diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 296db7f7e..33c2769e2 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -68,14 +68,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe absl::MutexLock lock(&mu_); auto queue = client_queues_.find(task_spec.ActorId()); RAY_CHECK(queue != client_queues_.end()); - if (queue->second.state == rpc::ActorTableData::DEAD) { - task_finisher_->MarkTaskCanceled(task_spec.TaskId()); - auto status = Status::IOError("cancelling all pending tasks of dead actor"); - // No need to increment the number of completed tasks since the actor is - // dead. - RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(), - rpc::ErrorType::ACTOR_DIED, &status)); - } else { + if (queue->second.state != rpc::ActorTableData::DEAD) { // We must fix the send order prior to resolving dependencies, which may // complete out of order. This ensures that we will not deadlock due to // backpressure. The receiving actor will execute the tasks according to @@ -90,6 +83,8 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe if (task_queued) { const auto actor_id = task_spec.ActorId(); + // We must release the lock before resolving the task dependencies since + // the callback may get called in the same call stack. resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id]() { absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); @@ -102,6 +97,14 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe SendPendingTasks(actor_id); } }); + } else { + // Do not hold the lock while calling into task_finisher_. + task_finisher_->MarkTaskCanceled(task_spec.TaskId()); + auto status = Status::IOError("cancelling all pending tasks of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(), + rpc::ErrorType::ACTOR_DIED, &status)); } // If the task submission subsequently fails, then the client will receive