[RuntimeEnv] Raise RuntimeEnvSetupError when Actor Creation Failed due to It (#19888)

* ray_pkg passed

* fix

* fix typo

* fix test

* fix test

* fix test

* fix

* draft

* compile OK

* lint

* fix

* lint

* fix ci

* Update src/ray/gcs/gcs_server/gcs_actor_manager.cc

Co-authored-by: SangBin Cho <rkooo567@gmail.com>

* remove comment

* rename

* resolve conflict

* use unique ownership

* use DestroyActor instead of ReconstructActor

* fix sigment fault

* fix crash in debug log

* Revert "fix crash in debug log"

This reverts commit 8f0e3d37f062b664d8d0e07c6c1a9a715b8ba1ee.

Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
Lixin Wei 2021-11-15 23:43:35 +08:00 committed by GitHub
parent fa878e2d4d
commit b7e35acf14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 232 additions and 142 deletions

View file

@ -273,9 +273,8 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
# For status that is not DEPENDENCIES_UNREADY, only states fields will
# be published.
elif actor_data_dict["state"] in ("ALIVE", "DEAD"):
assert actor_data_dict.keys() == {
"state", "address", "timestamp", "pid",
"creationTaskException", "rayNamespace"
assert actor_data_dict.keys() >= {
"state", "address", "timestamp", "pid", "rayNamespace"
}
elif actor_data_dict["state"] == "PENDING_CREATION":
assert actor_data_dict.keys() == {

View file

@ -732,7 +732,6 @@ cdef execute_task(
exit.is_ray_terminate = True
raise exit
# return a protobuf-serialized ray_exception
cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
cdef bytes py_bytes = ray_error.to_bytes()
return make_shared[LocalMemoryBuffer](

View file

@ -147,10 +147,8 @@ def test_invalid_conda_env(shutdown_only):
# Check that another valid task can run.
ray.get(f.remote())
# Check actor is also broken.
# TODO(sang): It should raise RuntimeEnvSetupError
a = A.options(runtime_env=bad_env).remote()
with pytest.raises(ray.exceptions.RayActorError):
with pytest.raises(ray.exceptions.RuntimeEnvSetupError):
ray.get(a.f.remote())
# The second time this runs it should be faster as the error is cached.
@ -281,8 +279,7 @@ def test_runtime_env_broken(set_agent_failure_env_var, ray_start_cluster_head):
Test actor task raises an exception.
"""
a = A.options(runtime_env=runtime_env).remote()
# TODO(sang): Raise a RuntimeEnvSetupError with proper error.
with pytest.raises(ray.exceptions.RayActorError):
with pytest.raises(ray.exceptions.RuntimeEnvSetupError):
ray.get(a.ready.remote())

View file

@ -25,7 +25,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
(override));
MOCK_METHOD(bool, PendingTaskFailed,
(const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const std::shared_ptr<rpc::RayException> &creation_task_exception,
const rpc::RayException *creation_task_exception,
bool immediately_mark_object_fail),
(override));
MOCK_METHOD(void, OnTaskDependenciesInlined,
@ -35,7 +35,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
MOCK_METHOD(bool, MarkTaskCanceled, (const TaskID &task_id), (override));
MOCK_METHOD(void, MarkPendingTaskFailed,
(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception),
const rpc::RayException *creation_task_exception),
(override));
MOCK_METHOD(absl::optional<TaskSpecification>, GetTaskSpec, (const TaskID &task_id),
(const, override));

View file

@ -24,7 +24,7 @@ class MockCoreWorkerDirectActorTaskSubmitterInterface
(override));
MOCK_METHOD(void, DisconnectActor,
(const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception),
const rpc::RayException *creation_task_exception),
(override));
MOCK_METHOD(void, KillActor,
(const ActorID &actor_id, bool force_kill, bool no_restart), (override));

View file

@ -232,22 +232,15 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: " << NodeID::FromBinary(actor_data.address().raylet_id())
<< ", num_restarts: " << actor_data.num_restarts()
<< ", has creation_task_exception="
<< actor_data.has_creation_task_exception();
<< ", death context type="
<< gcs::GetDeathCauseString(&actor_data.death_cause());
if (actor_data.state() == rpc::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false);
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(),
/*is_dead=*/false);
} else if (actor_data.state() == rpc::ActorTableData::DEAD) {
OnActorKilled(actor_id);
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
if (actor_data.has_creation_task_exception()) {
RAY_LOG(INFO) << "Creation task formatted exception: "
<< actor_data.creation_task_exception().formatted_exception_string()
<< ", actor_id: " << actor_id;
creation_task_exception =
std::make_shared<rpc::RayException>(actor_data.creation_task_exception());
}
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true,
creation_task_exception);
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(),
/*is_dead=*/true, &actor_data.death_cause());
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.

View file

@ -380,10 +380,10 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) {
}
}
bool TaskManager::PendingTaskFailed(
const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const std::shared_ptr<rpc::RayException> &creation_task_exception,
bool immediately_mark_object_fail) {
bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception,
bool immediately_mark_object_fail) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(DEBUG) << "Task " << task_id << " failed with error "
@ -557,7 +557,7 @@ bool TaskManager::MarkTaskCanceled(const TaskID &task_id) {
void TaskManager::MarkPendingTaskFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
const rpc::RayException *creation_task_exception) {
const TaskID task_id = spec.TaskId();
RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id
<< ", error_type: " << ErrorType_Name(error_type);
@ -566,10 +566,16 @@ void TaskManager::MarkPendingTaskFailed(
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
if (creation_task_exception != nullptr) {
// Structure of bytes stored in object store:
// rpc::RayException
// ->pb-serialized bytes
// ->msgpack-serialized bytes
// ->[offset][msgpack-serialized bytes]
// First serialize RayException by the following steps:
// PB's RayException
// --(PB Serialization)-->
// --(msgpack Serialization)-->
// msgpack_serialized_exception(MSE)
// Then add it's length to the head(for coross-language deserialization):
// [MSE's length(9 bytes)] [MSE]
std::string pb_serialized_exception;
creation_task_exception->SerializeToString(&pb_serialized_exception);
msgpack::sbuffer msgpack_serialized_exception;

View file

@ -35,7 +35,7 @@ class TaskFinisherInterface {
virtual bool PendingTaskFailed(
const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) = 0;
virtual void OnTaskDependenciesInlined(
@ -46,7 +46,7 @@ class TaskFinisherInterface {
virtual void MarkPendingTaskFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) = 0;
const rpc::RayException *creation_task_exception = nullptr) = 0;
virtual absl::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const = 0;
@ -145,17 +145,16 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param[in] immediately_mark_object_fail whether immediately mark the task
/// result object as failed.
/// \return Whether the task will be retried or not.
bool PendingTaskFailed(
const TaskID &task_id, rpc::ErrorType error_type, const Status *status = nullptr,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override;
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status = nullptr,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override;
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.
void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException>
&creation_task_exception = nullptr) override
LOCKS_EXCLUDED(mu_);
const rpc::RayException *creation_task_exception =
nullptr) override LOCKS_EXCLUDED(mu_);
/// A task's dependencies were inlined in the task spec. This will decrement
/// the ref count for the dependency IDs. If the dependencies contained other

View file

@ -76,9 +76,8 @@ class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterf
MOCK_METHOD1(AddActorQueueIfNotExists, void(const ActorID &actor_id));
MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts));
MOCK_METHOD4(DisconnectActor,
void(const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception));
MOCK_METHOD4(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts,
bool dead, const rpc::ActorDeathCause *death_cause));
MOCK_METHOD3(KillActor,
void(const ActorID &actor_id, bool force_kill, bool no_restart));
@ -198,7 +197,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
actor_table_data.set_state(rpc::ActorTableData::ALIVE);
actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data);
// Now actor state is updated to DEAD. Make sure it is diconnected.
// Now actor state is updated to DEAD. Make sure it is disconnected.
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _, _)).Times(1);
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(rpc::ActorTableData::DEAD);

View file

@ -120,10 +120,10 @@ class MockTaskFinisher : public TaskFinisherInterface {
return false;
}
bool PendingTaskFailed(
const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override {
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override {
num_tasks_failed++;
return true;
}
@ -134,9 +134,9 @@ class MockTaskFinisher : public TaskFinisherInterface {
num_contained_ids += contained_ids.size();
}
void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException>
&creation_task_exception = nullptr) override {}
void MarkPendingTaskFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception = nullptr) override {}
bool MarkTaskCanceled(const TaskID &task_id) override { return true; }

View file

@ -17,8 +17,10 @@
#include <thread>
#include "ray/common/task/task.h"
#include "ray/gcs/pb_util.h"
using ray::rpc::ActorTableData;
using namespace ray::gcs;
namespace ray {
namespace core {
@ -107,17 +109,20 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
} else {
// Do not hold the lock while calling into task_finisher_.
task_finisher_.MarkTaskCanceled(task_id);
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
rpc::ErrorType error_type;
const rpc::RayException *creation_task_exception = nullptr;
{
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(task_spec.ActorId());
creation_task_exception = queue->second.creation_task_exception;
auto &death_cause = queue->second.death_cause;
error_type = GenErrorTypeFromDeathCause(death_cause.get());
creation_task_exception = GetCreationTaskExceptionFromDeathCause(death_cause.get());
}
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED,
&status, creation_task_exception));
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status,
creation_task_exception));
}
// If the task submission subsequently fails, then the client will receive
@ -208,8 +213,9 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id;
const rpc::ActorDeathCause *death_cause) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id
<< ", death context type=" << GetDeathCauseString(death_cause);
std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
inflight_task_callbacks;
@ -239,20 +245,30 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
if (dead) {
queue->second.state = rpc::ActorTableData::DEAD;
queue->second.creation_task_exception = creation_task_exception;
if (death_cause != nullptr) {
queue->second.death_cause = std::make_unique<rpc::ActorDeathCause>(*death_cause);
}
// If there are pending requests, treat the pending tasks as failed.
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id
<< " because the actor is already dead.";
auto status = Status::IOError("cancelling all pending tasks of dead actor");
auto task_ids = queue->second.actor_submit_queue->ClearAllTasks();
rpc::ErrorType error_type = GenErrorTypeFromDeathCause(death_cause);
const rpc::RayException *creation_task_exception =
GetCreationTaskExceptionFromDeathCause(death_cause);
if (creation_task_exception != nullptr) {
RAY_LOG(INFO) << "Creation task formatted exception: "
<< creation_task_exception->formatted_exception_string()
<< ", actor_id: " << actor_id;
}
for (auto &task_id : task_ids) {
task_finisher_.MarkTaskCanceled(task_id);
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED,
&status, creation_task_exception));
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status,
creation_task_exception));
}
auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks;
@ -260,8 +276,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
RAY_LOG(INFO) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(
net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception));
RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(net_err_task.second, error_type,
creation_task_exception));
}
// No need to clean up tasks that have been sent and are waiting for
@ -393,7 +409,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD);
bool will_retry = task_finisher_.PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception,
task_id, GenErrorTypeFromDeathCause(queue.death_cause.get()), &status,
GetCreationTaskExceptionFromDeathCause(queue.death_cause.get()),
immediately_mark_object_fail);
if (will_retry) {
increment_completed_tasks = false;

View file

@ -47,9 +47,8 @@ class CoreWorkerDirectActorTaskSubmitterInterface {
virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
virtual void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) = 0;
virtual void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead,
const rpc::ActorDeathCause *death_cause = nullptr) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;
virtual void CheckTimeoutTasks() = 0;
@ -114,12 +113,9 @@ class CoreWorkerDirectActorTaskSubmitter
/// ignore the command to connect.
/// \param[in] dead Whether the actor is permanently dead. In this case, all
/// pending tasks for the actor should be failed.
/// \param[in] creation_task_exception Reason why the actor is dead, only applies when
/// dead = true. If this arg is set, it means this actor died because of an exception
/// thrown in creation task.
void DisconnectActor(
const ActorID &actor_id, int64_t num_restarts, bool dead,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
/// \param[in] death_cause Context about why this actor is dead.
void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead,
const rpc::ActorDeathCause *death_cause = nullptr);
/// Set the timerstamp for the caller.
void SetCallerCreationTimestamp(int64_t timestamp);
@ -137,7 +133,7 @@ class CoreWorkerDirectActorTaskSubmitter
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY;
/// Only applies when state=DEAD.
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
std::unique_ptr<rpc::ActorDeathCause> death_cause = nullptr;
/// How many times this actor has been restarted before. Starts at -1 to
/// indicate that the actor is not yet created. This is used to drop stale
/// messages from the GCS.

View file

@ -17,8 +17,10 @@
#include <thread>
#include "ray/common/task/task.h"
#include "ray/gcs/pb_util.h"
using ray::rpc::ActorTableData;
using namespace ray::gcs;
namespace ray {
namespace core {

View file

@ -191,7 +191,9 @@ void GcsBasedActorScheduler::HandleWorkerLeaseReply(
if (iter->second.empty()) {
node_to_actors_when_leasing_.erase(iter);
}
if (reply.rejected()) {
if (reply.runtime_env_setup_failed()) {
OnRuntimeEnvSetupFailure(actor, node_id);
} else if (reply.rejected()) {
RAY_LOG(INFO) << "Failed to lease worker from node " << node_id << " for actor "
<< actor->GetActorID()
<< " as the resources are seized by normal tasks, job id = "

View file

@ -601,7 +601,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
});
}
void GcsActorManager::DestroyActor(const ActorID &actor_id) {
void GcsActorManager::DestroyActor(const ActorID &actor_id,
const rpc::ActorDeathCause *death_cause) {
RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
actor_to_register_callbacks_.erase(actor_id);
@ -670,6 +671,9 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
auto time = current_sys_time_ms();
mutable_actor_table_data->set_end_time(time);
mutable_actor_table_data->set_timestamp(time);
if (death_cause != nullptr) {
mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause);
}
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
@ -718,17 +722,23 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
OnWorkerDead(node_id, worker_id, rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
}
void GcsActorManager::OnWorkerDead(
const ray::NodeID &node_id, const ray::WorkerID &worker_id,
const rpc::WorkerExitType disconnect_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
const ray::WorkerID &worker_id,
const rpc::WorkerExitType disconnect_type,
const rpc::RayException *creation_task_exception) {
std::string message = absl::StrCat(
"Worker ", worker_id.Hex(), " on node ", node_id.Hex(),
" exits, type=", rpc::WorkerExitType_Name(disconnect_type),
", has creation_task_exception = ", (creation_task_exception != nullptr));
std::unique_ptr<rpc::ActorDeathCause> death_cause = nullptr;
if (creation_task_exception != nullptr) {
absl::StrAppend(&message, " Formatted creation task exception: ",
creation_task_exception->formatted_exception_string());
death_cause = std::make_unique<rpc::ActorDeathCause>();
death_cause->mutable_creation_task_failure_context()
->mutable_creation_task_exception()
->CopyFrom(*creation_task_exception);
}
if (disconnect_type == rpc::WorkerExitType::INTENDED_EXIT ||
disconnect_type == rpc::WorkerExitType::IDLE_EXIT) {
@ -780,8 +790,7 @@ void GcsActorManager::OnWorkerDead(
// Otherwise, try to reconstruct the actor that was already created or in the creation
// process.
ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct,
creation_task_exception);
ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause.get());
}
void GcsActorManager::OnNodeDead(const NodeID &node_id) {
@ -834,9 +843,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id) {
ReconstructActor(actor_id, /*need_reschedule=*/true);
}
void GcsActorManager::ReconstructActor(
const ActorID &actor_id, bool need_reschedule,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_reschedule,
const rpc::ActorDeathCause *death_cause) {
// If the owner and this actor is dead at the same time, the actor
// could've been destroyed and dereigstered before reconstruction.
auto iter = registered_actors_.find(actor_id);
@ -864,10 +872,13 @@ void GcsActorManager::ReconstructActor(
int64_t remaining = max_restarts - num_restarts;
remaining_restarts = std::max(remaining, static_cast<int64_t>(0));
}
RAY_LOG(INFO) << "Actor is failed " << actor_id << " on worker " << worker_id
RAY_LOG(INFO) << "Actor " << actor_id << " is failed on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", death context type = " << GetDeathCauseString(death_cause)
<< ", remaining_restarts = " << remaining_restarts
<< ", job id = " << actor_id.JobId();
if (remaining_restarts != 0) {
// num_restarts must be set before updating GCS, or num_restarts will be inconsistent
// between memory cache and storage.
@ -888,9 +899,8 @@ void GcsActorManager::ReconstructActor(
} else {
RemoveActorNameFromRegistry(actor);
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
if (creation_task_exception != nullptr) {
mutable_actor_table_data->set_allocated_creation_task_exception(
new rpc::RayException(*creation_task_exception));
if (death_cause != nullptr) {
mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause);
}
auto time = current_sys_time_ms();
mutable_actor_table_data->set_end_time(time);
@ -915,15 +925,22 @@ void GcsActorManager::ReconstructActor(
}
}
void GcsActorManager::OnActorCreationFailed(std::shared_ptr<GcsActor> actor,
bool destroy_actor) {
if (destroy_actor) {
DestroyActor(actor->GetActorID());
} else {
void GcsActorManager::OnActorSchedulingFailed(std::shared_ptr<GcsActor> actor,
bool runtime_env_setup_failed) {
if (!runtime_env_setup_failed) {
// We will attempt to schedule this actor once an eligible node is
// registered.
pending_actors_.emplace_back(std::move(actor));
return;
}
auto death_cause = std::make_unique<rpc::ActorDeathCause>();
// TODO(sang, lixin) 1. Make this message more friendly 2. Show this message in
// object.get()'s error.
death_cause->mutable_runtime_env_setup_failure_context()->set_error_message(
"Cannot create an actor because the associated runtime env couldn't be created.");
// If there is runtime env failure, mark this actor as dead immediately.
DestroyActor(actor->GetActorID(), death_cause.get());
}
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor,

View file

@ -300,10 +300,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param exit_type exit reason of the dead worker.
/// \param creation_task_exception if this arg is set, this worker is died because of an
/// exception thrown in actor's creation task.
void OnWorkerDead(
const NodeID &node_id, const WorkerID &worker_id,
const rpc::WorkerExitType disconnect_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id,
const rpc::WorkerExitType disconnect_type,
const rpc::RayException *creation_task_exception = nullptr);
void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id);
@ -313,11 +312,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// failed).
///
/// \param actor The actor whose creation task is infeasible.
/// \param destroy_actor Whether or not we should destroy an actor.
/// If false is given, the actor will be rescheduled. Otherwise, all
/// \param runtime_env_setup_failed Whether creation is failed due to runtime env setup
/// failure. If false is given, the actor will be rescheduled. Otherwise, all
/// the interest party (driver that has actor handles) will notify
/// that the actor is dead.
void OnActorCreationFailed(std::shared_ptr<GcsActor> actor, bool destroy_actor = false);
void OnActorSchedulingFailed(std::shared_ptr<GcsActor> actor,
bool runtime_env_setup_failed = false);
/// Handle actor creation task success. This should be called when the actor
/// creation task has been scheduled successfully.
@ -381,7 +381,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// scope or the owner has died.
/// NOTE: This method can be called multiple times in out-of-order and should be
/// idempotent.
void DestroyActor(const ActorID &actor_id);
void DestroyActor(const ActorID &actor_id,
const rpc::ActorDeathCause *death_cause = nullptr);
/// Get unresolved actors that were submitted from the specified node.
absl::flat_hash_set<ActorID> GetUnresolvedActorsByOwnerNode(
@ -397,12 +398,10 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param need_reschedule Whether to reschedule the actor creation task, sometimes
/// users want to kill an actor intentionally and don't want it to be reconstructed
/// again.
/// \param creation_task_exception Only applies when need_reschedule=false, decribing
/// why this actor failed. If this arg is set, it means this actor died because of an
/// exception thrown in creation task.
void ReconstructActor(
const ActorID &actor_id, bool need_reschedule,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
/// \param death_cause Context about why this actor is dead. Should only be set when
/// need_reschedule=false.
void ReconstructActor(const ActorID &actor_id, bool need_reschedule,
const rpc::ActorDeathCause *death_cause = nullptr);
/// Reconstruct the specified actor and reschedule it.
void ReconstructActor(const ActorID &actor_id);
@ -442,8 +441,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
const rpc::ActorTableData &actor) {
auto actor_delta = std::make_shared<rpc::ActorTableData>();
actor_delta->set_state(actor.state());
actor_delta->set_allocated_creation_task_exception(
new rpc::RayException(actor.creation_task_exception()));
actor_delta->mutable_death_cause()->CopyFrom(actor.death_cause());
actor_delta->mutable_address()->CopyFrom(actor.address());
actor_delta->set_num_restarts(actor.num_restarts());
actor_delta->set_timestamp(actor.timestamp());

View file

@ -51,7 +51,7 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
if (!node.has_value()) {
// There are no available nodes to schedule the actor, so just trigger the failed
// handler.
schedule_failure_handler_(std::move(actor), /*destroy_actor*/ false);
schedule_failure_handler_(std::move(actor), /*runtime_env_setup_failed=*/false);
return;
}
@ -311,6 +311,17 @@ void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
}
}
void GcsActorScheduler::OnRuntimeEnvSetupFailure(std::shared_ptr<GcsActor> actor,
const NodeID &node_id) {
RAY_LOG(ERROR)
<< "Failed to lease worker from node " << node_id << " for actor "
<< actor->GetActorID() << "("
<< actor->GetCreationTaskSpecification().FunctionDescriptor()->CallString() << ")"
<< " as the runtime environment setup failed, job id = "
<< actor->GetActorID().JobId();
schedule_failure_handler_(actor, /*runtime_env_setup_failed=*/true);
}
void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr<GcsActor> actor,
std::shared_ptr<GcsLeasedWorker> worker) {
RAY_CHECK(actor && worker);
@ -491,18 +502,7 @@ void RayletBasedActorScheduler::HandleWorkerLeaseReply(
// The runtime environment has failed by an unrecoverable error.
// We cannot create this actor anymore.
if (reply.runtime_env_setup_failed()) {
// Right now, the way to report error message back to actor is not that great.
// This message is used to notify users the cause of actor death (with ERROR
// severity).
// TODO(sang): It is a temporary solution. Improve the error reporting here.
RAY_LOG(ERROR)
<< "Cannot create an actor "
<< actor->GetCreationTaskSpecification().FunctionDescriptor()->CallString()
<< " of an id " << actor->GetActorID()
<< " because the runtime environment setup failed on a node of address "
<< node->node_manager_address() << ".";
RAY_LOG(INFO) << "Actor failed to be scheduled on a node " << node_id;
schedule_failure_handler_(std::move(actor), /*destroy_actor*/ true);
OnRuntimeEnvSetupFailure(actor, node_id);
return;
}

View file

@ -257,6 +257,12 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
void HandleWorkerLeaseGrantedReply(std::shared_ptr<GcsActor> actor,
const rpc::RequestWorkerLeaseReply &reply);
/// Handler to process runtime env setup failure.
///
/// \param actor Contains the resources needed to lease workers from the specified node.
/// \param node_id The node where the runtime env is failed to setup.
void OnRuntimeEnvSetupFailure(std::shared_ptr<GcsActor> actor, const NodeID &node_id);
/// Create the specified actor on the specified worker.
///
/// \param actor The actor to be created.

View file

@ -250,12 +250,13 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_);
std::unique_ptr<GcsActorSchedulerInterface> scheduler;
auto schedule_failure_handler = [this](std::shared_ptr<GcsActor> actor,
bool destroy_actor) {
bool runtime_env_setup_failed) {
// When there are no available nodes to schedule the actor the
// gcs_actor_scheduler will treat it as failed and invoke this handler. In
// this case, the actor manager should schedule the actor once an
// eligible node is registered.
gcs_actor_manager_->OnActorCreationFailed(std::move(actor), destroy_actor);
gcs_actor_manager_->OnActorSchedulingFailed(std::move(actor),
runtime_env_setup_failed);
};
auto schedule_success_handler = [this](std::shared_ptr<GcsActor> actor,
const rpc::PushTaskReply &reply) {
@ -479,10 +480,9 @@ void GcsServer::InstallEventListeners() {
auto &worker_address = worker_failure_data->worker_address();
auto worker_id = WorkerID::FromBinary(worker_address.worker_id());
auto node_id = NodeID::FromBinary(worker_address.raylet_id());
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
const rpc::RayException *creation_task_exception = nullptr;
if (worker_failure_data->has_creation_task_exception()) {
creation_task_exception = std::make_shared<rpc::RayException>(
worker_failure_data->creation_task_exception());
creation_task_exception = &worker_failure_data->creation_task_exception();
}
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data->exit_type(),

View file

@ -264,7 +264,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.clear();
gcs_actor_manager_->OnActorCreationFailed(actor);
gcs_actor_manager_->OnActorSchedulingFailed(actor);
gcs_actor_manager_->SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
mock_actor_scheduler_->actors.clear();

View file

@ -25,6 +25,8 @@ namespace ray {
namespace gcs {
using ContextCase = rpc::ActorDeathCause::ContextCase;
/// Helper function to produce job table data (for newly created job or updated job).
///
/// \param job_id The ID of job that need to be registered or updated.
@ -95,7 +97,7 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &raylet_id, const WorkerID &worker_id, const std::string &address,
int32_t port, int64_t timestamp, rpc::WorkerExitType disconnect_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) {
const rpc::RayException *creation_task_exception = nullptr) {
auto worker_failure_info_ptr = std::make_shared<ray::rpc::WorkerTableData>();
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
@ -111,6 +113,44 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
return worker_failure_info_ptr;
}
/// Get actor creation task exception from ActorDeathCause.
/// Returns nullptr if actor isn't dead due to creation task failure.
inline const rpc::RayException *GetCreationTaskExceptionFromDeathCause(
const rpc::ActorDeathCause *death_cause) {
if (death_cause == nullptr ||
death_cause->context_case() != ContextCase::kCreationTaskFailureContext) {
return nullptr;
}
return &(death_cause->creation_task_failure_context().creation_task_exception());
}
/// Generate object error type from ActorDeathCause.
inline rpc::ErrorType GenErrorTypeFromDeathCause(
const rpc::ActorDeathCause *death_cause) {
if (death_cause == nullptr) {
return rpc::ErrorType::ACTOR_DIED;
}
if (death_cause->context_case() == ContextCase::kCreationTaskFailureContext) {
return rpc::ErrorType::ACTOR_DIED;
}
if (death_cause->context_case() == ContextCase::kRuntimeEnvSetupFailureContext) {
return rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED;
}
return rpc::ErrorType::ACTOR_DIED;
}
inline const std::string &GetDeathCauseString(const rpc::ActorDeathCause *death_cause) {
static absl::flat_hash_map<ContextCase, std::string> death_cause_string{
{ContextCase::CONTEXT_NOT_SET, "CONTEXT_NOT_SET"},
{ContextCase::kCreationTaskFailureContext, "CreationTaskFailureContext"},
{ContextCase::kRuntimeEnvSetupFailureContext, "RuntimeEnvSetupFailureContext"}};
ContextCase death_cause_case = ContextCase::CONTEXT_NOT_SET;
if (death_cause != nullptr) {
death_cause_case = death_cause->context_case();
}
return death_cause_string.at(death_cause_case);
}
} // namespace gcs
} // namespace ray

View file

@ -94,6 +94,27 @@ message TaskTableData {
Task task = 1;
}
message ActorDeathCause {
oneof context {
CreationTaskFailureContext creation_task_failure_context = 1;
RuntimeEnvSetupFailureContext runtime_env_setup_failure_context = 2;
}
}
// ---Actor death contexts start----
// Indicates that this actor is marked as DEAD due to actor creation task failure.
message CreationTaskFailureContext {
// The exception thrown in creation task.
RayException creation_task_exception = 1;
}
// Indicates that this actor is marked as DEAD due to runtime environment setup failure.
message RuntimeEnvSetupFailureContext {
// TODO(sang,lixin) Get this error message from agent.
string error_message = 1;
}
// ---Actor death contexts end----
message ActorTableData {
// State of an actor.
enum ActorState {
@ -145,9 +166,6 @@ message ActorTableData {
repeated ResourceMapEntry resource_mapping = 15;
// The process id of this actor.
uint32 pid = 16;
// The exception thrown in creation task. This field is set if this actor died because
// of exception thrown in creation task. Only applies when state=DEAD.
RayException creation_task_exception = 18;
// The actor's namespace. Named `ray_namespace` to avoid confusions when invoked in c++.
string ray_namespace = 19;
// The unix ms timestamp the actor was started at.
@ -161,6 +179,8 @@ message ActorTableData {
// The actor's class name. This is necessary because the task spec's lifetime
// is shorter than the ActorTableData.
string class_name = 23;
// Will only be set when state=DEAD. Offers detailed context of why this actor is dead.
ActorDeathCause death_cause = 24;
}
message ErrorTableData {

View file

@ -1207,9 +1207,9 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
cluster_task_manager_->ScheduleAndDispatchTasks();
}
void NodeManager::DisconnectClient(
const std::shared_ptr<ClientConnection> &client, rpc::WorkerExitType disconnect_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type,
const rpc::RayException *creation_task_exception) {
RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type
<< ", has creation task exception = "
<< (creation_task_exception != nullptr);
@ -1337,13 +1337,13 @@ void NodeManager::ProcessDisconnectClientMessage(
const flatbuffers::Vector<uint8_t> *exception_pb =
message->creation_task_exception_pb();
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
std::unique_ptr<rpc::RayException> creation_task_exception = nullptr;
if (exception_pb != nullptr) {
creation_task_exception = std::make_shared<rpc::RayException>();
creation_task_exception = std::make_unique<rpc::RayException>();
creation_task_exception->ParseFromString(std::string(
reinterpret_cast<const char *>(exception_pb->data()), exception_pb->size()));
}
DisconnectClient(client, disconnect_type, creation_task_exception);
DisconnectClient(client, disconnect_type, creation_task_exception.get());
}
void NodeManager::ProcessFetchOrReconstructMessage(

View file

@ -605,7 +605,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void DisconnectClient(
const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr);
const rpc::RayException *creation_task_exception = nullptr);
/// ID of this node.
NodeID self_node_id_;