From 4c2de7be5401c6a42d0eb82164348fdf9303401d Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 22 Feb 2020 13:29:48 -0800 Subject: [PATCH] [core] Ref counting for returning object IDs created by a different process (#7221) * Add regression tests * Refactor, split RemoveSubmittedTaskReferences into submitted and finished paths * Add nested return IDs to UpdateFinishedTaskRefs, rename WrapObjectIds * Basic unit tests pass * Fix unit test and add an out-of-order regression test * Add stored_in_objects to ObjectReferenceCount, regression test now passes * Add an Address to the ReferenceCounter so we can determine ownership * Set the nested return IDs from the TaskManager * Add another test * Simplify * Update src/ray/core_worker/reference_count_test.cc Co-Authored-By: Edward Oakes * Apply suggestions from code review Co-Authored-By: Edward Oakes * comments * Add python test Co-authored-by: Edward Oakes --- python/ray/tests/test_reference_counting.py | 38 + src/ray/common/ray_object.h | 10 +- src/ray/common/task/task_util.h | 4 +- src/ray/core_worker/core_worker.cc | 25 +- src/ray/core_worker/reference_count.cc | 128 +- src/ray/core_worker/reference_count.h | 75 +- src/ray/core_worker/reference_count_test.cc | 1032 +++++++++++------ .../memory_store/memory_store.cc | 2 +- src/ray/core_worker/task_manager.cc | 26 +- src/ray/core_worker/task_manager.h | 11 - .../transport/dependency_resolver.cc | 6 +- .../transport/direct_actor_transport.cc | 4 +- src/ray/protobuf/core_worker.proto | 15 +- 13 files changed, 916 insertions(+), 460 deletions(-) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 1d0466fb0..c6ab3527d 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -596,6 +596,44 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB): _fill_object_store_and_get(inner_oid_bytes, succeed=False) +# Call a recursive chain of tasks. The final task in the chain returns an +# ObjectID returned by a task that it submitted. Every other task in the chain +# returns the same ObjectID by calling ray.get() on its submitted task and +# returning the result. The reference should still exist while the driver has a +# reference to the final task's ObjectID. +def test_recursively_return_borrowed_object_id(one_worker_100MiB): + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + @ray.remote + def recursive(num_tasks_left): + if num_tasks_left == 0: + return put.remote() + + final_id = ray.get(recursive.remote(num_tasks_left - 1)) + ray.get(final_id) + return final_id + + max_depth = 5 + head_oid = recursive.remote(max_depth) + final_oid = ray.get(head_oid) + final_oid_bytes = final_oid.binary() + + # Check that the driver's reference pins the object. + _fill_object_store_and_get(final_oid_bytes) + + # Remove the local reference and try it again. + final_oid = ray.get(head_oid) + _fill_object_store_and_get(final_oid_bytes) + + # Remove all references. + del head_oid + del final_oid + # Reference should be gone, check that returned ID gets evicted. + _fill_object_store_and_get(final_oid_bytes, succeed=False) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index 395d369ed..e8f2f1c57 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -22,13 +22,13 @@ class RayObject { /// /// \param[in] data Data of the ray object. /// \param[in] metadata Metadata of the ray object. - /// \param[in] inlined_ids ObjectIDs that were serialized in data. + /// \param[in] nested_ids ObjectIDs that were serialized in data. /// \param[in] copy_data Whether this class should hold a copy of data. RayObject(const std::shared_ptr &data, const std::shared_ptr &metadata, - const std::vector &inlined_ids, bool copy_data = false) + const std::vector &nested_ids, bool copy_data = false) : data_(data), metadata_(metadata), - inlined_ids_(inlined_ids), + nested_ids_(nested_ids), has_data_copy_(copy_data) { if (has_data_copy_) { // If this object is required to hold a copy of the data, @@ -56,7 +56,7 @@ class RayObject { const std::shared_ptr &GetMetadata() const { return metadata_; } /// Return the object IDs that were serialized in data. - const std::vector &GetInlinedIds() const { return inlined_ids_; } + const std::vector &GetNestedIds() const { return nested_ids_; } uint64_t GetSize() const { uint64_t size = 0; @@ -81,7 +81,7 @@ class RayObject { private: std::shared_ptr data_; std::shared_ptr metadata_; - const std::vector inlined_ids_; + const std::vector nested_ids_; /// Whether this class holds a data copy. bool has_data_copy_; }; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index b978de6b4..db3f8aa13 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -92,8 +92,8 @@ class TaskSpecBuilder { const auto &metadata = value.GetMetadata(); arg->set_metadata(metadata->Data(), metadata->Size()); } - for (const auto &inlined_id : value.GetInlinedIds()) { - arg->add_nested_inlined_ids(inlined_id.Binary()); + for (const auto &nested_id : value.GetNestedIds()) { + arg->add_nested_inlined_ids(nested_id.Binary()); } return *this; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e6650426b..743d08a09 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -80,13 +80,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, death_check_timer_(io_service_), internal_timer_(io_service_), core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), - reference_counter_(std::make_shared( - /*distributed_ref_counting_enabled=*/RayConfig::instance() - .distributed_ref_counting_enabled(), - [this](const rpc::Address &addr) { - return std::shared_ptr( - new rpc::CoreWorkerClient(addr, *client_call_manager_)); - })), task_queue_length_(0), num_executed_tasks_(0), task_execution_service_work_(task_execution_service_), @@ -166,6 +159,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, rpc_address_.set_raylet_id(local_raylet_id.Binary()); rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary()); + reference_counter_ = std::make_shared( + rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(), + [this](const rpc::Address &addr) { + return std::shared_ptr( + new rpc::CoreWorkerClient(addr, *client_call_manager_)); + }); + if (worker_type_ == ray::WorkerType::WORKER) { death_check_timer_.expires_from_now(boost::asio::chrono::milliseconds( RayConfig::instance().raylet_death_check_interval_milliseconds())); @@ -937,12 +937,7 @@ Status CoreWorker::AllocateReturnObjects( RAY_CHECK(object_ids.size() == data_sizes.size()); return_objects->resize(object_ids.size(), nullptr); - absl::optional owner_address( - worker_context_.GetCurrentTask()->CallerAddress()); - bool owned_by_us = owner_address->worker_id() == rpc_address_.worker_id(); - if (owned_by_us) { - owner_address.reset(); - } + rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress()); for (size_t i = 0; i < object_ids.size(); i++) { bool object_already_exists = false; @@ -952,8 +947,8 @@ Status CoreWorker::AllocateReturnObjects( // Mark this object as containing other object IDs. The ref counter will // keep the inner IDs in scope until the outer one is out of scope. if (!contained_object_ids[i].empty()) { - reference_counter_->WrapObjectIds(object_ids[i], contained_object_ids[i], - owner_address); + reference_counter_->AddNestedObjectIds(object_ids[i], contained_object_ids[i], + owner_address); } // Allocate a buffer for the return object. diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 0daf0edad..04f42b107 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -84,9 +84,11 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, // because this corresponds to a submitted task whose return ObjectID will be created // in the frontend language, incrementing the reference count. object_id_refs_.emplace(object_id, Reference(owner_id, owner_address)); - // Mark that this object ID contains other inner IDs. Then, we will not remove - // the inner objects until the outer object ID goes out of scope. - WrapObjectIdsInternal(object_id, inner_ids, absl::optional()); + if (!inner_ids.empty()) { + // Mark that this object ID contains other inner IDs. Then, we will not GC + // the inner objects until the outer object ID goes out of scope. + AddNestedObjectIdsInternal(object_id, inner_ids, rpc_address_); + } } void ReferenceCounter::AddLocalReference(const ObjectID &object_id) { @@ -124,22 +126,24 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id, } } -void ReferenceCounter::AddSubmittedTaskReferences( - const std::vector &object_ids) { +void ReferenceCounter::UpdateSubmittedTaskReferences( + const std::vector &argument_ids_to_add, + const std::vector &argument_ids_to_remove, std::vector *deleted) { absl::MutexLock lock(&mutex_); - for (const ObjectID &object_id : object_ids) { - auto it = object_id_refs_.find(object_id); + for (const ObjectID &argument_id : argument_ids_to_add) { + auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { // This happens if a large argument is transparently passed by reference // because we don't hold a Python reference to its ObjectID. - it = object_id_refs_.emplace(object_id, Reference()).first; + it = object_id_refs_.emplace(argument_id, Reference()).first; } it->second.submitted_task_ref_count++; } + RemoveSubmittedTaskReferences(argument_ids_to_remove, deleted); } -void ReferenceCounter::UpdateSubmittedTaskReferences( - const std::vector &object_ids, const rpc::Address &worker_addr, +void ReferenceCounter::UpdateFinishedTaskReferences( + const std::vector &argument_ids, const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, std::vector *deleted) { absl::MutexLock lock(&mutex_); // Must merge the borrower refs before decrementing any ref counts. This is @@ -150,15 +154,20 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( if (!refs.empty()) { RAY_CHECK(!WorkerID::FromBinary(worker_addr.worker_id()).IsNil()); } - for (const ObjectID &object_id : object_ids) { - MergeRemoteBorrowers(object_id, worker_addr, refs); + for (const ObjectID &argument_id : argument_ids) { + MergeRemoteBorrowers(argument_id, worker_addr, refs); } - for (const ObjectID &object_id : object_ids) { - auto it = object_id_refs_.find(object_id); + RemoveSubmittedTaskReferences(argument_ids, deleted); +} + +void ReferenceCounter::RemoveSubmittedTaskReferences( + const std::vector &argument_ids, std::vector *deleted) { + for (const ObjectID &argument_id : argument_ids) { + auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: " - << object_id; + << argument_id; return; } it->second.submitted_task_ref_count--; @@ -359,6 +368,7 @@ bool ReferenceCounter::GetAndClearLocalBorrowersInternal(const ObjectID &object_ // of the returned borrowed_refs must merge this list into their own list // until all active borrowers are merged into the owner. it->second.borrowers.clear(); + it->second.stored_in_objects.clear(); if (it->second.contained_in_borrowed_id.has_value()) { /// This ID was nested in another ID that we (or a nested task) borrowed. @@ -437,6 +447,13 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id, } } + // If the borrower stored this object ID inside another object ID that it did + // not own, then mark that the object ID is nested inside another. + for (const auto &stored_in_object : borrower_ref.stored_in_objects) { + AddNestedObjectIdsInternal(stored_in_object.first, {object_id}, + stored_in_object.second); + } + // Recursively merge any references that were contained in this object, to // handle any borrowers of nested objects. for (const auto &inner_id : borrower_ref.contains) { @@ -462,10 +479,10 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it, if (it == borrower_cache_.end()) { RAY_CHECK(client_factory_ != nullptr); it = borrower_cache_.emplace(addr, client_factory_(addr.ToProto())).first; - RAY_LOG(DEBUG) << "Connected to borrower " << addr.ip_address << ":" << addr.port - << " for object " << object_id; } + RAY_LOG(DEBUG) << "Sending WaitForRefRemoved to borrower " << addr.ip_address << ":" + << addr.port << " for object " << object_id; // Send the borrower a message about this object. The borrower responds once // it is no longer using the object ID. RAY_CHECK_OK(it->second->WaitForRefRemoved( @@ -474,32 +491,35 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it, RAY_LOG(DEBUG) << "Received reply from borrower " << addr.ip_address << ":" << addr.port << " of object " << object_id; absl::MutexLock lock(&mutex_); + + // Merge in any new borrowers that the previous borrower learned of. + const ReferenceTable new_borrower_refs = + ReferenceTableFromProto(reply.borrowed_refs()); + MergeRemoteBorrowers(object_id, addr, new_borrower_refs); + + // Erase the previous borrower. auto it = object_id_refs_.find(object_id); RAY_CHECK(it != object_id_refs_.end()); RAY_CHECK(it->second.borrowers.erase(addr)); - - const ReferenceTable new_borrower_refs = - ReferenceTableFromProto(reply.borrowed_refs()); - - MergeRemoteBorrowers(object_id, addr, new_borrower_refs); DeleteReferenceInternal(it, nullptr); })); } -void ReferenceCounter::WrapObjectIds( - const ObjectID &object_id, const std::vector &inner_ids, - const absl::optional &owner_address) { +void ReferenceCounter::AddNestedObjectIds(const ObjectID &object_id, + const std::vector &inner_ids, + const rpc::WorkerAddress &owner_address) { absl::MutexLock lock(&mutex_); - WrapObjectIdsInternal(object_id, inner_ids, owner_address); + AddNestedObjectIdsInternal(object_id, inner_ids, owner_address); } -void ReferenceCounter::WrapObjectIdsInternal( +void ReferenceCounter::AddNestedObjectIdsInternal( const ObjectID &object_id, const std::vector &inner_ids, - const absl::optional &owner_address) { + const rpc::WorkerAddress &owner_address) { + RAY_CHECK(!owner_address.worker_id.IsNil()); auto it = object_id_refs_.find(object_id); - if (!owner_address.has_value()) { - // `ray.put()` case OR returning an object ID from a task and the task's - // caller executed in the same process as us. + if (owner_address.worker_id == rpc_address_.worker_id) { + // We own object_id. This is a `ray.put()` case OR returning an object ID + // from a task and the task's caller executed in the same process as us. if (it != object_id_refs_.end()) { RAY_CHECK(it->second.owned_by_us); // The outer object is still in scope. Mark the inner ones as being @@ -515,27 +535,26 @@ void ReferenceCounter::WrapObjectIdsInternal( } } } else { - // Returning an object ID from a task, and the task's caller executed in a - // remote process. + // We do not own object_id. This is the case where we returned an object ID + // from a task, and the task's caller executed in a remote process. for (const auto &inner_id : inner_ids) { + RAY_LOG(DEBUG) << "Adding borrower " << owner_address.ip_address << " to id " + << inner_id << ", borrower owns outer ID " << object_id; auto inner_it = object_id_refs_.find(inner_id); RAY_CHECK(inner_it != object_id_refs_.end()); - if (!inner_it->second.owned_by_us) { - RAY_LOG(WARNING) - << "Ref counting currently does not support returning an object ID that was " - "not created by the worker executing the task. The object may be evicted " - "before all references are out of scope."; - // TODO: Do not return. Handle the case where we return a BORROWED id. - continue; - } // Add the task's caller as a borrower. - auto inserted = inner_it->second.borrowers.insert(*owner_address).second; - if (inserted) { - RAY_LOG(DEBUG) << "Adding borrower " << owner_address->ip_address << " to id " - << object_id << ", borrower owns outer ID " << object_id; - // Wait for it to remove its - // reference. - WaitForRefRemoved(inner_it, *owner_address, object_id); + if (inner_it->second.owned_by_us) { + auto inserted = inner_it->second.borrowers.insert(owner_address).second; + if (inserted) { + // Wait for it to remove its reference. + WaitForRefRemoved(inner_it, owner_address, object_id); + } + } else { + auto inserted = + inner_it->second.stored_in_objects.emplace(object_id, owner_address).second; + // This should be the first time that we have stored this object ID + // inside this return ID. + RAY_CHECK(inserted); } } } @@ -583,9 +602,7 @@ void ReferenceCounter::SetRefRemovedCallback( // add the outer object to the inner ID's ref count. We will not respond to // the owner of the inner ID until the outer object ID goes out of scope. if (!contained_in_id.IsNil()) { - AddBorrowedObjectInternal(object_id, contained_in_id, owner_id, owner_address); - WrapObjectIdsInternal(contained_in_id, {object_id}, - absl::optional()); + AddNestedObjectIdsInternal(contained_in_id, {object_id}, rpc_address_); } if (it->second.RefCount() == 0) { @@ -620,6 +637,10 @@ ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( for (const auto &borrower : ref_count.borrowers()) { ref.borrowers.insert(rpc::WorkerAddress(borrower)); } + for (const auto &object : ref_count.stored_in_objects()) { + const auto &object_id = ObjectID::FromBinary(object.object_id()); + ref.stored_in_objects.emplace(object_id, rpc::WorkerAddress(object.owner_address())); + } for (const auto &id : ref_count.contains()) { ref.contains.insert(ObjectID::FromBinary(id)); } @@ -641,6 +662,11 @@ void ReferenceCounter::Reference::ToProto(rpc::ObjectReferenceCount *ref) const for (const auto &borrower : borrowers) { ref->add_borrowers()->CopyFrom(borrower.ToProto()); } + for (const auto &object : stored_in_objects) { + auto ref_object = ref->add_stored_in_objects(); + ref_object->set_object_id(object.first.Binary()); + ref_object->mutable_owner_address()->CopyFrom(object.second.ToProto()); + } if (contained_in_borrowed_id.has_value()) { ref->set_contained_in_borrowed_id(contained_in_borrowed_id->Binary()); } diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 919d5692b..8378cfa95 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -23,9 +23,11 @@ class ReferenceCounter { ::google::protobuf::RepeatedPtrField; using ReferenceRemovedCallback = std::function; - ReferenceCounter(bool distributed_ref_counting_enabled = true, + ReferenceCounter(const rpc::WorkerAddress &rpc_address, + bool distributed_ref_counting_enabled = true, rpc::ClientFactoryFn client_factory = nullptr) - : distributed_ref_counting_enabled_(distributed_ref_counting_enabled), + : rpc_address_(rpc_address), + distributed_ref_counting_enabled_(distributed_ref_counting_enabled), client_factory_(client_factory) {} ~ReferenceCounter() {} @@ -48,8 +50,10 @@ class ReferenceCounter { /// dependencies to a submitted task. /// /// \param[in] object_ids The object IDs to add references for. - void AddSubmittedTaskReferences(const std::vector &object_ids) - LOCKS_EXCLUDED(mutex_); + void UpdateSubmittedTaskReferences( + const std::vector &argument_ids_to_add, + const std::vector &argument_ids_to_remove = std::vector(), + std::vector *deleted = nullptr) LOCKS_EXCLUDED(mutex_); /// Update object references that were given to a submitted task. The task /// may still be borrowing any object IDs that were contained in its @@ -64,10 +68,10 @@ class ReferenceCounter { /// arguments. Some references in this table may still be borrowed by the /// worker and/or a task that the worker submitted. /// \param[out] deleted The object IDs whos reference counts reached zero. - void UpdateSubmittedTaskReferences(const std::vector &object_ids, - const rpc::Address &worker_addr, - const ReferenceTableProto &borrowed_refs, - std::vector *deleted) + void UpdateFinishedTaskReferences(const std::vector &argument_ids, + const rpc::Address &worker_addr, + const ReferenceTableProto &borrowed_refs, + std::vector *deleted) LOCKS_EXCLUDED(mutex_); /// Add an object that we own. The object may depend on other objects. @@ -182,18 +186,25 @@ class ReferenceCounter { void GetAndClearLocalBorrowers(const std::vector &borrowed_ids, ReferenceTableProto *proto) LOCKS_EXCLUDED(mutex_); - /// Wrap ObjectIDs inside another object ID. + /// Mark that this ObjectID contains another ObjectID(s). This should be + /// called in two cases: + /// 1. We are storing the value of an object and the value contains + /// serialized copies of other ObjectIDs. If the outer object is owned by a + /// remote process, then they are now a borrower of the nested IDs. + /// 2. We submitted a task that returned an ObjectID(s) in its return values + /// and we are processing the worker's reply. In this case, we own the task's + /// return objects and are borrowing the nested IDs. /// - /// \param[in] object_id The object ID whose value we are storing. - /// \param[in] inner_ids The object IDs that we are storing in object_id. + /// \param[in] object_id The ID of the object that contains other ObjectIDs. + /// \param[in] inner_ids The object IDs are nested in object_id's value. /// \param[in] owner_address The owner address of the outer object_id. If /// this is not provided, then the outer object ID must be owned by us. the /// outer object ID is not owned by us, then this is used to contact the /// outer object's owner, since it is considered a borrower for the inner /// IDs. - void WrapObjectIds(const ObjectID &object_id, const std::vector &inner_ids, - const absl::optional &owner_address) - LOCKS_EXCLUDED(mutex_); + void AddNestedObjectIds(const ObjectID &object_id, + const std::vector &inner_ids, + const rpc::WorkerAddress &owner_address) LOCKS_EXCLUDED(mutex_); /// Whether we have a reference to a particular ObjectID. /// @@ -234,7 +245,9 @@ class ReferenceCounter { bool in_scope = RefCount() > 0; bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value(); bool has_borrowers = borrowers.size() > 0; - return !(in_scope || was_contained_in_borrowed_id || has_borrowers); + bool was_stored_in_objects = stored_in_objects.size() > 0; + return !(in_scope || was_contained_in_borrowed_id || has_borrowers || + was_stored_in_objects); } /// Whether we own the object. If we own the object, then we are @@ -293,6 +306,13 @@ class ReferenceCounter { /// borrowers. A borrower is removed from the list when it responds /// that it is no longer using the reference. absl::flat_hash_set borrowers; + /// When a process that is borrowing an object ID stores the ID inside the + /// return value of a task that it executes, the caller of the task is also + /// considered a borrower for as long as its reference to the task's return + /// ID stays in scope. Thus, the borrower must notify the owner that the + /// task's caller is also a borrower. The key is the task's return ID, and + /// the value is the task ID and address of the task's caller. + absl::flat_hash_map stored_in_objects; /// Callback that will be called when this ObjectID no longer has /// references. @@ -311,18 +331,26 @@ class ReferenceCounter { static void ReferenceTableToProto(const ReferenceTable &table, ReferenceTableProto *proto); - /// Helper method to wrap an ObjectID(s) inside another object ID. + /// Remove references for the provided object IDs that correspond to them + /// being dependencies to a submitted task. This should be called when + /// inlined dependencies are inlined or when the task finishes for plasma + /// dependencies. + void RemoveSubmittedTaskReferences(const std::vector &argument_ids, + std::vector *deleted) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + /// Helper method to mark that this ObjectID contains another ObjectID(s). /// - /// \param[in] object_id The object ID whose value we are storing. - /// \param[in] inner_ids The object IDs that we are storing in object_id. + /// \param[in] object_id The ID of the object that contains other ObjectIDs. + /// \param[in] inner_ids The object IDs are nested in object_id's value. /// \param[in] owner_address The owner address of the outer object_id. If /// this is not provided, then the outer object ID must be owned by us. the /// outer object ID is not owned by us, then this is used to contact the /// outer object's owner, since it is considered a borrower for the inner /// IDs. - void WrapObjectIdsInternal(const ObjectID &object_id, - const std::vector &inner_ids, - const absl::optional &owner_address) + void AddNestedObjectIdsInternal(const ObjectID &object_id, + const std::vector &inner_ids, + const rpc::WorkerAddress &owner_address) EXCLUSIVE_LOCKS_REQUIRED(mutex_); /// Populates the table with the ObjectID that we were or are still @@ -394,6 +422,11 @@ class ReferenceCounter { std::vector *deleted) EXCLUSIVE_LOCKS_REQUIRED(mutex_); + /// Address of our RPC server. This is used to determine whether we own a + /// given object or not, by comparing our WorkerID with the WorkerID of the + /// object's owner. + rpc::WorkerAddress rpc_address_; + /// Feature flag for distributed ref counting. If this is false, then we will /// keep the distributed ref count, but only the local ref count will be used /// to decide when objects can be evicted. diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 05f2729b9..336fcc023 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -14,20 +14,31 @@ static const ReferenceCounter::ReferenceTableProto empty_refs; class ReferenceCountTest : public ::testing::Test { protected: std::unique_ptr rc; - virtual void SetUp() { rc = std::unique_ptr(new ReferenceCounter); } + virtual void SetUp() { + rpc::Address addr; + rc = std::unique_ptr(new ReferenceCounter(addr)); + } virtual void TearDown() {} }; class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: - MockWorkerClient(ReferenceCounter &rc, const std::string &addr) - : rc_(rc), task_id_(TaskID::ForFakeTask()) { - address_.set_ip_address(addr); - address_.set_raylet_id(ClientID::FromRandom().Binary()); - address_.set_worker_id(WorkerID::FromRandom().Binary()); + // Helper function to generate a random address. + rpc::Address CreateRandomAddress(const std::string &addr) { + rpc::Address address; + address.set_ip_address(addr); + address.set_raylet_id(ClientID::FromRandom().Binary()); + address.set_worker_id(WorkerID::FromRandom().Binary()); + return address; } + MockWorkerClient(const std::string &addr, rpc::ClientFactoryFn client_factory = nullptr) + : task_id_(TaskID::ForFakeTask()), + address_(CreateRandomAddress(addr)), + rc_(rpc::WorkerAddress(address_), /*distributed_ref_counting_enabled=*/true, + client_factory) {} + ray::Status WaitForRefRemoved( const rpc::WaitForRefRemovedRequest &request, const rpc::ClientCallback &callback) override { @@ -97,7 +108,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { } ObjectID SubmitTaskWithArg(const ObjectID &arg_id) { - rc_.AddSubmittedTaskReferences({arg_id}); + rc_.UpdateSubmittedTaskReferences({arg_id}); ObjectID return_id = ObjectID::FromRandom(); rc_.AddOwnedObject(return_id, {}, task_id_, address_); // Add a sentinel reference to keep all nested object IDs in scope. @@ -110,7 +121,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { const ObjectID *return_wrapped_id = nullptr, const rpc::WorkerAddress *owner_address = nullptr) { if (return_wrapped_id) { - rc_.WrapObjectIds(return_id, {*return_wrapped_id}, *owner_address); + rc_.AddNestedObjectIds(return_id, {*return_wrapped_id}, *owner_address); } ReferenceCounter::ReferenceTableProto refs; @@ -123,22 +134,25 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { } void HandleSubmittedTaskFinished( - const ObjectID &arg_id, const rpc::Address &borrower_address = empty_borrower, + const ObjectID &arg_id, + const std::unordered_map> &nested_return_ids = {}, + const rpc::Address &borrower_address = empty_borrower, const ReferenceCounter::ReferenceTableProto &borrower_refs = empty_refs) { + std::vector arguments; if (!arg_id.IsNil()) { - rc_.UpdateSubmittedTaskReferences({arg_id}, borrower_address, borrower_refs, - nullptr); + arguments.push_back(arg_id); } + rc_.UpdateFinishedTaskReferences(arguments, borrower_address, borrower_refs, nullptr); } // Global map from Worker ID -> MockWorkerClient. // Global map from Object ID -> owner worker ID, list of objects that it depends on, // worker address that it's scheduled on. Worker map of pending return IDs. - // The ReferenceCounter at the "client". - ReferenceCounter &rc_; TaskID task_id_; rpc::Address address_; + // The ReferenceCounter at the "client". + ReferenceCounter rc_; std::unordered_map> borrower_callbacks_; std::unordered_map, rpc::ClientCallback>> @@ -171,32 +185,32 @@ TEST_F(ReferenceCountTest, TestBasic) { out.clear(); // Submitted task references. - rc->AddSubmittedTaskReferences({id1}); - rc->AddSubmittedTaskReferences({id1, id2}); + rc->UpdateSubmittedTaskReferences({id1}); + rc->UpdateSubmittedTaskReferences({id1, id2}); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); - rc->UpdateSubmittedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateSubmittedTaskReferences({id2}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); - rc->UpdateSubmittedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(out.size(), 2); out.clear(); // Local & submitted task references. rc->AddLocalReference(id1); - rc->AddSubmittedTaskReferences({id1, id2}); + rc->UpdateSubmittedTaskReferences({id1, id2}); rc->AddLocalReference(id2); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); rc->RemoveLocalReference(id1, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateSubmittedTaskReferences({id2}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateSubmittedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); rc->RemoveLocalReference(id2, &out); @@ -243,7 +257,8 @@ TEST(MemoryStoreIntegrationTest, TestSimple) { uint8_t data[] = {1, 2, 3, 4, 5, 6, 7, 8}; RayObject buffer(std::make_shared(data, sizeof(data)), nullptr, {}); - auto rc = std::shared_ptr(new ReferenceCounter()); + auto rc = std::shared_ptr( + new ReferenceCounter(rpc::WorkerAddress(rpc::Address()))); CoreWorkerMemoryStore store(nullptr, rc); // Tests putting an object with no references is ignored. @@ -274,10 +289,9 @@ TEST(MemoryStoreIntegrationTest, TestSimple) { // outer_id = ray.put([inner_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestNoBorrow) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -289,33 +303,33 @@ TEST(DistributedReferenceCountTest, TestNoBorrow) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for both objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower waits for the task to finish before returning to the owner. borrower->HandleSubmittedTaskFinished(inner_id); auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); // Check that the borrower's ref count is now 0 for all objects. - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); borrower->FlushBorrowerCallbacks(); // Check that owner's ref count is now 0 for all objects. - ASSERT_FALSE(owner_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); } // A borrower is given a reference to an object ID, submits a task, does not @@ -330,10 +344,9 @@ TEST(DistributedReferenceCountTest, TestNoBorrow) { // outer_id = ray.put([inner_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestSimpleBorrower) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -345,44 +358,44 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrower) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for both objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower task returns to the owner without waiting for its submitted // task to finish. auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); - // ASSERT_FALSE(borrower_rc.HasReference(outer_id)); + // ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); // Check that the borrower's ref count for inner_id > 0 because of the // pending task. - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); borrower->FlushBorrowerCallbacks(); // Check that owner now has borrower in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer == 0 since the borrower task // returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); // The task submitted by the borrower returns. Everyone's ref count should go // to 0. borrower->HandleSubmittedTaskFinished(inner_id); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); } // A borrower is given a reference to an object ID, keeps the reference past @@ -398,10 +411,9 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrower) { // outer_id = ray.put([inner_id]) // res = Borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestSimpleBorrowerReferenceRemoved) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -413,41 +425,41 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrowerReferenceRemoved) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for both objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower task returns to the owner while still using inner_id. auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); // Check that owner now has borrower in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer == 0 since the borrower task // returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); // The borrower is no longer using inner_id, but it hasn't received the // message from the owner yet. - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower receives the owner's wait message. It should return a reply // to the owner immediately saying that it is no longer using inner_id. borrower->FlushBorrowerCallbacks(); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // A borrower is given a reference to an object ID, passes the reference to @@ -465,18 +477,15 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrowerReferenceRemoved) { // outer_id = ray.put([inner_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestBorrowerTree) { - ReferenceCounter borrower_rc1; - auto borrower1 = std::make_shared(borrower_rc1, "1"); - ReferenceCounter borrower_rc2; - auto borrower2 = std::make_shared(borrower_rc2, "2"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto borrower1 = std::make_shared("1"); + auto borrower2 = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { if (addr.ip_address() == borrower1->address_.ip_address()) { return borrower1; } else { return borrower2; } }); - auto owner = std::make_shared(owner_rc, "3"); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -488,11 +497,11 @@ TEST(DistributedReferenceCountTest, TestBorrowerTree) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for both objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Borrower 1 is given a reference to the inner object. borrower1->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); @@ -500,49 +509,50 @@ TEST(DistributedReferenceCountTest, TestBorrowerTree) { auto outer_id2 = ObjectID::FromRandom(); borrower1->PutWrappedId(outer_id2, inner_id); borrower1->SubmitTaskWithArg(outer_id2); - borrower_rc1.RemoveLocalReference(inner_id, nullptr); - borrower_rc1.RemoveLocalReference(outer_id2, nullptr); - ASSERT_TRUE(borrower_rc1.HasReference(inner_id)); - ASSERT_TRUE(borrower_rc1.HasReference(outer_id2)); + borrower1->rc_.RemoveLocalReference(inner_id, nullptr); + borrower1->rc_.RemoveLocalReference(outer_id2, nullptr); + ASSERT_TRUE(borrower1->rc_.HasReference(inner_id)); + ASSERT_TRUE(borrower1->rc_.HasReference(outer_id2)); // The borrower task returns to the owner without waiting for its submitted // task to finish. auto borrower_refs = borrower1->FinishExecutingTask(outer_id, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc1.HasReference(inner_id)); - ASSERT_TRUE(borrower_rc1.HasReference(outer_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(outer_id)); + ASSERT_TRUE(borrower1->rc_.HasReference(inner_id)); + ASSERT_TRUE(borrower1->rc_.HasReference(outer_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(outer_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower1->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower1->address_, borrower_refs); borrower1->FlushBorrowerCallbacks(); // Check that owner now has borrower in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer == 0 since the borrower task // returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); // Borrower 2 starts executing. It is given a reference to the inner object // when it gets outer_id2 as an argument. borrower2->ExecuteTaskWithArg(outer_id2, inner_id, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc2.HasReference(inner_id)); + ASSERT_TRUE(borrower2->rc_.HasReference(inner_id)); // Borrower 2 finishes but it is still using inner_id. borrower_refs = borrower2->FinishExecutingTask(outer_id2, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc2.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc2.HasReference(outer_id2)); - ASSERT_FALSE(borrower_rc2.HasReference(outer_id)); + ASSERT_TRUE(borrower2->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower2->rc_.HasReference(outer_id2)); + ASSERT_FALSE(borrower2->rc_.HasReference(outer_id)); - borrower1->HandleSubmittedTaskFinished(outer_id2, borrower2->address_, borrower_refs); + borrower1->HandleSubmittedTaskFinished(outer_id2, {}, borrower2->address_, + borrower_refs); borrower2->FlushBorrowerCallbacks(); // Borrower 1 no longer has a reference to any objects. - ASSERT_FALSE(borrower_rc1.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc1.HasReference(outer_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower1->rc_.HasReference(outer_id2)); // The owner should now have borrower 2 in its count. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); - borrower_rc2.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(borrower_rc2.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + borrower2->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower2->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // A task is given a reference to an object ID, whose value contains another @@ -559,10 +569,9 @@ TEST(DistributedReferenceCountTest, TestBorrowerTree) { // outer_id = ray.put([mid_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestNestedObjectNoBorrow) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -576,39 +585,39 @@ TEST(DistributedReferenceCountTest, TestNestedObjectNoBorrow) { // be given a reference to mid_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(mid_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(mid_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for all objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(mid_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(mid_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the middle object. borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc.HasReference(mid_id)); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(mid_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); // The borrower unwraps the inner object with ray.get. borrower->GetSerializedObjectId(mid_id, inner_id, owner->task_id_, owner->address_); - borrower_rc.RemoveLocalReference(mid_id, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(mid_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower's reference to inner_id goes out of scope. - borrower_rc.RemoveLocalReference(inner_id, nullptr); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); // The borrower task returns to the owner. auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); - ASSERT_FALSE(borrower_rc.HasReference(mid_id)); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + ASSERT_FALSE(borrower->rc_.HasReference(mid_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); // Check that owner now has nothing in scope. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); - ASSERT_FALSE(owner_rc.HasReference(mid_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(mid_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // A task is given a reference to an object ID, whose value contains another @@ -625,10 +634,9 @@ TEST(DistributedReferenceCountTest, TestNestedObjectNoBorrow) { // outer_id = ray.put([mid_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestNestedObject) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -642,51 +650,51 @@ TEST(DistributedReferenceCountTest, TestNestedObject) { // be given a reference to mid_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(mid_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(mid_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // The owner's ref count > 0 for all objects. - ASSERT_TRUE(owner_rc.HasReference(outer_id)); - ASSERT_TRUE(owner_rc.HasReference(mid_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(mid_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the middle object. borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc.HasReference(mid_id)); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(mid_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); // The borrower unwraps the inner object with ray.get. borrower->GetSerializedObjectId(mid_id, inner_id, owner->task_id_, owner->address_); - borrower_rc.RemoveLocalReference(mid_id, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(mid_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower task returns to the owner while still using inner_id. auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); - ASSERT_FALSE(borrower_rc.HasReference(mid_id)); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + ASSERT_FALSE(borrower->rc_.HasReference(mid_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); // Check that owner now has borrower in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer and mid are 0 since the borrower // task returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); - ASSERT_FALSE(owner_rc.HasReference(mid_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(mid_id)); // The borrower receives the owner's wait message. It should return a reply // to the owner immediately saying that it is no longer using inner_id. borrower->FlushBorrowerCallbacks(); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is no longer using inner_id, but it hasn't received the // message from the owner yet. - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // A borrower is given a reference to an object ID, whose value contains @@ -708,18 +716,15 @@ TEST(DistributedReferenceCountTest, TestNestedObject) { // owner_id3 = ray.put([owner_id2]) // res = borrower1.remote(owner_id3) TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners) { - ReferenceCounter borrower_rc1; - auto borrower1 = std::make_shared(borrower_rc1, "1"); - ReferenceCounter borrower_rc2; - auto borrower2 = std::make_shared(borrower_rc2, "2"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto borrower1 = std::make_shared("1"); + auto borrower2 = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { if (addr.ip_address() == borrower1->address_.ip_address()) { return borrower1; } else { return borrower2; } }); - auto owner = std::make_shared(owner_rc, "3"); // The owner creates an inner object and wraps it. auto owner_id1 = ObjectID::FromRandom(); @@ -733,61 +738,62 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners) { // be given a reference to owner_id2. owner->SubmitTaskWithArg(owner_id3); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(owner_id1, nullptr); - owner_rc.RemoveLocalReference(owner_id2, nullptr); - owner_rc.RemoveLocalReference(owner_id3, nullptr); + owner->rc_.RemoveLocalReference(owner_id1, nullptr); + owner->rc_.RemoveLocalReference(owner_id2, nullptr); + owner->rc_.RemoveLocalReference(owner_id3, nullptr); // The borrower is given a reference to the middle object. borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id1)); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); // The borrower wraps the object ID again. auto borrower_id = ObjectID::FromRandom(); borrower1->PutWrappedId(borrower_id, owner_id2); - borrower_rc1.RemoveLocalReference(owner_id2, nullptr); + borrower1->rc_.RemoveLocalReference(owner_id2, nullptr); // Borrower 1 submits a task that depends on the wrapped object. The task // will be given a reference to owner_id2. borrower1->SubmitTaskWithArg(borrower_id); - borrower_rc1.RemoveLocalReference(borrower_id, nullptr); + borrower1->rc_.RemoveLocalReference(borrower_id, nullptr); borrower2->ExecuteTaskWithArg(borrower_id, owner_id2, owner->task_id_, owner->address_); // The nested task returns while still using owner_id1. borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, owner->address_); - borrower_rc2.RemoveLocalReference(owner_id2, nullptr); + borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); auto borrower_refs = borrower2->FinishExecutingTask(borrower_id, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc2.HasReference(owner_id1)); - ASSERT_FALSE(borrower_rc2.HasReference(owner_id2)); + ASSERT_TRUE(borrower2->rc_.HasReference(owner_id1)); + ASSERT_FALSE(borrower2->rc_.HasReference(owner_id2)); // Borrower 1 should now know that borrower 2 is borrowing the inner object // ID. - borrower1->HandleSubmittedTaskFinished(borrower_id, borrower2->address_, borrower_refs); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id1)); + borrower1->HandleSubmittedTaskFinished(borrower_id, {}, borrower2->address_, + borrower_refs); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id1)); // Borrower 1 finishes. It should not have any references now because all // state has been merged into the owner. borrower_refs = borrower1->FinishExecutingTask(owner_id3, ObjectID::Nil()); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id1)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id3)); - ASSERT_FALSE(borrower_rc1.HasReference(borrower_id)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id3)); + ASSERT_FALSE(borrower1->rc_.HasReference(borrower_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(owner_id3, borrower1->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(owner_id3, {}, borrower1->address_, borrower_refs); // Check that owner now has borrower2 in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - ASSERT_FALSE(owner_rc.HasReference(owner_id2)); - ASSERT_FALSE(owner_rc.HasReference(owner_id3)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id2)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id3)); // The borrower receives the owner's wait message. borrower2->FlushBorrowerCallbacks(); - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - borrower_rc2.RemoveLocalReference(owner_id1, nullptr); - ASSERT_FALSE(borrower_rc2.HasReference(owner_id1)); - ASSERT_FALSE(owner_rc.HasReference(owner_id1)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + borrower2->rc_.RemoveLocalReference(owner_id1, nullptr); + ASSERT_FALSE(borrower2->rc_.HasReference(owner_id1)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id1)); } // A borrower is given a reference to an object ID, whose value contains @@ -809,18 +815,15 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners) { // owner_id3 = ray.put([owner_id2]) // res = borrower1.remote(owner_id3) TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { - ReferenceCounter borrower_rc1; - auto borrower1 = std::make_shared(borrower_rc1, "1"); - ReferenceCounter borrower_rc2; - auto borrower2 = std::make_shared(borrower_rc2, "2"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto borrower1 = std::make_shared("1"); + auto borrower2 = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { if (addr.ip_address() == borrower1->address_.ip_address()) { return borrower1; } else { return borrower2; } }); - auto owner = std::make_shared(owner_rc, "3"); // The owner creates an inner object and wraps it. auto owner_id1 = ObjectID::FromRandom(); @@ -834,19 +837,19 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { // be given a reference to owner_id2. owner->SubmitTaskWithArg(owner_id3); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(owner_id1, nullptr); - owner_rc.RemoveLocalReference(owner_id2, nullptr); - owner_rc.RemoveLocalReference(owner_id3, nullptr); + owner->rc_.RemoveLocalReference(owner_id1, nullptr); + owner->rc_.RemoveLocalReference(owner_id2, nullptr); + owner->rc_.RemoveLocalReference(owner_id3, nullptr); // The borrower is given a reference to the middle object. borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id1)); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); // The borrower wraps the object ID again. auto borrower_id = ObjectID::FromRandom(); borrower1->PutWrappedId(borrower_id, owner_id2); - borrower_rc1.RemoveLocalReference(owner_id2, nullptr); + borrower1->rc_.RemoveLocalReference(owner_id2, nullptr); // Borrower 1 submits a task that depends on the wrapped object. The task // will be given a reference to owner_id2. @@ -856,44 +859,45 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { // The nested task returns while still using owner_id1. borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, owner->address_); - borrower_rc2.RemoveLocalReference(owner_id2, nullptr); + borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); auto borrower_refs = borrower2->FinishExecutingTask(borrower_id, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc2.HasReference(owner_id1)); - ASSERT_FALSE(borrower_rc2.HasReference(owner_id2)); + ASSERT_TRUE(borrower2->rc_.HasReference(owner_id1)); + ASSERT_FALSE(borrower2->rc_.HasReference(owner_id2)); // Borrower 1 should now know that borrower 2 is borrowing the inner object // ID. - borrower1->HandleSubmittedTaskFinished(borrower_id, borrower2->address_, borrower_refs); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id1)); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id2)); + borrower1->HandleSubmittedTaskFinished(borrower_id, {}, borrower2->address_, + borrower_refs); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id1)); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); // Borrower 1 finishes. It should only have its reference to owner_id2 now. borrower_refs = borrower1->FinishExecutingTask(owner_id3, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id3)); + ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id3)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(owner_id3, borrower1->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(owner_id3, {}, borrower1->address_, borrower_refs); // Check that owner now has borrower2 in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - ASSERT_TRUE(owner_rc.HasReference(owner_id2)); - ASSERT_FALSE(owner_rc.HasReference(owner_id3)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id2)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id3)); // The borrower receives the owner's wait message. borrower2->FlushBorrowerCallbacks(); - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - borrower_rc2.RemoveLocalReference(owner_id1, nullptr); - ASSERT_FALSE(borrower_rc2.HasReference(owner_id1)); - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + borrower2->rc_.RemoveLocalReference(owner_id1, nullptr); + ASSERT_FALSE(borrower2->rc_.HasReference(owner_id1)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); // The borrower receives the owner's wait message. borrower1->FlushBorrowerCallbacks(); - ASSERT_TRUE(owner_rc.HasReference(owner_id2)); - borrower_rc1.RemoveLocalReference(borrower_id, nullptr); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id1)); - ASSERT_FALSE(owner_rc.HasReference(owner_id2)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id2)); + borrower1->rc_.RemoveLocalReference(borrower_id, nullptr); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id2)); } // A borrower is given a reference to an object ID and passes the reference to @@ -912,13 +916,11 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { // outer_id = ray.put([inner_id]) // res = borrower.remote(outer_id) TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto borrower = std::make_shared("1"); + auto owner = std::make_shared("2", [&](const rpc::Address &addr) { RAY_CHECK(addr.ip_address() == borrower->address_.ip_address()); return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -930,8 +932,8 @@ TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(outer_id, nullptr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); // Borrower 1 is given a reference to the inner object. borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); @@ -939,45 +941,45 @@ TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { auto outer_id2 = ObjectID::FromRandom(); borrower->PutWrappedId(outer_id2, inner_id); borrower->SubmitTaskWithArg(outer_id2); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - borrower_rc.RemoveLocalReference(outer_id2, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); - ASSERT_TRUE(borrower_rc.HasReference(outer_id2)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + borrower->rc_.RemoveLocalReference(outer_id2, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(outer_id2)); // The borrower task returns to the owner without waiting for its submitted // task to finish. auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); - ASSERT_TRUE(borrower_rc.HasReference(outer_id2)); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(outer_id2)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); // The owner receives the borrower's reply and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); borrower->FlushBorrowerCallbacks(); // Check that owner now has a borrower for inner. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer == 0 since the borrower task // returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); // Owner starts executing the submitted task. It is given a second reference // to the inner object when it gets outer_id2 as an argument. owner->ExecuteTaskWithArg(outer_id2, inner_id, owner->task_id_, owner->address_); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Owner finishes but it is still using inner_id. borrower_refs = owner->FinishExecutingTask(outer_id2, ObjectID::Nil()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); - borrower->HandleSubmittedTaskFinished(outer_id2, owner->address_, borrower_refs); + borrower->HandleSubmittedTaskFinished(outer_id2, {}, owner->address_, borrower_refs); borrower->FlushBorrowerCallbacks(); // Borrower no longer has a reference to any objects. - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc.HasReference(outer_id2)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id2)); // The owner should now have borrower 2 in its count. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); - owner_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + owner->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // A borrower is given two references to the same object ID. `task` and `Actor` @@ -997,10 +999,9 @@ TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { // res = task.remote(outer_id) // Actor.remote(outer_id) TEST(DistributedReferenceCountTest, TestDuplicateBorrower) { - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { return borrower; }); - auto owner = std::make_shared(owner_rc, "2"); + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); // The owner creates an inner object and wraps it. auto inner_id = ObjectID::FromRandom(); @@ -1012,67 +1013,64 @@ TEST(DistributedReferenceCountTest, TestDuplicateBorrower) { // be given a reference to inner_id. owner->SubmitTaskWithArg(outer_id); // The owner's references go out of scope. - owner_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + owner->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower task returns to the owner without waiting for its submitted // task to finish. auto borrower_refs1 = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); // Check that the borrower's ref count for inner_id > 0 because of the // pending task. - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower is given a 2nd reference to the inner object. owner->SubmitTaskWithArg(outer_id); - owner_rc.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(outer_id, nullptr); borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); auto borrower_refs2 = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); // The owner receives the borrower's replies and merges the borrower's ref // count into its own. - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs1); - owner->HandleSubmittedTaskFinished(outer_id, borrower->address_, borrower_refs2); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs1); + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs2); borrower->FlushBorrowerCallbacks(); // Check that owner now has borrower in inner's borrowers list. - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Check that owner's ref count for outer == 0 since the borrower task // returned and there were no local references to outer_id. - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); // The task submitted by the borrower returns and its second reference goes // out of scope. Everyone's ref count should go to 0. borrower->HandleSubmittedTaskFinished(inner_id); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(borrower_rc.HasReference(outer_id)); - ASSERT_FALSE(owner_rc.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); } // A borrower is given references to 2 different objects, which each contain a // reference to an object ID. The borrower unwraps both objects and receives a // duplicate reference to the inner ID. TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { - ReferenceCounter borrower_rc1; - auto borrower1 = std::make_shared(borrower_rc1, "1"); - ReferenceCounter borrower_rc2; - auto borrower2 = std::make_shared(borrower_rc2, "2"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto borrower1 = std::make_shared("1"); + auto borrower2 = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { if (addr.ip_address() == borrower1->address_.ip_address()) { return borrower1; } else { return borrower2; } }); - auto owner = std::make_shared(owner_rc, "3"); // The owner creates an inner object and wraps it. auto owner_id1 = ObjectID::FromRandom(); @@ -1084,17 +1082,17 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { owner->SubmitTaskWithArg(owner_id3); owner->SubmitTaskWithArg(owner_id2); - owner_rc.RemoveLocalReference(owner_id1, nullptr); - owner_rc.RemoveLocalReference(owner_id2, nullptr); - owner_rc.RemoveLocalReference(owner_id3, nullptr); + owner->rc_.RemoveLocalReference(owner_id1, nullptr); + owner->rc_.RemoveLocalReference(owner_id2, nullptr); + owner->rc_.RemoveLocalReference(owner_id3, nullptr); borrower2->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, owner->address_); - borrower_rc2.RemoveLocalReference(owner_id2, nullptr); + borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); // The nested task returns while still using owner_id1. auto borrower_refs = borrower2->FinishExecutingTask(owner_id3, ObjectID::Nil()); - owner->HandleSubmittedTaskFinished(owner_id3, borrower2->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(owner_id3, {}, borrower2->address_, borrower_refs); ASSERT_TRUE(borrower2->FlushBorrowerCallbacks()); // The owner submits a task that is given a reference to owner_id1. @@ -1102,37 +1100,38 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { // The borrower wraps the object ID again. auto borrower_id = ObjectID::FromRandom(); borrower1->PutWrappedId(borrower_id, owner_id1); - borrower_rc1.RemoveLocalReference(owner_id1, nullptr); + borrower1->rc_.RemoveLocalReference(owner_id1, nullptr); // Borrower 1 submits a task that depends on the wrapped object. The task // will be given a reference to owner_id1. borrower1->SubmitTaskWithArg(borrower_id); - borrower_rc1.RemoveLocalReference(borrower_id, nullptr); + borrower1->rc_.RemoveLocalReference(borrower_id, nullptr); borrower2->ExecuteTaskWithArg(borrower_id, owner_id1, owner->task_id_, owner->address_); // The nested task returns while still using owner_id1. // It should now have 2 local references to owner_id1, one from the owner and // one from the borrower. borrower_refs = borrower2->FinishExecutingTask(borrower_id, ObjectID::Nil()); - borrower1->HandleSubmittedTaskFinished(borrower_id, borrower2->address_, borrower_refs); + borrower1->HandleSubmittedTaskFinished(borrower_id, {}, borrower2->address_, + borrower_refs); // Borrower 1 finishes. It should not have any references now because all // state has been merged into the owner. borrower_refs = borrower1->FinishExecutingTask(owner_id2, ObjectID::Nil()); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id1)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id2)); - ASSERT_FALSE(borrower_rc1.HasReference(owner_id3)); - ASSERT_FALSE(borrower_rc1.HasReference(borrower_id)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id2)); + ASSERT_FALSE(borrower1->rc_.HasReference(owner_id3)); + ASSERT_FALSE(borrower1->rc_.HasReference(borrower_id)); // Borrower 1 should not have merge any refs into the owner because borrower 2's ref was // already merged into the owner. - owner->HandleSubmittedTaskFinished(owner_id2, borrower1->address_, borrower_refs); + owner->HandleSubmittedTaskFinished(owner_id2, {}, borrower1->address_, borrower_refs); // The borrower receives the owner's wait message. borrower2->FlushBorrowerCallbacks(); - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - borrower_rc2.RemoveLocalReference(owner_id1, nullptr); - ASSERT_TRUE(owner_rc.HasReference(owner_id1)); - borrower_rc2.RemoveLocalReference(owner_id1, nullptr); - ASSERT_FALSE(borrower_rc2.HasReference(owner_id1)); - ASSERT_FALSE(owner_rc.HasReference(owner_id1)); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + borrower2->rc_.RemoveLocalReference(owner_id1, nullptr); + ASSERT_TRUE(owner->rc_.HasReference(owner_id1)); + borrower2->rc_.RemoveLocalReference(owner_id1, nullptr); + ASSERT_FALSE(borrower2->rc_.HasReference(owner_id1)); + ASSERT_FALSE(owner->rc_.HasReference(owner_id1)); } // We submit a task and immediately delete the reference to the return ID. The @@ -1145,13 +1144,11 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { // // returns_id.remote() TEST(DistributedReferenceCountTest, TestReturnObjectIdNoBorrow) { - ReferenceCounter caller_rc; - auto caller = std::make_shared(caller_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto caller = std::make_shared("1"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { RAY_CHECK(addr.ip_address() == caller->address_.ip_address()); return caller; }); - auto owner = std::make_shared(owner_rc, "3"); // Caller submits a task. auto return_id = caller->SubmitTaskWithArg(ObjectID::Nil()); @@ -1161,20 +1158,20 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdNoBorrow) { owner->Put(inner_id); rpc::WorkerAddress addr(caller->address_); auto refs = owner->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); ASSERT_TRUE(refs.empty()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Caller's ref to the task's return ID goes out of scope before it hears // from the owner of inner_id. - caller->HandleSubmittedTaskFinished(ObjectID::Nil()); - caller_rc.RemoveLocalReference(return_id, nullptr); - ASSERT_FALSE(caller_rc.HasReference(return_id)); - ASSERT_FALSE(caller_rc.HasReference(inner_id)); + caller->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); + caller->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_FALSE(caller->rc_.HasReference(inner_id)); // Caller should respond to the owner's message immediately. ASSERT_TRUE(caller->FlushBorrowerCallbacks()); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // We submit a task and keep the reference to the return ID. The submitted task @@ -1187,13 +1184,11 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdNoBorrow) { // // return_id = returns_id.remote() TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrow) { - ReferenceCounter caller_rc; - auto caller = std::make_shared(caller_rc, "1"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto caller = std::make_shared("1"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { RAY_CHECK(addr.ip_address() == caller->address_.ip_address()); return caller; }); - auto owner = std::make_shared(owner_rc, "3"); // Caller submits a task. auto return_id = caller->SubmitTaskWithArg(ObjectID::Nil()); @@ -1203,22 +1198,22 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrow) { owner->Put(inner_id); rpc::WorkerAddress addr(caller->address_); auto refs = owner->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); ASSERT_TRUE(refs.empty()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Caller receives the owner's message, but inner_id is still in scope // because caller has a reference to return_id. - caller->HandleSubmittedTaskFinished(ObjectID::Nil()); + caller->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); ASSERT_TRUE(caller->FlushBorrowerCallbacks()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Caller's reference to return_id goes out of scope. The caller should // respond to the owner of inner_id so that inner_id can be deleted. - caller_rc.RemoveLocalReference(return_id, nullptr); - ASSERT_FALSE(caller_rc.HasReference(return_id)); - ASSERT_FALSE(caller_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + caller->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_FALSE(caller->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); } // We submit a task and submit another task that depends on the return ID. The @@ -1233,18 +1228,15 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrow) { // return_id = returns_id.remote() // borrow.remote(return_id) TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrowChain) { - ReferenceCounter caller_rc; - auto caller = std::make_shared(caller_rc, "1"); - ReferenceCounter borrower_rc; - auto borrower = std::make_shared(borrower_rc, "2"); - ReferenceCounter owner_rc(true, [&](const rpc::Address &addr) { + auto caller = std::make_shared("1"); + auto borrower = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { if (addr.ip_address() == caller->address_.ip_address()) { return caller; } else { return borrower; } }); - auto owner = std::make_shared(owner_rc, "3"); // Caller submits a task. auto return_id = caller->SubmitTaskWithArg(ObjectID::Nil()); @@ -1254,43 +1246,421 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrowChain) { owner->Put(inner_id); rpc::WorkerAddress addr(caller->address_); auto refs = owner->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); - owner_rc.RemoveLocalReference(inner_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); ASSERT_TRUE(refs.empty()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Caller receives the owner's message, but inner_id is still in scope // because caller has a reference to return_id. - caller->HandleSubmittedTaskFinished(ObjectID::Nil()); + caller->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); caller->SubmitTaskWithArg(return_id); - caller_rc.RemoveLocalReference(return_id, nullptr); + caller->rc_.RemoveLocalReference(return_id, nullptr); ASSERT_TRUE(caller->FlushBorrowerCallbacks()); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Borrower receives a reference to inner_id. It still has a reference when // the task returns. borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); auto borrower_refs = borrower->FinishExecutingTask(return_id, return_id); - ASSERT_TRUE(borrower_rc.HasReference(inner_id)); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // Borrower merges ref count into the caller. - caller->HandleSubmittedTaskFinished(return_id, borrower->address_, borrower_refs); + caller->HandleSubmittedTaskFinished(return_id, {}, borrower->address_, borrower_refs); // The caller should not have a ref count anymore because it was merged into // the owner. - ASSERT_FALSE(caller_rc.HasReference(return_id)); - ASSERT_FALSE(caller_rc.HasReference(inner_id)); - ASSERT_TRUE(owner_rc.HasReference(inner_id)); + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_FALSE(caller->rc_.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower's receives the owner's message and its reference goes out of // scope. ASSERT_TRUE(borrower->FlushBorrowerCallbacks()); - borrower_rc.RemoveLocalReference(inner_id, nullptr); - ASSERT_FALSE(borrower_rc.HasReference(return_id)); - ASSERT_FALSE(borrower_rc.HasReference(inner_id)); - ASSERT_FALSE(owner_rc.HasReference(inner_id)); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower->rc_.HasReference(return_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); +} + +// We submit a task and submit another task that depends on the return ID. The +// first submitted task returns an object ID, which will get borrowed by the second +// task. The second task returns the borrowed ID. +// +// @ray.remote +// def returns_id(): +// inner_id = ray.put() +// return inner_id +// +// @ray.remote +// def returns_borrowed_id(inner_ids): +// return inner_ids +// +// return_id = returns_id.remote() +// returns_borrowed_id.remote(return_id) +TEST(DistributedReferenceCountTest, TestReturnBorrowedId) { + auto caller = std::make_shared("1"); + auto borrower = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { + if (addr.ip_address() == caller->address_.ip_address()) { + return caller; + } else { + return borrower; + } + }); + + // Caller submits a task. + auto return_id = caller->SubmitTaskWithArg(ObjectID::Nil()); + + // Task returns inner_id as its return value. + auto inner_id = ObjectID::FromRandom(); + owner->Put(inner_id); + rpc::WorkerAddress addr(caller->address_); + auto refs = owner->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(refs.empty()); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // Caller receives the owner's message, but inner_id is still in scope + // because caller has a reference to return_id. + caller->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); + auto borrower_return_id = caller->SubmitTaskWithArg(return_id); + caller->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_TRUE(caller->FlushBorrowerCallbacks()); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // Borrower receives a reference to inner_id. It returns the inner_id as its + // return value. + borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + auto borrower_refs = + borrower->FinishExecutingTask(return_id, borrower_return_id, &inner_id, &addr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + + // Borrower merges ref count into the caller. + caller->HandleSubmittedTaskFinished(return_id, {{borrower_return_id, {inner_id}}}, + borrower->address_, borrower_refs); + // The caller should still have a ref count because it has a reference to + // borrower_return_id. + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_TRUE(caller->rc_.HasReference(borrower_return_id)); + ASSERT_TRUE(caller->rc_.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // The borrower's receives the owner's message and its reference goes out of + // scope. + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower->rc_.HasReference(borrower_return_id)); + ASSERT_FALSE(borrower->rc_.HasReference(return_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + + // The caller's reference to the borrower's return value goes out of scope. + caller->rc_.RemoveLocalReference(borrower_return_id, nullptr); + ASSERT_FALSE(caller->rc_.HasReference(borrower_return_id)); + ASSERT_FALSE(caller->rc_.HasReference(inner_id)); + // The owner should still have the object ID in scope because it hasn't heard + // from borrower yet. + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + ASSERT_TRUE(borrower->FlushBorrowerCallbacks()); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); +} + +// We submit a task and submit another task that depends on the return ID. The +// first submitted task returns an object ID, which will get borrowed by the second +// task. The second task returns the borrowed ID. The driver gets the value of +// the second task and now has a reference to the inner object ID. +// +// @ray.remote +// def returns_id(): +// inner_id = ray.put() +// return inner_id +// +// @ray.remote +// def returns_borrowed_id(inner_ids): +// return inner_ids +// +// return_id = returns_id.remote() +// inner_id = ray.get(returns_borrowed_id.remote(return_id))[0] +TEST(DistributedReferenceCountTest, TestReturnBorrowedIdDeserialize) { + auto caller = std::make_shared("1"); + auto borrower = std::make_shared("2"); + auto owner = std::make_shared("3", [&](const rpc::Address &addr) { + if (addr.ip_address() == caller->address_.ip_address()) { + return caller; + } else { + return borrower; + } + }); + + // Caller submits a task. + auto return_id = caller->SubmitTaskWithArg(ObjectID::Nil()); + + // Task returns inner_id as its return value. + auto inner_id = ObjectID::FromRandom(); + owner->Put(inner_id); + rpc::WorkerAddress addr(caller->address_); + auto refs = owner->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(refs.empty()); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // Caller receives the owner's message, but inner_id is still in scope + // because caller has a reference to return_id. + caller->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); + auto borrower_return_id = caller->SubmitTaskWithArg(return_id); + caller->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // Borrower receives a reference to inner_id. It returns the inner_id as its + // return value. + borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + auto borrower_refs = + borrower->FinishExecutingTask(return_id, borrower_return_id, &inner_id, &addr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + + // Borrower merges ref count into the caller. + caller->HandleSubmittedTaskFinished(return_id, {{borrower_return_id, {inner_id}}}, + borrower->address_, borrower_refs); + // The caller should still have a ref count because it has a reference to + // borrower_return_id. + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_TRUE(caller->rc_.HasReference(borrower_return_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + caller->GetSerializedObjectId(borrower_return_id, inner_id, owner->task_id_, + owner->address_); + caller->rc_.RemoveLocalReference(borrower_return_id, nullptr); + ASSERT_TRUE(caller->FlushBorrowerCallbacks()); + caller->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(caller->rc_.HasReference(return_id)); + ASSERT_FALSE(caller->rc_.HasReference(borrower_return_id)); + ASSERT_FALSE(caller->rc_.HasReference(inner_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // The borrower's receives the owner's message and its reference goes out of + // scope. + ASSERT_TRUE(borrower->FlushBorrowerCallbacks()); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_FALSE(borrower->rc_.HasReference(borrower_return_id)); + ASSERT_FALSE(borrower->rc_.HasReference(return_id)); + ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); +} + +// Recursively returning IDs. We submit a task, which submits another task and +// returns the submitted task's return ID. The nested task creates an object +// and returns that ID. +// +// @ray.remote +// def nested_worker(): +// inner_id = ray.put() +// return inner_id +// +// @ray.remote +// def worker(): +// return nested_worker.remote() +// +// return_id = worker.remote() +// nested_return_id = ray.get(return_id) +// inner_id = ray.get(nested_return_id) +TEST(DistributedReferenceCountTest, TestReturnIdChain) { + auto root = std::make_shared("1"); + auto worker = std::make_shared("2", [&](const rpc::Address &addr) { + RAY_CHECK(addr.ip_address() == root->address_.ip_address()); + return root; + }); + auto nested_worker = + std::make_shared("3", [&](const rpc::Address &addr) { + RAY_CHECK(addr.ip_address() == worker->address_.ip_address()); + return worker; + }); + + // Root submits a task. + auto return_id = root->SubmitTaskWithArg(ObjectID::Nil()); + + // Task submits a nested task and returns the return ID. + auto nested_return_id = worker->SubmitTaskWithArg(ObjectID::Nil()); + rpc::WorkerAddress addr(root->address_); + auto refs = + worker->FinishExecutingTask(ObjectID::Nil(), return_id, &nested_return_id, &addr); + + // The nested task returns an ObjectID that it owns. + auto inner_id = ObjectID::FromRandom(); + nested_worker->Put(inner_id); + rpc::WorkerAddress worker_addr(worker->address_); + auto nested_refs = nested_worker->FinishExecutingTask(ObjectID::Nil(), nested_return_id, + &inner_id, &worker_addr); + nested_worker->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + // All task execution replies are received. + root->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {nested_return_id}}}); + worker->HandleSubmittedTaskFinished(ObjectID::Nil(), {{nested_return_id, {inner_id}}}); + root->FlushBorrowerCallbacks(); + worker->FlushBorrowerCallbacks(); + + // The reference only goes out of scope once the other workers' references to + // their submitted tasks' return ID go out of scope. + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + worker->rc_.RemoveLocalReference(nested_return_id, nullptr); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + root->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(nested_worker->rc_.HasReference(inner_id)); +} + +// Recursively returning a borrowed object ID. We submit a task, which submits +// another task, calls ray.get() on the return ID and returns the value. The +// nested task creates an object and returns that ID. +// +// @ray.remote +// def nested_worker(): +// inner_id = ray.put() +// return inner_id +// +// @ray.remote +// def worker(): +// return ray.get(nested_worker.remote()) +// +// return_id = worker.remote() +// inner_id = ray.get(return_id) +TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChain) { + auto root = std::make_shared("1"); + auto worker = std::make_shared("2", [&](const rpc::Address &addr) { + RAY_CHECK(addr.ip_address() == root->address_.ip_address()); + return root; + }); + auto nested_worker = + std::make_shared("3", [&](const rpc::Address &addr) { + if (addr.ip_address() == root->address_.ip_address()) { + return root; + } else { + return worker; + } + }); + + // Root submits a task. + auto return_id = root->SubmitTaskWithArg(ObjectID::Nil()); + + // Task submits a nested task. + auto nested_return_id = worker->SubmitTaskWithArg(ObjectID::Nil()); + + // The nested task returns an ObjectID that it owns. + auto inner_id = ObjectID::FromRandom(); + nested_worker->Put(inner_id); + rpc::WorkerAddress worker_addr(worker->address_); + auto nested_refs = nested_worker->FinishExecutingTask(ObjectID::Nil(), nested_return_id, + &inner_id, &worker_addr); + nested_worker->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + // Worker receives the reply from the nested task. + worker->HandleSubmittedTaskFinished(ObjectID::Nil(), {{nested_return_id, {inner_id}}}); + worker->FlushBorrowerCallbacks(); + // Worker deserializes the inner_id and returns it. + worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->task_id_, + nested_worker->address_); + rpc::WorkerAddress addr(root->address_); + auto refs = worker->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); + + // Worker no longer borrowers the inner ID. + worker->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(worker->rc_.HasReference(inner_id)); + worker->rc_.RemoveLocalReference(nested_return_id, nullptr); + ASSERT_FALSE(worker->rc_.HasReference(inner_id)); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + // Root receives worker's reply, then the WaitForRefRemovedRequest from + // nested_worker. + root->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); + root->FlushBorrowerCallbacks(); + // Object is still in scope because root now knows that return_id contains + // inner_id. + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + root->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(root->rc_.HasReference(return_id)); + ASSERT_FALSE(root->rc_.HasReference(inner_id)); + ASSERT_FALSE(nested_worker->rc_.HasReference(inner_id)); +} + +// Recursively returning a borrowed object ID. We submit a task, which submits +// another task, calls ray.get() on the return ID and returns the value. The +// nested task creates an object and returns that ID. +// +// This test is the same as above, except that it reorders messages so that the +// driver receives the WaitForRefRemovedRequest from nested_worker BEFORE it +// receives the reply from worker indicating that return_id contains inner_id. +// +// @ray.remote +// def nested_worker(): +// inner_id = ray.put() +// return inner_id +// +// @ray.remote +// def worker(): +// return ray.get(nested_worker.remote()) +// +// return_id = worker.remote() +// inner_id = ray.get(return_id) +TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { + auto root = std::make_shared("1"); + auto worker = std::make_shared("2", [&](const rpc::Address &addr) { + RAY_CHECK(addr.ip_address() == root->address_.ip_address()); + return root; + }); + auto nested_worker = + std::make_shared("3", [&](const rpc::Address &addr) { + if (addr.ip_address() == root->address_.ip_address()) { + return root; + } else { + return worker; + } + }); + + // Root submits a task. + auto return_id = root->SubmitTaskWithArg(ObjectID::Nil()); + + // Task submits a nested task. + auto nested_return_id = worker->SubmitTaskWithArg(ObjectID::Nil()); + + // The nested task returns an ObjectID that it owns. + auto inner_id = ObjectID::FromRandom(); + nested_worker->Put(inner_id); + rpc::WorkerAddress worker_addr(worker->address_); + auto nested_refs = nested_worker->FinishExecutingTask(ObjectID::Nil(), nested_return_id, + &inner_id, &worker_addr); + nested_worker->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + // Worker receives the reply from the nested task. + worker->HandleSubmittedTaskFinished(ObjectID::Nil(), {{nested_return_id, {inner_id}}}); + worker->FlushBorrowerCallbacks(); + // Worker deserializes the inner_id and returns it. + worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->task_id_, + nested_worker->address_); + rpc::WorkerAddress addr(root->address_); + auto refs = worker->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); + + // Worker no longer borrowers the inner ID. + worker->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(worker->rc_.HasReference(inner_id)); + worker->rc_.RemoveLocalReference(nested_return_id, nullptr); + ASSERT_FALSE(worker->rc_.HasReference(inner_id)); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + // Root receives the WaitForRefRemovedRequest from nested_worker BEFORE the + // reply from worker. + root->FlushBorrowerCallbacks(); + ASSERT_TRUE(nested_worker->rc_.HasReference(inner_id)); + + root->HandleSubmittedTaskFinished(ObjectID::Nil(), {{return_id, {inner_id}}}); + root->rc_.RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(root->rc_.HasReference(return_id)); + ASSERT_FALSE(root->rc_.HasReference(inner_id)); + ASSERT_FALSE(nested_worker->rc_.HasReference(inner_id)); } -// TODO: Test returning an Object ID. // TODO: Test Pop and Merge individually. } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index e7a366356..ead233a5f 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -161,7 +161,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec RAY_CHECK(object_id.IsDirectCallType()); std::vector)>> async_callbacks; auto object_entry = std::make_shared(object.GetData(), object.GetMetadata(), - object.GetInlinedIds(), true); + object.GetNestedIds(), true); { absl::MutexLock lock(&mu_); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 9fbf30f1f..8817d79cc 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -34,7 +34,7 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, } } } - reference_counter_->AddSubmittedTaskReferences(task_deps); + reference_counter_->UpdateSubmittedTaskReferences(task_deps); // Add new owned objects for the return values of the task. size_t num_returns = spec.NumReturns(); @@ -110,7 +110,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, } RAY_CHECK_OK(in_memory_store_->Put( RayObject(data_buffer, metadata_buffer, - IdVectorFromProtobuf(return_object.inlined_ids())), + IdVectorFromProtobuf(return_object.nested_inlined_ids())), object_id)); } } @@ -186,20 +186,14 @@ void TaskManager::ShutdownIfNeeded() { } } -void TaskManager::RemoveSubmittedTaskReferences( - const std::vector &object_ids, const rpc::Address &worker_addr, - const ReferenceCounter::ReferenceTableProto &borrowed_refs) { - std::vector deleted; - reference_counter_->UpdateSubmittedTaskReferences(object_ids, worker_addr, - borrowed_refs, &deleted); - in_memory_store_->Delete(deleted); -} - void TaskManager::OnTaskDependenciesInlined( const std::vector &inlined_dependency_ids, const std::vector &contained_ids) { - reference_counter_->AddSubmittedTaskReferences(contained_ids); - RemoveSubmittedTaskReferences(inlined_dependency_ids); + std::vector deleted; + reference_counter_->UpdateSubmittedTaskReferences( + /*argument_ids_to_add=*/contained_ids, + /*argument_ids_to_remove=*/inlined_dependency_ids, &deleted); + in_memory_store_->Delete(deleted); } void TaskManager::RemoveFinishedTaskReferences( @@ -217,7 +211,11 @@ void TaskManager::RemoveFinishedTaskReferences( inlined_ids.end()); } } - RemoveSubmittedTaskReferences(plasma_dependencies, borrower_addr, borrowed_refs); + + std::vector deleted; + reference_counter_->UpdateFinishedTaskReferences(plasma_dependencies, borrower_addr, + borrowed_refs, &deleted); + in_memory_store_->Delete(deleted); } void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 52f2c0333..e3f504b66 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -105,17 +105,6 @@ class TaskManager : public TaskFinisherInterface { void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_); - /// Remove submitted task references in the reference counter for the object IDs. - /// If the references were borrowed by a worker while executing a task, then - /// merge in the ref counts for any references that the task (or a nested - /// task) is still borrowing. If any reference counts for the borrowed - /// objects reach zero, they are deleted from the in-memory store. - void RemoveSubmittedTaskReferences( - const std::vector &object_ids, - const rpc::Address &worker_addr = rpc::Address(), - const ReferenceCounter::ReferenceTableProto &borrowed_refs = - ReferenceCounter::ReferenceTableProto()); - /// Helper function to call RemoveSubmittedTaskReferences on the remaining /// dependencies of the given task spec after the task has finished or /// failed. The remaining dependencies are plasma objects and any ObjectIDs diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index 392e0067b..4fade5b1c 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -44,9 +44,9 @@ void InlineDependencies( const auto &metadata = it->second->GetMetadata(); mutable_arg->set_metadata(metadata->Data(), metadata->Size()); } - for (const auto &inlined_id : it->second->GetInlinedIds()) { - mutable_arg->add_nested_inlined_ids(inlined_id.Binary()); - contained_ids->push_back(inlined_id); + for (const auto &nested_id : it->second->GetNestedIds()) { + mutable_arg->add_nested_inlined_ids(nested_id.Binary()); + contained_ids->push_back(nested_id); } inlined_dependency_ids->push_back(id); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 825771744..c5fbaebac 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -275,8 +275,8 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( return_object->set_metadata(result->GetMetadata()->Data(), result->GetMetadata()->Size()); } - for (const auto &inlined_id : result->GetInlinedIds()) { - return_object->add_inlined_ids(inlined_id.Binary()); + for (const auto &nested_id : result->GetNestedIds()) { + return_object->add_nested_inlined_ids(nested_id.Binary()); } } } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 2698574c0..12c4dc383 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -63,8 +63,8 @@ message ReturnObject { bytes data = 3; // Metadata of the object. bytes metadata = 4; - // ObjectIDs that were inlined in the data field. - repeated bytes inlined_ids = 5; + // ObjectIDs that were nested in data. This is only set for inlined objects. + repeated bytes nested_inlined_ids = 5; } message PushTaskRequest { @@ -195,12 +195,19 @@ message ObjectReferenceCount { bool has_local_ref = 2; // Any other borrowers that the worker created (by passing the ID on to them). repeated Address borrowers = 3; + // The borrower may have returned the object ID nested inside the return + // value of a task that it executed. This list contains all task returns that + // were owned by a process other than the borrower. Then, the process that + // owns the task's return value is also a borrower for as long as it has the + // task return ID in scope. Note that only the object ID and owner address + // are used for elements in this list. + repeated ObjectReference stored_in_objects = 4; // The borrowed object ID that contained this object, if any. This is used // for nested object IDs. - bytes contained_in_borrowed_id = 4; + bytes contained_in_borrowed_id = 5; // The object IDs that this object contains, if any. This is used for nested // object IDs. - repeated bytes contains = 5; + repeated bytes contains = 6; } message WaitForRefRemovedReply {