[Part 1] Improve RayActorDeadError: Refactoring (#20458)

This is the first step to improve `RayActorError` which doesn't provide any information to the user.

In the first step, we re-define ambiguous / confusing APIs and code path. 

1. Change the name of APIs that expose too less information
- MarkPendingTaskFailed -> MarkPendingTaskObjectFailed (API too general compared to what it does)
- PendingTaskFailed  -> FailOrRetryPendingTask (API name doesn't make much sense compared to its behavior).

2. Change the name of arguments that expose too much impl detail
- immediately_mark_object_fail -> mark_task_object_failed (no need to specify "immediately")

3. Move msgpack serialization to a util function instead of embedding it to the task manager function.
This commit is contained in:
SangBin Cho 2021-11-24 22:11:33 +09:00 committed by GitHub
parent a8d7897a56
commit e310f6c76f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 160 additions and 112 deletions

View file

@ -23,17 +23,17 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
(const TaskID &task_id, const rpc::PushTaskReply &reply,
const rpc::Address &actor_addr),
(override));
MOCK_METHOD(bool, PendingTaskFailed,
MOCK_METHOD(bool, FailOrRetryPendingTask,
(const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const rpc::RayException *creation_task_exception,
bool immediately_mark_object_fail),
bool mark_task_object_failed),
(override));
MOCK_METHOD(void, OnTaskDependenciesInlined,
(const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids),
(override));
MOCK_METHOD(bool, MarkTaskCanceled, (const TaskID &task_id), (override));
MOCK_METHOD(void, MarkPendingTaskFailed,
MOCK_METHOD(void, MarkTaskReturnObjectsFailed,
(const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception),
(override));

View file

@ -16,7 +16,6 @@
#include <cstdint>
#include <cstdio>
#include <functional>
#include <memory>
@ -112,9 +111,9 @@ class LocalMemoryBuffer : public Buffer {
/// Pointer to the data.
uint8_t *data_;
/// Size of the buffer.
size_t size_;
size_t size_ = 0;
/// Whether this buffer holds a copy of data.
bool has_data_copy_;
bool has_data_copy_ = false;
/// This is only valid when `should_copy` is true.
uint8_t *buffer_ = NULL;
};

View file

@ -20,6 +20,62 @@
#include "ray/common/constants.h"
#include "ray/util/util.h"
namespace {
///
/// Serialize the protobuf message to msg pack.
///
/// Ray uses Msgpack for cross-language object serialization.
/// This method creates a msgpack serialized buffer that contains
/// serialized protobuf message.
///
/// Language frontend can deseiralize this object to obtain
/// data stored in a given protobuf. Check `serialization.py` to see
/// how this works.
///
/// NOTE: The function guarantees that the returned buffer contains data.
///
/// \param protobuf_message The protobuf message to serialize.
/// \return The buffer that contains serialized msgpack message.
template <class ProtobufMessage>
std::unique_ptr<ray::LocalMemoryBuffer> SerializePBToMsgPack(
const ProtobufMessage *protobuf_message) {
RAY_CHECK(protobuf_message != nullptr);
// Structure of bytes stored in object store:
// First serialize RayException by the following steps:
// PB's RayException
// --(PB Serialization)-->
// --(msgpack Serialization)-->
// msgpack_serialized_exception(MSE)
// Then add it's length to the head(for coross-language deserialization):
// [MSE's length(9 bytes)] [MSE]
std::string pb_serialized_exception;
protobuf_message->SerializeToString(&pb_serialized_exception);
msgpack::sbuffer msgpack_serialized_exception;
msgpack::packer<msgpack::sbuffer> packer(msgpack_serialized_exception);
packer.pack_bin(pb_serialized_exception.size());
packer.pack_bin_body(pb_serialized_exception.data(), pb_serialized_exception.size());
std::unique_ptr<ray::LocalMemoryBuffer> final_buffer =
std::make_unique<ray::LocalMemoryBuffer>(msgpack_serialized_exception.size() +
kMessagePackOffset);
// copy msgpack-serialized bytes
std::memcpy(final_buffer->Data() + kMessagePackOffset,
msgpack_serialized_exception.data(), msgpack_serialized_exception.size());
// copy offset
msgpack::sbuffer msgpack_int;
msgpack::pack(msgpack_int, msgpack_serialized_exception.size());
std::memcpy(final_buffer->Data(), msgpack_int.data(), msgpack_int.size());
RAY_CHECK(final_buffer->Data() != nullptr);
RAY_CHECK(final_buffer->Size() != 0);
return final_buffer;
}
} // namespace
namespace ray {
namespace core {
@ -380,10 +436,10 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) {
}
}
bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception,
bool immediately_mark_object_fail) {
bool TaskManager::FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception,
bool mark_task_object_failed) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(DEBUG) << "Task " << task_id << " failed with error "
@ -430,8 +486,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
// objects.
RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(),
ReferenceCounter::ReferenceTableProto());
if (immediately_mark_object_fail) {
MarkPendingTaskFailed(spec, error_type, creation_task_exception);
if (mark_task_object_failed) {
MarkTaskReturnObjectsFailed(spec, error_type, creation_task_exception);
}
}
@ -555,7 +611,7 @@ bool TaskManager::MarkTaskCanceled(const TaskID &task_id) {
return it != submissible_tasks_.end();
}
void TaskManager::MarkPendingTaskFailed(
void TaskManager::MarkTaskReturnObjectsFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception) {
const TaskID task_id = spec.TaskId();
@ -565,36 +621,10 @@ void TaskManager::MarkPendingTaskFailed(
for (int i = 0; i < num_returns; i++) {
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
if (creation_task_exception != nullptr) {
// Structure of bytes stored in object store:
// First serialize RayException by the following steps:
// PB's RayException
// --(PB Serialization)-->
// --(msgpack Serialization)-->
// msgpack_serialized_exception(MSE)
// Then add it's length to the head(for coross-language deserialization):
// [MSE's length(9 bytes)] [MSE]
std::string pb_serialized_exception;
creation_task_exception->SerializeToString(&pb_serialized_exception);
msgpack::sbuffer msgpack_serialized_exception;
msgpack::packer<msgpack::sbuffer> packer(msgpack_serialized_exception);
packer.pack_bin(pb_serialized_exception.size());
packer.pack_bin_body(pb_serialized_exception.data(),
pb_serialized_exception.size());
LocalMemoryBuffer final_buffer(msgpack_serialized_exception.size() +
kMessagePackOffset);
// copy msgpack-serialized bytes
std::memcpy(final_buffer.Data() + kMessagePackOffset,
msgpack_serialized_exception.data(),
msgpack_serialized_exception.size());
// copy offset
msgpack::sbuffer msgpack_int;
msgpack::pack(msgpack_int, msgpack_serialized_exception.size());
std::memcpy(final_buffer.Data(), msgpack_int.data(), msgpack_int.size());
const auto final_buffer =
SerializePBToMsgPack<rpc::RayException>(creation_task_exception);
RAY_UNUSED(in_memory_store_->Put(
RayObject(error_type, final_buffer.Data(), final_buffer.Size()), object_id));
RayObject(error_type, final_buffer->Data(), final_buffer->Size()), object_id));
} else {
RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id));
}

View file

@ -33,10 +33,10 @@ class TaskFinisherInterface {
virtual bool RetryTaskIfPossible(const TaskID &task_id) = 0;
virtual bool PendingTaskFailed(
virtual bool FailOrRetryPendingTask(
const TaskID &task_id, rpc::ErrorType error_type, const Status *status,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) = 0;
bool mark_task_object_failed = true) = 0;
virtual void OnTaskDependenciesInlined(
const std::vector<ObjectID> &inlined_dependency_ids,
@ -44,7 +44,7 @@ class TaskFinisherInterface {
virtual bool MarkTaskCanceled(const TaskID &task_id) = 0;
virtual void MarkPendingTaskFailed(
virtual void MarkTaskReturnObjectsFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception = nullptr) = 0;
@ -142,19 +142,24 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param[in] creation_task_exception If this arg is set, it means this task failed
/// because the callee actor is dead caused by an exception thrown in creation task,
/// only applies when error_type=ACTOR_DIED.
/// \param[in] immediately_mark_object_fail whether immediately mark the task
/// result object as failed.
/// \param[in] mark_task_object_failed whether or not it marks the task
/// return object as failed.
/// \return Whether the task will be retried or not.
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status = nullptr,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override;
bool FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status = nullptr,
const rpc::RayException *creation_task_exception = nullptr,
bool mark_task_object_failed = true) override;
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.
void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception =
nullptr) override LOCKS_EXCLUDED(mu_);
/// Treat a pending task's returned Ray object as failed. The lock should not be held
/// when calling this method because it may trigger callbacks in this or other classes.
///
/// \param[in] spec The TaskSpec that contains return object.
/// \param[in] error_type The error type the returned Ray object will store.
/// \param[in] creation_task_exception The serialized actor init exception.
void MarkTaskReturnObjectsFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception = nullptr) override
LOCKS_EXCLUDED(mu_);
/// A task's dependencies were inlined in the task spec. This will decrement
/// the ref count for the dependency IDs. If the dependencies contained other

View file

@ -83,9 +83,9 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) {
ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id));
actor_task_submitter->AddActorQueueIfNotExists(actor_id);
ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok());
EXPECT_CALL(*task_finisher,
PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _));
EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(
task_spec.TaskId(),
rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _));
register_cb(Status::IOError(""));
}
@ -107,7 +107,7 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) {
ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id));
actor_task_submitter->AddActorQueueIfNotExists(actor_id);
ASSERT_TRUE(actor_task_submitter->SubmitTask(task_spec).ok());
EXPECT_CALL(*task_finisher, PendingTaskFailed(_, _, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _)).Times(0);
register_cb(Status::OK());
}

View file

@ -140,7 +140,7 @@ TEST_P(DirectActorSubmitterTest, TestSubmitTask) {
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _))
.Times(worker_client_->callbacks.size());
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0);
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask());
}
@ -289,16 +289,18 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) {
ASSERT_EQ(worker_client_->callbacks.size(), 1);
// Simulate the actor dying. All in-flight tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task1.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task1.TaskId(), _, _, _, _))
.Times(1);
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0);
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
}
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
// Actor marked as dead. All queued tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/true);
}
@ -324,8 +326,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) {
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(1);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _))
.Times(1);
EXPECT_CALL(*task_finisher_, CompletePendingTask(task4.TaskId(), _, _)).Times(1);
// First task finishes. Second task fails.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
@ -376,10 +380,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) {
// All tasks will eventually finish.
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(4);
// Tasks 2 and 3 will be retried.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _))
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1)
.WillRepeatedly(Return(true));
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _))
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _))
.Times(1)
.WillRepeatedly(Return(true));
// First task finishes. Second task fails.
@ -435,7 +439,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(3);
// Tasks 2 will be retried
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _))
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1)
.WillRepeatedly(Return(true));
// First task finishes. Second task hang. Third task finishes.
@ -526,7 +530,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_EQ(num_clients_connected_, 2);
// The actor dies permanently. All tasks are failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/true);
ASSERT_EQ(num_clients_connected_, 2);
@ -537,7 +542,8 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 4);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _))
.Times(1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
}
@ -567,16 +573,20 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) {
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
// Actor failed, but the task replies are delayed (or in some scenarios, lost).
// We should still be able to fail the inflight tasks.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(1);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _))
.Times(1);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
// The task replies are now received. Since the tasks are already failed, they will not
// be marked as failed or finished again.
EXPECT_CALL(*task_finisher_, CompletePendingTask(task2.TaskId(), _, _)).Times(0);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _))
.Times(0);
EXPECT_CALL(*task_finisher_, CompletePendingTask(task3.TaskId(), _, _)).Times(0);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task3.TaskId(), _, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _))
.Times(0);
// Task 2 replied with OK.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// Task 3 replied with error.

View file

@ -79,8 +79,8 @@ TEST_F(DirectTaskTransportTest, ActorCreationFail) {
auto task_spec = GetCreatingTaskSpec(actor_id);
EXPECT_CALL(*task_finisher, CompletePendingTask(_, _, _)).Times(0);
EXPECT_CALL(*task_finisher,
PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_CREATION_FAILED,
_, _, true));
FailOrRetryPendingTask(task_spec.TaskId(),
rpc::ErrorType::ACTOR_CREATION_FAILED, _, _, true));
rpc::ClientCallback<rpc::CreateActorReply> create_cb;
EXPECT_CALL(*actor_creator, AsyncCreateActor(task_spec, _))
.WillOnce(DoAll(SaveArg<1>(&create_cb), Return(Status::OK())));

View file

@ -120,10 +120,10 @@ class MockTaskFinisher : public TaskFinisherInterface {
return false;
}
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception = nullptr,
bool immediately_mark_object_fail = true) override {
bool FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type,
const Status *status,
const rpc::RayException *creation_task_exception = nullptr,
bool mark_task_object_failed = true) override {
num_tasks_failed++;
return true;
}
@ -134,7 +134,7 @@ class MockTaskFinisher : public TaskFinisherInterface {
num_contained_ids += contained_ids.size();
}
void MarkPendingTaskFailed(
void MarkTaskReturnObjectsFailed(
const TaskSpecification &spec, rpc::ErrorType error_type,
const rpc::RayException *creation_task_exception = nullptr) override {}

View file

@ -145,7 +145,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) {
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
auto error = rpc::ErrorType::WORKER_DIED;
manager_.PendingTaskFailed(spec.TaskId(), error);
manager_.FailOrRetryPendingTask(spec.TaskId(), error);
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
// Only the return object reference should remain.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
@ -210,7 +210,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) {
auto error = rpc::ErrorType::WORKER_DIED;
for (int i = 0; i < num_retries; i++) {
RAY_LOG(INFO) << "Retry " << i;
manager_.PendingTaskFailed(spec.TaskId(), error);
manager_.FailOrRetryPendingTask(spec.TaskId(), error);
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -219,7 +219,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) {
ASSERT_EQ(num_retries_, i + 1);
}
manager_.PendingTaskFailed(spec.TaskId(), error);
manager_.FailOrRetryPendingTask(spec.TaskId(), error);
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
// Only the return object reference should remain.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
@ -253,7 +253,7 @@ TEST_F(TaskManagerTest, TestTaskKill) {
manager_.MarkTaskCanceled(spec.TaskId());
auto error = rpc::ErrorType::TASK_CANCELLED;
manager_.PendingTaskFailed(spec.TaskId(), error);
manager_.FailOrRetryPendingTask(spec.TaskId(), error);
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results));

View file

@ -101,7 +101,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
} else {
auto task_id = actor_submit_queue->Get(send_pos).first.TaskId();
actor_submit_queue->MarkDependencyFailed(send_pos);
task_finisher_.PendingTaskFailed(
task_finisher_.FailOrRetryPendingTask(
task_id, rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status);
}
}
@ -121,8 +121,8 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status,
creation_task_exception));
RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status,
creation_task_exception));
}
// If the task submission subsequently fails, then the client will receive
@ -141,7 +141,7 @@ void CoreWorkerDirectActorTaskSubmitter::FailInflightTasks(
const std::unordered_map<TaskID, rpc::ClientCallback<rpc::PushTaskReply>>
&inflight_task_callbacks) {
// NOTE(kfstorm): We invoke the callbacks with a bad status to act like there's a
// network issue. We don't call `task_finisher_.PendingTaskFailed` directly because
// network issue. We don't call `task_finisher_.FailOrRetryPendingTask` directly because
// there's much more work to do in the callback.
auto status = Status::IOError("Fail all inflight tasks due to actor state change.");
rpc::PushTaskReply reply;
@ -267,8 +267,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
task_finisher_.MarkTaskCanceled(task_id);
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_.PendingTaskFailed(task_id, error_type, &status,
creation_task_exception));
RAY_UNUSED(!task_finisher_.FailOrRetryPendingTask(task_id, error_type, &status,
creation_task_exception));
}
auto &wait_for_death_info_tasks = queue->second.wait_for_death_info_tasks;
@ -276,8 +276,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
RAY_LOG(INFO) << "Failing tasks waiting for death info, size="
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_.MarkPendingTaskFailed(net_err_task.second, error_type,
creation_task_exception));
RAY_UNUSED(task_finisher_.MarkTaskReturnObjectsFailed(
net_err_task.second, error_type, creation_task_exception));
}
// No need to clean up tasks that have been sent and are waiting for
@ -304,7 +304,7 @@ void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto task_spec = deque_itr->second;
task_finisher_.MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
task_finisher_.MarkTaskReturnObjectsFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
}
}
@ -390,7 +390,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
rpc::ClientCallback<rpc::PushTaskReply> reply_callback =
[this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped](
const Status &status, const rpc::PushTaskReply &reply) {
bool increment_completed_tasks = true;
/// Whether or not we will retry this actor task.
auto will_retry = false;
if (task_skipped) {
// NOTE(simon):Increment the task counter regardless of the status because the
@ -407,19 +408,21 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
bool immediately_mark_object_fail = (queue.state == rpc::ActorTableData::DEAD);
bool will_retry = task_finisher_.PendingTaskFailed(
bool is_actor_dead = (queue.state == rpc::ActorTableData::DEAD);
// If the actor is already dead, immediately mark the task object is failed.
// Otherwise, it will have grace period until it makrs the object is dead.
will_retry = task_finisher_.FailOrRetryPendingTask(
task_id, GenErrorTypeFromDeathCause(queue.death_cause.get()), &status,
GetCreationTaskExceptionFromDeathCause(queue.death_cause.get()),
immediately_mark_object_fail);
if (will_retry) {
increment_completed_tasks = false;
} else if (!immediately_mark_object_fail) {
// put it to wait_for_death_info_tasks and wait for Death info
int64_t death_info_timeout_ts =
/*mark_task_object_failed*/ is_actor_dead);
if (!is_actor_dead && !will_retry) {
// No retry == actor is dead.
// If actor is not dead yet, wait for the grace period until we mark the
// return object as failed.
int64_t death_info_grace_period_ms =
current_time_ms() +
RayConfig::instance().timeout_ms_task_wait_for_death_info();
queue.wait_for_death_info_tasks.emplace_back(death_info_timeout_ts,
queue.wait_for_death_info_tasks.emplace_back(death_info_grace_period_ms,
task_spec);
RAY_LOG(INFO)
<< "PushActorTask failed because of network error, this task "
@ -429,7 +432,8 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(ClientQueue &queue,
}
}
if (increment_completed_tasks) {
if (!will_retry) {
// If we don't need to retry, mark the task as completed.
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());

View file

@ -26,7 +26,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
resolver_.ResolveDependencies(task_spec, [this, task_spec](Status status) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Resolving task dependencies failed " << status.ToString();
RAY_UNUSED(task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status));
return;
}
@ -52,7 +52,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
} else {
RAY_LOG(ERROR) << "Failed to create actor " << actor_id
<< " with status: " << status.ToString();
RAY_UNUSED(task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status));
}
}));
@ -106,7 +106,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
}
}
if (!keep_executing) {
RAY_UNUSED(task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr));
}
});
@ -553,7 +553,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
auto &task_queue = scheduling_key_entry.task_queue;
while (!task_queue.empty()) {
auto &task_spec = task_queue.front();
RAY_UNUSED(task_finisher_->MarkPendingTaskFailed(
RAY_UNUSED(task_finisher_->MarkTaskReturnObjectsFailed(
task_spec, rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED, nullptr));
task_queue.pop_front();
}
@ -696,7 +696,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// failure (e.g., by contacting the raylet). If it was a process
// failure, it may have been an application-level error and it may
// not make sense to retry the task.
RAY_UNUSED(task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_id,
is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED,
&status));
@ -738,7 +738,7 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec,
if (scheduled_tasks.empty()) {
CancelWorkerLeaseIfNeeded(scheduling_key);
}
RAY_UNUSED(task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr));
return Status::OK();
}