From b7e35acf14849b3d57ac85fcbdeafd3b2604fe64 Mon Sep 17 00:00:00 2001 From: Lixin Wei Date: Mon, 15 Nov 2021 23:43:35 +0800 Subject: [PATCH] [RuntimeEnv] Raise RuntimeEnvSetupError when Actor Creation Failed due to It (#19888) * ray_pkg passed * fix * fix typo * fix test * fix test * fix test * fix * draft * compile OK * lint * fix * lint * fix ci * Update src/ray/gcs/gcs_server/gcs_actor_manager.cc Co-authored-by: SangBin Cho * remove comment * rename * resolve conflict * use unique ownership * use DestroyActor instead of ReconstructActor * fix sigment fault * fix crash in debug log * Revert "fix crash in debug log" This reverts commit 8f0e3d37f062b664d8d0e07c6c1a9a715b8ba1ee. Co-authored-by: SangBin Cho --- dashboard/modules/actor/tests/test_actor.py | 5 +- python/ray/_raylet.pyx | 1 - python/ray/tests/test_runtime_env.py | 7 +-- src/mock/ray/core_worker/task_manager.h | 4 +- .../transport/direct_actor_transport.h | 2 +- src/ray/core_worker/actor_manager.cc | 19 ++----- src/ray/core_worker/task_manager.cc | 24 +++++--- src/ray/core_worker/task_manager.h | 17 +++--- .../core_worker/test/actor_manager_test.cc | 7 +-- .../test/direct_task_transport_test.cc | 14 ++--- .../transport/direct_actor_task_submitter.cc | 41 ++++++++++---- .../transport/direct_actor_task_submitter.h | 16 ++---- .../transport/direct_actor_transport.cc | 2 + .../gcs/gcs_server/gcs_actor_distribution.cc | 4 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 55 ++++++++++++------- src/ray/gcs/gcs_server/gcs_actor_manager.h | 30 +++++----- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 26 ++++----- src/ray/gcs/gcs_server/gcs_actor_scheduler.h | 6 ++ src/ray/gcs/gcs_server/gcs_server.cc | 10 ++-- .../gcs_server/test/gcs_actor_manager_test.cc | 2 +- src/ray/gcs/pb_util.h | 42 +++++++++++++- src/ray/protobuf/gcs.proto | 26 ++++++++- src/ray/raylet/node_manager.cc | 12 ++-- src/ray/raylet/node_manager.h | 2 +- 24 files changed, 232 insertions(+), 142 deletions(-) diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index aa83e8cdd..3b34a6425 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -273,9 +273,8 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard): # For status that is not DEPENDENCIES_UNREADY, only states fields will # be published. elif actor_data_dict["state"] in ("ALIVE", "DEAD"): - assert actor_data_dict.keys() == { - "state", "address", "timestamp", "pid", - "creationTaskException", "rayNamespace" + assert actor_data_dict.keys() >= { + "state", "address", "timestamp", "pid", "rayNamespace" } elif actor_data_dict["state"] == "PENDING_CREATION": assert actor_data_dict.keys() == { diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 746cda831..a2b4640ca 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -732,7 +732,6 @@ cdef execute_task( exit.is_ray_terminate = True raise exit -# return a protobuf-serialized ray_exception cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error): cdef bytes py_bytes = ray_error.to_bytes() return make_shared[LocalMemoryBuffer]( diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 92295d508..44a3c962f 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -147,10 +147,8 @@ def test_invalid_conda_env(shutdown_only): # Check that another valid task can run. ray.get(f.remote()) - # Check actor is also broken. - # TODO(sang): It should raise RuntimeEnvSetupError a = A.options(runtime_env=bad_env).remote() - with pytest.raises(ray.exceptions.RayActorError): + with pytest.raises(ray.exceptions.RuntimeEnvSetupError): ray.get(a.f.remote()) # The second time this runs it should be faster as the error is cached. @@ -281,8 +279,7 @@ def test_runtime_env_broken(set_agent_failure_env_var, ray_start_cluster_head): Test actor task raises an exception. """ a = A.options(runtime_env=runtime_env).remote() - # TODO(sang): Raise a RuntimeEnvSetupError with proper error. - with pytest.raises(ray.exceptions.RayActorError): + with pytest.raises(ray.exceptions.RuntimeEnvSetupError): ray.get(a.ready.remote()) diff --git a/src/mock/ray/core_worker/task_manager.h b/src/mock/ray/core_worker/task_manager.h index 6e193c612..26567f3fc 100644 --- a/src/mock/ray/core_worker/task_manager.h +++ b/src/mock/ray/core_worker/task_manager.h @@ -25,7 +25,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { (override)); MOCK_METHOD(bool, PendingTaskFailed, (const TaskID &task_id, rpc::ErrorType error_type, const Status *status, - const std::shared_ptr &creation_task_exception, + const rpc::RayException *creation_task_exception, bool immediately_mark_object_fail), (override)); MOCK_METHOD(void, OnTaskDependenciesInlined, @@ -35,7 +35,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { MOCK_METHOD(bool, MarkTaskCanceled, (const TaskID &task_id), (override)); MOCK_METHOD(void, MarkPendingTaskFailed, (const TaskSpecification &spec, rpc::ErrorType error_type, - const std::shared_ptr &creation_task_exception), + const rpc::RayException *creation_task_exception), (override)); MOCK_METHOD(absl::optional, GetTaskSpec, (const TaskID &task_id), (const, override)); diff --git a/src/mock/ray/core_worker/transport/direct_actor_transport.h b/src/mock/ray/core_worker/transport/direct_actor_transport.h index 2fbcfeb2c..d7b22f3f7 100644 --- a/src/mock/ray/core_worker/transport/direct_actor_transport.h +++ b/src/mock/ray/core_worker/transport/direct_actor_transport.h @@ -24,7 +24,7 @@ class MockCoreWorkerDirectActorTaskSubmitterInterface (override)); MOCK_METHOD(void, DisconnectActor, (const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception), + const rpc::RayException *creation_task_exception), (override)); MOCK_METHOD(void, KillActor, (const ActorID &actor_id, bool force_kill, bool no_restart), (override)); diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index a3dd3614b..134f5a07d 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -232,22 +232,15 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id, << WorkerID::FromBinary(actor_data.address().worker_id()) << ", raylet_id: " << NodeID::FromBinary(actor_data.address().raylet_id()) << ", num_restarts: " << actor_data.num_restarts() - << ", has creation_task_exception=" - << actor_data.has_creation_task_exception(); + << ", death context type=" + << gcs::GetDeathCauseString(&actor_data.death_cause()); if (actor_data.state() == rpc::ActorTableData::RESTARTING) { - direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false); + direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), + /*is_dead=*/false); } else if (actor_data.state() == rpc::ActorTableData::DEAD) { OnActorKilled(actor_id); - std::shared_ptr creation_task_exception = nullptr; - if (actor_data.has_creation_task_exception()) { - RAY_LOG(INFO) << "Creation task formatted exception: " - << actor_data.creation_task_exception().formatted_exception_string() - << ", actor_id: " << actor_id; - creation_task_exception = - std::make_shared(actor_data.creation_task_exception()); - } - direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true, - creation_task_exception); + direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), + /*is_dead=*/true, &actor_data.death_cause()); // We cannot erase the actor handle here because clients can still // submit tasks to dead actors. This also means we defer unsubscription, // otherwise we crash when bulk unsubscribing all actor handles. diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 5b79afdbe..0397bc54e 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -380,10 +380,10 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { } } -bool TaskManager::PendingTaskFailed( - const TaskID &task_id, rpc::ErrorType error_type, const Status *status, - const std::shared_ptr &creation_task_exception, - bool immediately_mark_object_fail) { +bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status, + const rpc::RayException *creation_task_exception, + bool immediately_mark_object_fail) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. RAY_LOG(DEBUG) << "Task " << task_id << " failed with error " @@ -557,7 +557,7 @@ bool TaskManager::MarkTaskCanceled(const TaskID &task_id) { void TaskManager::MarkPendingTaskFailed( const TaskSpecification &spec, rpc::ErrorType error_type, - const std::shared_ptr &creation_task_exception) { + const rpc::RayException *creation_task_exception) { const TaskID task_id = spec.TaskId(); RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id << ", error_type: " << ErrorType_Name(error_type); @@ -566,10 +566,16 @@ void TaskManager::MarkPendingTaskFailed( const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); if (creation_task_exception != nullptr) { // Structure of bytes stored in object store: - // rpc::RayException - // ->pb-serialized bytes - // ->msgpack-serialized bytes - // ->[offset][msgpack-serialized bytes] + + // First serialize RayException by the following steps: + // PB's RayException + // --(PB Serialization)--> + // --(msgpack Serialization)--> + // msgpack_serialized_exception(MSE) + + // Then add it's length to the head(for coross-language deserialization): + // [MSE's length(9 bytes)] [MSE] + std::string pb_serialized_exception; creation_task_exception->SerializeToString(&pb_serialized_exception); msgpack::sbuffer msgpack_serialized_exception; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 0a8262de3..276fd0c32 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -35,7 +35,7 @@ class TaskFinisherInterface { virtual bool PendingTaskFailed( const TaskID &task_id, rpc::ErrorType error_type, const Status *status, - const std::shared_ptr &creation_task_exception = nullptr, + const rpc::RayException *creation_task_exception = nullptr, bool immediately_mark_object_fail = true) = 0; virtual void OnTaskDependenciesInlined( @@ -46,7 +46,7 @@ class TaskFinisherInterface { virtual void MarkPendingTaskFailed( const TaskSpecification &spec, rpc::ErrorType error_type, - const std::shared_ptr &creation_task_exception = nullptr) = 0; + const rpc::RayException *creation_task_exception = nullptr) = 0; virtual absl::optional GetTaskSpec(const TaskID &task_id) const = 0; @@ -145,17 +145,16 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param[in] immediately_mark_object_fail whether immediately mark the task /// result object as failed. /// \return Whether the task will be retried or not. - bool PendingTaskFailed( - const TaskID &task_id, rpc::ErrorType error_type, const Status *status = nullptr, - const std::shared_ptr &creation_task_exception = nullptr, - bool immediately_mark_object_fail = true) override; + bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status = nullptr, + const rpc::RayException *creation_task_exception = nullptr, + bool immediately_mark_object_fail = true) override; /// Treat a pending task as failed. The lock should not be held when calling /// this method because it may trigger callbacks in this or other classes. void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type, - const std::shared_ptr - &creation_task_exception = nullptr) override - LOCKS_EXCLUDED(mu_); + const rpc::RayException *creation_task_exception = + nullptr) override LOCKS_EXCLUDED(mu_); /// A task's dependencies were inlined in the task spec. This will decrement /// the ref count for the dependency IDs. If the dependencies contained other diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 2be9a8c1e..027997472 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -76,9 +76,8 @@ class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterf MOCK_METHOD1(AddActorQueueIfNotExists, void(const ActorID &actor_id)); MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts)); - MOCK_METHOD4(DisconnectActor, - void(const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception)); + MOCK_METHOD4(DisconnectActor, void(const ActorID &actor_id, int64_t num_restarts, + bool dead, const rpc::ActorDeathCause *death_cause)); MOCK_METHOD3(KillActor, void(const ActorID &actor_id, bool force_kill, bool no_restart)); @@ -198,7 +197,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { actor_table_data.set_state(rpc::ActorTableData::ALIVE); actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); - // Now actor state is updated to DEAD. Make sure it is diconnected. + // Now actor state is updated to DEAD. Make sure it is disconnected. EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _, _)).Times(1); actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state(rpc::ActorTableData::DEAD); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index deb99d146..25e5163e3 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -120,10 +120,10 @@ class MockTaskFinisher : public TaskFinisherInterface { return false; } - bool PendingTaskFailed( - const TaskID &task_id, rpc::ErrorType error_type, const Status *status, - const std::shared_ptr &creation_task_exception = nullptr, - bool immediately_mark_object_fail = true) override { + bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status, + const rpc::RayException *creation_task_exception = nullptr, + bool immediately_mark_object_fail = true) override { num_tasks_failed++; return true; } @@ -134,9 +134,9 @@ class MockTaskFinisher : public TaskFinisherInterface { num_contained_ids += contained_ids.size(); } - void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type, - const std::shared_ptr - &creation_task_exception = nullptr) override {} + void MarkPendingTaskFailed( + const TaskSpecification &spec, rpc::ErrorType error_type, + const rpc::RayException *creation_task_exception = nullptr) override {} bool MarkTaskCanceled(const TaskID &task_id) override { return true; } diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 714dd35df..8c2f828a4 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -17,8 +17,10 @@ #include #include "ray/common/task/task.h" +#include "ray/gcs/pb_util.h" using ray::rpc::ActorTableData; +using namespace ray::gcs; namespace ray { namespace core { @@ -107,17 +109,20 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe } else { // Do not hold the lock while calling into task_finisher_. task_finisher_.MarkTaskCanceled(task_id); - std::shared_ptr creation_task_exception = nullptr; + rpc::ErrorType error_type; + const rpc::RayException *creation_task_exception = nullptr; { absl::MutexLock lock(&mu_); auto queue = client_queues_.find(task_spec.ActorId()); - creation_task_exception = queue->second.creation_task_exception; + auto &death_cause = queue->second.death_cause; + error_type = GenErrorTypeFromDeathCause(death_cause.get()); + creation_task_exception = GetCreationTaskExceptionFromDeathCause(death_cause.get()); } auto status = Status::IOError("cancelling task of dead actor"); // No need to increment the number of completed tasks since the actor is // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, - &status, creation_task_exception)); + RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status, + creation_task_exception)); } // If the task submission subsequently fails, then the client will receive @@ -208,8 +213,9 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception) { - RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id; + const rpc::ActorDeathCause *death_cause) { + RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id + << ", death context type=" << GetDeathCauseString(death_cause); std::unordered_map> inflight_task_callbacks; @@ -239,20 +245,30 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( if (dead) { queue->second.state = rpc::ActorTableData::DEAD; - queue->second.creation_task_exception = creation_task_exception; + if (death_cause != nullptr) { + queue->second.death_cause = std::make_unique(*death_cause); + } // If there are pending requests, treat the pending tasks as failed. RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id << " because the actor is already dead."; auto status = Status::IOError("cancelling all pending tasks of dead actor"); auto task_ids = queue->second.actor_submit_queue->ClearAllTasks(); + rpc::ErrorType error_type = GenErrorTypeFromDeathCause(death_cause); + const rpc::RayException *creation_task_exception = + GetCreationTaskExceptionFromDeathCause(death_cause); + if (creation_task_exception != nullptr) { + RAY_LOG(INFO) << "Creation task formatted exception: " + << creation_task_exception->formatted_exception_string() + << ", actor_id: " << actor_id; + } for (auto &task_id : task_ids) { task_finisher_.MarkTaskCanceled(task_id); // No need to increment the number of completed tasks since the actor is // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, - &status, creation_task_exception)); + RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status, + creation_task_exception)); } auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks; @@ -260,8 +276,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( RAY_LOG(INFO) << "Failing tasks waiting for death info, size=" << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; for (auto &net_err_task : wait_for_death_info_tasks) { - RAY_UNUSED(task_finisher_.MarkPendingTaskFailed( - net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception)); + RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(net_err_task.second, error_type, + creation_task_exception)); } // No need to clean up tasks that have been sent and are waiting for @@ -393,7 +409,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD); bool will_retry = task_finisher_.PendingTaskFailed( - task_id, rpc::ErrorType::ACTOR_DIED, &status, queue.creation_task_exception, + task_id, GenErrorTypeFromDeathCause(queue.death_cause.get()), &status, + GetCreationTaskExceptionFromDeathCause(queue.death_cause.get()), immediately_mark_object_fail); if (will_retry) { increment_completed_tasks = false; diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index c007c2160..4c0e188a6 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -47,9 +47,8 @@ class CoreWorkerDirectActorTaskSubmitterInterface { virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0; virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, int64_t num_restarts) = 0; - virtual void DisconnectActor( - const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception = nullptr) = 0; + virtual void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead, + const rpc::ActorDeathCause *death_cause = nullptr) = 0; virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0; virtual void CheckTimeoutTasks() = 0; @@ -114,12 +113,9 @@ class CoreWorkerDirectActorTaskSubmitter /// ignore the command to connect. /// \param[in] dead Whether the actor is permanently dead. In this case, all /// pending tasks for the actor should be failed. - /// \param[in] creation_task_exception Reason why the actor is dead, only applies when - /// dead = true. If this arg is set, it means this actor died because of an exception - /// thrown in creation task. - void DisconnectActor( - const ActorID &actor_id, int64_t num_restarts, bool dead, - const std::shared_ptr &creation_task_exception = nullptr); + /// \param[in] death_cause Context about why this actor is dead. + void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead, + const rpc::ActorDeathCause *death_cause = nullptr); /// Set the timerstamp for the caller. void SetCallerCreationTimestamp(int64_t timestamp); @@ -137,7 +133,7 @@ class CoreWorkerDirectActorTaskSubmitter /// queue will be marked failed and all other ClientQueue state is ignored. rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY; /// Only applies when state=DEAD. - std::shared_ptr creation_task_exception = nullptr; + std::unique_ptr death_cause = nullptr; /// How many times this actor has been restarted before. Starts at -1 to /// indicate that the actor is not yet created. This is used to drop stale /// messages from the GCS. diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index cbd29d454..0bb9f0184 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -17,8 +17,10 @@ #include #include "ray/common/task/task.h" +#include "ray/gcs/pb_util.h" using ray::rpc::ActorTableData; +using namespace ray::gcs; namespace ray { namespace core { diff --git a/src/ray/gcs/gcs_server/gcs_actor_distribution.cc b/src/ray/gcs/gcs_server/gcs_actor_distribution.cc index af90515af..e408c8937 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_distribution.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_distribution.cc @@ -191,7 +191,9 @@ void GcsBasedActorScheduler::HandleWorkerLeaseReply( if (iter->second.empty()) { node_to_actors_when_leasing_.erase(iter); } - if (reply.rejected()) { + if (reply.runtime_env_setup_failed()) { + OnRuntimeEnvSetupFailure(actor, node_id); + } else if (reply.rejected()) { RAY_LOG(INFO) << "Failed to lease worker from node " << node_id << " for actor " << actor->GetActorID() << " as the resources are seized by normal tasks, job id = " diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 70ad49636..69cbd83ff 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -601,7 +601,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope( }); } -void GcsActorManager::DestroyActor(const ActorID &actor_id) { +void GcsActorManager::DestroyActor(const ActorID &actor_id, + const rpc::ActorDeathCause *death_cause) { RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id << ", job id = " << actor_id.JobId(); actor_to_register_callbacks_.erase(actor_id); @@ -670,6 +671,9 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { auto time = current_sys_time_ms(); mutable_actor_table_data->set_end_time(time); mutable_actor_table_data->set_timestamp(time); + if (death_cause != nullptr) { + mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause); + } auto actor_table_data = std::make_shared(*mutable_actor_table_data); @@ -718,17 +722,23 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id, OnWorkerDead(node_id, worker_id, rpc::WorkerExitType::SYSTEM_ERROR_EXIT); } -void GcsActorManager::OnWorkerDead( - const ray::NodeID &node_id, const ray::WorkerID &worker_id, - const rpc::WorkerExitType disconnect_type, - const std::shared_ptr &creation_task_exception) { +void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id, + const ray::WorkerID &worker_id, + const rpc::WorkerExitType disconnect_type, + const rpc::RayException *creation_task_exception) { std::string message = absl::StrCat( "Worker ", worker_id.Hex(), " on node ", node_id.Hex(), " exits, type=", rpc::WorkerExitType_Name(disconnect_type), ", has creation_task_exception = ", (creation_task_exception != nullptr)); + std::unique_ptr death_cause = nullptr; if (creation_task_exception != nullptr) { absl::StrAppend(&message, " Formatted creation task exception: ", creation_task_exception->formatted_exception_string()); + + death_cause = std::make_unique(); + death_cause->mutable_creation_task_failure_context() + ->mutable_creation_task_exception() + ->CopyFrom(*creation_task_exception); } if (disconnect_type == rpc::WorkerExitType::INTENDED_EXIT || disconnect_type == rpc::WorkerExitType::IDLE_EXIT) { @@ -780,8 +790,7 @@ void GcsActorManager::OnWorkerDead( // Otherwise, try to reconstruct the actor that was already created or in the creation // process. - ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, - creation_task_exception); + ReconstructActor(actor_id, /*need_reschedule=*/need_reconstruct, death_cause.get()); } void GcsActorManager::OnNodeDead(const NodeID &node_id) { @@ -834,9 +843,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id) { ReconstructActor(actor_id, /*need_reschedule=*/true); } -void GcsActorManager::ReconstructActor( - const ActorID &actor_id, bool need_reschedule, - const std::shared_ptr &creation_task_exception) { +void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_reschedule, + const rpc::ActorDeathCause *death_cause) { // If the owner and this actor is dead at the same time, the actor // could've been destroyed and dereigstered before reconstruction. auto iter = registered_actors_.find(actor_id); @@ -864,10 +872,13 @@ void GcsActorManager::ReconstructActor( int64_t remaining = max_restarts - num_restarts; remaining_restarts = std::max(remaining, static_cast(0)); } - RAY_LOG(INFO) << "Actor is failed " << actor_id << " on worker " << worker_id + + RAY_LOG(INFO) << "Actor " << actor_id << " is failed on worker " << worker_id << " at node " << node_id << ", need_reschedule = " << need_reschedule + << ", death context type = " << GetDeathCauseString(death_cause) << ", remaining_restarts = " << remaining_restarts << ", job id = " << actor_id.JobId(); + if (remaining_restarts != 0) { // num_restarts must be set before updating GCS, or num_restarts will be inconsistent // between memory cache and storage. @@ -888,9 +899,8 @@ void GcsActorManager::ReconstructActor( } else { RemoveActorNameFromRegistry(actor); mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD); - if (creation_task_exception != nullptr) { - mutable_actor_table_data->set_allocated_creation_task_exception( - new rpc::RayException(*creation_task_exception)); + if (death_cause != nullptr) { + mutable_actor_table_data->mutable_death_cause()->CopyFrom(*death_cause); } auto time = current_sys_time_ms(); mutable_actor_table_data->set_end_time(time); @@ -915,15 +925,22 @@ void GcsActorManager::ReconstructActor( } } -void GcsActorManager::OnActorCreationFailed(std::shared_ptr actor, - bool destroy_actor) { - if (destroy_actor) { - DestroyActor(actor->GetActorID()); - } else { +void GcsActorManager::OnActorSchedulingFailed(std::shared_ptr actor, + bool runtime_env_setup_failed) { + if (!runtime_env_setup_failed) { // We will attempt to schedule this actor once an eligible node is // registered. pending_actors_.emplace_back(std::move(actor)); + return; } + + auto death_cause = std::make_unique(); + // TODO(sang, lixin) 1. Make this message more friendly 2. Show this message in + // object.get()'s error. + death_cause->mutable_runtime_env_setup_failure_context()->set_error_message( + "Cannot create an actor because the associated runtime env couldn't be created."); + // If there is runtime env failure, mark this actor as dead immediately. + DestroyActor(actor->GetActorID(), death_cause.get()); } void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &actor, diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index fa64b7312..6b7e4640f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -300,10 +300,9 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param exit_type exit reason of the dead worker. /// \param creation_task_exception if this arg is set, this worker is died because of an /// exception thrown in actor's creation task. - void OnWorkerDead( - const NodeID &node_id, const WorkerID &worker_id, - const rpc::WorkerExitType disconnect_type, - const std::shared_ptr &creation_task_exception = nullptr); + void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id, + const rpc::WorkerExitType disconnect_type, + const rpc::RayException *creation_task_exception = nullptr); void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id); @@ -313,11 +312,12 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// failed). /// /// \param actor The actor whose creation task is infeasible. - /// \param destroy_actor Whether or not we should destroy an actor. - /// If false is given, the actor will be rescheduled. Otherwise, all + /// \param runtime_env_setup_failed Whether creation is failed due to runtime env setup + /// failure. If false is given, the actor will be rescheduled. Otherwise, all /// the interest party (driver that has actor handles) will notify /// that the actor is dead. - void OnActorCreationFailed(std::shared_ptr actor, bool destroy_actor = false); + void OnActorSchedulingFailed(std::shared_ptr actor, + bool runtime_env_setup_failed = false); /// Handle actor creation task success. This should be called when the actor /// creation task has been scheduled successfully. @@ -381,7 +381,8 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// scope or the owner has died. /// NOTE: This method can be called multiple times in out-of-order and should be /// idempotent. - void DestroyActor(const ActorID &actor_id); + void DestroyActor(const ActorID &actor_id, + const rpc::ActorDeathCause *death_cause = nullptr); /// Get unresolved actors that were submitted from the specified node. absl::flat_hash_set GetUnresolvedActorsByOwnerNode( @@ -397,12 +398,10 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param need_reschedule Whether to reschedule the actor creation task, sometimes /// users want to kill an actor intentionally and don't want it to be reconstructed /// again. - /// \param creation_task_exception Only applies when need_reschedule=false, decribing - /// why this actor failed. If this arg is set, it means this actor died because of an - /// exception thrown in creation task. - void ReconstructActor( - const ActorID &actor_id, bool need_reschedule, - const std::shared_ptr &creation_task_exception = nullptr); + /// \param death_cause Context about why this actor is dead. Should only be set when + /// need_reschedule=false. + void ReconstructActor(const ActorID &actor_id, bool need_reschedule, + const rpc::ActorDeathCause *death_cause = nullptr); /// Reconstruct the specified actor and reschedule it. void ReconstructActor(const ActorID &actor_id); @@ -442,8 +441,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { const rpc::ActorTableData &actor) { auto actor_delta = std::make_shared(); actor_delta->set_state(actor.state()); - actor_delta->set_allocated_creation_task_exception( - new rpc::RayException(actor.creation_task_exception())); + actor_delta->mutable_death_cause()->CopyFrom(actor.death_cause()); actor_delta->mutable_address()->CopyFrom(actor.address()); actor_delta->set_num_restarts(actor.num_restarts()); actor_delta->set_timestamp(actor.timestamp()); diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 9394d74bf..3ced57725 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -51,7 +51,7 @@ void GcsActorScheduler::Schedule(std::shared_ptr actor) { if (!node.has_value()) { // There are no available nodes to schedule the actor, so just trigger the failed // handler. - schedule_failure_handler_(std::move(actor), /*destroy_actor*/ false); + schedule_failure_handler_(std::move(actor), /*runtime_env_setup_failed=*/false); return; } @@ -311,6 +311,17 @@ void GcsActorScheduler::HandleWorkerLeaseGrantedReply( } } +void GcsActorScheduler::OnRuntimeEnvSetupFailure(std::shared_ptr actor, + const NodeID &node_id) { + RAY_LOG(ERROR) + << "Failed to lease worker from node " << node_id << " for actor " + << actor->GetActorID() << "(" + << actor->GetCreationTaskSpecification().FunctionDescriptor()->CallString() << ")" + << " as the runtime environment setup failed, job id = " + << actor->GetActorID().JobId(); + schedule_failure_handler_(actor, /*runtime_env_setup_failed=*/true); +} + void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr actor, std::shared_ptr worker) { RAY_CHECK(actor && worker); @@ -491,18 +502,7 @@ void RayletBasedActorScheduler::HandleWorkerLeaseReply( // The runtime environment has failed by an unrecoverable error. // We cannot create this actor anymore. if (reply.runtime_env_setup_failed()) { - // Right now, the way to report error message back to actor is not that great. - // This message is used to notify users the cause of actor death (with ERROR - // severity). - // TODO(sang): It is a temporary solution. Improve the error reporting here. - RAY_LOG(ERROR) - << "Cannot create an actor " - << actor->GetCreationTaskSpecification().FunctionDescriptor()->CallString() - << " of an id " << actor->GetActorID() - << " because the runtime environment setup failed on a node of address " - << node->node_manager_address() << "."; - RAY_LOG(INFO) << "Actor failed to be scheduled on a node " << node_id; - schedule_failure_handler_(std::move(actor), /*destroy_actor*/ true); + OnRuntimeEnvSetupFailure(actor, node_id); return; } diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 7782d5808..21cddd718 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -257,6 +257,12 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { void HandleWorkerLeaseGrantedReply(std::shared_ptr actor, const rpc::RequestWorkerLeaseReply &reply); + /// Handler to process runtime env setup failure. + /// + /// \param actor Contains the resources needed to lease workers from the specified node. + /// \param node_id The node where the runtime env is failed to setup. + void OnRuntimeEnvSetupFailure(std::shared_ptr actor, const NodeID &node_id); + /// Create the specified actor on the specified worker. /// /// \param actor The actor to be created. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 71398360c..549dd3502 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -250,12 +250,13 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_); std::unique_ptr scheduler; auto schedule_failure_handler = [this](std::shared_ptr actor, - bool destroy_actor) { + bool runtime_env_setup_failed) { // When there are no available nodes to schedule the actor the // gcs_actor_scheduler will treat it as failed and invoke this handler. In // this case, the actor manager should schedule the actor once an // eligible node is registered. - gcs_actor_manager_->OnActorCreationFailed(std::move(actor), destroy_actor); + gcs_actor_manager_->OnActorSchedulingFailed(std::move(actor), + runtime_env_setup_failed); }; auto schedule_success_handler = [this](std::shared_ptr actor, const rpc::PushTaskReply &reply) { @@ -479,10 +480,9 @@ void GcsServer::InstallEventListeners() { auto &worker_address = worker_failure_data->worker_address(); auto worker_id = WorkerID::FromBinary(worker_address.worker_id()); auto node_id = NodeID::FromBinary(worker_address.raylet_id()); - std::shared_ptr creation_task_exception = nullptr; + const rpc::RayException *creation_task_exception = nullptr; if (worker_failure_data->has_creation_task_exception()) { - creation_task_exception = std::make_shared( - worker_failure_data->creation_task_exception()); + creation_task_exception = &worker_failure_data->creation_task_exception(); } gcs_actor_manager_->OnWorkerDead(node_id, worker_id, worker_failure_data->exit_type(), diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 1c7583af2..efa31debb 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -264,7 +264,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) { auto actor = mock_actor_scheduler_->actors.back(); mock_actor_scheduler_->actors.clear(); - gcs_actor_manager_->OnActorCreationFailed(actor); + gcs_actor_manager_->OnActorSchedulingFailed(actor); gcs_actor_manager_->SchedulePendingActors(); ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); mock_actor_scheduler_->actors.clear(); diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 75167d544..fd18c0778 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -25,6 +25,8 @@ namespace ray { namespace gcs { +using ContextCase = rpc::ActorDeathCause::ContextCase; + /// Helper function to produce job table data (for newly created job or updated job). /// /// \param job_id The ID of job that need to be registered or updated. @@ -95,7 +97,7 @@ inline std::shared_ptr CreateActorTableData( inline std::shared_ptr CreateWorkerFailureData( const NodeID &raylet_id, const WorkerID &worker_id, const std::string &address, int32_t port, int64_t timestamp, rpc::WorkerExitType disconnect_type, - const std::shared_ptr &creation_task_exception = nullptr) { + const rpc::RayException *creation_task_exception = nullptr) { auto worker_failure_info_ptr = std::make_shared(); worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary()); worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary()); @@ -111,6 +113,44 @@ inline std::shared_ptr CreateWorkerFailureData( return worker_failure_info_ptr; } +/// Get actor creation task exception from ActorDeathCause. +/// Returns nullptr if actor isn't dead due to creation task failure. +inline const rpc::RayException *GetCreationTaskExceptionFromDeathCause( + const rpc::ActorDeathCause *death_cause) { + if (death_cause == nullptr || + death_cause->context_case() != ContextCase::kCreationTaskFailureContext) { + return nullptr; + } + return &(death_cause->creation_task_failure_context().creation_task_exception()); +} + +/// Generate object error type from ActorDeathCause. +inline rpc::ErrorType GenErrorTypeFromDeathCause( + const rpc::ActorDeathCause *death_cause) { + if (death_cause == nullptr) { + return rpc::ErrorType::ACTOR_DIED; + } + if (death_cause->context_case() == ContextCase::kCreationTaskFailureContext) { + return rpc::ErrorType::ACTOR_DIED; + } + if (death_cause->context_case() == ContextCase::kRuntimeEnvSetupFailureContext) { + return rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED; + } + return rpc::ErrorType::ACTOR_DIED; +} + +inline const std::string &GetDeathCauseString(const rpc::ActorDeathCause *death_cause) { + static absl::flat_hash_map death_cause_string{ + {ContextCase::CONTEXT_NOT_SET, "CONTEXT_NOT_SET"}, + {ContextCase::kCreationTaskFailureContext, "CreationTaskFailureContext"}, + {ContextCase::kRuntimeEnvSetupFailureContext, "RuntimeEnvSetupFailureContext"}}; + ContextCase death_cause_case = ContextCase::CONTEXT_NOT_SET; + if (death_cause != nullptr) { + death_cause_case = death_cause->context_case(); + } + return death_cause_string.at(death_cause_case); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 58371b0e6..6e8e150de 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -94,6 +94,27 @@ message TaskTableData { Task task = 1; } +message ActorDeathCause { + oneof context { + CreationTaskFailureContext creation_task_failure_context = 1; + RuntimeEnvSetupFailureContext runtime_env_setup_failure_context = 2; + } +} +// ---Actor death contexts start---- + +// Indicates that this actor is marked as DEAD due to actor creation task failure. +message CreationTaskFailureContext { + // The exception thrown in creation task. + RayException creation_task_exception = 1; +} + +// Indicates that this actor is marked as DEAD due to runtime environment setup failure. +message RuntimeEnvSetupFailureContext { + // TODO(sang,lixin) Get this error message from agent. + string error_message = 1; +} +// ---Actor death contexts end---- + message ActorTableData { // State of an actor. enum ActorState { @@ -145,9 +166,6 @@ message ActorTableData { repeated ResourceMapEntry resource_mapping = 15; // The process id of this actor. uint32 pid = 16; - // The exception thrown in creation task. This field is set if this actor died because - // of exception thrown in creation task. Only applies when state=DEAD. - RayException creation_task_exception = 18; // The actor's namespace. Named `ray_namespace` to avoid confusions when invoked in c++. string ray_namespace = 19; // The unix ms timestamp the actor was started at. @@ -161,6 +179,8 @@ message ActorTableData { // The actor's class name. This is necessary because the task spec's lifetime // is shorter than the ActorTableData. string class_name = 23; + // Will only be set when state=DEAD. Offers detailed context of why this actor is dead. + ActorDeathCause death_cause = 24; } message ErrorTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2b744c89a..00eaaf893 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1207,9 +1207,9 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & cluster_task_manager_->ScheduleAndDispatchTasks(); } -void NodeManager::DisconnectClient( - const std::shared_ptr &client, rpc::WorkerExitType disconnect_type, - const std::shared_ptr &creation_task_exception) { +void NodeManager::DisconnectClient(const std::shared_ptr &client, + rpc::WorkerExitType disconnect_type, + const rpc::RayException *creation_task_exception) { RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type << ", has creation task exception = " << (creation_task_exception != nullptr); @@ -1337,13 +1337,13 @@ void NodeManager::ProcessDisconnectClientMessage( const flatbuffers::Vector *exception_pb = message->creation_task_exception_pb(); - std::shared_ptr creation_task_exception = nullptr; + std::unique_ptr creation_task_exception = nullptr; if (exception_pb != nullptr) { - creation_task_exception = std::make_shared(); + creation_task_exception = std::make_unique(); creation_task_exception->ParseFromString(std::string( reinterpret_cast(exception_pb->data()), exception_pb->size())); } - DisconnectClient(client, disconnect_type, creation_task_exception); + DisconnectClient(client, disconnect_type, creation_task_exception.get()); } void NodeManager::ProcessFetchOrReconstructMessage( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 44457c081..79810c690 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -605,7 +605,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void DisconnectClient( const std::shared_ptr &client, rpc::WorkerExitType disconnect_type = rpc::WorkerExitType::SYSTEM_ERROR_EXIT, - const std::shared_ptr &creation_task_exception = nullptr); + const rpc::RayException *creation_task_exception = nullptr); /// ID of this node. NodeID self_node_id_;