From e310f6c76f33bb2a54feb8ec2db069da23aaf2d0 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 24 Nov 2021 22:11:33 +0900 Subject: [PATCH] [Part 1] Improve RayActorDeadError: Refactoring (#20458) This is the first step to improve `RayActorError` which doesn't provide any information to the user. In the first step, we re-define ambiguous / confusing APIs and code path. 1. Change the name of APIs that expose too less information - MarkPendingTaskFailed -> MarkPendingTaskObjectFailed (API too general compared to what it does) - PendingTaskFailed -> FailOrRetryPendingTask (API name doesn't make much sense compared to its behavior). 2. Change the name of arguments that expose too much impl detail - immediately_mark_object_fail -> mark_task_object_failed (no need to specify "immediately") 3. Move msgpack serialization to a util function instead of embedding it to the task manager function. --- src/mock/ray/core_worker/task_manager.h | 6 +- src/ray/common/buffer.h | 5 +- src/ray/core_worker/task_manager.cc | 102 +++++++++++------- src/ray/core_worker/task_manager.h | 33 +++--- .../test/direct_actor_transport_mock_test.cc | 8 +- .../test/direct_actor_transport_test.cc | 40 ++++--- .../test/direct_task_transport_mock_test.cc | 4 +- .../test/direct_task_transport_test.cc | 10 +- src/ray/core_worker/test/task_manager_test.cc | 8 +- .../transport/direct_actor_task_submitter.cc | 44 ++++---- .../transport/direct_task_transport.cc | 12 +-- 11 files changed, 160 insertions(+), 112 deletions(-) diff --git a/src/mock/ray/core_worker/task_manager.h b/src/mock/ray/core_worker/task_manager.h index 26567f3fc..0d3657f8a 100644 --- a/src/mock/ray/core_worker/task_manager.h +++ b/src/mock/ray/core_worker/task_manager.h @@ -23,17 +23,17 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { (const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &actor_addr), (override)); - MOCK_METHOD(bool, PendingTaskFailed, + MOCK_METHOD(bool, FailOrRetryPendingTask, (const TaskID &task_id, rpc::ErrorType error_type, const Status *status, const rpc::RayException *creation_task_exception, - bool immediately_mark_object_fail), + bool mark_task_object_failed), (override)); MOCK_METHOD(void, OnTaskDependenciesInlined, (const std::vector &inlined_dependency_ids, const std::vector &contained_ids), (override)); MOCK_METHOD(bool, MarkTaskCanceled, (const TaskID &task_id), (override)); - MOCK_METHOD(void, MarkPendingTaskFailed, + MOCK_METHOD(void, MarkTaskReturnObjectsFailed, (const TaskSpecification &spec, rpc::ErrorType error_type, const rpc::RayException *creation_task_exception), (override)); diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index 6dc25033a..e75b1ebb9 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -16,7 +16,6 @@ #include #include - #include #include @@ -112,9 +111,9 @@ class LocalMemoryBuffer : public Buffer { /// Pointer to the data. uint8_t *data_; /// Size of the buffer. - size_t size_; + size_t size_ = 0; /// Whether this buffer holds a copy of data. - bool has_data_copy_; + bool has_data_copy_ = false; /// This is only valid when `should_copy` is true. uint8_t *buffer_ = NULL; }; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 0397bc54e..2f81919f1 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -20,6 +20,62 @@ #include "ray/common/constants.h" #include "ray/util/util.h" +namespace { + +/// +/// Serialize the protobuf message to msg pack. +/// +/// Ray uses Msgpack for cross-language object serialization. +/// This method creates a msgpack serialized buffer that contains +/// serialized protobuf message. +/// +/// Language frontend can deseiralize this object to obtain +/// data stored in a given protobuf. Check `serialization.py` to see +/// how this works. +/// +/// NOTE: The function guarantees that the returned buffer contains data. +/// +/// \param protobuf_message The protobuf message to serialize. +/// \return The buffer that contains serialized msgpack message. +template +std::unique_ptr SerializePBToMsgPack( + const ProtobufMessage *protobuf_message) { + RAY_CHECK(protobuf_message != nullptr); + // Structure of bytes stored in object store: + + // First serialize RayException by the following steps: + // PB's RayException + // --(PB Serialization)--> + // --(msgpack Serialization)--> + // msgpack_serialized_exception(MSE) + + // Then add it's length to the head(for coross-language deserialization): + // [MSE's length(9 bytes)] [MSE] + + std::string pb_serialized_exception; + protobuf_message->SerializeToString(&pb_serialized_exception); + msgpack::sbuffer msgpack_serialized_exception; + msgpack::packer packer(msgpack_serialized_exception); + packer.pack_bin(pb_serialized_exception.size()); + packer.pack_bin_body(pb_serialized_exception.data(), pb_serialized_exception.size()); + std::unique_ptr final_buffer = + std::make_unique(msgpack_serialized_exception.size() + + kMessagePackOffset); + // copy msgpack-serialized bytes + std::memcpy(final_buffer->Data() + kMessagePackOffset, + msgpack_serialized_exception.data(), msgpack_serialized_exception.size()); + // copy offset + msgpack::sbuffer msgpack_int; + msgpack::pack(msgpack_int, msgpack_serialized_exception.size()); + std::memcpy(final_buffer->Data(), msgpack_int.data(), msgpack_int.size()); + RAY_CHECK(final_buffer->Data() != nullptr); + RAY_CHECK(final_buffer->Size() != 0); + + return final_buffer; +} + +} // namespace + namespace ray { namespace core { @@ -380,10 +436,10 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { } } -bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, - const Status *status, - const rpc::RayException *creation_task_exception, - bool immediately_mark_object_fail) { +bool TaskManager::FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status, + const rpc::RayException *creation_task_exception, + bool mark_task_object_failed) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. RAY_LOG(DEBUG) << "Task " << task_id << " failed with error " @@ -430,8 +486,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ // objects. RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(), ReferenceCounter::ReferenceTableProto()); - if (immediately_mark_object_fail) { - MarkPendingTaskFailed(spec, error_type, creation_task_exception); + if (mark_task_object_failed) { + MarkTaskReturnObjectsFailed(spec, error_type, creation_task_exception); } } @@ -555,7 +611,7 @@ bool TaskManager::MarkTaskCanceled(const TaskID &task_id) { return it != submissible_tasks_.end(); } -void TaskManager::MarkPendingTaskFailed( +void TaskManager::MarkTaskReturnObjectsFailed( const TaskSpecification &spec, rpc::ErrorType error_type, const rpc::RayException *creation_task_exception) { const TaskID task_id = spec.TaskId(); @@ -565,36 +621,10 @@ void TaskManager::MarkPendingTaskFailed( for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); if (creation_task_exception != nullptr) { - // Structure of bytes stored in object store: - - // First serialize RayException by the following steps: - // PB's RayException - // --(PB Serialization)--> - // --(msgpack Serialization)--> - // msgpack_serialized_exception(MSE) - - // Then add it's length to the head(for coross-language deserialization): - // [MSE's length(9 bytes)] [MSE] - - std::string pb_serialized_exception; - creation_task_exception->SerializeToString(&pb_serialized_exception); - msgpack::sbuffer msgpack_serialized_exception; - msgpack::packer packer(msgpack_serialized_exception); - packer.pack_bin(pb_serialized_exception.size()); - packer.pack_bin_body(pb_serialized_exception.data(), - pb_serialized_exception.size()); - LocalMemoryBuffer final_buffer(msgpack_serialized_exception.size() + - kMessagePackOffset); - // copy msgpack-serialized bytes - std::memcpy(final_buffer.Data() + kMessagePackOffset, - msgpack_serialized_exception.data(), - msgpack_serialized_exception.size()); - // copy offset - msgpack::sbuffer msgpack_int; - msgpack::pack(msgpack_int, msgpack_serialized_exception.size()); - std::memcpy(final_buffer.Data(), msgpack_int.data(), msgpack_int.size()); + const auto final_buffer = + SerializePBToMsgPack(creation_task_exception); RAY_UNUSED(in_memory_store_->Put( - RayObject(error_type, final_buffer.Data(), final_buffer.Size()), object_id)); + RayObject(error_type, final_buffer->Data(), final_buffer->Size()), object_id)); } else { RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id)); } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 276fd0c32..7f7ef4ee4 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -33,10 +33,10 @@ class TaskFinisherInterface { virtual bool RetryTaskIfPossible(const TaskID &task_id) = 0; - virtual bool PendingTaskFailed( + virtual bool FailOrRetryPendingTask( const TaskID &task_id, rpc::ErrorType error_type, const Status *status, const rpc::RayException *creation_task_exception = nullptr, - bool immediately_mark_object_fail = true) = 0; + bool mark_task_object_failed = true) = 0; virtual void OnTaskDependenciesInlined( const std::vector &inlined_dependency_ids, @@ -44,7 +44,7 @@ class TaskFinisherInterface { virtual bool MarkTaskCanceled(const TaskID &task_id) = 0; - virtual void MarkPendingTaskFailed( + virtual void MarkTaskReturnObjectsFailed( const TaskSpecification &spec, rpc::ErrorType error_type, const rpc::RayException *creation_task_exception = nullptr) = 0; @@ -142,19 +142,24 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param[in] creation_task_exception If this arg is set, it means this task failed /// because the callee actor is dead caused by an exception thrown in creation task, /// only applies when error_type=ACTOR_DIED. - /// \param[in] immediately_mark_object_fail whether immediately mark the task - /// result object as failed. + /// \param[in] mark_task_object_failed whether or not it marks the task + /// return object as failed. /// \return Whether the task will be retried or not. - bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, - const Status *status = nullptr, - const rpc::RayException *creation_task_exception = nullptr, - bool immediately_mark_object_fail = true) override; + bool FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status = nullptr, + const rpc::RayException *creation_task_exception = nullptr, + bool mark_task_object_failed = true) override; - /// Treat a pending task as failed. The lock should not be held when calling - /// this method because it may trigger callbacks in this or other classes. - void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type, - const rpc::RayException *creation_task_exception = - nullptr) override LOCKS_EXCLUDED(mu_); + /// Treat a pending task's returned Ray object as failed. The lock should not be held + /// when calling this method because it may trigger callbacks in this or other classes. + /// + /// \param[in] spec The TaskSpec that contains return object. + /// \param[in] error_type The error type the returned Ray object will store. + /// \param[in] creation_task_exception The serialized actor init exception. + void MarkTaskReturnObjectsFailed( + const TaskSpecification &spec, rpc::ErrorType error_type, + const rpc::RayException *creation_task_exception = nullptr) override + LOCKS_EXCLUDED(mu_); /// A task's dependencies were inlined in the task spec. This will decrement /// the ref count for the dependency IDs. If the dependencies contained other diff --git a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc index 28a4e28f1..a6b02692f 100644 --- a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc @@ -83,9 +83,9 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) { ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); actor_task_submitter->AddActorQueueIfNotExists(actor_id); ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok()); - EXPECT_CALL(*task_finisher, - PendingTaskFailed(task_spec.TaskId(), - rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _)); + EXPECT_CALL(*task_finisher, FailOrRetryPendingTask( + task_spec.TaskId(), + rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _)); register_cb(Status::IOError("")); } @@ -107,7 +107,7 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) { ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); actor_task_submitter->AddActorQueueIfNotExists(actor_id); ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok()); - EXPECT_CALL(*task_finisher, PendingTaskFailed(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); register_cb(Status::OK()); } diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 0cdc30458..ad9e4a745 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -140,7 +140,7 @@ TEST_P(DirectActorSubmitterTest, TestSubmitTask) { EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)) .Times(worker_client_->callbacks.size()); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask()); } @@ -289,16 +289,18 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) { ASSERT_EQ(worker_client_->callbacks.size(), 1); // Simulate the actor dying. All in-flight tasks should get failed. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task1.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task1.TaskId(), _, _, _, _)) + .Times(1); EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); } - EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); submitter_.DisconnectActor(actor_id, 1, /*dead=*/false); // Actor marked as dead. All queued tasks should get failed. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + .Times(1); submitter_.DisconnectActor(actor_id, 2, /*dead=*/true); } @@ -324,8 +326,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) { ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + .Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + .Times(1); EXPECT_CALL(*task_finisher_, CompletePendingTask(task4.TaskId(), _, _)).Times(1); // First task finishes. Second task fails. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); @@ -376,10 +380,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) { // All tasks will eventually finish. EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(4); // Tasks 2 and 3 will be retried. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); // First task finishes. Second task fails. @@ -435,7 +439,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(3); // Tasks 2 will be retried - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); // First task finishes. Second task hang. Third task finishes. @@ -526,7 +530,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // The actor dies permanently. All tasks are failed. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _)) + .Times(1); submitter_.DisconnectActor(actor_id, 3, /*dead=*/true); ASSERT_EQ(num_clients_connected_, 2); @@ -537,7 +542,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 4); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _)) + .Times(1); ASSERT_TRUE(submitter_.SubmitTask(task).ok()); } @@ -567,16 +573,20 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); // Actor failed, but the task replies are delayed (or in some scenarios, lost). // We should still be able to fail the inflight tasks. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + .Times(1); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + .Times(1); submitter_.DisconnectActor(actor_id, 1, /*dead=*/false); // The task replies are now received. Since the tasks are already failed, they will not // be marked as failed or finished again. EXPECT_CALL(*task_finisher_, CompletePendingTask(task2.TaskId(), _, _)).Times(0); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + .Times(0); EXPECT_CALL(*task_finisher_, CompletePendingTask(task3.TaskId(), _, _)).Times(0); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + .Times(0); // Task 2 replied with OK. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); // Task 3 replied with error. diff --git a/src/ray/core_worker/test/direct_task_transport_mock_test.cc b/src/ray/core_worker/test/direct_task_transport_mock_test.cc index 75fc08822..e7bfd83be 100644 --- a/src/ray/core_worker/test/direct_task_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_mock_test.cc @@ -79,8 +79,8 @@ TEST_F(DirectTaskTransportTest, ActorCreationFail) { auto task_spec = GetCreatingTaskSpec(actor_id); EXPECT_CALL(*task_finisher, CompletePendingTask(_, _, _)).Times(0); EXPECT_CALL(*task_finisher, - PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_CREATION_FAILED, - _, _, true)); + FailOrRetryPendingTask(task_spec.TaskId(), + rpc::ErrorType::ACTOR_CREATION_FAILED, _, _, true)); rpc::ClientCallback create_cb; EXPECT_CALL(*actor_creator, AsyncCreateActor(task_spec, _)) .WillOnce(DoAll(SaveArg<1>(&create_cb), Return(Status::OK()))); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 7a3ced76b..f0906925c 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -120,10 +120,10 @@ class MockTaskFinisher : public TaskFinisherInterface { return false; } - bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, - const Status *status, - const rpc::RayException *creation_task_exception = nullptr, - bool immediately_mark_object_fail = true) override { + bool FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type, + const Status *status, + const rpc::RayException *creation_task_exception = nullptr, + bool mark_task_object_failed = true) override { num_tasks_failed++; return true; } @@ -134,7 +134,7 @@ class MockTaskFinisher : public TaskFinisherInterface { num_contained_ids += contained_ids.size(); } - void MarkPendingTaskFailed( + void MarkTaskReturnObjectsFailed( const TaskSpecification &spec, rpc::ErrorType error_type, const rpc::RayException *creation_task_exception = nullptr) override {} diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index af0ce6549..1692e80c5 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -145,7 +145,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) { ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); auto error = rpc::ErrorType::WORKER_DIED; - manager_.PendingTaskFailed(spec.TaskId(), error); + manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); // Only the return object reference should remain. ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); @@ -210,7 +210,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { auto error = rpc::ErrorType::WORKER_DIED; for (int i = 0; i < num_retries; i++) { RAY_LOG(INFO) << "Retry " << i; - manager_.PendingTaskFailed(spec.TaskId(), error); + manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -219,7 +219,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { ASSERT_EQ(num_retries_, i + 1); } - manager_.PendingTaskFailed(spec.TaskId(), error); + manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); // Only the return object reference should remain. ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); @@ -253,7 +253,7 @@ TEST_F(TaskManagerTest, TestTaskKill) { manager_.MarkTaskCanceled(spec.TaskId()); auto error = rpc::ErrorType::TASK_CANCELLED; - manager_.PendingTaskFailed(spec.TaskId(), error); + manager_.FailOrRetryPendingTask(spec.TaskId(), error); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); std::vector> results; RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results)); diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 72174cb81..4e7b33d06 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -101,7 +101,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe } else { auto task_id = actor_submit_queue->Get(send_pos).first.TaskId(); actor_submit_queue->MarkDependencyFailed(send_pos); - task_finisher_.PendingTaskFailed( + task_finisher_.FailOrRetryPendingTask( task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status); } } @@ -121,8 +121,8 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe auto status = Status::IOError("cancelling task of dead actor"); // No need to increment the number of completed tasks since the actor is // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status, - creation_task_exception)); + RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status, + creation_task_exception)); } // If the task submission subsequently fails, then the client will receive @@ -141,7 +141,7 @@ void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks( const std::unordered_map> &inflight_task_callbacks) { // NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a - // network issue. We don't call `task_finisher_.PendingTaskFailed` directly because + // network issue. We don't call `task_finisher_.FailOrRetryPendingTask` directly because // there's much more work to do in the callback. auto status = Status::IOError("Fail all inflight tasks due to actor state change."); rpc::PushTaskReply reply; @@ -267,8 +267,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( task_finisher_.MarkTaskCanceled(task_id); // No need to increment the number of completed tasks since the actor is // dead. - RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status, - creation_task_exception)); + RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status, + creation_task_exception)); } auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks; @@ -276,8 +276,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor( RAY_LOG(INFO) << "Failing tasks waiting for death info, size=" << wait_for_death_info_tasks.size() << ", actor_id=" << actor_id; for (auto &net_err_task : wait_for_death_info_tasks) { - RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(net_err_task.second, error_type, - creation_task_exception)); + RAY_UNUSED(task_finisher_.MarkTaskReturnObjectsFailed( + net_err_task.second, error_type, creation_task_exception)); } // No need to clean up tasks that have been sent and are waiting for @@ -304,7 +304,7 @@ void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() { while (deque_itr != queue.wait_for_death_info_tasks.end() && /*timeout timestamp*/ deque_itr->first < current_time_ms()) { auto task_spec = deque_itr->second; - task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED); + task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED); deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr); } } @@ -390,7 +390,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, rpc::ClientCallback reply_callback = [this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped]( const Status &status, const rpc::PushTaskReply &reply) { - bool increment_completed_tasks = true; + /// Whether or not we will retry this actor task. + auto will_retry = false; if (task_skipped) { // NOTE(simon):Increment the task counter regardless of the status because the @@ -407,19 +408,21 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, RAY_CHECK(queue_pair != client_queues_.end()); auto &queue = queue_pair->second; - bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD); - bool will_retry = task_finisher_.PendingTaskFailed( + bool is_actor_dead = (queue.state == rpc::ActorTableData::DEAD); + // If the actor is already dead, immediately mark the task object is failed. + // Otherwise, it will have grace period until it makrs the object is dead. + will_retry = task_finisher_.FailOrRetryPendingTask( task_id, GenErrorTypeFromDeathCause(queue.death_cause.get()), &status, GetCreationTaskExceptionFromDeathCause(queue.death_cause.get()), - immediately_mark_object_fail); - if (will_retry) { - increment_completed_tasks = false; - } else if (!immediately_mark_object_fail) { - // put it to wait_for_death_info_tasks and wait for Death info - int64_t death_info_timeout_ts = + /*mark_task_object_failed*/ is_actor_dead); + if (!is_actor_dead && !will_retry) { + // No retry == actor is dead. + // If actor is not dead yet, wait for the grace period until we mark the + // return object as failed. + int64_t death_info_grace_period_ms = current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info(); - queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts, + queue.wait_for_death_info_tasks.emplace_back(death_info_grace_period_ms, task_spec); RAY_LOG(INFO) << "PushActorTask failed because of network error, this task " @@ -429,7 +432,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue, } } - if (increment_completed_tasks) { + if (!will_retry) { + // If we don't need to retry, mark the task as completed. absl::MutexLock lock(&mu_); auto queue_pair = client_queues_.find(actor_id); RAY_CHECK(queue_pair != client_queues_.end()); diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 3132fd3e7..8bc371a3b 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -26,7 +26,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { resolver_.ResolveDependencies(task_spec, [this, task_spec](Status status) { if (!status.ok()) { RAY_LOG(WARNING) << "Resolving task dependencies failed " << status.ToString(); - RAY_UNUSED(task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status)); return; } @@ -52,7 +52,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } else { RAY_LOG(ERROR) << "Failed to create actor " << actor_id << " with status: " << status.ToString(); - RAY_UNUSED(task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status)); } })); @@ -106,7 +106,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } } if (!keep_executing) { - RAY_UNUSED(task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); } }); @@ -553,7 +553,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( auto &task_queue = scheduling_key_entry.task_queue; while (!task_queue.empty()) { auto &task_spec = task_queue.front(); - RAY_UNUSED(task_finisher_->MarkPendingTaskFailed( + RAY_UNUSED(task_finisher_->MarkTaskReturnObjectsFailed( task_spec, rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED, nullptr)); task_queue.pop_front(); } @@ -696,7 +696,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // failure (e.g., by contacting the raylet). If it was a process // failure, it may have been an application-level error and it may // not make sense to retry the task. - RAY_UNUSED(task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED, &status)); @@ -738,7 +738,7 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec, if (scheduled_tasks.empty()) { CancelWorkerLeaseIfNeeded(scheduling_key); } - RAY_UNUSED(task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); return Status::OK(); }