Windows process issues (#7739)

This commit is contained in:
mehrdadn 2020-03-29 12:48:32 -07:00 committed by GitHub
parent 6ce8b63bb6
commit fc23f79f82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 150 additions and 34 deletions

View file

@ -6,9 +6,9 @@ import os
import logging import logging
import signal import signal
import socket import socket
import subprocess
import sys import sys
import tempfile import tempfile
import threading
import time import time
import ray import ray
@ -185,7 +185,7 @@ class Node:
self.kill_all_processes(check_alive=False, allow_graceful=True) self.kill_all_processes(check_alive=False, allow_graceful=True)
sys.exit(1) sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler) ray.utils.set_sigterm_handler(sigterm_handler)
def _init_temp(self, redis_client): def _init_temp(self, redis_client):
# Create an dictionary to store temp file index. # Create an dictionary to store temp file index.
@ -718,20 +718,16 @@ class Node:
time.sleep(0.1) time.sleep(0.1)
if allow_graceful: if allow_graceful:
# Allow the process one second to exit gracefully.
process.terminate() process.terminate()
timer = threading.Timer(1, lambda process: process.kill(), # Allow the process one second to exit gracefully.
[process]) timeout_seconds = 1
try: try:
timer.start() process.wait(timeout_seconds)
process.wait() except subprocess.TimeoutExpired:
finally: pass
timer.cancel()
if process.poll() is not None: # If the process did not exit, force kill it.
continue if process.poll() is None:
# If the process did not exit within one second, force kill it.
process.kill() process.kill()
# The reason we usually don't call process.wait() here is that # The reason we usually don't call process.wait() here is that
# there's some chance we'd end up waiting a really long time. # there's some chance we'd end up waiting a really long time.

View file

@ -1,10 +1,12 @@
import collections import collections
import errno import errno
import io
import json import json
import logging import logging
import multiprocessing import multiprocessing
import os import os
import random import random
import signal
import socket import socket
import subprocess import subprocess
import sys import sys
@ -76,6 +78,32 @@ ProcessInfo = collections.namedtuple("ProcessInfo", [
]) ])
class ConsolePopen(subprocess.Popen):
if sys.platform == "win32":
def terminate(self):
if isinstance(self.stdin, io.IOBase):
self.stdin.close()
if self._use_signals:
self.send_signal(signal.CTRL_BREAK_EVENT)
else:
super(ConsolePopen, self).terminate()
def __init__(self, *args, **kwargs):
# CREATE_NEW_PROCESS_GROUP is used to send Ctrl+C on Windows:
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
new_pgroup = subprocess.CREATE_NEW_PROCESS_GROUP
flags = 0
if ray.utils.detect_fate_sharing_support():
# If we don't have kernel-mode fate-sharing, then don't do this
# because our children need to be in out process group for
# the process reaper to properly terminate them.
flags = new_pgroup
kwargs.setdefault("creationflags", flags)
self._use_signals = (kwargs["creationflags"] & new_pgroup)
super(ConsolePopen, self).__init__(*args, **kwargs)
def address(ip_address, port): def address(ip_address, port):
return ip_address + ":" + str(port) return ip_address + ":" + str(port)
@ -464,7 +492,7 @@ def start_ray_process(command,
if fate_share and sys.platform.startswith("linux"): if fate_share and sys.platform.startswith("linux"):
ray.utils.set_kill_on_parent_death_linux() ray.utils.set_kill_on_parent_death_linux()
process = subprocess.Popen( process = ConsolePopen(
command, command,
env=modified_env, env=modified_env,
cwd=cwd, cwd=cwd,

View file

@ -5,6 +5,7 @@ import inspect
import logging import logging
import numpy as np import numpy as np
import os import os
import signal
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
@ -561,10 +562,12 @@ def detect_fate_sharing_support_win32():
# https://docs.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-setinformationjobobject # https://docs.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-setinformationjobobject
JobObjectExtendedLimitInformation = 9 JobObjectExtendedLimitInformation = 9
JOB_OBJECT_LIMIT_BREAKAWAY_OK = 0x00000800 JOB_OBJECT_LIMIT_BREAKAWAY_OK = 0x00000800
JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION = 0x00000400
JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000
buf = JOBOBJECT_EXTENDED_LIMIT_INFORMATION() buf = JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
buf.BasicLimitInformation.LimitFlags = ( buf.BasicLimitInformation.LimitFlags = (
JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
| JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION
| JOB_OBJECT_LIMIT_BREAKAWAY_OK) | JOB_OBJECT_LIMIT_BREAKAWAY_OK)
infoclass = JobObjectExtendedLimitInformation infoclass = JobObjectExtendedLimitInformation
if not kernel32.SetInformationJobObject( if not kernel32.SetInformationJobObject(
@ -636,6 +639,17 @@ def set_kill_child_on_death_win32(child_proc):
assert False, "AssignProcessToJobObject used despite being unavailable" assert False, "AssignProcessToJobObject used despite being unavailable"
def set_sigterm_handler(sigterm_handler):
"""Registers a handler for SIGTERM in a platform-compatible manner."""
if sys.platform == "win32":
# Note that these signal handlers only work for console applications.
# TODO(mehrdadn): implement graceful process termination mechanism
# SIGINT is Ctrl+C, SIGBREAK is Ctrl+Break.
signal.signal(signal.SIGBREAK, sigterm_handler)
else:
signal.signal(signal.SIGTERM, sigterm_handler)
def try_make_directory_shared(directory_path): def try_make_directory_shared(directory_path):
try: try:
os.chmod(directory_path, 0o0777) os.chmod(directory_path, 0o0777)

View file

@ -422,7 +422,7 @@ class Worker:
shutdown(True) shutdown(True)
sys.exit(1) sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler) ray.utils.set_sigterm_handler(sigterm_handler)
self.core_worker.run_task_loop() self.core_worker.run_task_loop()
sys.exit(0) sys.exit(0)
@ -882,7 +882,7 @@ def sigterm_handler(signum, frame):
try: try:
signal.signal(signal.SIGTERM, sigterm_handler) ray.utils.set_sigterm_handler(sigterm_handler)
except ValueError: except ValueError:
logger.warning("Failed to set SIGTERM handler, processes might" logger.warning("Failed to set SIGTERM handler, processes might"
"not be cleaned up properly on exit.") "not be cleaned up properly on exit.")
@ -1222,13 +1222,17 @@ def connect(node,
# Redirect stdout/stderr at the file descriptor level. If we simply # Redirect stdout/stderr at the file descriptor level. If we simply
# set sys.stdout and sys.stderr, then logging from C++ can fail to # set sys.stdout and sys.stderr, then logging from C++ can fail to
# be redirected. # be redirected.
if log_stdout_file is not None:
os.dup2(log_stdout_file.fileno(), sys.stdout.fileno()) os.dup2(log_stdout_file.fileno(), sys.stdout.fileno())
if log_stderr_file is not None:
os.dup2(log_stderr_file.fileno(), sys.stderr.fileno()) os.dup2(log_stderr_file.fileno(), sys.stderr.fileno())
# We also manually set sys.stdout and sys.stderr because that seems # We also manually set sys.stdout and sys.stderr because that seems
# to have an affect on the output buffering. Without doing this, # to have an affect on the output buffering. Without doing this,
# stdout and stderr are heavily buffered resulting in seemingly # stdout and stderr are heavily buffered resulting in seemingly
# lost logging statements. # lost logging statements.
if log_stdout_file is not None:
sys.stdout = log_stdout_file sys.stdout = log_stdout_file
if log_stderr_file is not None:
sys.stderr = log_stderr_file sys.stderr = log_stderr_file
# This should always be the first message to appear in the worker's # This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is # stdout and stderr log files. The string "Ray worker pid:" is
@ -1238,8 +1242,12 @@ def connect(node,
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name) worker_dict["stdout_file"] = os.path.abspath(
worker_dict["stderr_file"] = os.path.abspath(log_stderr_file.name) (log_stdout_file
if log_stdout_file is not None else sys.stdout).name)
worker_dict["stderr_file"] = os.path.abspath(
(log_stderr_file
if log_stderr_file is not None else sys.stderr).name)
worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict) worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict)
else: else:
raise ValueError("Invalid worker mode. Expected DRIVER or WORKER.") raise ValueError("Invalid worker mode. Expected DRIVER or WORKER.")

View file

@ -375,14 +375,10 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
void CoreWorker::CheckForRayletFailure() { void CoreWorker::CheckForRayletFailure() {
// If the raylet fails, we will be reassigned to init (PID=1). // If the raylet fails, we will be reassigned to init (PID=1).
#ifdef _WIN32
// TODO(mehrdadn): need a different solution for Windows.
#else
if (getppid() == 1) { if (getppid() == 1) {
RAY_LOG(ERROR) << "Raylet failed. Shutting down."; RAY_LOG(ERROR) << "Raylet failed. Shutting down.";
Shutdown(); Shutdown();
} }
#endif
// Reset the timer from the previous expiration time to avoid drift. // Reset the timer from the previous expiration time to avoid drift.
death_check_timer_.expires_at( death_check_timer_.expires_at(

View file

@ -201,7 +201,12 @@ int main(int argc, char *argv[]) {
main_service.stop(); main_service.stop();
remove(raylet_socket_name.c_str()); remove(raylet_socket_name.c_str());
}; };
boost::asio::signal_set signals(main_service, SIGTERM); boost::asio::signal_set signals(main_service);
#ifdef _WIN32
signals.add(SIGBREAK);
#else
signals.add(SIGTERM);
#endif
signals.async_wait(handler); signals.async_wait(handler);
main_service.run(); main_service.run();

View file

@ -65,7 +65,12 @@ int main(int argc, char *argv[]) {
// // instead of returning immediately. // // instead of returning immediately.
// auto handler = [&io_service](const boost::system::error_code &error, // auto handler = [&io_service](const boost::system::error_code &error,
// int signal_number) { io_service.stop(); }; // int signal_number) { io_service.stop(); };
// boost::asio::signal_set signals(io_service, SIGTERM); // boost::asio::signal_set signals(io_service);
// #ifdef _WIN32
// signals.add(SIGBREAK);
// #else
// signals.add(SIGTERM);
// #endif
// signals.async_wait(handler); // signals.async_wait(handler);
// Initialize the monitor. // Initialize the monitor.

View file

@ -68,7 +68,12 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
raylet_config_(raylet_config), raylet_config_(raylet_config),
starting_worker_timeout_callback_(starting_worker_timeout_callback) { starting_worker_timeout_callback_(starting_worker_timeout_callback) {
RAY_CHECK(maximum_startup_concurrency > 0); RAY_CHECK(maximum_startup_concurrency > 0);
#ifndef _WIN32 #ifdef _WIN32
// If worker processes fail to initialize, don't display an error window.
SetErrorMode(GetErrorMode() | SEM_FAILCRITICALERRORS);
// If worker processes crash, don't display an error window.
SetErrorMode(GetErrorMode() | SEM_NOGPFAULTERRORBOX);
#else
// Ignore SIGCHLD signals. If we don't do this, then worker processes will // Ignore SIGCHLD signals. If we don't do this, then worker processes will
// become zombies instead of dying gracefully. // become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN); signal(SIGCHLD, SIG_IGN);

View file

@ -1,9 +1,66 @@
#include <unistd.h> #include <unistd.h>
#include <atomic>
#ifndef WIN32_LEAN_AND_MEAN #ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1 #define WIN32_LEAN_AND_MEAN 1
#endif #endif
#include <Windows.h> #include <Windows.h>
#include <Winternl.h>
#ifndef STATUS_BUFFER_OVERFLOW
#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L)
#endif
typedef LONG NTSTATUS;
typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle,
ULONG ProcessInformationClass,
PVOID ProcessInformation,
ULONG ProcessInformationLength,
ULONG *ReturnLength);
static std::atomic<NtQueryInformationProcess_t *> NtQueryInformationProcess_ =
ATOMIC_VAR_INIT(NULL);
pid_t getppid() {
NtQueryInformationProcess_t *NtQueryInformationProcess = ::NtQueryInformationProcess_;
if (!NtQueryInformationProcess) {
NtQueryInformationProcess = reinterpret_cast<NtQueryInformationProcess_t *>(
GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")),
_CRT_STRINGIZE(NtQueryInformationProcess)));
::NtQueryInformationProcess_ = NtQueryInformationProcess;
}
DWORD ppid = 0;
PROCESS_BASIC_INFORMATION info;
ULONG cb = sizeof(info);
NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb);
if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) {
ppid = reinterpret_cast<DWORD>(info.Reserved3);
}
pid_t result = 0;
if (ppid > 0) {
// For now, assume PPID = 1 (simulating the reassignment to "init" on Linux)
result = 1;
if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) {
long long me_created, parent_created;
FILETIME unused;
if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast<FILETIME *>(&me_created),
&unused, &unused, &unused) &&
GetProcessTimes(parent, reinterpret_cast<FILETIME *>(&parent_created), &unused,
&unused, &unused)) {
if (me_created >= parent_created) {
// We verified the child is younger than the parent, so we know the parent
// is still alive.
// (Note that the parent can still die by the time this function returns,
// but that race condition exists on POSIX too, which we're emulating here.)
result = static_cast<pid_t>(ppid);
}
}
CloseHandle(parent);
}
}
return result;
}
int usleep(useconds_t usec) { int usleep(useconds_t usec) {
Sleep((usec + (1000 - 1)) / 1000); Sleep((usec + (1000 - 1)) / 1000);

View file

@ -42,6 +42,8 @@ typedef unsigned int useconds_t;
int usleep(useconds_t usec); int usleep(useconds_t usec);
unsigned sleep(unsigned seconds); unsigned sleep(unsigned seconds);
pid_t getppid();
__declspec( __declspec(
deprecated("Killing a process by ID has an inherent race condition on Windows" deprecated("Killing a process by ID has an inherent race condition on Windows"
" and is HIGHLY discouraged. " " and is HIGHLY discouraged. "