diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e3c0730c6..b394702b2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1398,37 +1398,35 @@ void NodeManager::ProcessDisconnectClientMessage( RAY_CHECK(!(is_worker && is_driver)); // If the client has any blocked tasks, mark them as unblocked. In // particular, we are no longer waiting for their dependencies. - if (worker) { - if (is_worker && worker->IsDead()) { - // If the worker was killed by us because the driver exited, - // treat it as intentionally disconnected. - intentional_disconnect = true; - // Don't need to unblock the client if it's a worker and is already dead. - // Because in this case, its task is already cleaned up. - RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead."; - } else { - // Clean up any open ray.get calls that the worker made. - while (!worker->GetBlockedTaskIds().empty()) { - // NOTE(swang): AsyncResolveObjectsFinish will modify the worker, so it is - // not safe to pass in the iterator directly. - const TaskID task_id = *worker->GetBlockedTaskIds().begin(); - AsyncResolveObjectsFinish(client, task_id, true); - } - // Clean up any open ray.wait calls that the worker made. - task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); + if (is_worker && worker->IsDead()) { + // If the worker was killed by us because the driver exited, + // treat it as intentionally disconnected. + intentional_disconnect = true; + // Don't need to unblock the client if it's a worker and is already dead. + // Because in this case, its task is already cleaned up. + RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead."; + } else { + // Clean up any open ray.get calls that the worker made. + while (!worker->GetBlockedTaskIds().empty()) { + // NOTE(swang): AsyncResolveObjectsFinish will modify the worker, so it is + // not safe to pass in the iterator directly. + const TaskID task_id = *worker->GetBlockedTaskIds().begin(); + AsyncResolveObjectsFinish(client, task_id, true); } - - // Erase any lease metadata. - leased_workers_.erase(worker->WorkerId()); - - // Publish the worker failure. - auto worker_failure_data_ptr = gcs::CreateWorkerFailureData( - self_node_id_, worker->WorkerId(), worker->IpAddress(), worker->Port(), - time(nullptr), intentional_disconnect); - RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, - nullptr)); + // Clean up any open ray.wait calls that the worker made. + task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); } + // Erase any lease metadata. + leased_workers_.erase(worker->WorkerId()); + + // Publish the worker failure. + auto worker_failure_data_ptr = + gcs::CreateWorkerFailureData(self_node_id_, worker->WorkerId(), worker->IpAddress(), + worker->Port(), time(nullptr), intentional_disconnect); + RAY_CHECK_OK( + gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); + if (is_worker) { const ActorID &actor_id = worker->GetActorId(); const TaskID &task_id = worker->GetAssignedTaskId();