Don't pass in TaskID to TaskManager::MarkPendingTaskFailed since it can (#18532)

be got from TaskSpecification
This commit is contained in:
Jiajun Yao 2021-09-13 11:27:42 -07:00 committed by GitHub
parent a0336578a9
commit f8ae2b2b62
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 13 additions and 17 deletions

View file

@ -34,8 +34,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
(override));
MOCK_METHOD(bool, MarkTaskCanceled, (const TaskID &task_id), (override));
MOCK_METHOD(void, MarkPendingTaskFailed,
(const TaskID &task_id, const TaskSpecification &spec,
rpc::ErrorType error_type,
(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception),
(override));
MOCK_METHOD(absl::optional<TaskSpecification>, GetTaskSpec, (const TaskID &task_id),

View file

@ -383,7 +383,7 @@ bool TaskManager::PendingTaskFailed(
RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(),
ReferenceCounter::ReferenceTableProto());
if (immediately_mark_object_fail) {
MarkPendingTaskFailed(task_id, spec, error_type, creation_task_exception);
MarkPendingTaskFailed(spec, error_type, creation_task_exception);
}
}
@ -492,8 +492,9 @@ bool TaskManager::MarkTaskCanceled(const TaskID &task_id) {
}
void TaskManager::MarkPendingTaskFailed(
const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type,
const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception) {
const TaskID task_id = spec.TaskId();
RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id
<< ", error_type: " << ErrorType_Name(error_type);
int64_t num_returns = spec.NumReturns();

View file

@ -45,7 +45,7 @@ class TaskFinisherInterface {
virtual bool MarkTaskCanceled(const TaskID &task_id) = 0;
virtual void MarkPendingTaskFailed(
const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type,
const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception = nullptr) = 0;
virtual absl::optional<TaskSpecification> GetTaskSpec(const TaskID &task_id) const = 0;
@ -148,10 +148,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.
void MarkPendingTaskFailed(
const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException> &creation_task_exception =
nullptr) override LOCKS_EXCLUDED(mu_);
void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException>
&creation_task_exception = nullptr) override
LOCKS_EXCLUDED(mu_);
/// A task's dependencies were inlined in the task spec. This will decrement
/// the ref count for the dependency IDs. If the dependencies contained other

View file

@ -123,8 +123,7 @@ class MockTaskFinisher : public TaskFinisherInterface {
num_contained_ids += contained_ids.size();
}
void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec,
rpc::ErrorType error_type,
void MarkPendingTaskFailed(const TaskSpecification &spec, rpc::ErrorType error_type,
const std::shared_ptr<rpc::RayException>
&creation_task_exception = nullptr) override {}

View file

@ -229,8 +229,7 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
<< wait_for_death_info_tasks.size() << ", actor_id=" << actor_id;
for (auto &net_err_task : wait_for_death_info_tasks) {
RAY_UNUSED(task_finisher_->MarkPendingTaskFailed(
net_err_task.second.TaskId(), net_err_task.second, rpc::ErrorType::ACTOR_DIED,
creation_task_exception));
net_err_task.second, rpc::ErrorType::ACTOR_DIED, creation_task_exception));
}
// No need to clean up tasks that have been sent and are waiting for
@ -253,8 +252,7 @@ void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
while (deque_itr != queue.wait_for_death_info_tasks.end() &&
/*timeout timestamp*/ deque_itr->first < current_time_ms()) {
auto task_spec = deque_itr->second;
task_finisher_->MarkPendingTaskFailed(task_spec.TaskId(), task_spec,
rpc::ErrorType::ACTOR_DIED);
task_finisher_->MarkPendingTaskFailed(task_spec, rpc::ErrorType::ACTOR_DIED);
deque_itr = queue.wait_for_death_info_tasks.erase(deque_itr);
}
}

View file

@ -517,8 +517,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
while (!task_queue.empty()) {
auto &task_spec = task_queue.front();
RAY_UNUSED(task_finisher_->MarkPendingTaskFailed(
task_spec.TaskId(), task_spec, rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED,
nullptr));
task_spec, rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED, nullptr));
task_queue.pop_front();
}
if (scheduling_key_entry.CanDelete()) {