[Part 5] Set actor died error message in ActorDiedError (#20903)

This is the second last PR to improve `ActorDiedError` exception. 

This propagates Actor death cause metadata to the ray error object. In this way, we can raise a better actor died error exception.

After this PR is merged, I will add more metadata to each error message and write a documentation that explains when each error happens. 

TODO
- [x] Fix test failures
- [x] Add unit tests
- [x] Fix Java/cpp cases

Follow up PRs
- Not allowing nullptr for RayErrorInfo input.
This commit is contained in:
SangBin Cho 2022-01-21 15:11:11 +09:00 committed by GitHub
parent 9728d98586
commit 5514711a35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 273 additions and 136 deletions

View file

@ -26,14 +26,19 @@ public class RayException extends RuntimeException {
return builder.build().toByteArray();
}
public static RayException fromRayExceptionPB(
io.ray.runtime.generated.Common.RayException rayExceptionPB) {
if (rayExceptionPB.getLanguage() == Language.JAVA) {
return Serializer.decode(
rayExceptionPB.getSerializedException().toByteArray(), RayException.class);
} else {
return new CrossLanguageException(rayExceptionPB);
}
}
public static RayException fromBytes(byte[] serialized) throws InvalidProtocolBufferException {
io.ray.runtime.generated.Common.RayException exception =
io.ray.runtime.generated.Common.RayException.parseFrom(serialized);
if (exception.getLanguage() == Language.JAVA) {
return Serializer.decode(
exception.getSerializedException().toByteArray(), RayException.class);
} else {
return new CrossLanguageException(exception);
}
return fromRayExceptionPB(exception);
}
}

View file

@ -100,7 +100,7 @@ public class ObjectSerializer {
} else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) {
ActorId actorId = IdUtil.getActorIdFromObjectId(objectId);
if (data != null && data.length > 0) {
RayException exception = deserializeRayException(data, objectId);
RayException exception = deserializeActorException(data, actorId, objectId);
if (exception != null) {
return exception;
}
@ -219,12 +219,44 @@ public class ObjectSerializer {
// No RayException provided
return null;
}
try {
return RayException.fromBytes(pbData);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Can't deserialize RayActorCreationTaskException object: " + objectId.toString());
"Can't deserialize RayActorCreationTaskException object: " + objectId.toString(), e);
}
}
private static RayException deserializeActorException(
byte[] msgPackData, ActorId actorId, ObjectId objectId) {
// Serialization logic of task execution exception: an instance of
// `io.ray.runtime.exception.RayTaskException`
// -> a `RayException` protobuf message
// -> protobuf-serialized bytes
// -> MessagePack-serialized bytes.
// So here the `data` variable is MessagePack-serialized bytes, and the `serialized`
// variable is protobuf-serialized bytes. They are not the same.
byte[] pbData = Serializer.decode(msgPackData, byte[].class);
if (pbData == null || pbData.length == 0) {
// No RayException provided
return null;
}
try {
io.ray.runtime.generated.Common.RayErrorInfo rayErrorInfo =
io.ray.runtime.generated.Common.RayErrorInfo.parseFrom(pbData);
if (rayErrorInfo.getActorDiedError().hasCreationTaskFailureContext()) {
return RayException.fromRayExceptionPB(
rayErrorInfo.getActorDiedError().getCreationTaskFailureContext());
} else {
// TODO(lixin) Generate friendly error message from RayErrorInfo.ActorDiedError's field
// type.
return new RayActorException(actorId);
}
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Can't deserialize RayActorCreationTaskException object: " + objectId.toString(), e);
}
}
}

View file

@ -774,7 +774,7 @@ cdef CRayStatus task_execution_handler(
except Exception as e:
sys_exit = SystemExit()
if isinstance(e, RayActorError) and \
e.has_creation_task_error():
e.actor_init_failed:
traceback_str = str(e)
logger.error("Exception raised "
f"in creation task: {traceback_str}")

View file

@ -1,11 +1,13 @@
import os
from traceback import format_exception
from typing import Union
import ray.cloudpickle as pickle
from ray.core.generated.common_pb2 import RayException, Language, PYTHON
from ray.core.generated.common_pb2 import Address
from ray.core.generated.common_pb2 import Address, ActorDiedErrorContext
import ray.ray_constants as ray_constants
from ray._raylet import WorkerID
from ray._raylet import WorkerID, ActorID
import colorama
import setproctitle
@ -27,6 +29,10 @@ class RayError(Exception):
def from_bytes(b):
ray_exception = RayException()
ray_exception.ParseFromString(b)
return RayError.from_ray_exception(ray_exception)
@staticmethod
def from_ray_exception(ray_exception):
if ray_exception.language == PYTHON:
try:
return pickle.loads(ray_exception.serialized_exception)
@ -83,7 +89,6 @@ class RayTaskError(RayError):
actor_repr=None):
"""Initialize a RayTaskError."""
import ray
# BaseException implements a __reduce__ method that returns
# a tuple with the type and the value of self.args.
# https://stackoverflow.com/a/49715949/2213289
@ -219,46 +224,61 @@ class RayActorError(RayError):
executing a task, or because a task is submitted to a dead actor.
If the actor is dead because of an exception thrown in its creation tasks,
RayActorError will contains this exception.
RayActorError will contain the creation_task_error, which is used to
reconstruct the exception on the caller side.
cause: The cause of the actor error. `RayTaskError` type means
the actor has died because of an exception within `__init__`.
`ActorDiedErrorContext` means the actor has died because of
unexepected system error. None means the cause is not known.
Theoretically, this should not happen,
but it is there as a safety check.
"""
def __init__(self,
function_name=None,
traceback_str=None,
cause=None,
proctitle=None,
pid=None,
ip=None):
# Traceback handling is similar to RayTaskError, so we create a
# RayTaskError to reuse its function.
# But we don't want RayActorError to inherit from RayTaskError, since
# they have different meanings.
self.creation_task_error = None
if function_name and traceback_str and cause:
self.creation_task_error = RayTaskError(
function_name, traceback_str, cause, proctitle, pid, ip)
cause: Union[RayTaskError, ActorDiedErrorContext] = None):
# -- If the actor has failed in the middle of __init__, this is set. --
self._actor_init_failed = False
# -- The base actor error message. --
self.base_error_msg = (
"The actor died unexpectedly before finishing this task.")
def has_creation_task_error(self):
return self.creation_task_error is not None
if not cause:
self.error_msg = self.base_error_msg
elif isinstance(cause, RayTaskError):
self._actor_init_failed = True
self.error_msg = ("The actor died because of an error"
" raised in its creation task, "
f"{cause.__str__()}")
else:
# Inidicating system-level actor failures.
assert isinstance(cause, ActorDiedErrorContext)
error_msg_lines = [self.base_error_msg]
error_msg_lines.append(f"\tclass_name: {cause.class_name}")
error_msg_lines.append(
f"\tactor_id: {ActorID(cause.actor_id).hex()}")
# Below items are optional fields.
if cause.pid != 0:
error_msg_lines.append(f"\tpid: {cause.pid}")
if cause.name != "":
error_msg_lines.append(f"\tname: {cause.name}")
if cause.ray_namespace != "":
error_msg_lines.append(f"\tnamespace: {cause.ray_namespace}")
if cause.node_ip_address != "":
error_msg_lines.append(f"\tip: {cause.node_ip_address}")
error_msg_lines.append(cause.error_message)
self.error_msg = "\n".join(error_msg_lines)
def get_creation_task_error(self):
if self.creation_task_error is not None:
return self.creation_task_error
return None
@property
def actor_init_failed(self) -> bool:
return self._actor_init_failed
def __str__(self):
if self.creation_task_error:
return ("The actor died because of an error" +
" raised in its creation task, " +
self.creation_task_error.__str__())
return ("The actor died unexpectedly before finishing this task.")
def __str__(self) -> str:
return self.error_msg
@staticmethod
def from_task_error(task_error):
return RayActorError(task_error.function_name,
task_error.traceback_str, task_error.cause,
task_error.proctitle, task_error.pid,
task_error.ip)
def from_task_error(task_error: RayTaskError):
return RayActorError(task_error)
class RaySystemError(RayError):

View file

@ -6,6 +6,7 @@ import ray.cloudpickle as pickle
from ray import ray_constants
import ray._private.utils
from ray._private.gcs_utils import ErrorType
from ray.core.generated.common_pb2 import RayErrorInfo
from ray.exceptions import (
RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError,
TaskCancelledError, WorkerCrashedError, ObjectLostError,
@ -184,6 +185,25 @@ class SerializationContext:
raise DeserializationError()
return obj
def _deserialize_actor_died_error(self, data, metadata_fields):
if not data:
return RayActorError()
pb_bytes = self._deserialize_msgpack_data(data, metadata_fields)
assert pb_bytes
ray_error_info = RayErrorInfo()
ray_error_info.ParseFromString(pb_bytes)
assert ray_error_info.HasField("actor_died_error")
if ray_error_info.actor_died_error.HasField(
"creation_task_failure_context"):
return RayError.from_ray_exception(
ray_error_info.actor_died_error.creation_task_failure_context)
else:
assert ray_error_info.actor_died_error.HasField(
"actor_died_error_context")
return RayActorError(
cause=ray_error_info.actor_died_error.actor_died_error_context)
def _deserialize_object(self, data, metadata, object_ref):
if metadata:
metadata_fields = metadata.split(b",")
@ -218,12 +238,8 @@ class SerializationContext:
elif error_type == ErrorType.Value("WORKER_DIED"):
return WorkerCrashedError()
elif error_type == ErrorType.Value("ACTOR_DIED"):
if data:
pb_bytes = self._deserialize_msgpack_data(
data, metadata_fields)
if pb_bytes:
return RayError.from_bytes(pb_bytes)
return RayActorError()
return self._deserialize_actor_died_error(
data, metadata_fields)
elif error_type == ErrorType.Value("LOCAL_RAYLET_DIED"):
return LocalRayletDiedError()
elif error_type == ErrorType.Value("TASK_CANCELLED"):

View file

@ -665,6 +665,69 @@ def test_recreate_child_actor(ray_start_cluster):
ray.get(p.ready.remote())
def test_actor_failure_per_type(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node()
ray.init(address="auto")
@ray.remote
class Actor:
def check_alive(self):
return os.getpid()
def create_actor(self):
self.a = Actor.remote()
return self.a
# Test actor is dead because its reference is gone.
# Q(sang): Should we raise RayACtorError in this case?
with pytest.raises(
RuntimeError, match="Lost reference to actor") as exc_info:
ray.get(Actor.remote().check_alive.remote())
print(exc_info._excinfo[1])
# Test actor killed by ray.kill
a = Actor.remote()
ray.kill(a)
with pytest.raises(
ray.exceptions.RayActorError,
match="it was killed by `ray.kill") as exc_info:
ray.get(a.check_alive.remote())
print(exc_info._excinfo[1])
# Test actor killed because of worker failure.
a = Actor.remote()
pid = ray.get(a.check_alive.remote())
os.kill(pid, 9)
with pytest.raises(
ray.exceptions.RayActorError,
match=("The actor is dead because its worker process has died"
)) as exc_info:
ray.get(a.check_alive.remote())
print(exc_info._excinfo[1])
# Test acator killed because of owner failure.
owner = Actor.remote()
a = ray.get(owner.create_actor.remote())
ray.kill(owner)
with pytest.raises(
ray.exceptions.RayActorError,
match="The actor is dead because its owner has died") as exc_info:
ray.get(a.check_alive.remote())
print(exc_info._excinfo[1])
# Test actor killed because the node is dead.
node_to_kill = cluster.add_node(resources={"worker": 1})
a = Actor.options(resources={"worker": 1}).remote()
ray.get(a.check_alive.remote())
cluster.remove_node(node_to_kill)
with pytest.raises(
ray.exceptions.RayActorError,
match="The actor is dead because its node has died.") as exc_info:
ray.get(a.check_alive.remote())
print(exc_info._excinfo[1])
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -441,7 +441,7 @@ def test_basic_reconstruction_actor_constructor(ray_start_cluster,
ray.get(a.dependent_task.remote(obj))
return True
except ray.exceptions.RayActorError as e:
return e.has_creation_task_error()
return e.actor_init_failed
except (ray.exceptions.RayTaskError, ray.exceptions.ObjectLostError):
return True

View file

@ -27,9 +27,11 @@ these tests will fail.
def scrub_traceback(ex):
assert isinstance(ex, str)
print(ex)
ex = ex.strip("\n")
ex = re.sub("pid=.*,", "pid=XXX,", ex)
ex = re.sub("ip=.*\)", "ip=YYY)", ex)
ex = re.sub("pid=[0-9]+,", "pid=XXX,", ex)
ex = re.sub("ip=[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+", "ip=YYY", ex)
ex = re.sub("repr=.*\)", "repr=ZZZ)", ex)
ex = re.sub("line .*,", "line ZZ,", ex)
ex = re.sub('".*"', '"FILE"', ex)
# These are used to coloring the string.
@ -52,7 +54,7 @@ def clean_noqa(ex):
reason="Clean stacktrace not supported on Windows")
def test_actor_creation_stacktrace(ray_start_regular):
"""Test the actor creation task stacktrace."""
expected_output = """The actor died because of an error raised in its creation task, ray::A.__init__() (pid=XXX, ip=YYY) # noqa
expected_output = """The actor died because of an error raised in its creation task, ray::A.__init__() (pid=XXX, ip=YYY, repr=ZZZ) # noqa
File "FILE", line ZZ, in __init__
g(3)
File "FILE", line ZZ, in g
@ -113,7 +115,7 @@ ValueError: 7"""
reason="Clean stacktrace not supported on Windows")
def test_actor_task_stacktrace(ray_start_regular):
"""Test the actor task stacktrace."""
expected_output = """ray::A.f() (pid=XXX, repr=<test_traceback.A object at ADDRESS>) # noqa
expected_output = """ray::A.f() (pid=XXX, ip=YYY, repr=ZZZ) # noqa
File "FILE", line ZZ, in f
return g(c)
File "FILE", line ZZ, in g
@ -274,7 +276,7 @@ def test_actor_repr_in_traceback(ray_start_regular):
def test_unpickleable_stacktrace(shutdown_only):
expected_output = """System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
File "FILE", line ZZ, in from_bytes
File "FILE", line ZZ, in from_ray_exception
return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 1 required positional argument: 'arg'
@ -286,6 +288,8 @@ Traceback (most recent call last):
File "FILE", line ZZ, in _deserialize_object
return RayError.from_bytes(obj)
File "FILE", line ZZ, in from_bytes
return RayError.from_ray_exception(ray_exception)
File "FILE", line ZZ, in from_ray_exception
raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception"""
@ -306,7 +310,6 @@ RuntimeError: Failed to unpickle serialized exception"""
try:
ray.get(f.remote())
except Exception as ex:
print(repr(scrub_traceback(str(ex))))
assert clean_noqa(expected_output) == scrub_traceback(str(ex))

View file

@ -17,7 +17,6 @@
#include "msgpack.hpp"
namespace {
std::shared_ptr<ray::LocalMemoryBuffer> MakeBufferFromString(const uint8_t *data,
size_t data_size) {
auto metadata = const_cast<uint8_t *>(data);
@ -51,7 +50,7 @@ std::shared_ptr<ray::LocalMemoryBuffer> MakeErrorMetadataBuffer(
/// \param protobuf_message The protobuf message to serialize.
/// \return The buffer that contains serialized msgpack message.
template <class ProtobufMessage>
std::shared_ptr<ray::LocalMemoryBuffer> MakeSerializeErrorBuffer(
std::shared_ptr<ray::LocalMemoryBuffer> MakeSerializedErrorBuffer(
const ProtobufMessage &protobuf_message) {
// Structure of bytes stored in object store:
@ -96,12 +95,7 @@ RayObject::RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_err
return;
}
RAY_CHECK(ray_error_info->has_actor_init_failure());
// This is temporarily here because changing this requires changes in all language
// frontend.
// TODO(sang, lixin): Remove it.
const auto error_buffer =
MakeSerializeErrorBuffer<rpc::RayException>(ray_error_info->actor_init_failure());
const auto error_buffer = MakeSerializedErrorBuffer<rpc::RayErrorInfo>(*ray_error_info);
Init(std::move(error_buffer), MakeErrorMetadataBuffer(error_type), {});
return;
}

View file

@ -224,14 +224,14 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
<< ", raylet_id: " << NodeID::FromBinary(actor_data.address().raylet_id())
<< ", num_restarts: " << actor_data.num_restarts()
<< ", death context type="
<< gcs::GetDeathCauseString(&actor_data.death_cause());
<< gcs::GetActorDeathCauseString(actor_data.death_cause());
if (actor_data.state() == rpc::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(),
/*is_dead=*/false);
/*is_dead=*/false, actor_data.death_cause());
} else if (actor_data.state() == rpc::ActorTableData::DEAD) {
OnActorKilled(actor_id);
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(),
/*is_dead=*/true, &actor_data.death_cause());
/*is_dead=*/true, actor_data.death_cause());
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.

View file

@ -145,6 +145,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param[in] error_type The type of the specific error.
/// \param[in] status Optional status message.
/// \param[in] ray_error_info The error information of a given error type.
/// Nullptr means that there's no error information.
/// TODO(sang): Remove nullptr case. Every error message should have metadata.
/// \param[in] mark_task_object_failed whether or not it marks the task
/// return object as failed.
/// \return Whether the task will be retried or not.

View file

@ -82,7 +82,7 @@ class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterf
MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts));
MOCK_METHOD4(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts,
bool dead, const rpc::ActorDeathCause *death_cause));
bool dead, const rpc::ActorDeathCause &death_cause));
MOCK_METHOD3(KillActor,
void(const ActorID &actor_id, bool force_kill, bool no_restart));

View file

@ -37,6 +37,12 @@ using ::testing::_;
using ::testing::ElementsAre;
using ::testing::Return;
rpc::ActorDeathCause CreateMockDeathCause() {
ray::rpc::ActorDeathCause death_cause;
death_cause.mutable_runtime_env_failed_context()->set_error_message("failed");
return death_cause;
}
TaskSpecification CreateActorTaskHelper(ActorID actor_id, WorkerID caller_worker_id,
int64_t counter,
TaskID caller_id = TaskID::Nil()) {
@ -310,11 +316,12 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) {
}
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
// Actor marked as dead. All queued tasks should get failed.
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/true);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/true, death_cause);
}
TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
@ -349,7 +356,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
// Third task fails after the actor is disconnected. It should not get
// retried.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
@ -404,7 +412,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
// Third task fails after the actor is disconnected.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
@ -460,7 +469,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), /*index=*/1));
// Simulate the actor failing.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""), /*index=*/0));
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
// Actor gets restarted.
addr.set_port(1);
@ -519,7 +529,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// We receive the RESTART message late. Nothing happens.
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 2);
@ -528,7 +539,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// The actor dies twice. We receive the last RESTART message first.
submitter_.DisconnectActor(actor_id, 3, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/false, death_cause);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 3);
@ -539,17 +550,17 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
// We receive the late messages. Nothing happens.
addr.set_port(2);
submitter_.ConnectActor(actor_id, addr, 2);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/false, death_cause);
ASSERT_EQ(num_clients_connected_, 2);
// The actor dies permanently. All tasks are failed.
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/true);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/true, death_cause);
ASSERT_EQ(num_clients_connected_, 2);
// We receive more late messages. Nothing happens because the actor is dead.
submitter_.DisconnectActor(actor_id, 4, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 4, /*dead=*/false, death_cause);
addr.set_port(3);
submitter_.ConnectActor(actor_id, addr, 4);
ASSERT_EQ(num_clients_connected_, 2);
@ -590,7 +601,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
.Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
const auto death_cause = CreateMockDeathCause();
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause);
// The task replies are now received. Since the tasks are already failed, they will not
// be marked as failed or finished again.

View file

@ -120,19 +120,19 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
// Do not hold the lock while calling into task_finisher_.
task_finisher_.MarkTaskCanceled(task_id);
rpc::ErrorType error_type;
std::unique_ptr<rpc::RayErrorInfo> error_info = nullptr;
rpc::RayErrorInfo error_info;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(task_spec.ActorId());
auto &death_cause = queue->second.death_cause;
error_type = GenErrorTypeFromDeathCause(death_cause.get());
error_info = GetErrorInfoFromActorDeathCause(death_cause.get());
const auto queue_it = client_queues_.find(task_spec.ActorId());
const auto &death_cause = queue_it->second.death_cause;
error_type = GenErrorTypeFromDeathCause(death_cause);
error_info = GetErrorInfoFromActorDeathCause(death_cause);
}
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status,
error_info.get()));
&error_info));
}
// If the task submission subsequently fails, then the client will receive
@ -223,9 +223,9 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const rpc::ActorDeathCause *death_cause) {
const rpc::ActorDeathCause &death_cause) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id
<< ", death context type=" << GetDeathCauseString(death_cause);
<< ", death context type=" << GetActorDeathCauseString(death_cause);
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
@ -255,9 +255,7 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
if (dead) {
queue->second.state = rpc::ActorTableData::DEAD;
if (death_cause != nullptr) {
queue->second.death_cause = std::make_unique<rpc::ActorDeathCause>(*death_cause);
}
queue->second.death_cause = death_cause;
// If there are pending requests, treat the pending tasks as failed.
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id
<< " because the actor is already dead.";
@ -272,7 +270,7 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status,
error_info.get()));
&error_info));
}
auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks;
@ -280,8 +278,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
RAY_LOG(INFO) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_.MarkTaskReturnObjectsFailed(
net_err_task.second, error_type, error_info.get()));
RAY_UNUSED(task_finisher_.MarkTaskReturnObjectsFailed(net_err_task.second,
error_type, &error_info));
}
// No need to clean up tasks that have been sent and are waiting for
@ -422,13 +420,13 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
auto &queue = queue_pair->second;
bool is_actor_dead = (queue.state == rpc::ActorTableData::DEAD);
const auto error_info =
GetErrorInfoFromActorDeathCause(queue.death_cause.get());
const auto &death_cause = queue.death_cause;
const auto &error_info = GetErrorInfoFromActorDeathCause(death_cause);
const auto &error_type = GenErrorTypeFromDeathCause(death_cause);
// If the actor is already dead, immediately mark the task object is failed.
// Otherwise, it will have grace period until it makrs the object is dead.
will_retry = task_finisher_.FailOrRetryPendingTask(
task_id, GenErrorTypeFromDeathCause(queue.death_cause.get()), &status,
error_info.get(),
task_id, error_type, &status, &error_info,
/*mark_task_object_failed*/ is_actor_dead);
if (!is_actor_dead && !will_retry) {
// No retry == actor is dead.

View file

@ -51,7 +51,7 @@ class CoreWorkerDirectActorTaskSubmitterInterface {
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
virtual void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead,
const rpc::ActorDeathCause *death_cause = nullptr) = 0;
const rpc::ActorDeathCause &death_cause) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;
virtual void CheckTimeoutTasks() = 0;
@ -123,7 +123,7 @@ class CoreWorkerDirectActorTaskSubmitter
/// pending tasks for the actor should be failed.
/// \param[in] death_cause Context about why this actor is dead.
void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead,
const rpc::ActorDeathCause *death_cause = nullptr);
const rpc::ActorDeathCause &death_cause);
/// Set the timerstamp for the caller.
void SetCallerCreationTimestamp(int64_t timestamp);
@ -159,8 +159,9 @@ class CoreWorkerDirectActorTaskSubmitter
/// an RPC client to the actor. If this is DEAD, then all tasks in the
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY;
/// Only applies when state=DEAD.
std::unique_ptr<rpc::ActorDeathCause> death_cause = nullptr;
/// The reason why this actor is dead.
/// If the context is not set, it means the actor is not dead.
rpc::ActorDeathCause death_cause;
/// How many times this actor has been restarted before. Starts at -1 to
/// indicate that the actor is not yet created. This is used to drop stale
/// messages from the GCS.

View file

@ -54,8 +54,7 @@ const ray::rpc::ActorDeathCause GenNodeDiedCause(const ray::gcs::GcsActor *actor
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(absl::StrCat(
"(ip=", ip_address,
") The actor is dead because its node has died. Node Id: ", node_id.Hex()));
"The actor is dead because its node has died. Node Id: ", node_id.Hex()));
return death_cause;
}
@ -66,8 +65,7 @@ const ray::rpc::ActorDeathCause GenWorkerDiedCause(
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(absl::StrCat(
"(ip=", ip_address,
") The actor is dead because its worker process has died. Worker exit type: ",
"The actor is dead because its worker process has died. Worker exit type: ",
ray::rpc::WorkerExitType_Name(disconnect_type)));
return death_cause;
}
@ -976,7 +974,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
RAY_LOG(INFO) << "Actor " << actor_id << " is failed on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", death context type = " << GetDeathCauseString(&death_cause)
<< ", death context type = " << GetActorDeathCauseString(death_cause)
<< ", remaining_restarts = " << remaining_restarts
<< ", job id = " << actor_id.JobId();

View file

@ -126,49 +126,42 @@ inline const rpc::RayException *GetCreationTaskExceptionFromDeathCause(
/// Generate object error type from ActorDeathCause.
inline rpc::ErrorType GenErrorTypeFromDeathCause(
const rpc::ActorDeathCause *death_cause) {
if (death_cause == nullptr) {
const rpc::ActorDeathCause &death_cause) {
if (death_cause.context_case() == ContextCase::kCreationTaskFailureContext) {
return rpc::ErrorType::ACTOR_DIED;
}
if (death_cause->context_case() == ContextCase::kCreationTaskFailureContext) {
return rpc::ErrorType::ACTOR_DIED;
}
if (death_cause->context_case() == ContextCase::kRuntimeEnvFailedContext) {
} else if (death_cause.context_case() == ContextCase::kRuntimeEnvFailedContext) {
return rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED;
} else {
return rpc::ErrorType::ACTOR_DIED;
}
return rpc::ErrorType::ACTOR_DIED;
}
inline const std::string &GetDeathCauseString(const rpc::ActorDeathCause *death_cause) {
inline const std::string &GetActorDeathCauseString(
const rpc::ActorDeathCause &death_cause) {
static absl::flat_hash_map<ContextCase, std::string> death_cause_string{
{ContextCase::CONTEXT_NOT_SET, "CONTEXT_NOT_SET"},
{ContextCase::kRuntimeEnvFailedContext, "RuntimeEnvFailedContext"},
{ContextCase::kCreationTaskFailureContext, "CreationTaskFailureContext"},
{ContextCase::kActorDiedErrorContext, "ActorDiedErrorContext"}};
ContextCase death_cause_case = ContextCase::CONTEXT_NOT_SET;
if (death_cause != nullptr) {
death_cause_case = death_cause->context_case();
}
auto it = death_cause_string.find(death_cause_case);
auto it = death_cause_string.find(death_cause.context_case());
RAY_CHECK(it != death_cause_string.end())
<< "Given death cause case " << death_cause_case << " doesn't exist.";
<< "Given death cause case " << death_cause.context_case() << " doesn't exist.";
return it->second;
}
/// Get the error information from the actor death cause.
///
/// \param[in] death_cause The rpc message that contains the actos death information.
/// \return RayErrorInfo that has propagated death cause. Nullptr if not sufficient
/// information is provided.
inline std::unique_ptr<rpc::RayErrorInfo> GetErrorInfoFromActorDeathCause(
const rpc::ActorDeathCause *death_cause) {
auto creation_task_exception = GetCreationTaskExceptionFromDeathCause(death_cause);
if (creation_task_exception == nullptr) {
return nullptr;
}
auto error_info = std::make_unique<rpc::RayErrorInfo>();
if (creation_task_exception != nullptr) {
// Shouldn't use Swap here because we don't own the pointer.
error_info->mutable_actor_init_failure()->CopyFrom(*creation_task_exception);
/// \return RayErrorInfo that has propagated death cause.
inline rpc::RayErrorInfo GetErrorInfoFromActorDeathCause(
const rpc::ActorDeathCause &death_cause) {
rpc::RayErrorInfo error_info;
if (death_cause.context_case() == ContextCase::kRuntimeEnvFailedContext ||
death_cause.context_case() == ContextCase::kActorDiedErrorContext ||
death_cause.context_case() == ContextCase::kCreationTaskFailureContext) {
error_info.mutable_actor_died_error()->CopyFrom(death_cause);
} else {
RAY_CHECK(death_cause.context_case() == ContextCase::CONTEXT_NOT_SET);
}
return error_info;
}

View file

@ -181,7 +181,7 @@ enum ErrorType {
/// The information per ray error type.
message RayErrorInfo {
oneof error {
RayException actor_init_failure = 1;
ActorDeathCause actor_died_error = 2;
}
}