[core] Fix bug in ref counting protocol for nested objects (#18821)

* Fix assertion crash

* test, lint

* todo

* tests

* protocol

* test

* fix

* lint

* header

* recursive

* note

* forward test

* lock

* lint

* unneeded check
This commit is contained in:
Stephanie Wang 2021-09-23 09:45:12 -07:00 committed by GitHub
parent 5d57eed598
commit 7b1e594412
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 247 additions and 84 deletions

View file

@ -661,11 +661,45 @@ def test_deep_nested_refs(shutdown_only):
while isinstance(r, ray.ObjectRef):
print(i, r)
i += 1
try:
r = ray.get(r)
except ray.exceptions.ReferenceCountingAssertionError:
# TODO(swang): https://github.com/ray-project/ray/issues/18751.
break
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_forward_nested_ref(shutdown_only):
ray.init(object_store_memory=100 * 1024 * 1024)
@ray.remote
def nested_ref():
return ray.put(1)
@ray.remote
def nested_nested_ref():
return nested_ref.remote()
@ray.remote
class Borrower:
def __init__(self):
return
def pass_ref(self, middle_ref):
self.inner_ref = ray.get(middle_ref)
def check_ref(self):
ray.get(self.inner_ref)
@ray.remote
def pass_nested_ref(borrower, outer_ref):
ray.get(borrower.pass_ref.remote(outer_ref[0]))
b = Borrower.remote()
outer_ref = nested_nested_ref.remote()
x = pass_nested_ref.remote(b, [outer_ref])
del outer_ref
ray.get(x)
for _ in range(3):
ray.get(b.check_ref.remote())
time.sleep(1)
if __name__ == "__main__":

View file

@ -20,9 +20,7 @@
<< " submitted_count: " << it->second.submitted_task_ref_count \
<< " contained_in_owned: " << it->second.contained_in_owned.size() \
<< " contained_in_borrowed: " \
<< (it->second.contained_in_borrowed_id.has_value() \
? *it->second.contained_in_borrowed_id \
: ObjectID::Nil()) \
<< (it)->second.contained_in_borrowed_ids.size() \
<< " contains: " << it->second.contains.size() \
<< " lineage_ref_count: " << it->second.lineage_ref_count;
@ -100,14 +98,6 @@ bool ReferenceCounter::AddBorrowedObjectInternal(const ObjectID &object_id,
}
RAY_LOG(DEBUG) << "Adding borrowed object " << object_id;
// Skip adding this object as a borrower if we already have ownership info.
// If we already have ownership info, then either we are the owner or someone
// else already knows that we are a borrower.
if (it->second.owner_address) {
RAY_LOG(DEBUG) << "Skipping add borrowed object " << object_id;
return false;
}
it->second.owner_address = owner_address;
if (!outer_id.IsNil()) {
@ -115,9 +105,13 @@ bool ReferenceCounter::AddBorrowedObjectInternal(const ObjectID &object_id,
if (outer_it != object_id_refs_.end() && !outer_it->second.owned_by_us) {
RAY_LOG(DEBUG) << "Setting borrowed inner ID " << object_id
<< " contained_in_borrowed: " << outer_id;
RAY_CHECK(!it->second.contained_in_borrowed_id.has_value());
it->second.contained_in_borrowed_id = outer_id;
it->second.contained_in_borrowed_ids.insert(outer_id);
outer_it->second.contains.insert(object_id);
// The inner object ref is in use. We must report our ref to the object's
// owner.
if (it->second.RefCount() > 0) {
SetNestedRefInUseRecursive(it);
}
}
}
@ -224,9 +218,25 @@ void ReferenceCounter::AddLocalReference(const ObjectID &object_id,
// NOTE: ownership info for these objects must be added later via AddBorrowedObject.
it = object_id_refs_.emplace(object_id, Reference(call_site, -1)).first;
}
bool was_in_use = it->second.RefCount() > 0;
it->second.local_ref_count++;
RAY_LOG(DEBUG) << "Add local reference " << object_id;
PRINT_REF_COUNT(it);
if (!was_in_use && it->second.RefCount() > 0) {
SetNestedRefInUseRecursive(it);
}
}
void ReferenceCounter::SetNestedRefInUseRecursive(ReferenceTable::iterator inner_ref_it) {
for (const auto &contained_in_borrowed_id :
inner_ref_it->second.contained_in_borrowed_ids) {
auto contained_in_it = object_id_refs_.find(contained_in_borrowed_id);
RAY_CHECK(contained_in_it != object_id_refs_.end());
if (!contained_in_it->second.has_nested_refs_to_report) {
contained_in_it->second.has_nested_refs_to_report = true;
SetNestedRefInUseRecursive(contained_in_it);
}
}
}
void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
@ -264,10 +274,14 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
// because we don't hold a Python reference to its ObjectID.
it = object_id_refs_.emplace(argument_id, Reference()).first;
}
bool was_in_use = it->second.RefCount() > 0;
it->second.submitted_task_ref_count++;
// The lineage ref will get released once the task finishes and cannot be
// retried again.
it->second.lineage_ref_count++;
if (!was_in_use && it->second.RefCount() > 0) {
SetNestedRefInUseRecursive(it);
}
}
// Release the submitted task ref and the lineage ref for any argument IDs
// whose values were inlined.
@ -281,7 +295,11 @@ void ReferenceCounter::UpdateResubmittedTaskReferences(
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
bool was_in_use = it->second.RefCount() > 0;
it->second.submitted_task_ref_count++;
if (!was_in_use && it->second.RefCount() > 0) {
SetNestedRefInUseRecursive(it);
}
}
}
@ -469,14 +487,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
// object.
RAY_CHECK(inner_it->second.contained_in_owned.erase(id));
} else {
// If this object ID was nested in a borrowed object, make sure that
// we have already returned this information through a previous
// GetAndClearLocalBorrowers call.
if (inner_it->second.contained_in_borrowed_id.has_value()) {
RAY_LOG(DEBUG) << "Object " << id
<< " deleted, still contains inner borrowed Ref " << inner_id;
should_delete_ref = false;
}
RAY_CHECK(inner_it->second.contained_in_borrowed_ids.erase(id));
}
DeleteReferenceInternal(inner_it, deleted);
}
@ -694,21 +705,12 @@ bool ReferenceCounter::GetAndClearLocalBorrowersInternal(const ObjectID &object_
// until all active borrowers are merged into the owner.
it->second.borrowers.clear();
it->second.stored_in_objects.clear();
if (it->second.contained_in_borrowed_id.has_value()) {
/// This ID was nested in another ID that we (or a nested task) borrowed.
/// Make sure that we also returned the ID that contained it.
RAY_CHECK(borrowed_refs->count(it->second.contained_in_borrowed_id.value()) > 0);
/// Clear the fact that this ID was nested because we are including it in
/// the returned borrowed_refs. If the nested ID is not being borrowed by
/// us, then it will be deleted recursively when deleting the outer ID.
it->second.contained_in_borrowed_id.reset();
}
// Attempt to pop children.
for (const auto &contained_id : it->second.contains) {
GetAndClearLocalBorrowersInternal(contained_id, borrowed_refs);
}
// We've reported our nested refs.
it->second.has_nested_refs_to_report = false;
return true;
}
@ -732,14 +734,6 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id,
if (it == object_id_refs_.end()) {
it = object_id_refs_.emplace(object_id, Reference()).first;
}
if (!it->second.owner_address && borrower_ref.contained_in_borrowed_id.has_value()) {
// We don't have owner information about this object ID yet and the worker
// received it because it was nested in another ID that the worker was
// borrowing. Copy this information to our local table.
RAY_CHECK(borrower_ref.owner_address);
AddBorrowedObjectInternal(object_id, *borrower_it->second.contained_in_borrowed_id,
*borrower_ref.owner_address);
}
std::vector<rpc::WorkerAddress> new_borrowers;
// The worker is still using the reference, so it is still a borrower.
@ -763,12 +757,25 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id,
}
}
// This ref was nested inside another object. Copy this information to our
// local table.
for (const auto &contained_in_borrowed_id :
borrower_it->second.contained_in_borrowed_ids) {
RAY_CHECK(borrower_ref.owner_address);
AddBorrowedObjectInternal(object_id, contained_in_borrowed_id,
*borrower_ref.owner_address);
}
// If we own this ID, then wait for all new borrowers to reach a ref count
// of 0 before GCing the object value.
if (it->second.owned_by_us) {
for (const auto &addr : new_borrowers) {
WaitForRefRemoved(it, addr);
}
} else {
// We received ref counts from another borrower. Make sure we forward it
// back to the owner.
SetNestedRefInUseRecursive(it);
}
// If the borrower stored this object ID inside another object ID that it did
@ -872,7 +879,11 @@ void ReferenceCounter::AddNestedObjectIdsInternal(
// That's why we use two loops, and we should avoid using `it` hearafter.
for (const auto &inner_id : inner_ids) {
auto inner_it = object_id_refs_.emplace(inner_id, Reference()).first;
bool was_in_use = inner_it->second.RefCount() > 0;
inner_it->second.contained_in_owned.insert(object_id);
if (!was_in_use && inner_it->second.RefCount() > 0) {
SetNestedRefInUseRecursive(inner_it);
}
}
}
} else {
@ -911,14 +922,6 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id) {
RAY_LOG(DEBUG) << pair.first << " has " << pair.second.borrowers.size()
<< " borrowers";
}
auto it = object_id_refs_.find(object_id);
if (it != object_id_refs_.end()) {
// We should only have called this callback once our local ref count for
// the object was zero. Also, we should have stripped all distributed ref
// count information and returned it to the owner. Therefore, it should be
// okay to delete the object, if it wasn't already deleted.
RAY_CHECK(it->second.OutOfScope(lineage_pinning_enabled_));
}
// Send the owner information about any new borrowers.
rpc::PubMessage pub_message;
@ -1245,11 +1248,10 @@ ReferenceCounter::Reference ReferenceCounter::Reference::FromProto(
for (const auto &id : ref_count.contains()) {
ref.contains.insert(ObjectID::FromBinary(id));
}
const auto contained_in_borrowed_id =
ObjectID::FromBinary(ref_count.contained_in_borrowed_id());
if (!contained_in_borrowed_id.IsNil()) {
ref.contained_in_borrowed_id = contained_in_borrowed_id;
}
const auto contained_in_borrowed_ids =
IdVectorFromProtobuf<ObjectID>(ref_count.contained_in_borrowed_ids());
ref.contained_in_borrowed_ids.insert(contained_in_borrowed_ids.begin(),
contained_in_borrowed_ids.end());
return ref;
}
@ -1267,8 +1269,8 @@ void ReferenceCounter::Reference::ToProto(rpc::ObjectReferenceCount *ref) const
ref_object->set_object_id(object.first.Binary());
ref_object->mutable_owner_address()->CopyFrom(object.second.ToProto());
}
if (contained_in_borrowed_id.has_value()) {
ref->set_contained_in_borrowed_id(contained_in_borrowed_id->Binary());
for (const auto &contained_in_borrowed_id : contained_in_borrowed_ids) {
ref->add_contained_in_borrowed_ids(contained_in_borrowed_id.Binary());
}
for (const auto &contains_id : contains) {
ref->add_contains(contains_id.Binary());

View file

@ -496,8 +496,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// The reference count. This number includes:
/// - Python references to the ObjectID.
/// - Pending submitted tasks that depend on the object.
/// - ObjectIDs that we own, that contain this ObjectID, and that are still
/// in scope.
/// - ObjectIDs containing this ObjectID that we own and that are still in
/// scope.
size_t RefCount() const {
return local_ref_count + submitted_task_ref_count + contained_in_owned.size();
}
@ -510,7 +510,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// - We gave the reference to at least one other process.
bool OutOfScope(bool lineage_pinning_enabled) const {
bool in_scope = RefCount() > 0;
bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value();
bool is_nested = contained_in_borrowed_ids.size();
bool has_borrowers = borrowers.size() > 0;
bool was_stored_in_objects = stored_in_objects.size() > 0;
@ -519,7 +519,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
has_lineage_references = lineage_ref_count > 0;
}
return !(in_scope || was_contained_in_borrowed_id || has_borrowers ||
return !(in_scope || is_nested || has_nested_refs_to_report || has_borrowers ||
was_stored_in_objects || has_lineage_references);
}
@ -573,30 +573,23 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// 2. A task that we submitted returned an ID(s).
/// ObjectIDs are erased from this field when their Reference is deleted.
absl::flat_hash_set<ObjectID> contained_in_owned;
/// An Object ID that we (or one of our children) borrowed that contains
/// this object ID, which is also borrowed. This is used in cases where an
/// ObjectID is nested. We need to notify the owner of the outer ID of any
/// borrowers of this object, so we keep this field around until
/// GetAndClearLocalBorrowersInternal is called on the outer ID. This field
/// is updated in 2 cases:
/// 1. We deserialize an ID that we do not own and that was stored in
/// another object that we do not own.
/// 2. Case (1) occurred for a task that we submitted and we also do not
/// own the inner or outer object. Then, we need to notify our caller
/// that the task we submitted is a borrower for the inner ID.
/// This field is reset to null once GetAndClearLocalBorrowersInternal is
/// called on contained_in_borrowed_id. For each borrower, this field is
/// set at most once during the reference's lifetime. If the object ID is
/// later found to be nested in a second object, we do not need to remember
/// the second ID because we will already have notified the owner of the
/// first outer object about our reference.
absl::optional<ObjectID> contained_in_borrowed_id;
/// Object IDs that we borrowed and that contain this object ID.
/// ObjectIDs are added to this field when we get the value of an ObjectRef
/// (either by deserializing the object or receiving the GetObjectStatus
/// reply for inlined objects) and it contains another ObjectRef.
absl::flat_hash_set<ObjectID> contained_in_borrowed_ids;
/// Reverse pointer for contained_in_owned and contained_in_borrowed_ids.
/// The object IDs contained in this object. These could be objects that we
/// own or are borrowing. This field is updated in 2 cases:
/// 1. We call ray.put() on this ID and store the contained IDs.
/// 2. We call ray.get() on an ID whose contents we do not know and we
/// discover that it contains these IDs.
absl::flat_hash_set<ObjectID> contains;
/// ObjectRefs nested in this object that are or were in use. These objects
/// are not owned by us, and we need to report that we are borrowing them
/// to their owner. Nesting is transitive, so this flag is set as long as
/// any child object is in scope.
bool has_nested_refs_to_report = false;
/// A list of processes that are we gave a reference to that are still
/// borrowing the ID. This field is updated in 2 cases:
/// 1. If we are a borrower of the ID, then we add a process to this list
@ -640,6 +633,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
using ReferenceTable = absl::flat_hash_map<ObjectID, Reference>;
void SetNestedRefInUseRecursive(ReferenceTable::iterator inner_ref_it)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
bool GetOwnerInternal(const ObjectID &object_id,
rpc::Address *owner_address = nullptr) const
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

View file

@ -44,11 +44,14 @@ class ReferenceCountTest : public ::testing::Test {
}
virtual void TearDown() {
AssertNoLeaks();
publisher_.reset();
subscriber_.reset();
rc.reset();
}
void AssertNoLeaks() { ASSERT_EQ(rc->NumObjectIDsInScope(), 0); }
std::shared_ptr<mock_pubsub::MockPublisher> publisher_;
std::shared_ptr<mock_pubsub::MockSubscriber> subscriber_;
};
@ -255,6 +258,12 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
rc_(rpc::WorkerAddress(address_), publisher_.get(), subscriber_.get(),
/*lineage_pinning_enabled=*/false, client_factory) {}
~MockWorkerClient() override {
if (!failed_) {
AssertNoLeaks();
}
}
void WaitForRefRemoved(const ObjectID object_id, const ObjectID contained_in_id,
rpc::Address owner_address) override {
auto r = num_requests_;
@ -297,6 +306,7 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
}
}
subscription_failure_callback_map.clear();
failed_ = true;
}
// The below methods mirror a core worker's operations, e.g., `Put` simulates
@ -326,11 +336,14 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
}
ObjectID SubmitTaskWithArg(const ObjectID &arg_id) {
if (!arg_id.IsNil()) {
rc_.UpdateSubmittedTaskReferences({arg_id});
}
ObjectID return_id = ObjectID::FromRandom();
rc_.AddOwnedObject(return_id, {}, address_, "", 0, false);
// Add a sentinel reference to keep all nested object IDs in scope.
rc_.AddLocalReference(return_id, "");
return_ids_.push_back(return_id);
return return_id;
}
@ -368,6 +381,18 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
WorkerID GetID() const { return WorkerID::FromBinary(address_.worker_id()); }
void AssertNoLeaks() {
for (const auto &return_id : return_ids_) {
if (rc_.HasReference(return_id)) {
rc_.RemoveLocalReference(return_id, nullptr);
}
}
for (const auto &id : rc_.GetAllInScopeObjectIDs()) {
RAY_LOG(INFO) << id;
}
ASSERT_EQ(rc_.NumObjectIDsInScope(), 0);
}
// Global map from Worker ID -> MockWorkerClient.
// Global map from Object ID -> owner worker ID, list of objects that it depends on,
// worker address that it's scheduled on. Worker map of pending return IDs.
@ -379,6 +404,8 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
ReferenceCounter rc_;
std::unordered_map<int, std::function<void()>> borrower_callbacks_;
int num_requests_ = 0;
std::vector<ObjectID> return_ids_;
bool failed_ = false;
};
// Tests basic incrementing/decrementing of direct/submitted task reference counts. An
@ -508,6 +535,9 @@ TEST_F(ReferenceCountTest, TestReferenceStats) {
ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 0);
ASSERT_EQ(stats2.object_refs(0).object_size(), 100);
ASSERT_EQ(stats2.object_refs(0).call_site(), "file2.py:43");
rc->AddLocalReference(id2, "");
rc->RemoveLocalReference(id2, nullptr);
}
// Tests fetching of locality data from reference table.
@ -575,6 +605,11 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) {
absl::optional<NodeID>(node2));
auto locality_data_obj2_no_object_size = rc->GetLocalityData(obj2);
ASSERT_FALSE(locality_data_obj2_no_object_size.has_value());
rc->AddLocalReference(obj1, "");
rc->RemoveLocalReference(obj1, nullptr);
rc->AddLocalReference(obj2, "");
rc->RemoveLocalReference(obj2, nullptr);
}
// Tests that we can get the owner address correctly for objects that we own,
@ -601,6 +636,12 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
ASSERT_FALSE(rc->GetOwner(object_id3, &added_address));
rc->AddLocalReference(object_id3, "");
ASSERT_FALSE(rc->GetOwner(object_id3, &added_address));
rc->AddLocalReference(object_id, "");
rc->RemoveLocalReference(object_id, nullptr);
rc->AddLocalReference(object_id2, "");
rc->RemoveLocalReference(object_id2, nullptr);
rc->RemoveLocalReference(object_id3, nullptr);
}
// Tests that the ref counts are properly integrated into the local
@ -2416,6 +2457,96 @@ TEST_F(ReferenceCountTest, TestDelayedWaitForRefRemoved) {
ASSERT_FALSE(owner->rc_.HasReference(inner_id));
}
TEST_F(ReferenceCountTest, TestRepeatedDeserialization) {
auto borrower = std::make_shared<MockWorkerClient>("1");
auto owner = std::make_shared<MockWorkerClient>(
"2", [&](const rpc::Address &addr) { return borrower; });
// Owner owns a nested object ref, borrower is using the outer ObjectRef.
ObjectID outer_id = ObjectID::FromRandom();
ObjectID middle_id = ObjectID::FromRandom();
ObjectID inner_id = ObjectID::FromRandom();
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
owner->rc_.AddBorrowerAddress(outer_id, borrower->address_);
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
borrower->rc_.AddLocalReference(outer_id, "");
borrower->rc_.AddBorrowedObject(outer_id, ObjectID::Nil(), owner->address_);
borrower->rc_.AddLocalReference(middle_id, "");
borrower->rc_.AddBorrowedObject(middle_id, outer_id, owner->address_);
// Borrower receives the inlined inner ObjectRef.
// This also simulates the case where the borrower deserializes the inner
// ObjectRef, then deletes it.
borrower->rc_.AddBorrowedObject(inner_id, middle_id, owner->address_);
borrower->rc_.RemoveLocalReference(outer_id, nullptr);
ASSERT_TRUE(borrower->FlushBorrowerCallbacks());
ASSERT_FALSE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
// Borrower deserializes the inner ObjectRef.
borrower->rc_.AddLocalReference(inner_id, "");
borrower->rc_.RemoveLocalReference(middle_id, nullptr);
ASSERT_TRUE(borrower->FlushBorrowerCallbacks());
ASSERT_FALSE(owner->rc_.HasReference(middle_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
borrower->rc_.RemoveLocalReference(inner_id, nullptr);
ASSERT_TRUE(borrower->FlushBorrowerCallbacks());
ASSERT_FALSE(owner->rc_.HasReference(inner_id));
}
// Matches test_reference_counting_2.py::test_forward_nested_ref.
TEST_F(ReferenceCountTest, TestForwardNestedRefs) {
auto borrower1 = std::make_shared<MockWorkerClient>("1");
auto borrower2 = std::make_shared<MockWorkerClient>("2");
bool first_borrower = true;
auto owner = std::make_shared<MockWorkerClient>("2", [&](const rpc::Address &addr) {
return first_borrower ? borrower1 : borrower2;
});
// Owner owns a nested object ref, borrower1 is using the outer ObjectRef.
ObjectID outer_id = ObjectID::FromRandom();
ObjectID middle_id = ObjectID::FromRandom();
ObjectID inner_id = ObjectID::FromRandom();
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
owner->rc_.AddBorrowerAddress(outer_id, borrower1->address_);
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
// Borrower 1 forwards the ObjectRef to borrower 2 via task submission.
borrower1->rc_.AddLocalReference(outer_id, "");
borrower1->rc_.AddBorrowedObject(outer_id, ObjectID::Nil(), owner->address_);
borrower1->SubmitTaskWithArg(outer_id);
// Borrower 2 executes the task, keeps ref to inner ref.
borrower2->ExecuteTaskWithArg(outer_id, middle_id, owner->address_);
borrower2->GetSerializedObjectId(middle_id, inner_id, owner->address_);
borrower2->rc_.RemoveLocalReference(middle_id, nullptr);
auto borrower_refs = borrower2->FinishExecutingTask(outer_id, ObjectID::Nil());
borrower1->HandleSubmittedTaskFinished(outer_id, {}, borrower2->address_,
borrower_refs);
borrower1->rc_.RemoveLocalReference(outer_id, nullptr);
// Now the owner should contact borrower 2.
first_borrower = false;
ASSERT_TRUE(borrower1->FlushBorrowerCallbacks());
ASSERT_FALSE(owner->rc_.HasReference(outer_id));
ASSERT_FALSE(owner->rc_.HasReference(middle_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
ASSERT_TRUE(borrower2->FlushBorrowerCallbacks());
borrower2->rc_.RemoveLocalReference(inner_id, nullptr);
}
} // namespace core
} // namespace ray

View file

@ -288,7 +288,7 @@ message ObjectReferenceCount {
repeated ObjectReference stored_in_objects = 4;
// The borrowed object ID that contained this object, if any. This is used
// for nested object IDs.
bytes contained_in_borrowed_id = 5;
repeated bytes contained_in_borrowed_ids = 5;
// The object IDs that this object contains, if any. This is used for nested
// object IDs.
repeated bytes contains = 6;