[core] Allow 0 waiting for death info to fail a task faster (#26993)

When a task failed, it'll wait for the death info and then fail. The waiting is 1s and the checking is every 1s. This is good for usability, but it causes issues for some cases because it'll delay the task return at most 2s and at least 1s.

This PR introduce an early cut where when the timeout is set to be 0, it'll just return immediately. The semantics doesn't change and for most users they are still going to get the message.
This commit is contained in:
Yi Cheng 2022-07-26 06:29:08 +00:00 committed by GitHub
parent 419e78180a
commit 65563e994b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 15 deletions

View file

@ -307,6 +307,51 @@ def test_actor_failure_async_4(ray_start_regular, tmp_path):
ray.get(t)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail on windowns")
@pytest.mark.parametrize(
"ray_start_regular",
[
{
"_system_config": {
"timeout_ms_task_wait_for_death_info": 0,
"core_worker_internal_heartbeat_ms": 1000000,
}
}
],
indirect=True,
)
def test_actor_failure_no_wait(ray_start_regular, tmp_path):
p = tmp_path / "a_pid"
time.sleep(1)
# Make sure the request will fail immediately without waiting for the death info
@ray.remote(max_restarts=1, max_task_retries=0)
class A:
def __init__(self):
pid = os.getpid()
# The second time start, it'll block,
# so that we'll know the actor is restarting.
if p.exists():
p.write_text(str(pid))
time.sleep(100000)
else:
p.write_text(str(pid))
def p(self):
time.sleep(100000)
def pid(self):
return os.getpid()
a = A.remote()
pid = ray.get(a.pid.remote())
t = a.p.remote()
os.kill(int(pid), signal.SIGKILL)
with pytest.raises(ray.exceptions.RayActorError):
# Make sure it'll return within 1s
ray.get(t)
if __name__ == "__main__":
import pytest

View file

@ -518,6 +518,10 @@ RAY_CONFIG(int64_t, log_rotation_backup_count, 5)
/// as failed.
RAY_CONFIG(int64_t, timeout_ms_task_wait_for_death_info, 1000)
/// The core worker heartbeat interval. During heartbeat, it'll
/// report the loads to raylet.
RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000);
/// Maximum amount of memory that will be used by running tasks' args.
RAY_CONFIG(float, max_task_args_memory_fraction, 0.7)

View file

@ -37,9 +37,6 @@ namespace ray {
namespace core {
namespace {
// Duration between internal book-keeping heartbeats.
const uint64_t kInternalHeartbeatMillis = 1000;
using ActorLifetime = ray::rpc::JobConfig_ActorLifetime;
JobID GetProcessJobID(const CoreWorkerOptions &options) {
@ -508,8 +505,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
},
100);
periodical_runner_.RunFnPeriodically([this] { InternalHeartbeat(); },
kInternalHeartbeatMillis);
periodical_runner_.RunFnPeriodically(
[this] { InternalHeartbeat(); },
RayConfig::instance().core_worker_internal_heartbeat_ms());
#ifndef _WIN32
// Doing this last during CoreWorker initialization, so initialization logic like

View file

@ -309,12 +309,13 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
GetTaskFinisherWithoutMu().FailOrRetryPendingTask(
task_id, error_type, &status, &error_info);
}
RAY_LOG(DEBUG) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed(
net_err_task.second, error_type, &error_info));
if (!wait_for_death_info_tasks.empty()) {
RAY_LOG(DEBUG) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed(
net_err_task.second, error_type, &error_info));
}
}
}
// NOTE(kfstorm): We need to make sure the lock is released before invoking callbacks.
@ -525,9 +526,10 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply(
// No retry == actor is dead.
// If actor is not dead yet, wait for the grace period until we mark the
// return object as failed.
int64_t death_info_grace_period_ms =
current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info();
{
if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {
int64_t death_info_grace_period_ms =
current_time_ms() +
RayConfig::instance().timeout_ms_task_wait_for_death_info();
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
@ -538,7 +540,11 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply(
<< "PushActorTask failed because of network error, this task "
"will be stashed away and waiting for Death info from GCS, task_id="
<< task_spec.TaskId()
<< ", wait queue size=" << queue.wait_for_death_info_tasks.size();
<< ", wait_queue_size=" << queue.wait_for_death_info_tasks.size();
} else {
// If we don't need death info, just fail the request.
GetTaskFinisherWithoutMu().MarkTaskReturnObjectsFailed(
task_spec, rpc::ErrorType::ACTOR_DIED);
}
}
}