Fix possible deadlock in CoreWorkerDirectActorTaskSubmitter (#8973)

This commit is contained in:
Stephanie Wang 2020-06-16 15:30:15 -07:00 committed by GitHub
parent fa0a677aac
commit fa16c7666a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -68,14 +68,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(task_spec.ActorId()); auto queue = client_queues_.find(task_spec.ActorId());
RAY_CHECK(queue != client_queues_.end()); RAY_CHECK(queue != client_queues_.end());
if (queue->second.state == rpc::ActorTableData::DEAD) { if (queue->second.state != rpc::ActorTableData::DEAD) {
task_finisher_->MarkTaskCanceled(task_spec.TaskId());
auto status = Status::IOError("cancelling all pending tasks of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status));
} else {
// We must fix the send order prior to resolving dependencies, which may // We must fix the send order prior to resolving dependencies, which may
// complete out of order. This ensures that we will not deadlock due to // complete out of order. This ensures that we will not deadlock due to
// backpressure. The receiving actor will execute the tasks according to // backpressure. The receiving actor will execute the tasks according to
@ -90,6 +83,8 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
if (task_queued) { if (task_queued) {
const auto actor_id = task_spec.ActorId(); const auto actor_id = task_spec.ActorId();
// We must release the lock before resolving the task dependencies since
// the callback may get called in the same call stack.
resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id]() { resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id]() {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id); auto queue = client_queues_.find(actor_id);
@ -102,6 +97,14 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
SendPendingTasks(actor_id); SendPendingTasks(actor_id);
} }
}); });
} else {
// Do not hold the lock while calling into task_finisher_.
task_finisher_->MarkTaskCanceled(task_spec.TaskId());
auto status = Status::IOError("cancelling all pending tasks of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status));
} }
// If the task submission subsequently fails, then the client will receive // If the task submission subsequently fails, then the client will receive