Reconstruct failed actors without sending tasks. (#5161)

* fast reconstruct dead actors

* add test

* fix typos

* remove debug print

* small fix

* fix typos

* Update test_actor.py
This commit is contained in:
Hao Chen 2019-07-16 01:25:09 +08:00 committed by Stephanie Wang
parent 7342117710
commit ea6aa6409a
3 changed files with 67 additions and 2 deletions

View file

@ -11,6 +11,7 @@ import pytest
import signal
import sys
import time
from pyarrow import plasma
import ray
import ray.ray_constants as ray_constants
@ -19,6 +20,7 @@ import ray.tests.cluster_utils
from ray.tests.conftest import generate_internal_config_map
from ray.tests.utils import (
relevant_errors,
wait_for_condition,
wait_for_errors,
)
@ -2162,6 +2164,39 @@ def test_actor_reconstruction(ray_start_regular):
ray.get(actor.increase.remote())
def test_actor_reconstruction_without_task(ray_start_regular):
"""Test a dead actor can be reconstructed without sending task to it."""
def object_exists(obj_id):
"""Check wether an object exists in plasma store."""
plasma_client = ray.worker.global_worker.plasma_client
plasma_id = plasma.ObjectID(obj_id.binary())
return plasma_client.get(
plasma_id, timeout_ms=0) != plasma.ObjectNotAvailable
@ray.remote(max_reconstructions=1)
class ReconstructableActor(object):
def __init__(self, obj_ids):
for obj_id in obj_ids:
# Every time the actor gets constructed,
# put a new object in plasma store.
if not object_exists(obj_id):
ray.worker.global_worker.put_object(obj_id, 1)
break
def get_pid(self):
return os.getpid()
obj_ids = [ray.ObjectID.from_random() for _ in range(2)]
actor = ReconstructableActor.remote(obj_ids)
# Kill the actor.
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
assert wait_for_condition(
lambda: object_exists(obj_ids[1]), timeout_ms=5000)
def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
"""Test actor reconstruction when node dies unexpectedly."""
cluster = ray_start_cluster_head

View file

@ -94,3 +94,25 @@ def wait_for_errors(error_type, num_errors, timeout=10):
return
time.sleep(0.1)
raise Exception("Timing out of wait.")
def wait_for_condition(condition_predictor,
timeout_ms=1000,
retry_interval_ms=100):
"""A helper function that waits until a condition is met.
Args:
condition_predictor: A function that predicts the condition.
timeout_ms: Maximum timeout in milliseconds.
retry_interval_ms: Retry interval in milliseconds.
Return:
Whether the condition is met within the timeout.
"""
time_elapsed = 0
while time_elapsed <= timeout_ms:
if condition_predictor():
return True
time_elapsed += retry_interval_ms
time.sleep(retry_interval_ms / 1000.0)
return False

View file

@ -655,6 +655,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
<< actor_registration.GetRemainingReconstructions();
if (actor_registration.GetState() == ActorTableData::ALIVE) {
// The actor is now alive (created for the first time or reconstructed). We can
// stop listening for the actor creation task. This is needed because we use
// `ListenAndMaybeReconstruct` to reconstruct the actor.
reconstruction_policy_.Cancel(actor_registration.GetActorCreationDependency());
// The actor's location is now known. Dequeue any methods that were
// submitted before the actor's location was known.
// (See design_docs/task_states.rst for the state transition diagram.)
@ -692,6 +696,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
} else {
RAY_CHECK(actor_registration.GetState() == ActorTableData::RECONSTRUCTING);
RAY_LOG(DEBUG) << "Actor is being reconstructed: " << actor_id;
// The actor is dead and needs reconstruction. Attempting to reconstruct its
// creation task.
reconstruction_policy_.ListenAndMaybeReconstruct(
actor_registration.GetActorCreationDependency());
// When an actor fails but can be reconstructed, resubmit all of the queued
// tasks for that actor. This will mark the tasks as waiting for actor
// creation.
@ -2056,8 +2064,8 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
RAY_CHECK(lineage_cache_.ContainsTask(task_id))
<< "Task metadata not found in either GCS or lineage cache. It may have been "
"evicted "
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";