[core] Ref counting for returning object IDs created by a different process (#7221)

* Add regression tests

* Refactor, split RemoveSubmittedTaskReferences into submitted and finished paths

* Add nested return IDs to UpdateFinishedTaskRefs, rename WrapObjectIds

* Basic unit tests pass

* Fix unit test and add an out-of-order regression test

* Add stored_in_objects to ObjectReferenceCount, regression test now passes

* Add an Address to the ReferenceCounter so we can determine ownership

* Set the nested return IDs from the TaskManager

* Add another test

* Simplify

* Update src/ray/core_worker/reference_count_test.cc

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Apply suggestions from code review

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* comments

* Add python test

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Stephanie Wang 2020-02-22 13:29:48 -08:00 committed by GitHub
parent e2edca45d4
commit 4c2de7be54
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 916 additions and 460 deletions

View file

@ -596,6 +596,44 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB):
_fill_object_store_and_get(inner_oid_bytes, succeed=False) _fill_object_store_and_get(inner_oid_bytes, succeed=False)
# Call a recursive chain of tasks. The final task in the chain returns an
# ObjectID returned by a task that it submitted. Every other task in the chain
# returns the same ObjectID by calling ray.get() on its submitted task and
# returning the result. The reference should still exist while the driver has a
# reference to the final task's ObjectID.
def test_recursively_return_borrowed_object_id(one_worker_100MiB):
@ray.remote
def put():
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
@ray.remote
def recursive(num_tasks_left):
if num_tasks_left == 0:
return put.remote()
final_id = ray.get(recursive.remote(num_tasks_left - 1))
ray.get(final_id)
return final_id
max_depth = 5
head_oid = recursive.remote(max_depth)
final_oid = ray.get(head_oid)
final_oid_bytes = final_oid.binary()
# Check that the driver's reference pins the object.
_fill_object_store_and_get(final_oid_bytes)
# Remove the local reference and try it again.
final_oid = ray.get(head_oid)
_fill_object_store_and_get(final_oid_bytes)
# Remove all references.
del head_oid
del final_oid
# Reference should be gone, check that returned ID gets evicted.
_fill_object_store_and_get(final_oid_bytes, succeed=False)
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
sys.exit(pytest.main(["-v", __file__])) sys.exit(pytest.main(["-v", __file__]))

View file

@ -22,13 +22,13 @@ class RayObject {
/// ///
/// \param[in] data Data of the ray object. /// \param[in] data Data of the ray object.
/// \param[in] metadata Metadata of the ray object. /// \param[in] metadata Metadata of the ray object.
/// \param[in] inlined_ids ObjectIDs that were serialized in data. /// \param[in] nested_ids ObjectIDs that were serialized in data.
/// \param[in] copy_data Whether this class should hold a copy of data. /// \param[in] copy_data Whether this class should hold a copy of data.
RayObject(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata, RayObject(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata,
const std::vector<ObjectID> &inlined_ids, bool copy_data = false) const std::vector<ObjectID> &nested_ids, bool copy_data = false)
: data_(data), : data_(data),
metadata_(metadata), metadata_(metadata),
inlined_ids_(inlined_ids), nested_ids_(nested_ids),
has_data_copy_(copy_data) { has_data_copy_(copy_data) {
if (has_data_copy_) { if (has_data_copy_) {
// If this object is required to hold a copy of the data, // If this object is required to hold a copy of the data,
@ -56,7 +56,7 @@ class RayObject {
const std::shared_ptr<Buffer> &GetMetadata() const { return metadata_; } const std::shared_ptr<Buffer> &GetMetadata() const { return metadata_; }
/// Return the object IDs that were serialized in data. /// Return the object IDs that were serialized in data.
const std::vector<ObjectID> &GetInlinedIds() const { return inlined_ids_; } const std::vector<ObjectID> &GetNestedIds() const { return nested_ids_; }
uint64_t GetSize() const { uint64_t GetSize() const {
uint64_t size = 0; uint64_t size = 0;
@ -81,7 +81,7 @@ class RayObject {
private: private:
std::shared_ptr<Buffer> data_; std::shared_ptr<Buffer> data_;
std::shared_ptr<Buffer> metadata_; std::shared_ptr<Buffer> metadata_;
const std::vector<ObjectID> inlined_ids_; const std::vector<ObjectID> nested_ids_;
/// Whether this class holds a data copy. /// Whether this class holds a data copy.
bool has_data_copy_; bool has_data_copy_;
}; };

View file

@ -92,8 +92,8 @@ class TaskSpecBuilder {
const auto &metadata = value.GetMetadata(); const auto &metadata = value.GetMetadata();
arg->set_metadata(metadata->Data(), metadata->Size()); arg->set_metadata(metadata->Data(), metadata->Size());
} }
for (const auto &inlined_id : value.GetInlinedIds()) { for (const auto &nested_id : value.GetNestedIds()) {
arg->add_nested_inlined_ids(inlined_id.Binary()); arg->add_nested_inlined_ids(nested_id.Binary());
} }
return *this; return *this;
} }

View file

@ -80,13 +80,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
death_check_timer_(io_service_), death_check_timer_(io_service_),
internal_timer_(io_service_), internal_timer_(io_service_),
core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */),
reference_counter_(std::make_shared<ReferenceCounter>(
/*distributed_ref_counting_enabled=*/RayConfig::instance()
.distributed_ref_counting_enabled(),
[this](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
})),
task_queue_length_(0), task_queue_length_(0),
num_executed_tasks_(0), num_executed_tasks_(0),
task_execution_service_work_(task_execution_service_), task_execution_service_work_(task_execution_service_),
@ -166,6 +159,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
rpc_address_.set_raylet_id(local_raylet_id.Binary()); rpc_address_.set_raylet_id(local_raylet_id.Binary());
rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary()); rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary());
reference_counter_ = std::make_shared<ReferenceCounter>(
rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(),
[this](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
if (worker_type_ == ray::WorkerType::WORKER) { if (worker_type_ == ray::WorkerType::WORKER) {
death_check_timer_.expires_from_now(boost::asio::chrono::milliseconds( death_check_timer_.expires_from_now(boost::asio::chrono::milliseconds(
RayConfig::instance().raylet_death_check_interval_milliseconds())); RayConfig::instance().raylet_death_check_interval_milliseconds()));
@ -937,12 +937,7 @@ Status CoreWorker::AllocateReturnObjects(
RAY_CHECK(object_ids.size() == data_sizes.size()); RAY_CHECK(object_ids.size() == data_sizes.size());
return_objects->resize(object_ids.size(), nullptr); return_objects->resize(object_ids.size(), nullptr);
absl::optional<rpc::Address> owner_address( rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress());
worker_context_.GetCurrentTask()->CallerAddress());
bool owned_by_us = owner_address->worker_id() == rpc_address_.worker_id();
if (owned_by_us) {
owner_address.reset();
}
for (size_t i = 0; i < object_ids.size(); i++) { for (size_t i = 0; i < object_ids.size(); i++) {
bool object_already_exists = false; bool object_already_exists = false;
@ -952,8 +947,8 @@ Status CoreWorker::AllocateReturnObjects(
// Mark this object as containing other object IDs. The ref counter will // Mark this object as containing other object IDs. The ref counter will
// keep the inner IDs in scope until the outer one is out of scope. // keep the inner IDs in scope until the outer one is out of scope.
if (!contained_object_ids[i].empty()) { if (!contained_object_ids[i].empty()) {
reference_counter_->WrapObjectIds(object_ids[i], contained_object_ids[i], reference_counter_->AddNestedObjectIds(object_ids[i], contained_object_ids[i],
owner_address); owner_address);
} }
// Allocate a buffer for the return object. // Allocate a buffer for the return object.

View file

@ -84,9 +84,11 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
// because this corresponds to a submitted task whose return ObjectID will be created // because this corresponds to a submitted task whose return ObjectID will be created
// in the frontend language, incrementing the reference count. // in the frontend language, incrementing the reference count.
object_id_refs_.emplace(object_id, Reference(owner_id, owner_address)); object_id_refs_.emplace(object_id, Reference(owner_id, owner_address));
// Mark that this object ID contains other inner IDs. Then, we will not remove if (!inner_ids.empty()) {
// the inner objects until the outer object ID goes out of scope. // Mark that this object ID contains other inner IDs. Then, we will not GC
WrapObjectIdsInternal(object_id, inner_ids, absl::optional<rpc::WorkerAddress>()); // the inner objects until the outer object ID goes out of scope.
AddNestedObjectIdsInternal(object_id, inner_ids, rpc_address_);
}
} }
void ReferenceCounter::AddLocalReference(const ObjectID &object_id) { void ReferenceCounter::AddLocalReference(const ObjectID &object_id) {
@ -124,22 +126,24 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
} }
} }
void ReferenceCounter::AddSubmittedTaskReferences( void ReferenceCounter::UpdateSubmittedTaskReferences(
const std::vector<ObjectID> &object_ids) { const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove, std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
for (const ObjectID &object_id : object_ids) { for (const ObjectID &argument_id : argument_ids_to_add) {
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
// This happens if a large argument is transparently passed by reference // This happens if a large argument is transparently passed by reference
// because we don't hold a Python reference to its ObjectID. // because we don't hold a Python reference to its ObjectID.
it = object_id_refs_.emplace(object_id, Reference()).first; it = object_id_refs_.emplace(argument_id, Reference()).first;
} }
it->second.submitted_task_ref_count++; it->second.submitted_task_ref_count++;
} }
RemoveSubmittedTaskReferences(argument_ids_to_remove, deleted);
} }
void ReferenceCounter::UpdateSubmittedTaskReferences( void ReferenceCounter::UpdateFinishedTaskReferences(
const std::vector<ObjectID> &object_ids, const rpc::Address &worker_addr, const std::vector<ObjectID> &argument_ids, const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs, std::vector<ObjectID> *deleted) { const ReferenceTableProto &borrowed_refs, std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
// Must merge the borrower refs before decrementing any ref counts. This is // Must merge the borrower refs before decrementing any ref counts. This is
@ -150,15 +154,20 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
if (!refs.empty()) { if (!refs.empty()) {
RAY_CHECK(!WorkerID::FromBinary(worker_addr.worker_id()).IsNil()); RAY_CHECK(!WorkerID::FromBinary(worker_addr.worker_id()).IsNil());
} }
for (const ObjectID &object_id : object_ids) { for (const ObjectID &argument_id : argument_ids) {
MergeRemoteBorrowers(object_id, worker_addr, refs); MergeRemoteBorrowers(argument_id, worker_addr, refs);
} }
for (const ObjectID &object_id : object_ids) { RemoveSubmittedTaskReferences(argument_ids, deleted);
auto it = object_id_refs_.find(object_id); }
void ReferenceCounter::RemoveSubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids, std::vector<ObjectID> *deleted) {
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: " RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: "
<< object_id; << argument_id;
return; return;
} }
it->second.submitted_task_ref_count--; it->second.submitted_task_ref_count--;
@ -359,6 +368,7 @@ bool ReferenceCounter::GetAndClearLocalBorrowersInternal(const ObjectID &object_
// of the returned borrowed_refs must merge this list into their own list // of the returned borrowed_refs must merge this list into their own list
// until all active borrowers are merged into the owner. // until all active borrowers are merged into the owner.
it->second.borrowers.clear(); it->second.borrowers.clear();
it->second.stored_in_objects.clear();
if (it->second.contained_in_borrowed_id.has_value()) { if (it->second.contained_in_borrowed_id.has_value()) {
/// This ID was nested in another ID that we (or a nested task) borrowed. /// This ID was nested in another ID that we (or a nested task) borrowed.
@ -437,6 +447,13 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id,
} }
} }
// If the borrower stored this object ID inside another object ID that it did
// not own, then mark that the object ID is nested inside another.
for (const auto &stored_in_object : borrower_ref.stored_in_objects) {
AddNestedObjectIdsInternal(stored_in_object.first, {object_id},
stored_in_object.second);
}
// Recursively merge any references that were contained in this object, to // Recursively merge any references that were contained in this object, to
// handle any borrowers of nested objects. // handle any borrowers of nested objects.
for (const auto &inner_id : borrower_ref.contains) { for (const auto &inner_id : borrower_ref.contains) {
@ -462,10 +479,10 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it,
if (it == borrower_cache_.end()) { if (it == borrower_cache_.end()) {
RAY_CHECK(client_factory_ != nullptr); RAY_CHECK(client_factory_ != nullptr);
it = borrower_cache_.emplace(addr, client_factory_(addr.ToProto())).first; it = borrower_cache_.emplace(addr, client_factory_(addr.ToProto())).first;
RAY_LOG(DEBUG) << "Connected to borrower " << addr.ip_address << ":" << addr.port
<< " for object " << object_id;
} }
RAY_LOG(DEBUG) << "Sending WaitForRefRemoved to borrower " << addr.ip_address << ":"
<< addr.port << " for object " << object_id;
// Send the borrower a message about this object. The borrower responds once // Send the borrower a message about this object. The borrower responds once
// it is no longer using the object ID. // it is no longer using the object ID.
RAY_CHECK_OK(it->second->WaitForRefRemoved( RAY_CHECK_OK(it->second->WaitForRefRemoved(
@ -474,32 +491,35 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it,
RAY_LOG(DEBUG) << "Received reply from borrower " << addr.ip_address << ":" RAY_LOG(DEBUG) << "Received reply from borrower " << addr.ip_address << ":"
<< addr.port << " of object " << object_id; << addr.port << " of object " << object_id;
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
// Merge in any new borrowers that the previous borrower learned of.
const ReferenceTable new_borrower_refs =
ReferenceTableFromProto(reply.borrowed_refs());
MergeRemoteBorrowers(object_id, addr, new_borrower_refs);
// Erase the previous borrower.
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(object_id);
RAY_CHECK(it != object_id_refs_.end()); RAY_CHECK(it != object_id_refs_.end());
RAY_CHECK(it->second.borrowers.erase(addr)); RAY_CHECK(it->second.borrowers.erase(addr));
const ReferenceTable new_borrower_refs =
ReferenceTableFromProto(reply.borrowed_refs());
MergeRemoteBorrowers(object_id, addr, new_borrower_refs);
DeleteReferenceInternal(it, nullptr); DeleteReferenceInternal(it, nullptr);
})); }));
} }
void ReferenceCounter::WrapObjectIds( void ReferenceCounter::AddNestedObjectIds(const ObjectID &object_id,
const ObjectID &object_id, const std::vector<ObjectID> &inner_ids, const std::vector<ObjectID> &inner_ids,
const absl::optional<rpc::WorkerAddress> &owner_address) { const rpc::WorkerAddress &owner_address) {
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
WrapObjectIdsInternal(object_id, inner_ids, owner_address); AddNestedObjectIdsInternal(object_id, inner_ids, owner_address);
} }
void ReferenceCounter::WrapObjectIdsInternal( void ReferenceCounter::AddNestedObjectIdsInternal(
const ObjectID &object_id, const std::vector<ObjectID> &inner_ids, const ObjectID &object_id, const std::vector<ObjectID> &inner_ids,
const absl::optional<rpc::WorkerAddress> &owner_address) { const rpc::WorkerAddress &owner_address) {
RAY_CHECK(!owner_address.worker_id.IsNil());
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(object_id);
if (!owner_address.has_value()) { if (owner_address.worker_id == rpc_address_.worker_id) {
// `ray.put()` case OR returning an object ID from a task and the task's // We own object_id. This is a `ray.put()` case OR returning an object ID
// caller executed in the same process as us. // from a task and the task's caller executed in the same process as us.
if (it != object_id_refs_.end()) { if (it != object_id_refs_.end()) {
RAY_CHECK(it->second.owned_by_us); RAY_CHECK(it->second.owned_by_us);
// The outer object is still in scope. Mark the inner ones as being // The outer object is still in scope. Mark the inner ones as being
@ -515,27 +535,26 @@ void ReferenceCounter::WrapObjectIdsInternal(
} }
} }
} else { } else {
// Returning an object ID from a task, and the task's caller executed in a // We do not own object_id. This is the case where we returned an object ID
// remote process. // from a task, and the task's caller executed in a remote process.
for (const auto &inner_id : inner_ids) { for (const auto &inner_id : inner_ids) {
RAY_LOG(DEBUG) << "Adding borrower " << owner_address.ip_address << " to id "
<< inner_id << ", borrower owns outer ID " << object_id;
auto inner_it = object_id_refs_.find(inner_id); auto inner_it = object_id_refs_.find(inner_id);
RAY_CHECK(inner_it != object_id_refs_.end()); RAY_CHECK(inner_it != object_id_refs_.end());
if (!inner_it->second.owned_by_us) {
RAY_LOG(WARNING)
<< "Ref counting currently does not support returning an object ID that was "
"not created by the worker executing the task. The object may be evicted "
"before all references are out of scope.";
// TODO: Do not return. Handle the case where we return a BORROWED id.
continue;
}
// Add the task's caller as a borrower. // Add the task's caller as a borrower.
auto inserted = inner_it->second.borrowers.insert(*owner_address).second; if (inner_it->second.owned_by_us) {
if (inserted) { auto inserted = inner_it->second.borrowers.insert(owner_address).second;
RAY_LOG(DEBUG) << "Adding borrower " << owner_address->ip_address << " to id " if (inserted) {
<< object_id << ", borrower owns outer ID " << object_id; // Wait for it to remove its reference.
// Wait for it to remove its WaitForRefRemoved(inner_it, owner_address, object_id);
// reference. }
WaitForRefRemoved(inner_it, *owner_address, object_id); } else {
auto inserted =
inner_it->second.stored_in_objects.emplace(object_id, owner_address).second;
// This should be the first time that we have stored this object ID
// inside this return ID.
RAY_CHECK(inserted);
} }
} }
} }
@ -583,9 +602,7 @@ void ReferenceCounter::SetRefRemovedCallback(
// add the outer object to the inner ID's ref count. We will not respond to // add the outer object to the inner ID's ref count. We will not respond to
// the owner of the inner ID until the outer object ID goes out of scope. // the owner of the inner ID until the outer object ID goes out of scope.
if (!contained_in_id.IsNil()) { if (!contained_in_id.IsNil()) {
AddBorrowedObjectInternal(object_id, contained_in_id, owner_id, owner_address); AddNestedObjectIdsInternal(contained_in_id, {object_id}, rpc_address_);
WrapObjectIdsInternal(contained_in_id, {object_id},
absl::optional<rpc::WorkerAddress>());
} }
if (it->second.RefCount() == 0) { if (it->second.RefCount() == 0) {
@ -620,6 +637,10 @@ ReferenceCounter::Reference ReferenceCounter::Reference::FromProto(
for (const auto &borrower : ref_count.borrowers()) { for (const auto &borrower : ref_count.borrowers()) {
ref.borrowers.insert(rpc::WorkerAddress(borrower)); ref.borrowers.insert(rpc::WorkerAddress(borrower));
} }
for (const auto &object : ref_count.stored_in_objects()) {
const auto &object_id = ObjectID::FromBinary(object.object_id());
ref.stored_in_objects.emplace(object_id, rpc::WorkerAddress(object.owner_address()));
}
for (const auto &id : ref_count.contains()) { for (const auto &id : ref_count.contains()) {
ref.contains.insert(ObjectID::FromBinary(id)); ref.contains.insert(ObjectID::FromBinary(id));
} }
@ -641,6 +662,11 @@ void ReferenceCounter::Reference::ToProto(rpc::ObjectReferenceCount *ref) const
for (const auto &borrower : borrowers) { for (const auto &borrower : borrowers) {
ref->add_borrowers()->CopyFrom(borrower.ToProto()); ref->add_borrowers()->CopyFrom(borrower.ToProto());
} }
for (const auto &object : stored_in_objects) {
auto ref_object = ref->add_stored_in_objects();
ref_object->set_object_id(object.first.Binary());
ref_object->mutable_owner_address()->CopyFrom(object.second.ToProto());
}
if (contained_in_borrowed_id.has_value()) { if (contained_in_borrowed_id.has_value()) {
ref->set_contained_in_borrowed_id(contained_in_borrowed_id->Binary()); ref->set_contained_in_borrowed_id(contained_in_borrowed_id->Binary());
} }

View file

@ -23,9 +23,11 @@ class ReferenceCounter {
::google::protobuf::RepeatedPtrField<rpc::ObjectReferenceCount>; ::google::protobuf::RepeatedPtrField<rpc::ObjectReferenceCount>;
using ReferenceRemovedCallback = std::function<void(const ObjectID &)>; using ReferenceRemovedCallback = std::function<void(const ObjectID &)>;
ReferenceCounter(bool distributed_ref_counting_enabled = true, ReferenceCounter(const rpc::WorkerAddress &rpc_address,
bool distributed_ref_counting_enabled = true,
rpc::ClientFactoryFn client_factory = nullptr) rpc::ClientFactoryFn client_factory = nullptr)
: distributed_ref_counting_enabled_(distributed_ref_counting_enabled), : rpc_address_(rpc_address),
distributed_ref_counting_enabled_(distributed_ref_counting_enabled),
client_factory_(client_factory) {} client_factory_(client_factory) {}
~ReferenceCounter() {} ~ReferenceCounter() {}
@ -48,8 +50,10 @@ class ReferenceCounter {
/// dependencies to a submitted task. /// dependencies to a submitted task.
/// ///
/// \param[in] object_ids The object IDs to add references for. /// \param[in] object_ids The object IDs to add references for.
void AddSubmittedTaskReferences(const std::vector<ObjectID> &object_ids) void UpdateSubmittedTaskReferences(
LOCKS_EXCLUDED(mutex_); const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove = std::vector<ObjectID>(),
std::vector<ObjectID> *deleted = nullptr) LOCKS_EXCLUDED(mutex_);
/// Update object references that were given to a submitted task. The task /// Update object references that were given to a submitted task. The task
/// may still be borrowing any object IDs that were contained in its /// may still be borrowing any object IDs that were contained in its
@ -64,10 +68,10 @@ class ReferenceCounter {
/// arguments. Some references in this table may still be borrowed by the /// arguments. Some references in this table may still be borrowed by the
/// worker and/or a task that the worker submitted. /// worker and/or a task that the worker submitted.
/// \param[out] deleted The object IDs whos reference counts reached zero. /// \param[out] deleted The object IDs whos reference counts reached zero.
void UpdateSubmittedTaskReferences(const std::vector<ObjectID> &object_ids, void UpdateFinishedTaskReferences(const std::vector<ObjectID> &argument_ids,
const rpc::Address &worker_addr, const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs, const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted) std::vector<ObjectID> *deleted)
LOCKS_EXCLUDED(mutex_); LOCKS_EXCLUDED(mutex_);
/// Add an object that we own. The object may depend on other objects. /// Add an object that we own. The object may depend on other objects.
@ -182,18 +186,25 @@ class ReferenceCounter {
void GetAndClearLocalBorrowers(const std::vector<ObjectID> &borrowed_ids, void GetAndClearLocalBorrowers(const std::vector<ObjectID> &borrowed_ids,
ReferenceTableProto *proto) LOCKS_EXCLUDED(mutex_); ReferenceTableProto *proto) LOCKS_EXCLUDED(mutex_);
/// Wrap ObjectIDs inside another object ID. /// Mark that this ObjectID contains another ObjectID(s). This should be
/// called in two cases:
/// 1. We are storing the value of an object and the value contains
/// serialized copies of other ObjectIDs. If the outer object is owned by a
/// remote process, then they are now a borrower of the nested IDs.
/// 2. We submitted a task that returned an ObjectID(s) in its return values
/// and we are processing the worker's reply. In this case, we own the task's
/// return objects and are borrowing the nested IDs.
/// ///
/// \param[in] object_id The object ID whose value we are storing. /// \param[in] object_id The ID of the object that contains other ObjectIDs.
/// \param[in] inner_ids The object IDs that we are storing in object_id. /// \param[in] inner_ids The object IDs are nested in object_id's value.
/// \param[in] owner_address The owner address of the outer object_id. If /// \param[in] owner_address The owner address of the outer object_id. If
/// this is not provided, then the outer object ID must be owned by us. the /// this is not provided, then the outer object ID must be owned by us. the
/// outer object ID is not owned by us, then this is used to contact the /// outer object ID is not owned by us, then this is used to contact the
/// outer object's owner, since it is considered a borrower for the inner /// outer object's owner, since it is considered a borrower for the inner
/// IDs. /// IDs.
void WrapObjectIds(const ObjectID &object_id, const std::vector<ObjectID> &inner_ids, void AddNestedObjectIds(const ObjectID &object_id,
const absl::optional<rpc::WorkerAddress> &owner_address) const std::vector<ObjectID> &inner_ids,
LOCKS_EXCLUDED(mutex_); const rpc::WorkerAddress &owner_address) LOCKS_EXCLUDED(mutex_);
/// Whether we have a reference to a particular ObjectID. /// Whether we have a reference to a particular ObjectID.
/// ///
@ -234,7 +245,9 @@ class ReferenceCounter {
bool in_scope = RefCount() > 0; bool in_scope = RefCount() > 0;
bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value(); bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value();
bool has_borrowers = borrowers.size() > 0; bool has_borrowers = borrowers.size() > 0;
return !(in_scope || was_contained_in_borrowed_id || has_borrowers); bool was_stored_in_objects = stored_in_objects.size() > 0;
return !(in_scope || was_contained_in_borrowed_id || has_borrowers ||
was_stored_in_objects);
} }
/// Whether we own the object. If we own the object, then we are /// Whether we own the object. If we own the object, then we are
@ -293,6 +306,13 @@ class ReferenceCounter {
/// borrowers. A borrower is removed from the list when it responds /// borrowers. A borrower is removed from the list when it responds
/// that it is no longer using the reference. /// that it is no longer using the reference.
absl::flat_hash_set<rpc::WorkerAddress> borrowers; absl::flat_hash_set<rpc::WorkerAddress> borrowers;
/// When a process that is borrowing an object ID stores the ID inside the
/// return value of a task that it executes, the caller of the task is also
/// considered a borrower for as long as its reference to the task's return
/// ID stays in scope. Thus, the borrower must notify the owner that the
/// task's caller is also a borrower. The key is the task's return ID, and
/// the value is the task ID and address of the task's caller.
absl::flat_hash_map<ObjectID, rpc::WorkerAddress> stored_in_objects;
/// Callback that will be called when this ObjectID no longer has /// Callback that will be called when this ObjectID no longer has
/// references. /// references.
@ -311,18 +331,26 @@ class ReferenceCounter {
static void ReferenceTableToProto(const ReferenceTable &table, static void ReferenceTableToProto(const ReferenceTable &table,
ReferenceTableProto *proto); ReferenceTableProto *proto);
/// Helper method to wrap an ObjectID(s) inside another object ID. /// Remove references for the provided object IDs that correspond to them
/// being dependencies to a submitted task. This should be called when
/// inlined dependencies are inlined or when the task finishes for plasma
/// dependencies.
void RemoveSubmittedTaskReferences(const std::vector<ObjectID> &argument_ids,
std::vector<ObjectID> *deleted)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Helper method to mark that this ObjectID contains another ObjectID(s).
/// ///
/// \param[in] object_id The object ID whose value we are storing. /// \param[in] object_id The ID of the object that contains other ObjectIDs.
/// \param[in] inner_ids The object IDs that we are storing in object_id. /// \param[in] inner_ids The object IDs are nested in object_id's value.
/// \param[in] owner_address The owner address of the outer object_id. If /// \param[in] owner_address The owner address of the outer object_id. If
/// this is not provided, then the outer object ID must be owned by us. the /// this is not provided, then the outer object ID must be owned by us. the
/// outer object ID is not owned by us, then this is used to contact the /// outer object ID is not owned by us, then this is used to contact the
/// outer object's owner, since it is considered a borrower for the inner /// outer object's owner, since it is considered a borrower for the inner
/// IDs. /// IDs.
void WrapObjectIdsInternal(const ObjectID &object_id, void AddNestedObjectIdsInternal(const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids, const std::vector<ObjectID> &inner_ids,
const absl::optional<rpc::WorkerAddress> &owner_address) const rpc::WorkerAddress &owner_address)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Populates the table with the ObjectID that we were or are still /// Populates the table with the ObjectID that we were or are still
@ -394,6 +422,11 @@ class ReferenceCounter {
std::vector<ObjectID> *deleted) std::vector<ObjectID> *deleted)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Address of our RPC server. This is used to determine whether we own a
/// given object or not, by comparing our WorkerID with the WorkerID of the
/// object's owner.
rpc::WorkerAddress rpc_address_;
/// Feature flag for distributed ref counting. If this is false, then we will /// Feature flag for distributed ref counting. If this is false, then we will
/// keep the distributed ref count, but only the local ref count will be used /// keep the distributed ref count, but only the local ref count will be used
/// to decide when objects can be evicted. /// to decide when objects can be evicted.

File diff suppressed because it is too large Load diff

View file

@ -161,7 +161,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec
RAY_CHECK(object_id.IsDirectCallType()); RAY_CHECK(object_id.IsDirectCallType());
std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks; std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks;
auto object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(), auto object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(),
object.GetInlinedIds(), true); object.GetNestedIds(), true);
{ {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);

View file

@ -34,7 +34,7 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
} }
} }
} }
reference_counter_->AddSubmittedTaskReferences(task_deps); reference_counter_->UpdateSubmittedTaskReferences(task_deps);
// Add new owned objects for the return values of the task. // Add new owned objects for the return values of the task.
size_t num_returns = spec.NumReturns(); size_t num_returns = spec.NumReturns();
@ -110,7 +110,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
} }
RAY_CHECK_OK(in_memory_store_->Put( RAY_CHECK_OK(in_memory_store_->Put(
RayObject(data_buffer, metadata_buffer, RayObject(data_buffer, metadata_buffer,
IdVectorFromProtobuf<ObjectID>(return_object.inlined_ids())), IdVectorFromProtobuf<ObjectID>(return_object.nested_inlined_ids())),
object_id)); object_id));
} }
} }
@ -186,20 +186,14 @@ void TaskManager::ShutdownIfNeeded() {
} }
} }
void TaskManager::RemoveSubmittedTaskReferences(
const std::vector<ObjectID> &object_ids, const rpc::Address &worker_addr,
const ReferenceCounter::ReferenceTableProto &borrowed_refs) {
std::vector<ObjectID> deleted;
reference_counter_->UpdateSubmittedTaskReferences(object_ids, worker_addr,
borrowed_refs, &deleted);
in_memory_store_->Delete(deleted);
}
void TaskManager::OnTaskDependenciesInlined( void TaskManager::OnTaskDependenciesInlined(
const std::vector<ObjectID> &inlined_dependency_ids, const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids) { const std::vector<ObjectID> &contained_ids) {
reference_counter_->AddSubmittedTaskReferences(contained_ids); std::vector<ObjectID> deleted;
RemoveSubmittedTaskReferences(inlined_dependency_ids); reference_counter_->UpdateSubmittedTaskReferences(
/*argument_ids_to_add=*/contained_ids,
/*argument_ids_to_remove=*/inlined_dependency_ids, &deleted);
in_memory_store_->Delete(deleted);
} }
void TaskManager::RemoveFinishedTaskReferences( void TaskManager::RemoveFinishedTaskReferences(
@ -217,7 +211,11 @@ void TaskManager::RemoveFinishedTaskReferences(
inlined_ids.end()); inlined_ids.end());
} }
} }
RemoveSubmittedTaskReferences(plasma_dependencies, borrower_addr, borrowed_refs);
std::vector<ObjectID> deleted;
reference_counter_->UpdateFinishedTaskReferences(plasma_dependencies, borrower_addr,
borrowed_refs, &deleted);
in_memory_store_->Delete(deleted);
} }
void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,

View file

@ -105,17 +105,6 @@ class TaskManager : public TaskFinisherInterface {
void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec,
rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_); rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_);
/// Remove submitted task references in the reference counter for the object IDs.
/// If the references were borrowed by a worker while executing a task, then
/// merge in the ref counts for any references that the task (or a nested
/// task) is still borrowing. If any reference counts for the borrowed
/// objects reach zero, they are deleted from the in-memory store.
void RemoveSubmittedTaskReferences(
const std::vector<ObjectID> &object_ids,
const rpc::Address &worker_addr = rpc::Address(),
const ReferenceCounter::ReferenceTableProto &borrowed_refs =
ReferenceCounter::ReferenceTableProto());
/// Helper function to call RemoveSubmittedTaskReferences on the remaining /// Helper function to call RemoveSubmittedTaskReferences on the remaining
/// dependencies of the given task spec after the task has finished or /// dependencies of the given task spec after the task has finished or
/// failed. The remaining dependencies are plasma objects and any ObjectIDs /// failed. The remaining dependencies are plasma objects and any ObjectIDs

View file

@ -44,9 +44,9 @@ void InlineDependencies(
const auto &metadata = it->second->GetMetadata(); const auto &metadata = it->second->GetMetadata();
mutable_arg->set_metadata(metadata->Data(), metadata->Size()); mutable_arg->set_metadata(metadata->Data(), metadata->Size());
} }
for (const auto &inlined_id : it->second->GetInlinedIds()) { for (const auto &nested_id : it->second->GetNestedIds()) {
mutable_arg->add_nested_inlined_ids(inlined_id.Binary()); mutable_arg->add_nested_inlined_ids(nested_id.Binary());
contained_ids->push_back(inlined_id); contained_ids->push_back(nested_id);
} }
inlined_dependency_ids->push_back(id); inlined_dependency_ids->push_back(id);
} }

View file

@ -275,8 +275,8 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
return_object->set_metadata(result->GetMetadata()->Data(), return_object->set_metadata(result->GetMetadata()->Data(),
result->GetMetadata()->Size()); result->GetMetadata()->Size());
} }
for (const auto &inlined_id : result->GetInlinedIds()) { for (const auto &nested_id : result->GetNestedIds()) {
return_object->add_inlined_ids(inlined_id.Binary()); return_object->add_nested_inlined_ids(nested_id.Binary());
} }
} }
} }

View file

@ -63,8 +63,8 @@ message ReturnObject {
bytes data = 3; bytes data = 3;
// Metadata of the object. // Metadata of the object.
bytes metadata = 4; bytes metadata = 4;
// ObjectIDs that were inlined in the data field. // ObjectIDs that were nested in data. This is only set for inlined objects.
repeated bytes inlined_ids = 5; repeated bytes nested_inlined_ids = 5;
} }
message PushTaskRequest { message PushTaskRequest {
@ -195,12 +195,19 @@ message ObjectReferenceCount {
bool has_local_ref = 2; bool has_local_ref = 2;
// Any other borrowers that the worker created (by passing the ID on to them). // Any other borrowers that the worker created (by passing the ID on to them).
repeated Address borrowers = 3; repeated Address borrowers = 3;
// The borrower may have returned the object ID nested inside the return
// value of a task that it executed. This list contains all task returns that
// were owned by a process other than the borrower. Then, the process that
// owns the task's return value is also a borrower for as long as it has the
// task return ID in scope. Note that only the object ID and owner address
// are used for elements in this list.
repeated ObjectReference stored_in_objects = 4;
// The borrowed object ID that contained this object, if any. This is used // The borrowed object ID that contained this object, if any. This is used
// for nested object IDs. // for nested object IDs.
bytes contained_in_borrowed_id = 4; bytes contained_in_borrowed_id = 5;
// The object IDs that this object contains, if any. This is used for nested // The object IDs that this object contains, if any. This is used for nested
// object IDs. // object IDs.
repeated bytes contains = 5; repeated bytes contains = 6;
} }
message WaitForRefRemovedReply { message WaitForRefRemovedReply {