From f4e6623522525177c2385b1cf2625d15643f6ea9 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 11 Dec 2021 20:49:47 -0800 Subject: [PATCH] Revert "Revert "[core] Ensure failed to register worker is killed and print better log"" (#21028) Reverts ray-project/ray#21023 Revert this one since https://github.com/ray-project/ray/commit/7fc9a9c2270bd7b75601874f24db9e21be0be1a8 has fixed the issue --- python/ray/tests/test_failure_4.py | 35 ++++++++++++++++++++++++++++++ src/ray/raylet/worker_pool.cc | 17 +++++++++++---- src/ray/util/process.cc | 7 ++++++ src/ray/util/process.h | 2 ++ 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index 3fcdc374c..971c951be 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -7,6 +7,7 @@ import pytest import grpc from grpc._channel import _InactiveRpcError import psutil +import subprocess import ray.ray_constants as ray_constants @@ -436,6 +437,40 @@ def test_gcs_drain(ray_start_cluster_head, error_pubsub): ray.get(a.ready.remote()) +def test_worker_start_timeout(monkeypatch, ray_start_cluster): + # This test is to make sure + # 1. when worker failed to register, raylet will print useful log + # 2. raylet will kill hanging worker + with monkeypatch.context() as m: + # this delay will make worker start slow + m.setenv( + "RAY_testing_asio_delay_us", + "InternalKVGcsService.grpc_server.InternalKVGet" + "=2000000:2000000") + m.setenv("RAY_worker_register_timeout_seconds", "1") + cluster = ray_start_cluster + cluster.add_node(num_cpus=4, object_store_memory=1e9) + script = """ +import ray +ray.init(address='auto') + +@ray.remote +def task(): + return None + +ray.get(task.remote(), timeout=3) +""" + with pytest.raises(subprocess.CalledProcessError) as e: + run_string_as_driver(script) + + # make sure log is correct + assert ("The process is still alive, probably " + "it's hanging during start") in e.value.output.decode() + # worker will be killed so it won't try to register to raylet + assert ("Received a register request from an " + "unknown worker shim process") not in e.value.output.decode() + + def test_task_failure_when_driver_local_raylet_dies(ray_start_cluster): cluster = ray_start_cluster head = cluster.add_node(num_cpus=4, resources={"foo": 1}) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 2332c36ce..ea629acda 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -438,16 +438,25 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, *io_service_, boost::posix_time::seconds( RayConfig::instance().worker_register_timeout_seconds())); // Capture timer in lambda to copy it once, so that it can avoid destructing timer. - timer->async_wait([timer, language, proc, proc_startup_token, worker_type, - this](const boost::system::error_code e) { + timer->async_wait([timer, language, proc = proc, proc_startup_token, worker_type, + this](const boost::system::error_code e) mutable { // check the error code. auto &state = this->GetStateForLanguage(language); // Since this process times out to start, remove it from starting_worker_processes // to avoid the zombie worker. auto it = state.starting_worker_processes.find(proc_startup_token); if (it != state.starting_worker_processes.end()) { - RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() - << ") have not registered to raylet within timeout."; + RAY_LOG(ERROR) + << "Some workers of the worker process(" << proc.GetId() + << ") have not registered within the timeout. " + << (proc.IsAlive() + ? "The process is still alive, probably it's hanging during start." + : "The process is dead, probably it crashed during start."); + + if (proc.IsAlive()) { + proc.Kill(); + } + PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration; process_failed_pending_registration_++; bool found; diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 0b615271e..69be1158c 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -462,6 +462,13 @@ int Process::Wait() const { return status; } +bool Process::IsAlive() const { + if (p_) { + return IsProcessAlive(p_->GetId()); + } + return false; +} + void Process::Kill() { if (p_) { pid_t pid = p_->GetId(); diff --git a/src/ray/util/process.h b/src/ray/util/process.h index 8dd22fc89..82edf3dec 100644 --- a/src/ray/util/process.h +++ b/src/ray/util/process.h @@ -88,6 +88,8 @@ class Process { bool IsValid() const; /// Forcefully kills the process. Unsafe for unowned processes. void Kill(); + /// Check whether the process is alive. + bool IsAlive() const; /// Convenience function to start a process in the background. /// \param pid_file A file to write the PID of the spawned process in. static std::pair Spawn(