From 65563e994bad523cc0f7bda365e96e3f2ef77922 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 26 Jul 2022 06:29:08 +0000 Subject: [PATCH] [core] Allow 0 waiting for death info to fail a task faster (#26993) When a task failed, it'll wait for the death info and then fail. The waiting is 1s and the checking is every 1s. This is good for usability, but it causes issues for some cases because it'll delay the task return at most 2s and at least 1s. This PR introduce an early cut where when the timeout is set to be 0, it'll just return immediately. The semantics doesn't change and for most users they are still going to get the message. --- python/ray/tests/test_failure_3.py | 45 +++++++++++++++++++ src/ray/common/ray_config_def.h | 4 ++ src/ray/core_worker/core_worker.cc | 8 ++-- .../transport/direct_actor_task_submitter.cc | 26 ++++++----- 4 files changed, 68 insertions(+), 15 deletions(-) diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index 969b69a97..b5f6a9655 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -307,6 +307,51 @@ def test_actor_failure_async_4(ray_start_regular, tmp_path): ray.get(t) +@pytest.mark.skipif(sys.platform == "win32", reason="Fail on windowns") +@pytest.mark.parametrize( + "ray_start_regular", + [ + { + "_system_config": { + "timeout_ms_task_wait_for_death_info": 0, + "core_worker_internal_heartbeat_ms": 1000000, + } + } + ], + indirect=True, +) +def test_actor_failure_no_wait(ray_start_regular, tmp_path): + p = tmp_path / "a_pid" + time.sleep(1) + + # Make sure the request will fail immediately without waiting for the death info + @ray.remote(max_restarts=1, max_task_retries=0) + class A: + def __init__(self): + pid = os.getpid() + # The second time start, it'll block, + # so that we'll know the actor is restarting. + if p.exists(): + p.write_text(str(pid)) + time.sleep(100000) + else: + p.write_text(str(pid)) + + def p(self): + time.sleep(100000) + + def pid(self): + return os.getpid() + + a = A.remote() + pid = ray.get(a.pid.remote()) + t = a.p.remote() + os.kill(int(pid), signal.SIGKILL) + with pytest.raises(ray.exceptions.RayActorError): + # Make sure it'll return within 1s + ray.get(t) + + if __name__ == "__main__": import pytest diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 88d1e0ea1..2c37bed4e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -518,6 +518,10 @@ RAY_CONFIG(int64_t, log_rotation_backup_count, 5) /// as failed. RAY_CONFIG(int64_t, timeout_ms_task_wait_for_death_info, 1000) +/// The core worker heartbeat interval. During heartbeat, it'll +/// report the loads to raylet. +RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000); + /// Maximum amount of memory that will be used by running tasks' args. RAY_CONFIG(float, max_task_args_memory_fraction, 0.7) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9ec4fbe5a..bd5aa9ecf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -37,9 +37,6 @@ namespace ray { namespace core { namespace { -// Duration between internal book-keeping heartbeats. -const uint64_t kInternalHeartbeatMillis = 1000; - using ActorLifetime = ray::rpc::JobConfig_ActorLifetime; JobID GetProcessJobID(const CoreWorkerOptions &options) { @@ -508,8 +505,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }, 100); - periodical_runner_.RunFnPeriodically([this] { InternalHeartbeat(); }, - kInternalHeartbeatMillis); + periodical_runner_.RunFnPeriodically( + [this] { InternalHeartbeat(); }, + RayConfig::instance().core_worker_internal_heartbeat_ms()); #ifndef _WIN32 // Doing this last during CoreWorker initialization, so initialization logic like diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 9d6cfe637..d00861fcf 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -309,12 +309,13 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( GetTaskFinisherWithoutMu().FailOrRetryPendingTask( task_id, error_type, &status, &error_info); } - - RAY_LOG(DEBUG) << "Failing tasks waiting for death info, size=" - << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; - for (auto &net_err_task : wait_for_death_info_tasks) { - RAY_UNUSED(GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed( - net_err_task.second, error_type, &error_info)); + if (!wait_for_death_info_tasks.empty()) { + RAY_LOG(DEBUG) << "Failing tasks waiting for death info, size=" + << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; + for (auto &net_err_task : wait_for_death_info_tasks) { + RAY_UNUSED(GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed( + net_err_task.second, error_type, &error_info)); + } } } // NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks. @@ -525,9 +526,10 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( // No retry == actor is dead. // If actor is not dead yet, wait for the grace period until we mark the // return object as failed. - int64_t death_info_grace_period_ms = - current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info(); - { + if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) { + int64_t death_info_grace_period_ms = + current_time_ms() + + RayConfig::instance().timeout_ms_task_wait_for_death_info(); absl::MutexLock lock(&mu_); auto queue_pair = client_queues_.find(actor_id); RAY_CHECK(queue_pair != client_queues_.end()); @@ -538,7 +540,11 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( << "PushActorTask failed because of network error, this task " "will be stashed away and waiting for Death info from GCS, task_id=" << task_spec.TaskId() - << ", wait queue size=" << queue.wait_for_death_info_tasks.size(); + << ", wait_queue_size=" << queue.wait_for_death_info_tasks.size(); + } else { + // If we don't need death info, just fail the request. + GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed( + task_spec, rpc::ErrorType::ACTOR_DIED); } } }