mirror of
https://github.com/vale981/ray
synced 2025-03-12 06:06:39 -04:00
[Core] Remove unnecessary if judgment (#10971)
* Remove unnecessary if judgment * format code style
This commit is contained in:
parent
2a79571c29
commit
4fa6523e4e
1 changed files with 26 additions and 28 deletions
|
@ -1398,37 +1398,35 @@ void NodeManager::ProcessDisconnectClientMessage(
|
||||||
RAY_CHECK(!(is_worker && is_driver));
|
RAY_CHECK(!(is_worker && is_driver));
|
||||||
// If the client has any blocked tasks, mark them as unblocked. In
|
// If the client has any blocked tasks, mark them as unblocked. In
|
||||||
// particular, we are no longer waiting for their dependencies.
|
// particular, we are no longer waiting for their dependencies.
|
||||||
if (worker) {
|
if (is_worker && worker->IsDead()) {
|
||||||
if (is_worker && worker->IsDead()) {
|
// If the worker was killed by us because the driver exited,
|
||||||
// If the worker was killed by us because the driver exited,
|
// treat it as intentionally disconnected.
|
||||||
// treat it as intentionally disconnected.
|
intentional_disconnect = true;
|
||||||
intentional_disconnect = true;
|
// Don't need to unblock the client if it's a worker and is already dead.
|
||||||
// Don't need to unblock the client if it's a worker and is already dead.
|
// Because in this case, its task is already cleaned up.
|
||||||
// Because in this case, its task is already cleaned up.
|
RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead.";
|
||||||
RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead.";
|
} else {
|
||||||
} else {
|
// Clean up any open ray.get calls that the worker made.
|
||||||
// Clean up any open ray.get calls that the worker made.
|
while (!worker->GetBlockedTaskIds().empty()) {
|
||||||
while (!worker->GetBlockedTaskIds().empty()) {
|
// NOTE(swang): AsyncResolveObjectsFinish will modify the worker, so it is
|
||||||
// NOTE(swang): AsyncResolveObjectsFinish will modify the worker, so it is
|
// not safe to pass in the iterator directly.
|
||||||
// not safe to pass in the iterator directly.
|
const TaskID task_id = *worker->GetBlockedTaskIds().begin();
|
||||||
const TaskID task_id = *worker->GetBlockedTaskIds().begin();
|
AsyncResolveObjectsFinish(client, task_id, true);
|
||||||
AsyncResolveObjectsFinish(client, task_id, true);
|
|
||||||
}
|
|
||||||
// Clean up any open ray.wait calls that the worker made.
|
|
||||||
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
|
|
||||||
}
|
}
|
||||||
|
// Clean up any open ray.wait calls that the worker made.
|
||||||
// Erase any lease metadata.
|
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
|
||||||
leased_workers_.erase(worker->WorkerId());
|
|
||||||
|
|
||||||
// Publish the worker failure.
|
|
||||||
auto worker_failure_data_ptr = gcs::CreateWorkerFailureData(
|
|
||||||
self_node_id_, worker->WorkerId(), worker->IpAddress(), worker->Port(),
|
|
||||||
time(nullptr), intentional_disconnect);
|
|
||||||
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
|
|
||||||
nullptr));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Erase any lease metadata.
|
||||||
|
leased_workers_.erase(worker->WorkerId());
|
||||||
|
|
||||||
|
// Publish the worker failure.
|
||||||
|
auto worker_failure_data_ptr =
|
||||||
|
gcs::CreateWorkerFailureData(self_node_id_, worker->WorkerId(), worker->IpAddress(),
|
||||||
|
worker->Port(), time(nullptr), intentional_disconnect);
|
||||||
|
RAY_CHECK_OK(
|
||||||
|
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
|
||||||
|
|
||||||
if (is_worker) {
|
if (is_worker) {
|
||||||
const ActorID &actor_id = worker->GetActorId();
|
const ActorID &actor_id = worker->GetActorId();
|
||||||
const TaskID &task_id = worker->GetAssignedTaskId();
|
const TaskID &task_id = worker->GetAssignedTaskId();
|
||||||
|
|
Loading…
Add table
Reference in a new issue