From 492076806d2f691fc43d35016cba857a2235d0b3 Mon Sep 17 00:00:00 2001 From: Jialing He Date: Wed, 21 Jul 2021 02:06:00 +0800 Subject: [PATCH] [object store] Assign the object owner in `ray.put()` (#16833) --- python/ray/_raylet.pxd | 4 +- python/ray/_raylet.pyx | 60 +++++++--- python/ray/includes/common.pxd | 4 +- python/ray/includes/libcoreworker.pxd | 9 +- python/ray/tests/BUILD | 1 + python/ray/tests/test_object_assign_owner.py | 77 +++++++++++++ python/ray/worker.py | 34 +++++- src/ray/core_worker/core_worker.cc | 115 +++++++++++++++---- src/ray/core_worker/core_worker.h | 19 ++- src/ray/core_worker/reference_count.cc | 21 ++++ src/ray/core_worker/reference_count.h | 9 ++ src/ray/protobuf/core_worker.proto | 18 +++ src/ray/rpc/worker/core_worker_client.h | 6 + src/ray/rpc/worker/core_worker_server.h | 6 +- 14 files changed, 334 insertions(+), 49 deletions(-) create mode 100644 python/ray/tests/test_object_assign_owner.py diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 2788a49c8..3ed354524 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -15,7 +15,8 @@ from libcpp.memory cimport ( from ray.includes.common cimport ( CBuffer, - CRayObject + CRayObject, + CAddress, ) from ray.includes.libcoreworker cimport ( ActorHandleSharedPtr, @@ -106,6 +107,7 @@ cdef class CoreWorker: CObjectID *c_object_id, shared_ptr[CBuffer] *data, c_bool created_by_worker, owner_address=*) + cdef unique_ptr[CAddress] _convert_python_address(self, address=*) cdef store_task_outputs( self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 67cf87548..812068e57 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -27,11 +27,12 @@ from libc.stdint cimport ( uint64_t, uint8_t, ) -from libcpp cimport bool as c_bool +from libcpp cimport bool as c_bool, nullptr from libcpp.memory cimport ( dynamic_pointer_cast, make_shared, shared_ptr, + make_unique, unique_ptr, ) from libcpp.string cimport string as c_string @@ -1054,25 +1055,28 @@ cdef class CoreWorker: c_bool created_by_worker, owner_address=None): cdef: - CAddress c_owner_address + unique_ptr[CAddress] c_owner_address + + c_owner_address = move(self._convert_python_address(owner_address)) if object_ref is None: with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned( metadata, data_size, contained_ids, - c_object_id, data, created_by_worker)) + c_object_id, data, created_by_worker, + move(c_owner_address))) else: c_object_id[0] = object_ref.native() if owner_address is None: - c_owner_address = CCoreWorkerProcess.GetCoreWorker( - ).GetRpcAddress() - else: - c_owner_address = CAddress() - c_owner_address.ParseFromString(owner_address) + c_owner_address = make_unique[CAddress]() + dereference( + c_owner_address + ).CopyFrom(CCoreWorkerProcess.GetCoreWorker().GetRpcAddress()) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting( metadata, data_size, c_object_id[0], - c_owner_address, data, created_by_worker)) + dereference(c_owner_address), data, + created_by_worker)) # If data is nullptr, that means the ObjectRef already existed, # which we ignore. @@ -1080,6 +1084,21 @@ cdef class CoreWorker: # and deal with it here. return data.get() == NULL + cdef unique_ptr[CAddress] _convert_python_address(self, address=None): + """ convert python address to `CAddress`, If not provided, + return nullptr. + + Args: + address: worker address. + """ + cdef: + unique_ptr[CAddress] c_address + + if address is not None: + c_address = make_unique[CAddress]() + dereference(c_address).ParseFromString(address) + return move(c_address) + def put_file_like_object( self, metadata, data_size, file_like, ObjectRef object_ref, owner_address): @@ -1101,6 +1120,7 @@ cdef class CoreWorker: int64_t put_threshold c_bool put_small_object_in_memory_store c_vector[CObjectID] c_object_id_vector + unique_ptr[CAddress] c_owner_address # TODO(suquark): This method does not support put objects to # in memory store currently. metadata_buf = string_to_buffer(metadata) @@ -1117,17 +1137,20 @@ cdef class CoreWorker: while index < data_size: bytes_read = file_like.readinto(view[index:]) index += bytes_read + c_owner_address = move(self._convert_python_address(owner_address)) with nogil: # Using custom object refs is not supported because we # can't track their lifecycle, so we don't pin the object # in this case. check_status( CCoreWorkerProcess.GetCoreWorker().SealExisting( - c_object_id, pin_object=False)) + c_object_id, pin_object=False, + owner_address=move(c_owner_address))) def put_serialized_object(self, serialized_object, ObjectRef object_ref=None, - c_bool pin_object=True): + c_bool pin_object=True, + owner_address=None): cdef: CObjectID c_object_id shared_ptr[CBuffer] data @@ -1135,6 +1158,7 @@ cdef class CoreWorker: int64_t put_threshold c_bool put_small_object_in_memory_store c_vector[CObjectID] c_object_id_vector + unique_ptr[CAddress] c_owner_address metadata = string_to_buffer(serialized_object.metadata) put_threshold = RayConfig.instance().max_direct_call_object_size() @@ -1144,7 +1168,7 @@ cdef class CoreWorker: object_already_exists = self._create_put_buffer( metadata, total_bytes, object_ref, ObjectRefsToVector(serialized_object.contained_object_refs), - &c_object_id, &data, True) + &c_object_id, &data, True, owner_address) if not object_already_exists: if total_bytes > 0: @@ -1152,24 +1176,32 @@ cdef class CoreWorker: Buffer.make(data)) if self.is_local_mode or (put_small_object_in_memory_store and total_bytes < put_threshold): + if owner_address is not None: + raise Exception( + "cannot put data into memory store directly" + " and assign owner at the same time") c_object_id_vector.push_back(c_object_id) check_status(CCoreWorkerProcess.GetCoreWorker().Put( CRayObject(data, metadata, c_object_id_vector), c_object_id_vector, c_object_id)) else: + c_owner_address = move(self._convert_python_address( + owner_address)) with nogil: if object_ref is None: check_status( CCoreWorkerProcess.GetCoreWorker().SealOwned( c_object_id, - pin_object)) + pin_object, + move(c_owner_address))) else: # Using custom object refs is not supported because we # can't track their lifecycle, so we don't pin the # object in this case. check_status( CCoreWorkerProcess.GetCoreWorker().SealExisting( - c_object_id, pin_object=False)) + c_object_id, pin_object=False, + owner_address=move(c_owner_address))) return c_object_id.Binary() diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 14948613e..04f8da761 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -21,7 +21,7 @@ from ray.includes.function_descriptor cimport ( ) -cdef extern from * namespace "polyfill": +cdef extern from * namespace "polyfill" nogil: """ namespace polyfill { @@ -154,6 +154,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: CAddress() const c_string &SerializeAsString() void ParseFromString(const c_string &serialized) + void CopyFrom(const CAddress& address) + const c_string &worker_id() # This is a workaround for C++ enum class since Cython has no corresponding diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index b5171175c..94a2cb482 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -190,15 +190,18 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const size_t data_size, const c_vector[CObjectID] &contained_object_ids, CObjectID *object_id, shared_ptr[CBuffer] *data, - c_bool created_by_worker) + c_bool created_by_worker, + const unique_ptr[CAddress] &owner_address) CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata, const size_t data_size, const CObjectID &object_id, const CAddress &owner_address, shared_ptr[CBuffer] *data, c_bool created_by_worker) - CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object) - CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object) + CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, + const unique_ptr[CAddress] &owner_address) + CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, + const unique_ptr[CAddress] &owner_address) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results, c_bool plasma_objects_only) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 96cd3fed9..e19775a24 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -63,6 +63,7 @@ py_test_module_list( "test_multinode_failures.py", "test_multinode_failures_2.py", "test_multiprocessing.py", + "test_object_assign_owner.py", "test_output.py", "test_ray_init.py", "test_reconstruction.py", diff --git a/python/ray/tests/test_object_assign_owner.py b/python/ray/tests/test_object_assign_owner.py new file mode 100644 index 000000000..a6a323049 --- /dev/null +++ b/python/ray/tests/test_object_assign_owner.py @@ -0,0 +1,77 @@ +import pytest +import ray +import time + + +@pytest.mark.parametrize( + "actor_resources", + [ + dict( + zip(["owner", "creator", "borrower"], [{ + f"node{i}": 1 + } for i in _])) for _ in [ + [1, 2, 3], # None of them is on the same node. + [1, 1, 3], # Owner and creator are on the same node. + [3, 2, 3], # Owner and borrower are on the same node. + [1, 3, 3], # Creator and borrower are on the same node. + [3, 3, 3], # All of them are on the same node. + ] + ]) +def test_owner_assign_when_put(ray_start_cluster, actor_resources): + cluster_node_config = [{ + "num_cpus": 1, + "resources": { + f"node{i+1}": 10 + } + } for i in range(3)] + cluster = ray_start_cluster + for kwargs in cluster_node_config: + cluster.add_node(**kwargs) + ray.init(address=cluster.address) + + @ray.remote(resources=actor_resources["creator"], num_cpus=0) + class Creator: + def gen_object_ref(self, data="test", owner=None): + return ray.put(data, _owner=owner) + + @ray.remote(resources=actor_resources["owner"], num_cpus=0) + class Owner: + def __init__(self): + self.ref = None + + def set_object_ref(self, ref): + self.ref = ref + + def warmup(self): + return 0 + + @ray.remote(resources=actor_resources["borrower"], num_cpus=0) + class Borrower: + def get_object(self, ref): + return ray.get(ref) + + owner = Owner.remote() + creator = Creator.remote() + borrower = Borrower.remote() + + # Make sure the owner actor is alive. + ray.get(owner.warmup.remote()) + + object_ref = creator.gen_object_ref.remote(data="test1", owner=owner) + # TODO(Catch-Bull): Ideally, deleting this line can also work normally, + # cause driver keep a reference of the object. But, for now, it still + # requires the owner to keep a reference of the object to make it + # available. + ray.get(owner.set_object_ref.remote(object_ref)) + + ray.kill(creator) + time.sleep(10) + + data = ray.get(borrower.get_object.remote(object_ref)) + assert data == "test1" + + ray.kill(owner) + time.sleep(2) + with pytest.raises(ray.exceptions.RayTaskError) as error: + ray.get(borrower.get_object.remote(object_ref), timeout=2) + assert "ObjectLostError" in error.value.args[1] diff --git a/python/ray/worker.py b/python/ray/worker.py index aeac8f99e..32069ebd5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -249,7 +249,7 @@ class Worker: def set_load_code_from_local(self, load_code_from_local): self._load_code_from_local = load_code_from_local - def put_object(self, value, object_ref=None): + def put_object(self, value, object_ref=None, owner_address=None): """Put value in the local object store with object reference `object_ref`. This assumes that the value for `object_ref` has not yet been placed in @@ -263,6 +263,7 @@ class Worker: value: The value to put in the object store. object_ref (ObjectRef): The object ref of the value to be put. If None, one will be generated. + owner_address: The serialized address of object's owner. Returns: ObjectRef: The object ref the object was put under. @@ -294,7 +295,9 @@ class Worker: # reference counter. return ray.ObjectRef( self.core_worker.put_serialized_object( - serialized_value, object_ref=object_ref)) + serialized_value, + object_ref=object_ref, + owner_address=owner_address)) def raise_errors(self, data_metadata_pairs, object_refs): out = self.deserialize_objects(data_metadata_pairs, object_refs) @@ -1595,22 +1598,45 @@ def get(object_refs, *, timeout=None): @PublicAPI @client_mode_hook -def put(value): +def put(value, *, _owner=None): """Store an object in the object store. The object may not be evicted while a reference to the returned ID exists. Args: value: The Python object to be stored. + _owner: The actor that should own this object. This allows creating + objects with lifetimes decoupled from that of the creating process. + Note that the owner actor must be passed a reference to the object + prior to the object creator exiting, otherwise the reference will + still be lost. Returns: The object ref assigned to this value. """ worker = global_worker worker.check_connected() + + if _owner is None: + serialize_owner_address = None + elif isinstance(_owner, ray.actor.ActorHandle): + # Ensure `ray.state.state.global_state_accessor` is not None + ray.state.state._check_connected() + owner_address = ray.gcs_utils.ActorTableData.FromString( + ray.state.state.global_state_accessor.get_actor_info( + _owner._actor_id)).address + if len(owner_address.worker_id) == 0: + raise RuntimeError( + f"{_owner} is not alive, it's worker_id is empty!") + serialize_owner_address = owner_address.SerializeToString() + else: + raise TypeError( + f"Expect an `ray.actor.ActorHandle`, but got: {type(_owner)}") + with profiling.profile("ray.put"): try: - object_ref = worker.put_object(value) + object_ref = worker.put_object( + value, owner_address=serialize_owner_address) except ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8a280e8de..01cf10edd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1109,23 +1109,66 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, ObjectID *object_id, std::shared_ptr *data, - bool created_by_worker) { + bool created_by_worker, + const std::unique_ptr owner_address) { *object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex()); - reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, - CurrentCallSite(), data_size + metadata->Size(), - /*is_reconstructable=*/false, - NodeID::FromBinary(rpc_address_.raylet_id())); - if (options_.is_local_mode || - (RayConfig::instance().put_small_object_in_memory_store() && - static_cast(data_size) < max_direct_call_object_size_)) { + rpc::Address real_owner_address = + owner_address != nullptr ? *owner_address : rpc_address_; + bool owned_by_us = real_owner_address.worker_id() == rpc_address_.worker_id(); + auto status = Status::OK(); + if (owned_by_us) { + reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, + CurrentCallSite(), data_size + metadata->Size(), + /*is_reconstructable=*/false, + NodeID::FromBinary(rpc_address_.raylet_id())); + } else { + // Because in the remote worker's `HandleAssignObjectOwner`, + // a `WaitForRefRemoved` RPC request will be sent back to + // the current worker. So we need to make sure ref count is > 0 + // by invoking `AddLocalReference` first. + AddLocalReference(*object_id); + RAY_UNUSED(reference_counter_->AddBorrowedObject(*object_id, ObjectID::Nil(), + real_owner_address)); + + // Remote call `AssignObjectOwner()`. + rpc::AssignObjectOwnerRequest request; + request.set_object_id(object_id->Binary()); + request.mutable_borrower_address()->CopyFrom(rpc_address_); + request.set_call_site(CurrentCallSite()); + + for (auto &contained_object_id : contained_object_ids) { + request.add_contained_object_ids(contained_object_id.Binary()); + } + request.set_object_size(data_size + metadata->Size()); + auto conn = core_worker_client_pool_->GetOrConnect(real_owner_address); + std::promise status_promise; + conn->AssignObjectOwner(request, + [&status_promise](const Status &returned_status, + const rpc::AssignObjectOwnerReply &reply) { + status_promise.set_value(returned_status); + }); + // Block until the remote call `AssignObjectOwner` returns. + status = status_promise.get_future().get(); + } + + if ((options_.is_local_mode || + (RayConfig::instance().put_small_object_in_memory_store() && + static_cast(data_size) < max_direct_call_object_size_)) && + owned_by_us) { *data = std::make_shared(data_size); } else { - auto status = plasma_store_provider_->Create(metadata, data_size, *object_id, - /* owner_address = */ rpc_address_, data, - created_by_worker); + if (status.ok()) { + status = plasma_store_provider_->Create(metadata, data_size, *object_id, + /* owner_address = */ rpc_address_, data, + created_by_worker); + } if (!status.ok() || !data) { - reference_counter_->RemoveOwnedObject(*object_id); + if (owned_by_us) { + reference_counter_->RemoveOwnedObject(*object_id); + } else { + RemoveLocalReference(*object_id); + } return status; } } @@ -1146,22 +1189,30 @@ Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, } } -Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object) { - auto status = SealExisting(object_id, pin_object); - if (!status.ok()) { +Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object, + const std::unique_ptr &owner_address) { + bool owned_by_us = owner_address != nullptr + ? WorkerID::FromBinary(owner_address->worker_id()) == + WorkerID::FromBinary(rpc_address_.worker_id()) + : true; + auto status = SealExisting(object_id, pin_object, std::move(owner_address)); + if (status.ok()) return status; + if (owned_by_us) { reference_counter_->RemoveOwnedObject(object_id); + } else { + RemoveLocalReference(object_id); } return status; } Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object, - const absl::optional &owner_address) { + const std::unique_ptr &owner_address) { RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id)); if (pin_object) { // Tell the raylet to pin the object **after** it is created. RAY_LOG(DEBUG) << "Pinning sealed object " << object_id; local_raylet_client_->PinObjectIDs( - owner_address.has_value() ? *owner_address : rpc_address_, {object_id}, + owner_address != nullptr ? *owner_address : rpc_address_, {object_id}, [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { // Only release the object once the raylet has responded to avoid the race // condition that the object could be evicted before the raylet pins it. @@ -2280,11 +2331,12 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, if (!return_object) { return status; } - absl::optional caller_address( - options_.is_local_mode ? absl::optional() - : worker_context_.GetCurrentTask()->CallerAddress()); + std::unique_ptr caller_address = + options_.is_local_mode ? nullptr + : std::make_unique( + worker_context_.GetCurrentTask()->CallerAddress()); if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { - status = SealExisting(return_id, /*pin_object=*/true, caller_address); + status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address)); if (!status.ok()) { RAY_LOG(FATAL) << "Failed to seal object " << return_id << " in store: " << status.message(); @@ -3032,6 +3084,27 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *rep [this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); }); } +void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request, + rpc::AssignObjectOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) { + ObjectID object_id = ObjectID::FromBinary(request.object_id()); + const auto &borrower_address = request.borrower_address(); + std::string call_site = request.call_site(); + // Get a list of contained object ids. + std::vector contained_object_ids; + contained_object_ids.reserve(request.contained_object_ids_size()); + for (const auto &id_binary : request.contained_object_ids()) { + contained_object_ids.push_back(ObjectID::FromBinary(id_binary)); + } + reference_counter_->AddOwnedObject( + object_id, contained_object_ids, rpc_address_, call_site, request.object_size(), + /*is_reconstructable=*/false, + /*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id())); + reference_counter_->AddBorrowerAddress(object_id, borrower_address); + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 2bef89c10..3bb2f14e7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -536,11 +536,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[out] object_id Object ID generated for the put. /// \param[out] data Buffer for the user to write the object into. + /// \param[in] object create by worker or not. + /// \param[in] owner_address The address of object's owner. If not provided, + /// defaults to this worker. /// \return Status. Status CreateOwned(const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, ObjectID *object_id, std::shared_ptr *data, - bool created_by_worker); + bool created_by_worker, + const std::unique_ptr owner_address = nullptr); /// Create and return a buffer in the object store that can be directly written /// into, for an object ID that already exists. After writing to the buffer, the @@ -563,8 +567,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id Object ID corresponding to the object. /// \param[in] pin_object Whether or not to pin the object at the local raylet. + /// \param[in] The address of object's owner. If not provided, + /// defaults to this worker. /// \return Status. - Status SealOwned(const ObjectID &object_id, bool pin_object); + Status SealOwned(const ObjectID &object_id, bool pin_object, + const std::unique_ptr &owner_address = nullptr); /// Finalize placing an object into the object store. This should be called after /// a corresponding `CreateExisting()` call and then writing into the returned buffer. @@ -575,7 +582,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// the raylet if the object is pinned. If not provided, defaults to this worker. /// \return Status. Status SealExisting(const ObjectID &object_id, bool pin_object, - const absl::optional &owner_address = absl::nullopt); + const std::unique_ptr &owner_address = nullptr); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. @@ -1005,6 +1012,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) override; + // Set local worker as the owner of object. + // Request by borrower's worker, execute by owner's worker. + void HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request, + rpc::AssignObjectOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 42adfc932..e95009903 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -1094,6 +1094,27 @@ bool ReferenceCounter::ReportLocalityData(const ObjectID &object_id, return true; } +void ReferenceCounter::AddBorrowerAddress(const ObjectID &object_id, + const rpc::Address &borrower_address) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + RAY_CHECK(it != object_id_refs_.end()); + + RAY_CHECK(it->second.owned_by_us) + << "AddBorrowerAddress should only be used for owner references."; + + rpc::WorkerAddress borrower_worker_address = rpc::WorkerAddress(borrower_address); + RAY_CHECK(borrower_worker_address.worker_id != rpc_address_.worker_id) + << "The borrower cannot be the owner itself"; + + RAY_LOG(DEBUG) << "Add borrower " << borrower_address.DebugString() << " for object " + << object_id; + auto inserted = it->second.borrowers.insert(borrower_worker_address).second; + if (inserted) { + WaitForRefRemoved(it, borrower_worker_address); + } +} + void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) { const auto &object_id = it->first; const auto &locations = it->second.locations; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 4c26d45f9..330ee4089 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -455,6 +455,15 @@ class ReferenceCounter : public ReferenceCounterInterface, const absl::flat_hash_set &locations, uint64_t object_size); + /// Add borrower address in owner's worker. This function will add borrower address + /// to the `object_id_refs_`, then call WaitForRefRemoved() to monitor borrowed + /// object in borrower's worker. + /// + /// \param[in] object_id The ID of Object whose been borrowed. + /// \param[in] borrower_address The address of borrower. + void AddBorrowerAddress(const ObjectID &object_id, const rpc::Address &borrower_address) + LOCKS_EXCLUDED(mutex_); + private: struct Reference { /// Constructor for a reference whose origin is unknown. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 377031ee0..8117dcd0c 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -335,6 +335,22 @@ message RunOnUtilWorkerRequest { message RunOnUtilWorkerReply { } +message AssignObjectOwnerRequest { + // The ID of added object. + bytes object_id = 1; + // The size of the object in bytes. + uint64 object_size = 2; + // The IDs of contained objects. + repeated bytes contained_object_ids = 3; + // The borrower address. + Address borrower_address = 4; + // Description of the call site where the reference was created. + string call_site = 5; +} + +message AssignObjectOwnerReply { +} + service CoreWorkerService { // Push a task directly to this worker from another. rpc PushTask(PushTaskRequest) returns (PushTaskReply); @@ -392,4 +408,6 @@ service CoreWorkerService { rpc RunOnUtilWorker(RunOnUtilWorkerRequest) returns (RunOnUtilWorkerReply); // Request for a worker to exit. rpc Exit(ExitRequest) returns (ExitReply); + // Assign the owner of an object to the intended worker. + rpc AssignObjectOwner(AssignObjectOwnerRequest) returns (AssignObjectOwnerReply); } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 2950af635..3252fbd9c 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -201,6 +201,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { virtual void Exit(const ExitRequest &request, const ClientCallback &callback) {} + virtual void AssignObjectOwner(const AssignObjectOwnerRequest &request, + const ClientCallback &callback) { + } + virtual ~CoreWorkerClientInterface(){}; }; @@ -266,6 +270,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, AssignObjectOwner, grpc_client_, override) + void PushActorTask(std::unique_ptr request, bool skip_queue, const ClientCallback &callback) override { if (skip_queue) { diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index d52881970..05629d7d6 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -49,7 +49,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, AddSpilledUrl) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \ RPC_SERVICE_HANDLER(CoreWorkerService, RunOnUtilWorker) \ - RPC_SERVICE_HANDLER(CoreWorkerService, Exit) + RPC_SERVICE_HANDLER(CoreWorkerService, Exit) \ + RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \ @@ -73,7 +74,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddSpilledUrl) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RunOnUtilWorker) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. class CoreWorkerServiceHandler {