From c092a5d1847c636708353823cb73bbe675f335cb Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 18 Feb 2021 15:09:43 -0700 Subject: [PATCH] Cancel object location long-poll on object free. (#14165) --- .../ownership_based_object_directory.cc | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index e5477c0c2..85b0fa578 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -215,15 +215,19 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback( } } - auto worker_it = worker_rpc_clients_.find(worker_id); - rpc::GetObjectLocationsOwnerRequest request; - request.set_intended_worker_id(worker_id.Binary()); - request.set_object_id(object_id.Binary()); - request.set_last_version(reply.current_version()); - worker_it->second->GetObjectLocationsOwner( - request, - std::bind(&OwnershipBasedObjectDirectory::SubscriptionCallback, this, object_id, - worker_id, std::placeholders::_1, std::placeholders::_2)); + // Only send the next long-polling RPC if the last one was successful. + // If the last RPC failed, we consider the object to have been freed. + if (status.ok()) { + auto worker_it = worker_rpc_clients_.find(worker_id); + rpc::GetObjectLocationsOwnerRequest request; + request.set_intended_worker_id(worker_id.Binary()); + request.set_object_id(object_id.Binary()); + request.set_last_version(reply.current_version()); + worker_it->second->GetObjectLocationsOwner( + request, + std::bind(&OwnershipBasedObjectDirectory::SubscriptionCallback, this, object_id, + worker_id, std::placeholders::_1, std::placeholders::_2)); + } } ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(