Revert "Revert "[core] Ensure failed to register worker is killed and print better log"" (#21028)

Reverts ray-project/ray#21023
Revert this one since 7fc9a9c227 has fixed the issue
This commit is contained in:
Yi Cheng 2021-12-11 20:49:47 -08:00 committed by GitHub
parent db058d0fb3
commit f4e6623522
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 4 deletions

View file

@ -7,6 +7,7 @@ import pytest
import grpc
from grpc._channel import _InactiveRpcError
import psutil
import subprocess
import ray.ray_constants as ray_constants
@ -436,6 +437,40 @@ def test_gcs_drain(ray_start_cluster_head, error_pubsub):
ray.get(a.ready.remote())
def test_worker_start_timeout(monkeypatch, ray_start_cluster):
# This test is to make sure
# 1. when worker failed to register, raylet will print useful log
# 2. raylet will kill hanging worker
with monkeypatch.context() as m:
# this delay will make worker start slow
m.setenv(
"RAY_testing_asio_delay_us",
"InternalKVGcsService.grpc_server.InternalKVGet"
"=2000000:2000000")
m.setenv("RAY_worker_register_timeout_seconds", "1")
cluster = ray_start_cluster
cluster.add_node(num_cpus=4, object_store_memory=1e9)
script = """
import ray
ray.init(address='auto')
@ray.remote
def task():
return None
ray.get(task.remote(), timeout=3)
"""
with pytest.raises(subprocess.CalledProcessError) as e:
run_string_as_driver(script)
# make sure log is correct
assert ("The process is still alive, probably "
"it's hanging during start") in e.value.output.decode()
# worker will be killed so it won't try to register to raylet
assert ("Received a register request from an "
"unknown worker shim process") not in e.value.output.decode()
def test_task_failure_when_driver_local_raylet_dies(ray_start_cluster):
cluster = ray_start_cluster
head = cluster.add_node(num_cpus=4, resources={"foo": 1})

View file

@ -438,16 +438,25 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
*io_service_, boost::posix_time::seconds(
RayConfig::instance().worker_register_timeout_seconds()));
// Capture timer in lambda to copy it once, so that it can avoid destructing timer.
timer->async_wait([timer, language, proc, proc_startup_token, worker_type,
this](const boost::system::error_code e) {
timer->async_wait([timer, language, proc = proc, proc_startup_token, worker_type,
this](const boost::system::error_code e) mutable {
// check the error code.
auto &state = this->GetStateForLanguage(language);
// Since this process times out to start, remove it from starting_worker_processes
// to avoid the zombie worker.
auto it = state.starting_worker_processes.find(proc_startup_token);
if (it != state.starting_worker_processes.end()) {
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not registered to raylet within timeout.";
RAY_LOG(ERROR)
<< "Some workers of the worker process(" << proc.GetId()
<< ") have not registered within the timeout. "
<< (proc.IsAlive()
? "The process is still alive, probably it's hanging during start."
: "The process is dead, probably it crashed during start.");
if (proc.IsAlive()) {
proc.Kill();
}
PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration;
process_failed_pending_registration_++;
bool found;

View file

@ -462,6 +462,13 @@ int Process::Wait() const {
return status;
}
bool Process::IsAlive() const {
if (p_) {
return IsProcessAlive(p_->GetId());
}
return false;
}
void Process::Kill() {
if (p_) {
pid_t pid = p_->GetId();

View file

@ -88,6 +88,8 @@ class Process {
bool IsValid() const;
/// Forcefully kills the process. Unsafe for unowned processes.
void Kill();
/// Check whether the process is alive.
bool IsAlive() const;
/// Convenience function to start a process in the background.
/// \param pid_file A file to write the PID of the spawned process in.
static std::pair<Process, std::error_code> Spawn(