[Part 2] Improve RayActorDiedError: Store why the actor is dead to the actor table. (#20528)

This PR includes the precise reason why actor is dead to `ActorTable`. The `death cause` stored in the table will be propagated to core worker through pubsub, so that core worker can eventually raise a good error message with metadata.
This commit is contained in:
SangBin Cho 2021-11-25 21:39:02 +09:00 committed by GitHub
parent e37afe0425
commit 31f378e45a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 226 additions and 101 deletions

View file

@ -25,6 +25,7 @@ import ray._private.gcs_utils as gcs_utils
import ray._private.memory_monitor as memory_monitor
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import gcs_pb2
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsSubscriber
from ray._private.tls_utils import generate_self_signed_tls_certs
from ray.util.queue import Queue, _QueueActor, Empty
@ -235,7 +236,15 @@ def run_string_as_driver_nonblocking(driver_script, env: Dict = None):
return proc
def convert_actor_state(state):
if not state:
return None
return (gcs_pb2.ActorTableData.ActorState.DESCRIPTOR.values_by_number[
state].name)
def wait_for_num_actors(num_actors, state=None, timeout=10):
state = convert_actor_state(state)
start_time = time.time()
while time.time() - start_time < timeout:
if len([
@ -282,7 +291,8 @@ def kill_actor_and_wait_for_failure(actor, timeout=10, retry_interval_ms=100):
start = time.time()
while time.time() - start <= timeout:
actor_status = ray.state.actors(actor_id)
if actor_status["State"] == gcs_utils.ActorTableData.DEAD \
if actor_status["State"] == convert_actor_state(
gcs_utils.ActorTableData.DEAD) \
or actor_status["NumRestarts"] > current_num_restarts:
return
time.sleep(retry_interval_ms / 1000.0)

View file

@ -22,7 +22,8 @@ from ray.serve.utils import (block_until_http_ready, get_all_node_ids,
format_actor_name)
from ray.serve.config import HTTPOptions
from ray.serve.api import _get_global_client
from ray._private.test_utils import run_string_as_driver, wait_for_condition
from ray._private.test_utils import (run_string_as_driver, wait_for_condition,
convert_actor_state)
from ray._private.services import new_port
import ray._private.gcs_utils as gcs_utils
@ -345,8 +346,8 @@ def test_no_http(ray_shutdown):
# Only controller actor should exist
live_actors = [
actor for actor in ray.state.actors().values()
if actor["State"] == gcs_utils.ActorTableData.ALIVE
actor for actor in ray.state.actors().values() if actor["State"] ==
convert_actor_state(gcs_utils.ActorTableData.ALIVE)
]
assert len(live_actors) == 1
controller = serve.api._global_client._controller

View file

@ -124,6 +124,8 @@ class GlobalState:
"""
actor_info = {
"ActorID": binary_to_hex(actor_table_data.actor_id),
"ActorClassName": actor_table_data.class_name,
"IsDetached": actor_table_data.is_detached,
"Name": actor_table_data.name,
"JobID": binary_to_hex(actor_table_data.job_id),
"Address": {
@ -137,11 +139,13 @@ class GlobalState:
"NodeID": binary_to_hex(
actor_table_data.owner_address.raylet_id),
},
"State": actor_table_data.state,
"State": gcs_pb2.ActorTableData.ActorState.DESCRIPTOR.
values_by_number[actor_table_data.state].name,
"NumRestarts": actor_table_data.num_restarts,
"Timestamp": actor_table_data.timestamp,
"StartTime": actor_table_data.start_time,
"EndTime": actor_table_data.end_time,
"DeathCause": actor_table_data.death_cause
}
return actor_info

View file

@ -13,7 +13,7 @@ import ray.cluster_utils
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
run_string_as_driver, get_non_head_nodes, kill_actor_and_wait_for_failure,
SignalActor, wait_for_condition, wait_for_pid_to_exit)
SignalActor, wait_for_condition, wait_for_pid_to_exit, convert_actor_state)
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
from ray._raylet import GlobalStateAccessor
@ -756,7 +756,8 @@ def test_detached_actor_cleanup(ray_start_regular):
actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != gcs_utils.ActorTableData.DEAD:
while actor_status["State"] != convert_actor_state(
gcs_utils.ActorTableData.DEAD):
actor_status = ray.state.actors(
actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
@ -777,6 +778,7 @@ def test_detached_actor_cleanup(ray_start_regular):
import ray
import ray._private.gcs_utils as gcs_utils
import time
from ray._private.test_utils import convert_actor_state
ray.init(address="{}", namespace="default_test_namespace")
@ray.remote
@ -792,7 +794,7 @@ ray.kill(detached_actor)
actor_status = ray.state.actors(actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != gcs_utils.ActorTableData.DEAD:
while actor_status["State"] != convert_actor_state(gcs_utils.ActorTableData.DEAD): # noqa
actor_status = ray.state.actors(actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
wait_time += 1
@ -875,7 +877,8 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster):
actor_status = ray.state.actors(actor_id=handle._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != gcs_utils.ActorTableData.DEAD:
while actor_status["State"] != convert_actor_state(
gcs_utils.ActorTableData.DEAD):
actor_status = ray.state.actors(actor_id=handle._actor_id.hex())
time.sleep(1.0)
wait_time += 1

View file

@ -4,7 +4,7 @@ import time
from ray.util.client.ray_client_helpers import ray_start_client_server
from ray.tests.client_test_utils import create_remote_signal_actor
from ray._private.test_utils import wait_for_condition
from ray._private.test_utils import wait_for_condition, convert_actor_state
from ray.exceptions import TaskCancelledError
from ray.exceptions import RayTaskError
from ray.exceptions import WorkerCrashedError
@ -25,7 +25,8 @@ def _all_actors_dead(ray):
import ray._private.gcs_utils as gcs_utils
def _all_actors_dead_internal():
return all(actor["State"] == gcs_utils.ActorTableData.DEAD
return all(actor["State"] == convert_actor_state(
gcs_utils.ActorTableData.DEAD)
for actor in list(real_ray.state.actors().values()))
return _all_actors_dead_internal

View file

@ -13,7 +13,8 @@ import ray.ray_constants as ray_constants
from ray.exceptions import RayTaskError, RayActorError, GetTimeoutError
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.test_utils import (wait_for_condition, SignalActor,
init_error_pubsub, get_error_message)
init_error_pubsub, get_error_message,
convert_actor_state)
def test_unhandled_errors(ray_start_regular):
@ -663,8 +664,8 @@ def test_actor_failover_with_bad_network(ray_start_cluster_head):
actors = list(ray.state.actors().values())
assert len(actors) == 1
print(actors)
return (actors[0]["State"] == gcs_utils.ActorTableData.ALIVE
and actors[0]["NumRestarts"] == 1)
return (actors[0]["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE) and actors[0]["NumRestarts"] == 1)
wait_for_condition(check_actor_restart)

View file

@ -3,11 +3,9 @@ import sys
import ray
import ray._private.gcs_utils as gcs_utils
import pytest
from ray._private.test_utils import (
generate_system_config_map,
wait_for_condition,
wait_for_pid_to_exit,
)
from ray._private.test_utils import (generate_system_config_map,
wait_for_condition, wait_for_pid_to_exit,
convert_actor_state)
@ray.remote
@ -151,7 +149,8 @@ def test_del_actor_after_gcs_server_restart(ray_start_regular):
def condition():
actor_status = ray.state.actors(actor_id=actor_id)
if actor_status["State"] == gcs_utils.ActorTableData.DEAD:
if actor_status["State"] == convert_actor_state(
gcs_utils.ActorTableData.DEAD):
return True
else:
return False

View file

@ -8,7 +8,7 @@ import time
import ray
import ray.ray_constants
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import wait_for_condition
from ray._private.test_utils import wait_for_condition, convert_actor_state
from ray._raylet import GlobalStateAccessor
@ -102,7 +102,7 @@ def test_global_state_actor_table(ray_start_regular):
def get_state():
return list(ray.state.actors().values())[0]["State"]
dead_state = gcs_utils.ActorTableData.DEAD
dead_state = convert_actor_state(gcs_utils.ActorTableData.DEAD)
for _ in range(10):
if get_state() == dead_state:
break
@ -137,10 +137,12 @@ def test_global_state_actor_entry(ray_start_regular):
b_actor_id = b._actor_id.hex()
assert ray.state.actors(actor_id=a_actor_id)["ActorID"] == a_actor_id
assert ray.state.actors(
actor_id=a_actor_id)["State"] == gcs_utils.ActorTableData.ALIVE
actor_id=a_actor_id)["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE)
assert ray.state.actors(actor_id=b_actor_id)["ActorID"] == b_actor_id
assert ray.state.actors(
actor_id=b_actor_id)["State"] == gcs_utils.ActorTableData.ALIVE
actor_id=b_actor_id)["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE)
@pytest.mark.parametrize("max_shapes", [0, 2, -1])

View file

@ -14,7 +14,7 @@ import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
get_other_nodes, generate_system_config_map,
kill_actor_and_wait_for_failure, run_string_as_driver, wait_for_condition,
get_error_message, placement_group_assert_no_leak)
get_error_message, placement_group_assert_no_leak, convert_actor_state)
from ray.util.placement_group import get_current_placement_group
from ray.util.client.ray_client_helpers import connect_to_client_or_not
@ -395,7 +395,8 @@ def test_capture_child_actors(ray_start_cluster, connect_to_client):
# (why? The placement group has STRICT_PACK strategy).
node_id_set = set()
for actor_info in ray.state.actors().values():
if actor_info["State"] == gcs_utils.ActorTableData.ALIVE:
if actor_info["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE):
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)
@ -418,7 +419,8 @@ def test_capture_child_actors(ray_start_cluster, connect_to_client):
# placement group.
node_id_set = set()
for actor_info in ray.state.actors().values():
if actor_info["State"] == gcs_utils.ActorTableData.ALIVE:
if actor_info["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE):
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)
@ -441,7 +443,8 @@ def test_capture_child_actors(ray_start_cluster, connect_to_client):
# placement group.
node_id_set = set()
for actor_info in ray.state.actors().values():
if actor_info["State"] == gcs_utils.ActorTableData.ALIVE:
if actor_info["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE):
node_id = actor_info["Address"]["NodeID"]
node_id_set.add(node_id)

View file

@ -14,7 +14,8 @@ import ray._private.gcs_utils as gcs_utils
from ray.autoscaler._private.commands import debug_status
from ray._private.test_utils import (
generate_system_config_map, kill_actor_and_wait_for_failure,
run_string_as_driver, wait_for_condition, is_placement_group_removed)
run_string_as_driver, wait_for_condition, is_placement_group_removed,
convert_actor_state)
from ray.exceptions import RaySystemError
from ray._raylet import PlacementGroupID
from ray.util.placement_group import (PlacementGroup, placement_group,
@ -173,7 +174,8 @@ ray.shutdown()
def assert_alive_num_actor(expected_num_actor):
alive_num_actor = 0
for actor_info in ray.state.actors().values():
if actor_info["State"] == gcs_utils.ActorTableData.ALIVE:
if actor_info["State"] == convert_actor_state(
gcs_utils.ActorTableData.ALIVE):
alive_num_actor += 1
return alive_num_actor == expected_num_actor

View file

@ -14,7 +14,7 @@ import ray.cluster_utils
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
SignalActor, kill_actor_and_wait_for_failure, put_object,
wait_for_condition, new_scheduler_enabled)
wait_for_condition, new_scheduler_enabled, convert_actor_state)
logger = logging.getLogger(__name__)
@ -549,8 +549,9 @@ def test_basic_nested_ids(one_worker_100MiB):
def _all_actors_dead():
return all(actor["State"] == gcs_utils.ActorTableData.DEAD
for actor in list(ray.state.actors().values()))
return all(
actor["State"] == convert_actor_state(gcs_utils.ActorTableData.DEAD)
for actor in list(ray.state.actors().values()))
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")

View file

@ -21,6 +21,52 @@
#include "ray/gcs/pb_util.h"
#include "ray/stats/stats.h"
namespace {
const ray::rpc::ActorDeathCause GenRuntimeEnvFailedCause(const std::string &error_msg) {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_runtime_env_failed_context()->set_error_message(error_msg);
return death_cause;
}
const ray::rpc::ActorDeathCause GenNodeDiedCause() {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_node_died_context()->set_error_message(
"The actor is dead because its node has died.");
return death_cause;
}
const ray::rpc::ActorDeathCause GenWorkerDiedCause() {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_worker_died_context()->set_error_message(
"The actor is dead because its worker process has died.");
return death_cause;
}
const ray::rpc::ActorDeathCause GenOwnerDiedCause(
const WorkerID &owner_id, const ray::rpc::WorkerExitType disconnect_type) {
ray::rpc::ActorDeathCause death_cause;
ray::rpc::ActorDeathOwnerDiedContext owner_died_context;
owner_died_context.set_owner_id(owner_id.Binary());
owner_died_context.set_error_message("The actor is dead because its owner has died.");
owner_died_context.set_owner_worker_exit_type(disconnect_type);
death_cause.mutable_owner_died_context()->Swap(&owner_died_context);
return death_cause;
}
const ray::rpc::ActorDeathCause GenKilledByApplicationCause() {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_killed_by_app_context()->set_error_message(
"The actor is dead because it was killed by `ray.kill`.");
return death_cause;
}
const ray::rpc::ActorDeathCause GenActorOutOfScopeCause() {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_out_of_scope_context()->set_error_message(
"The actor is dead because because all references to the actor were removed.");
return death_cause;
}
} // namespace
namespace ray {
namespace gcs {
@ -308,7 +354,7 @@ void GcsActorManager::HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &r
bool force_kill = request.force_kill();
bool no_restart = request.no_restart();
if (no_restart) {
DestroyActor(actor_id);
DestroyActor(actor_id, GenKilledByApplicationCause());
} else {
KillActor(actor_id, force_kill, no_restart);
}
@ -596,13 +642,13 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
// Only destroy the actor if its owner is still alive. The actor may
// have already been destroyed if the owner died.
DestroyActor(actor_id);
DestroyActor(actor_id, GenActorOutOfScopeCause());
}
});
}
void GcsActorManager::DestroyActor(const ActorID &actor_id,
const rpc::ActorDeathCause *death_cause) {
const rpc::ActorDeathCause &death_cause) {
RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
actor_to_register_callbacks_.erase(actor_id);
@ -671,9 +717,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
auto time = current_sys_time_ms();
mutable_actor_table_data->set_end_time(time);
mutable_actor_table_data->set_timestamp(time);
if (death_cause != nullptr) {
mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause);
}
mutable_actor_table_data->mutable_death_cause()->CopyFrom(death_cause);
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
@ -688,16 +732,18 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
}));
}
absl::flat_hash_set<ActorID> GcsActorManager::GetUnresolvedActorsByOwnerNode(
const NodeID &node_id) const {
absl::flat_hash_set<ActorID> actor_ids;
absl::flat_hash_map<WorkerID, absl::flat_hash_set<ActorID>>
GcsActorManager::GetUnresolvedActorsByOwnerNode(const NodeID &node_id) const {
absl::flat_hash_map<WorkerID, absl::flat_hash_set<ActorID>> actor_ids_map;
auto iter = unresolved_actors_.find(node_id);
if (iter != unresolved_actors_.end()) {
for (auto &entry : iter->second) {
for (const auto &entry : iter->second) {
const auto &owner_id = entry.first;
auto &actor_ids = actor_ids_map[owner_id];
actor_ids.insert(entry.second.begin(), entry.second.end());
}
}
return actor_ids;
return actor_ids_map;
}
absl::flat_hash_set<ActorID> GcsActorManager::GetUnresolvedActorsByOwnerWorker(
@ -730,16 +776,6 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
"Worker ", worker_id.Hex(), " on node ", node_id.Hex(),
" exits, type=", rpc::WorkerExitType_Name(disconnect_type),
", has creation_task_exception = ", (creation_task_exception != nullptr));
std::unique_ptr<rpc::ActorDeathCause> death_cause = nullptr;
if (creation_task_exception != nullptr) {
absl::StrAppend(&message, " Formatted creation task exception: ",
creation_task_exception->formatted_exception_string());
death_cause = std::make_unique<rpc::ActorDeathCause>();
death_cause->mutable_creation_task_failure_context()
->mutable_creation_task_exception()
->CopyFrom(*creation_task_exception);
}
if (disconnect_type == rpc::WorkerExitType::INTENDED_EXIT ||
disconnect_type == rpc::WorkerExitType::IDLE_EXIT) {
RAY_LOG(DEBUG) << message;
@ -757,7 +793,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
// list.
const auto children_ids = owner->second.children_actor_ids;
for (const auto &child_id : children_ids) {
DestroyActor(child_id);
DestroyActor(child_id, GenOwnerDiedCause(worker_id, disconnect_type));
}
}
@ -767,7 +803,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
auto unresolved_actors = GetUnresolvedActorsByOwnerWorker(node_id, worker_id);
for (auto &actor_id : unresolved_actors) {
if (registered_actors_.count(actor_id)) {
DestroyActor(actor_id);
DestroyActor(actor_id, GenOwnerDiedCause(worker_id, disconnect_type));
}
}
@ -788,35 +824,46 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
}
}
rpc::ActorDeathCause death_cause;
if (creation_task_exception != nullptr) {
absl::StrAppend(&message, ": ",
creation_task_exception->formatted_exception_string());
death_cause.mutable_creation_task_failure_context()
->mutable_creation_task_exception()
->CopyFrom(*creation_task_exception);
} else {
death_cause = GenWorkerDiedCause();
}
// Otherwise, try to reconstruct the actor that was already created or in the creation
// process.
ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause.get());
ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause);
}
void GcsActorManager::OnNodeDead(const NodeID &node_id) {
RAY_LOG(INFO) << "Node " << node_id << " failed, reconstructing actors.";
// Kill all children of owner actors on a dead node.
const auto it = owners_.find(node_id);
if (it != owners_.end()) {
std::vector<ActorID> children_ids;
absl::flat_hash_map<WorkerID, ActorID> children_ids;
// Make a copy of all the actor IDs owned by workers on the dead node.
for (const auto &owner : it->second) {
for (const auto &child_id : owner.second.children_actor_ids) {
children_ids.push_back(child_id);
children_ids.emplace(owner.first, child_id);
}
}
for (const auto &child_id : children_ids) {
DestroyActor(child_id);
for (const auto &[owner_id, child_id] : children_ids) {
DestroyActor(child_id, GenOwnerDiedCause(owner_id, rpc::WorkerExitType::NODE_DIED));
}
}
// Cancel the scheduling of all related actors.
// Cancel scheduling actors that haven't been created on the node.
auto scheduling_actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id);
for (auto &actor_id : scheduling_actor_ids) {
// Reconstruct the canceled actor.
ReconstructActor(actor_id);
ReconstructActor(actor_id, /*need_reschedule=*/true, GenNodeDiedCause());
}
// Find all actors that were created on this node.
// Try reconstructing all workers created on the node.
auto iter = created_actors_.find(node_id);
if (iter != created_actors_.end()) {
auto created_actors = std::move(iter->second);
@ -824,7 +871,7 @@ void GcsActorManager::OnNodeDead(const NodeID &node_id) {
created_actors_.erase(iter);
for (auto &entry : created_actors) {
// Reconstruct the removed actor.
ReconstructActor(entry.second);
ReconstructActor(entry.second, /*need_reschedule=*/true, GenNodeDiedCause());
}
}
@ -832,19 +879,18 @@ void GcsActorManager::OnNodeDead(const NodeID &node_id) {
// case, these actors will never be created successfully. So we need to destroy them,
// to prevent actor tasks hang forever.
auto unresolved_actors = GetUnresolvedActorsByOwnerNode(node_id);
for (auto &actor_id : unresolved_actors) {
if (registered_actors_.count(actor_id)) {
DestroyActor(actor_id);
for (const auto &[owner_id, actor_ids] : unresolved_actors) {
for (const auto &actor_id : actor_ids) {
if (registered_actors_.count(actor_id)) {
DestroyActor(actor_id,
GenOwnerDiedCause(owner_id, rpc::WorkerExitType::NODE_DIED));
}
}
}
}
void GcsActorManager::ReconstructActor(const ActorID &actor_id) {
ReconstructActor(actor_id, /*need_reschedule=*/true);
}
void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_reschedule,
const rpc::ActorDeathCause *death_cause) {
const rpc::ActorDeathCause &death_cause) {
// If the owner and this actor is dead at the same time, the actor
// could've been destroyed and dereigstered before reconstruction.
auto iter = registered_actors_.find(actor_id);
@ -875,7 +921,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
RAY_LOG(INFO) << "Actor " << actor_id << " is failed on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", death context type = " << GetDeathCauseString(death_cause)
<< ", death context type = " << GetDeathCauseString(&death_cause)
<< ", remaining_restarts = " << remaining_restarts
<< ", job id = " << actor_id.JobId();
@ -899,21 +945,20 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
} else {
RemoveActorNameFromRegistry(actor);
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
if (death_cause != nullptr) {
mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause);
}
mutable_actor_table_data->mutable_death_cause()->CopyFrom(death_cause);
auto time = current_sys_time_ms();
mutable_actor_table_data->set_end_time(time);
mutable_actor_table_data->set_timestamp(time);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor, actor_id, mutable_actor_table_data](Status status) {
[this, actor, actor_id, mutable_actor_table_data, death_cause](Status status) {
// If actor was an detached actor, make sure to destroy it.
// We need to do this because detached actors are not destroyed
// when its owners are dead because it doesn't have owners.
if (actor->IsDetached()) {
DestroyActor(actor_id);
DestroyActor(actor_id, death_cause);
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
@ -934,13 +979,11 @@ void GcsActorManager::OnActorSchedulingFailed(std::shared_ptr<GcsActor> actor,
return;
}
auto death_cause = std::make_unique<rpc::ActorDeathCause>();
// TODO(sang, lixin) 1. Make this message more friendly 2. Show this message in
// object.get()'s error.
death_cause->mutable_runtime_env_setup_failure_context()->set_error_message(
"Cannot create an actor because the associated runtime env couldn't be created.");
// If there is runtime env failure, mark this actor as dead immediately.
DestroyActor(actor->GetActorID(), death_cause.get());
DestroyActor(
actor->GetActorID(),
GenRuntimeEnvFailedCause("Could not create the actor because its associated "
"runtime env failed to be created."));
}
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor,
@ -1210,7 +1253,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill,
RAY_LOG(DEBUG) << "The actor " << actor->GetActorID()
<< " hasn't been created yet, cancel scheduling " << task_id;
CancelActorInScheduling(actor, task_id);
ReconstructActor(actor_id, /*need_reschedule=*/true);
ReconstructActor(actor_id, /*need_reschedule=*/true, GenKilledByApplicationCause());
}
}

View file

@ -381,12 +381,14 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// scope or the owner has died.
/// NOTE: This method can be called multiple times in out-of-order and should be
/// idempotent.
void DestroyActor(const ActorID &actor_id,
const rpc::ActorDeathCause *death_cause = nullptr);
///
/// \param[in] actor_id The actor id to destroy.
/// \param[in] death_cause The reason why actor is destroyed.
void DestroyActor(const ActorID &actor_id, const rpc::ActorDeathCause &death_cause);
/// Get unresolved actors that were submitted from the specified node.
absl::flat_hash_set<ActorID> GetUnresolvedActorsByOwnerNode(
const NodeID &node_id) const;
absl::flat_hash_map<WorkerID, absl::flat_hash_set<ActorID>>
GetUnresolvedActorsByOwnerNode(const NodeID &node_id) const;
/// Get unresolved actors that were submitted from the specified worker.
absl::flat_hash_set<ActorID> GetUnresolvedActorsByOwnerWorker(
@ -401,10 +403,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param death_cause Context about why this actor is dead. Should only be set when
/// need_reschedule=false.
void ReconstructActor(const ActorID &actor_id, bool need_reschedule,
const rpc::ActorDeathCause *death_cause = nullptr);
/// Reconstruct the specified actor and reschedule it.
void ReconstructActor(const ActorID &actor_id);
const rpc::ActorDeathCause &death_cause);
/// Remove the specified actor from `unresolved_actors_`.
///

View file

@ -313,6 +313,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) {
// Remove worker and then check that the actor is dead.
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_worker_died_context());
// No more actors to schedule.
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
@ -358,6 +359,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) {
OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_node_died_context());
// No more actors to schedule.
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
@ -422,6 +424,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id2));
OnNodeDead(node_id2);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_node_died_context());
// No more actors to schedule.
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
@ -463,6 +466,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
OnNodeDead(owner_node_id);
// The child actor should be marked as dead.
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_owner_died_context());
ASSERT_EQ(worker_client_->killed_actors_.size(), 1);
ASSERT_EQ(worker_client_->killed_actors_.front(), actor->GetActorID());
@ -607,6 +611,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) {
// Remove worker and then check that the actor is dead.
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_worker_died_context());
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name, ""), ActorID::Nil());
// Create an actor with the same name. This ensures that the name has been properly
@ -655,6 +660,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) {
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_node_died_context());
// Create an actor with the same name. This ensures that the name has been properly
// deleted.
@ -744,6 +750,7 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) {
actor->UpdateAddress(RandomAddress());
gcs_actor_manager_->OnActorCreationSuccess(actor, rpc::PushTaskReply());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_out_of_scope_context());
}
TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
@ -781,6 +788,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id());
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
ASSERT_TRUE(actor->GetActorTableData().death_cause().has_owner_died_context());
}
TEST_F(GcsActorManagerTest, TestRegisterActor) {
@ -821,6 +829,8 @@ TEST_F(GcsActorManagerTest, TestOwnerWorkerDieBeforeActorDependenciesResolved) {
auto worker_id = WorkerID::FromBinary(owner_address.worker_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(
registered_actor->GetActorTableData().death_cause().has_owner_died_context());
// Make sure the actor gets cleaned up.
const auto &registered_actors = gcs_actor_manager_->GetRegisteredActors();
@ -837,6 +847,8 @@ TEST_F(GcsActorManagerTest, TestOwnerWorkerDieBeforeDetachedActorDependenciesRes
auto worker_id = WorkerID::FromBinary(owner_address.worker_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(
registered_actor->GetActorTableData().death_cause().has_owner_died_context());
// Make sure the actor gets cleaned up.
const auto &registered_actors = gcs_actor_manager_->GetRegisteredActors();
@ -852,6 +864,8 @@ TEST_F(GcsActorManagerTest, TestOwnerNodeDieBeforeActorDependenciesResolved) {
auto node_id = NodeID::FromBinary(owner_address.raylet_id());
OnNodeDead(node_id);
ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(
registered_actor->GetActorTableData().death_cause().has_owner_died_context());
// Make sure the actor gets cleaned up.
const auto &registered_actors = gcs_actor_manager_->GetRegisteredActors();
@ -867,6 +881,8 @@ TEST_F(GcsActorManagerTest, TestOwnerNodeDieBeforeDetachedActorDependenciesResol
auto node_id = NodeID::FromBinary(owner_address.raylet_id());
OnNodeDead(node_id);
ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_TRUE(
registered_actor->GetActorTableData().death_cause().has_owner_died_context());
// Make sure the actor gets cleaned up.
const auto &registered_actors = gcs_actor_manager_->GetRegisteredActors();

View file

@ -133,7 +133,7 @@ inline rpc::ErrorType GenErrorTypeFromDeathCause(
if (death_cause->context_case() == ContextCase::kCreationTaskFailureContext) {
return rpc::ErrorType::ACTOR_DIED;
}
if (death_cause->context_case() == ContextCase::kRuntimeEnvSetupFailureContext) {
if (death_cause->context_case() == ContextCase::kRuntimeEnvFailedContext) {
return rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED;
}
return rpc::ErrorType::ACTOR_DIED;
@ -142,13 +142,21 @@ inline rpc::ErrorType GenErrorTypeFromDeathCause(
inline const std::string &GetDeathCauseString(const rpc::ActorDeathCause *death_cause) {
static absl::flat_hash_map<ContextCase, std::string> death_cause_string{
{ContextCase::CONTEXT_NOT_SET, "CONTEXT_NOT_SET"},
{ContextCase::kRuntimeEnvFailedContext, "RuntimeEnvFailedContext"},
{ContextCase::kCreationTaskFailureContext, "CreationTaskFailureContext"},
{ContextCase::kRuntimeEnvSetupFailureContext, "RuntimeEnvSetupFailureContext"}};
{ContextCase::kWorkerDiedContext, "WorkerDiedContext"},
{ContextCase::kNodeDiedContext, "NodeDiedContext"},
{ContextCase::kOwnerDiedContext, "OwnerDiedContext"},
{ContextCase::kKilledByAppContext, "KilledByAppContext"},
{ContextCase::kOutOfScopeContext, "OutOfScopeContext"}};
ContextCase death_cause_case = ContextCase::CONTEXT_NOT_SET;
if (death_cause != nullptr) {
death_cause_case = death_cause->context_case();
}
return death_cause_string.at(death_cause_case);
auto it = death_cause_string.find(death_cause_case);
RAY_CHECK(it != death_cause_string.end())
<< "Given death cause case " << death_cause_case << " doesn't exist.";
return it->second;
}
/// Get the error information from the actor death cause.

View file

@ -527,6 +527,8 @@ enum WorkerExitType {
CREATION_TASK_ERROR = 4;
// Worker killed by raylet if it has been idle for too long.
IDLE_EXIT = 5;
// Worker killed because its node is dead.
NODE_DIED = 6;
}
///////////////////////////////////////////////////////////////////////////////

View file

@ -98,22 +98,52 @@ message TaskTableData {
message ActorDeathCause {
oneof context {
CreationTaskFailureContext creation_task_failure_context = 1;
RuntimeEnvSetupFailureContext runtime_env_setup_failure_context = 2;
ActorDeathRuntimeEnvFailedContext runtime_env_failed_context = 2;
ActorDeathWorkerDiedContext worker_died_context = 3;
ActorDeathNodeDiedContext node_died_context = 4;
ActorDeathOwnerDiedContext owner_died_context = 5;
ActorDeathKilledByApplicationContext killed_by_app_context = 6;
ActorDeathOutOfScopeContext out_of_scope_context = 7;
}
}
// ---Actor death contexts start----
// Indicates that this actor is marked as DEAD due to actor creation task failure.
// TODO(sang): Refactor it to ActorInitError
message CreationTaskFailureContext {
// The exception thrown in creation task.
RayException creation_task_exception = 1;
}
// Indicates that this actor is marked as DEAD due to runtime environment setup failure.
message RuntimeEnvSetupFailureContext {
message ActorDeathRuntimeEnvFailedContext {
// TODO(sang,lixin) Get this error message from agent.
string error_message = 1;
}
message ActorDeathWorkerDiedContext {
string error_message = 1;
}
message ActorDeathNodeDiedContext {
string error_message = 1;
}
message ActorDeathOwnerDiedContext {
// The id of the dead owner.
bytes owner_id = 1;
string error_message = 2;
// The exit type of dead owner.
WorkerExitType owner_worker_exit_type = 3;
}
message ActorDeathKilledByApplicationContext {
string error_message = 1;
}
message ActorDeathOutOfScopeContext {
string error_message = 1;
}
// ---Actor death contexts end----
message ActorTableData {
@ -180,7 +210,7 @@ message ActorTableData {
// The actor's class name. This is necessary because the task spec's lifetime
// is shorter than the ActorTableData.
string class_name = 23;
// Will only be set when state=DEAD. Offers detailed context of why this actor is dead.
// Contains metadata about why the actor is dead.
ActorDeathCause death_cause = 24;
}