Exit worker when parent raylet dies (#20777)

* Exit worker when parent raylet dies

* Exit worker when parent raylet dies

* Exit worker when parent raylet dies
This commit is contained in:
Jiajun Yao 2021-11-30 18:04:11 +00:00 committed by GitHub
parent efbb815402
commit e3e2739164
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 64 additions and 21 deletions

View file

@ -40,7 +40,7 @@ remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
py_executable: str = sys.executable
command_str = " ".join([f"exec {py_executable}"] + remaining_args)
child_pid = os.fork()
print("shim pid:{} , worker pid:{}", os.getpid(), child_pid)
print(f"shim pid:{os.getpid()} , worker pid:{child_pid}")
if child_pid == 0:
# child process
os.execvp("bash", ["bash", "-c", command_str])

View file

@ -1,5 +1,6 @@
import os
import sys
import signal
import ray
@ -7,7 +8,36 @@ import numpy as np
import pytest
import time
from ray._private.test_utils import SignalActor
from ray._private.test_utils import SignalActor, wait_for_pid_to_exit
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
def test_worker_exit_after_parent_raylet_dies(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
cluster.add_node(num_cpus=8, resources={"foo": 1})
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote(resources={"foo": 1})
class Actor():
def get_worker_pid(self):
return os.getpid()
def get_raylet_pid(self):
return int(os.environ["RAY_RAYLET_PID"])
actor = Actor.remote()
worker_pid = ray.get(actor.get_worker_pid.remote())
raylet_pid = ray.get(actor.get_raylet_pid.remote())
# Kill the parent raylet.
os.kill(raylet_pid, SIGKILL)
os.waitpid(raylet_pid, 0)
wait_for_pid_to_exit(raylet_pid)
# Make sure the worker process exits as well.
wait_for_pid_to_exit(worker_pid)
@pytest.mark.parametrize(

View file

@ -44,6 +44,7 @@ constexpr char kPublicDNSServerIp[] = "8.8.8.8";
constexpr int kPublicDNSServerPort = 53;
constexpr char kEnvVarKeyJobId[] = "RAY_JOB_ID";
constexpr char kEnvVarKeyRayletPid[] = "RAY_RAYLET_PID";
/// for cross-langueage serialization
constexpr int kMessagePackOffset = 9;

View file

@ -236,7 +236,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
if (options_.worker_type == WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { CheckForRayletFailure(); },
[this] { ExitIfParentRayletDies(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
}
@ -668,23 +668,17 @@ void CoreWorker::RegisterToGcs() {
RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
}
void CoreWorker::CheckForRayletFailure() {
auto env_pid = RayConfig::instance().RAYLET_PID();
bool should_shutdown = IsRayletFailed(env_pid);
void CoreWorker::ExitIfParentRayletDies() {
RAY_CHECK(options_.worker_type == WorkerType::WORKER);
RAY_CHECK(!RayConfig::instance().RAYLET_PID().empty());
auto raylet_pid = static_cast<pid_t>(std::stoi(RayConfig::instance().RAYLET_PID()));
bool should_shutdown = !IsProcessAlive(raylet_pid);
if (should_shutdown) {
std::ostringstream stream;
stream << "Shutting down the core worker because the local raylet failed. "
<< "Check out the raylet.out log file.";
if (!env_pid.empty()) {
auto pid = static_cast<pid_t>(std::stoi(env_pid));
stream << " Raylet pid: " << pid;
}
<< "Check out the raylet.out log file. Raylet pid: " << raylet_pid;
RAY_LOG(WARNING) << stream.str();
if (options_.worker_type == WorkerType::WORKER) {
task_execution_service_.post([this]() { Shutdown(); }, "CoreWorker.Shutdown");
} else {
Shutdown();
}
task_execution_service_.post([this]() { Shutdown(); }, "CoreWorker.Shutdown");
}
}

View file

@ -803,8 +803,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Register this worker or driver to GCS.
void RegisterToGcs();
/// Check if the raylet has failed. If so, shutdown.
void CheckForRayletFailure();
/// (WORKER mode only) Check if the raylet has failed. If so, shutdown.
void ExitIfParentRayletDies();
/// Heartbeat for internal bookkeeping.
void InternalHeartbeat();

View file

@ -328,6 +328,7 @@ Process WorkerPool::StartWorkerProcess(
// need to add a new CLI parameter for both Python and Java workers.
env.emplace(kEnvVarKeyJobId, job_id.Hex());
}
env.emplace(kEnvVarKeyRayletPid, std::to_string(GetPID()));
// TODO(SongGuyang): Maybe Python and Java also need native library path in future.
if (language == Language::CPP) {

View file

@ -569,11 +569,26 @@ pid_t GetParentPID() {
pid_t GetParentPID() { return getppid(); }
#endif // #ifdef _WIN32
pid_t GetPID() {
#ifdef _WIN32
return GetCurrentProcessId();
#else
return getpid();
#endif
}
bool IsParentProcessAlive() { return GetParentPID() != 1; }
bool IsProcessAlive(pid_t pid) {
#ifdef _WIN32
RAY_LOG(FATAL) << "IsProcessAlive not implement on windows";
if (HANDLE handle =
OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, static_cast<DWORD>(pid))) {
DWORD exit_code;
if (GetExitCodeProcess(handle, &exit_code) && exit_code == STILL_ACTIVE) {
return true;
}
CloseHandle(handle);
}
return false;
#else
if (kill(pid, 0) == -1 && errno == ESRCH) {
@ -594,8 +609,8 @@ bool equal_to<ray::Process>::operator()(const ray::Process &x,
? !y.IsNull()
? x.IsValid()
? y.IsValid() ? equal_to<pid_t>()(x.GetId(), y.GetId()) : false
: y.IsValid() ? false
: equal_to<void const *>()(x.Get(), y.Get())
: y.IsValid() ? false
: equal_to<void const *>()(x.Get(), y.Get())
: false
: y.IsNull();
}

View file

@ -102,6 +102,8 @@ class Process {
// will be 1 (this simulates POSIX getppid()).
pid_t GetParentPID();
pid_t GetPID();
bool IsParentProcessAlive();
bool IsProcessAlive(pid_t pid);