diff --git a/BUILD.bazel b/BUILD.bazel index a435005b6..42a953ab4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -748,6 +748,31 @@ cc_binary( ], ) +cc_library( + name = "platform_shims", + srcs = [] + select({ + "@bazel_tools//src/conditions:windows": glob([ + "src/shims/windows/**/*.c", + "src/shims/windows/**/*.cc", + ]), + "//conditions:default": [], + }), + hdrs = [] + select({ + "@bazel_tools//src/conditions:windows": glob([ + "src/shims/windows/**/*.h", + ]), + "//conditions:default": [], + }), + copts = COPTS, + includes = [] + select({ + "@bazel_tools//src/conditions:windows": [ + "src/shims/windows", + ], + "//conditions:default": [], + }), + visibility = ["//visibility:public"], +) + cc_library( name = "ray_util", srcs = glob( diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index a6309449a..527bea6c1 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -116,6 +116,7 @@ cc_library( ":common_fbs", ":plasma_fbs", "@com_github_google_glog//:glog", + "@com_github_ray_project_ray//:platform_shims", ], ) @@ -210,6 +211,7 @@ cc_library( ":ae", ":plasma_client", "@com_github_google_glog//:glog", + "@com_github_ray_project_ray//:platform_shims", ], ) diff --git a/bazel/BUILD.redis b/bazel/BUILD.redis index 6354a09c0..ba055c423 100644 --- a/bazel/BUILD.redis +++ b/bazel/BUILD.redis @@ -28,5 +28,8 @@ cc_library( copts = COPTS, includes = ["deps"], strip_include_prefix = "deps", + deps = [ + "@com_github_ray_project_ray//:platform_shims", + ], visibility = ["//visibility:public"], ) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b58010c04..f024eda81 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -262,12 +262,16 @@ void CoreWorker::Disconnect() { } void CoreWorker::RunIOService() { +#ifdef _WIN32 + // TODO(mehrdadn): Is there an equivalent for Windows we need here? +#else // Block SIGINT and SIGTERM so they will be handled by the main thread. sigset_t mask; sigemptyset(&mask); sigaddset(&mask, SIGINT); sigaddset(&mask, SIGTERM); pthread_sigmask(SIG_BLOCK, &mask, NULL); +#endif io_service_.run(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 921752d78..e480da84c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -266,8 +266,10 @@ void NodeManager::KillWorker(std::shared_ptr worker) { retry_timer->expires_from_now(retry_duration); retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Pid(); - // Force kill worker. TODO(rkn): Is there some small danger that the worker - // has already died and the PID has been reassigned to a different process? + // Force kill worker. TODO(mehrdadn, rkn): The worker may have already died + // and had its PID reassigned to a different process, at least on Windows. + // On Linux, this may or may not be the case, depending on e.g. whether + // the process has been already waited on. Regardless, this must be fixed. kill(worker->Pid(), SIGKILL); }); } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 878259ffe..b1df7e37b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1,5 +1,10 @@ #include "ray/raylet/worker_pool.h" +#ifdef _WIN32 +#include +#include +#endif + #include #include @@ -46,9 +51,13 @@ WorkerPool::WorkerPool(int num_workers, int maximum_startup_concurrency, : maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)) { RAY_CHECK(maximum_startup_concurrency > 0); +#ifdef _WIN32 + // TODO(mehrdadn): Is there an equivalent of this we need for Windows? +#else // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); +#endif for (const auto &entry : worker_commands) { // Initialize the pool state for this language. auto &state = states_by_lang_[entry.first]; @@ -198,6 +207,56 @@ int WorkerPool::StartWorkerProcess(const Language &language, return -1; } +#ifdef _WIN32 +// Fork + exec combo for Windows. Returns -1 on failure. +// TODO(mehrdadn): This is dangerous on Windows. +// We need to keep the actual process handle alive for the PID to stay valid. +// Make this change as soon as possible, or the PID may refer to the wrong process. +static pid_t spawnvp_wrapper(std::vector const &args) { + pid_t pid; + std::vector str_args; + for (const auto &arg : args) { + str_args.push_back(arg.c_str()); + } + str_args.push_back(NULL); + HANDLE handle = (HANDLE)spawnvp(P_NOWAIT, str_args[0], str_args.data()); + if (handle != INVALID_HANDLE_VALUE) { + pid = static_cast(GetProcessId(handle)); + if (pid == 0) { + pid = -1; + } + CloseHandle(handle); + } else { + pid = -1; + errno = EINVAL; + } + return pid; +} +#else +// Fork + exec combo for POSIX. Returns -1 on failure. +static pid_t spawnvp_wrapper(std::vector const &args) { + pid_t pid; + std::vector str_args; + for (const auto &arg : args) { + str_args.push_back(arg.c_str()); + } + str_args.push_back(NULL); + pid = fork(); + if (pid == 0) { + // Child process case. + // Reset the SIGCHLD handler for the worker. + // TODO(mehrdadn): Move any work here to the child process itself + // so that it can also be implemented on Windows. + signal(SIGCHLD, SIG_DFL); + if (execvp(str_args[0], const_cast(str_args.data())) == -1) { + pid = -1; + abort(); // fork() succeeded but exec() failed, so abort the child + } + } + return pid; +} +#endif + pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { if (RAY_LOG_ENABLED(DEBUG)) { std::stringstream stream; @@ -209,29 +268,12 @@ pid_t WorkerPool::StartProcess(const std::vector &worker_command_ar } // Launch the process to create the worker. - pid_t pid = fork(); - - if (pid != 0) { - return pid; + pid_t pid = spawnvp_wrapper(worker_command_args); + if (pid == -1) { + RAY_LOG(FATAL) << "Failed to start worker with error " << errno << ": " + << strerror(errno); } - - // Child process case. - // Reset the SIGCHLD handler for the worker. - signal(SIGCHLD, SIG_DFL); - - // Try to execute the worker command. - std::vector worker_command_args_str; - for (const auto &arg : worker_command_args) { - worker_command_args_str.push_back(arg.c_str()); - } - worker_command_args_str.push_back(nullptr); - int rv = execvp(worker_command_args_str[0], - const_cast(worker_command_args_str.data())); - - // The worker failed to start. This is a fatal error. - RAY_LOG(FATAL) << "Failed to start worker with return value " << rv << ": " - << strerror(errno); - return 0; + return pid; } Status WorkerPool::RegisterWorker(const std::shared_ptr &worker) { diff --git a/src/ray/util/signal_test.cc b/src/ray/util/signal_test.cc index 119aeee19..afa568bf1 100644 --- a/src/ray/util/signal_test.cc +++ b/src/ray/util/signal_test.cc @@ -8,6 +8,7 @@ // This test just print some call stack information. namespace ray { +#ifndef _WIN32 void Sleep() { usleep(100000); } @@ -77,6 +78,7 @@ TEST(SignalTest, SIGILL_Test) { } } +#endif // !_WIN32 } // namespace ray int main(int argc, char **argv) { diff --git a/src/shims/windows/unistd.cc b/src/shims/windows/unistd.cc new file mode 100644 index 000000000..e1705c35d --- /dev/null +++ b/src/shims/windows/unistd.cc @@ -0,0 +1,28 @@ +#include + +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN 1 +#endif +#include + +int kill(pid_t pid, int sig) { + int result; + if (HANDLE process = OpenProcess(PROCESS_TERMINATE, FALSE, pid)) { + if (sig == SIGKILL) { + if (TerminateProcess(process, ERROR_PROCESS_ABORTED)) { + result = 0; + } else { + result = -1; + errno = EPERM; + } + } else { + result = -1; + errno = EINVAL; + } + CloseHandle(process); + } else { + result = -1; + errno = ESRCH; + } + return result; +} diff --git a/src/shims/windows/unistd.h b/src/shims/windows/unistd.h new file mode 100644 index 000000000..a5c191160 --- /dev/null +++ b/src/shims/windows/unistd.h @@ -0,0 +1,17 @@ +#ifndef UNISTD_H +#define UNISTD_H + +typedef int pid_t /* technically unsigned on Windows, but no practical concern */; +enum { SIGKILL = 9 }; + +__declspec( + deprecated("Killing a process by ID has an inherent race condition on Windows" + " and is HIGHLY discouraged. " + "Furthermore, signals other than SIGKILL are NOT portable. " + "Please use a wrapper that keeps the process handle alive" + " and terminates it directly as needed. " + "For SIGTERM or other signals, a different IPC mechanism may be" + " more appropriate (such as window messages on Windows)." + "")) int kill(pid_t pid, int sig); + +#endif /* UNISTD_H */