mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[object store] Assign the object owner in ray.put()
(#16833)
This commit is contained in:
parent
4ece5247d6
commit
492076806d
14 changed files with 334 additions and 49 deletions
|
@ -15,7 +15,8 @@ from libcpp.memory cimport (
|
|||
|
||||
from ray.includes.common cimport (
|
||||
CBuffer,
|
||||
CRayObject
|
||||
CRayObject,
|
||||
CAddress,
|
||||
)
|
||||
from ray.includes.libcoreworker cimport (
|
||||
ActorHandleSharedPtr,
|
||||
|
@ -106,6 +107,7 @@ cdef class CoreWorker:
|
|||
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
|
||||
c_bool created_by_worker,
|
||||
owner_address=*)
|
||||
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
|
||||
cdef store_task_outputs(
|
||||
self, worker, outputs, const c_vector[CObjectID] return_ids,
|
||||
c_vector[shared_ptr[CRayObject]] *returns)
|
||||
|
|
|
@ -27,11 +27,12 @@ from libc.stdint cimport (
|
|||
uint64_t,
|
||||
uint8_t,
|
||||
)
|
||||
from libcpp cimport bool as c_bool
|
||||
from libcpp cimport bool as c_bool, nullptr
|
||||
from libcpp.memory cimport (
|
||||
dynamic_pointer_cast,
|
||||
make_shared,
|
||||
shared_ptr,
|
||||
make_unique,
|
||||
unique_ptr,
|
||||
)
|
||||
from libcpp.string cimport string as c_string
|
||||
|
@ -1054,25 +1055,28 @@ cdef class CoreWorker:
|
|||
c_bool created_by_worker,
|
||||
owner_address=None):
|
||||
cdef:
|
||||
CAddress c_owner_address
|
||||
unique_ptr[CAddress] c_owner_address
|
||||
|
||||
c_owner_address = move(self._convert_python_address(owner_address))
|
||||
|
||||
if object_ref is None:
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
|
||||
metadata, data_size, contained_ids,
|
||||
c_object_id, data, created_by_worker))
|
||||
c_object_id, data, created_by_worker,
|
||||
move(c_owner_address)))
|
||||
else:
|
||||
c_object_id[0] = object_ref.native()
|
||||
if owner_address is None:
|
||||
c_owner_address = CCoreWorkerProcess.GetCoreWorker(
|
||||
).GetRpcAddress()
|
||||
else:
|
||||
c_owner_address = CAddress()
|
||||
c_owner_address.ParseFromString(owner_address)
|
||||
c_owner_address = make_unique[CAddress]()
|
||||
dereference(
|
||||
c_owner_address
|
||||
).CopyFrom(CCoreWorkerProcess.GetCoreWorker().GetRpcAddress())
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting(
|
||||
metadata, data_size, c_object_id[0],
|
||||
c_owner_address, data, created_by_worker))
|
||||
dereference(c_owner_address), data,
|
||||
created_by_worker))
|
||||
|
||||
# If data is nullptr, that means the ObjectRef already existed,
|
||||
# which we ignore.
|
||||
|
@ -1080,6 +1084,21 @@ cdef class CoreWorker:
|
|||
# and deal with it here.
|
||||
return data.get() == NULL
|
||||
|
||||
cdef unique_ptr[CAddress] _convert_python_address(self, address=None):
|
||||
""" convert python address to `CAddress`, If not provided,
|
||||
return nullptr.
|
||||
|
||||
Args:
|
||||
address: worker address.
|
||||
"""
|
||||
cdef:
|
||||
unique_ptr[CAddress] c_address
|
||||
|
||||
if address is not None:
|
||||
c_address = make_unique[CAddress]()
|
||||
dereference(c_address).ParseFromString(address)
|
||||
return move(c_address)
|
||||
|
||||
def put_file_like_object(
|
||||
self, metadata, data_size, file_like, ObjectRef object_ref,
|
||||
owner_address):
|
||||
|
@ -1101,6 +1120,7 @@ cdef class CoreWorker:
|
|||
int64_t put_threshold
|
||||
c_bool put_small_object_in_memory_store
|
||||
c_vector[CObjectID] c_object_id_vector
|
||||
unique_ptr[CAddress] c_owner_address
|
||||
# TODO(suquark): This method does not support put objects to
|
||||
# in memory store currently.
|
||||
metadata_buf = string_to_buffer(metadata)
|
||||
|
@ -1117,17 +1137,20 @@ cdef class CoreWorker:
|
|||
while index < data_size:
|
||||
bytes_read = file_like.readinto(view[index:])
|
||||
index += bytes_read
|
||||
c_owner_address = move(self._convert_python_address(owner_address))
|
||||
with nogil:
|
||||
# Using custom object refs is not supported because we
|
||||
# can't track their lifecycle, so we don't pin the object
|
||||
# in this case.
|
||||
check_status(
|
||||
CCoreWorkerProcess.GetCoreWorker().SealExisting(
|
||||
c_object_id, pin_object=False))
|
||||
c_object_id, pin_object=False,
|
||||
owner_address=move(c_owner_address)))
|
||||
|
||||
def put_serialized_object(self, serialized_object,
|
||||
ObjectRef object_ref=None,
|
||||
c_bool pin_object=True):
|
||||
c_bool pin_object=True,
|
||||
owner_address=None):
|
||||
cdef:
|
||||
CObjectID c_object_id
|
||||
shared_ptr[CBuffer] data
|
||||
|
@ -1135,6 +1158,7 @@ cdef class CoreWorker:
|
|||
int64_t put_threshold
|
||||
c_bool put_small_object_in_memory_store
|
||||
c_vector[CObjectID] c_object_id_vector
|
||||
unique_ptr[CAddress] c_owner_address
|
||||
|
||||
metadata = string_to_buffer(serialized_object.metadata)
|
||||
put_threshold = RayConfig.instance().max_direct_call_object_size()
|
||||
|
@ -1144,7 +1168,7 @@ cdef class CoreWorker:
|
|||
object_already_exists = self._create_put_buffer(
|
||||
metadata, total_bytes, object_ref,
|
||||
ObjectRefsToVector(serialized_object.contained_object_refs),
|
||||
&c_object_id, &data, True)
|
||||
&c_object_id, &data, True, owner_address)
|
||||
|
||||
if not object_already_exists:
|
||||
if total_bytes > 0:
|
||||
|
@ -1152,24 +1176,32 @@ cdef class CoreWorker:
|
|||
Buffer.make(data))
|
||||
if self.is_local_mode or (put_small_object_in_memory_store
|
||||
and <int64_t>total_bytes < put_threshold):
|
||||
if owner_address is not None:
|
||||
raise Exception(
|
||||
"cannot put data into memory store directly"
|
||||
" and assign owner at the same time")
|
||||
c_object_id_vector.push_back(c_object_id)
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().Put(
|
||||
CRayObject(data, metadata, c_object_id_vector),
|
||||
c_object_id_vector, c_object_id))
|
||||
else:
|
||||
c_owner_address = move(self._convert_python_address(
|
||||
owner_address))
|
||||
with nogil:
|
||||
if object_ref is None:
|
||||
check_status(
|
||||
CCoreWorkerProcess.GetCoreWorker().SealOwned(
|
||||
c_object_id,
|
||||
pin_object))
|
||||
pin_object,
|
||||
move(c_owner_address)))
|
||||
else:
|
||||
# Using custom object refs is not supported because we
|
||||
# can't track their lifecycle, so we don't pin the
|
||||
# object in this case.
|
||||
check_status(
|
||||
CCoreWorkerProcess.GetCoreWorker().SealExisting(
|
||||
c_object_id, pin_object=False))
|
||||
c_object_id, pin_object=False,
|
||||
owner_address=move(c_owner_address)))
|
||||
|
||||
return c_object_id.Binary()
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ from ray.includes.function_descriptor cimport (
|
|||
)
|
||||
|
||||
|
||||
cdef extern from * namespace "polyfill":
|
||||
cdef extern from * namespace "polyfill" nogil:
|
||||
"""
|
||||
namespace polyfill {
|
||||
|
||||
|
@ -154,6 +154,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
|
|||
CAddress()
|
||||
const c_string &SerializeAsString()
|
||||
void ParseFromString(const c_string &serialized)
|
||||
void CopyFrom(const CAddress& address)
|
||||
const c_string &worker_id()
|
||||
|
||||
|
||||
# This is a workaround for C++ enum class since Cython has no corresponding
|
||||
|
|
|
@ -190,15 +190,18 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
|||
const size_t data_size,
|
||||
const c_vector[CObjectID] &contained_object_ids,
|
||||
CObjectID *object_id, shared_ptr[CBuffer] *data,
|
||||
c_bool created_by_worker)
|
||||
c_bool created_by_worker,
|
||||
const unique_ptr[CAddress] &owner_address)
|
||||
CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata,
|
||||
const size_t data_size,
|
||||
const CObjectID &object_id,
|
||||
const CAddress &owner_address,
|
||||
shared_ptr[CBuffer] *data,
|
||||
c_bool created_by_worker)
|
||||
CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object)
|
||||
CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object)
|
||||
CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object,
|
||||
const unique_ptr[CAddress] &owner_address)
|
||||
CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object,
|
||||
const unique_ptr[CAddress] &owner_address)
|
||||
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
|
||||
c_vector[shared_ptr[CRayObject]] *results,
|
||||
c_bool plasma_objects_only)
|
||||
|
|
|
@ -63,6 +63,7 @@ py_test_module_list(
|
|||
"test_multinode_failures.py",
|
||||
"test_multinode_failures_2.py",
|
||||
"test_multiprocessing.py",
|
||||
"test_object_assign_owner.py",
|
||||
"test_output.py",
|
||||
"test_ray_init.py",
|
||||
"test_reconstruction.py",
|
||||
|
|
77
python/ray/tests/test_object_assign_owner.py
Normal file
77
python/ray/tests/test_object_assign_owner.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
import pytest
|
||||
import ray
|
||||
import time
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"actor_resources",
|
||||
[
|
||||
dict(
|
||||
zip(["owner", "creator", "borrower"], [{
|
||||
f"node{i}": 1
|
||||
} for i in _])) for _ in [
|
||||
[1, 2, 3], # None of them is on the same node.
|
||||
[1, 1, 3], # Owner and creator are on the same node.
|
||||
[3, 2, 3], # Owner and borrower are on the same node.
|
||||
[1, 3, 3], # Creator and borrower are on the same node.
|
||||
[3, 3, 3], # All of them are on the same node.
|
||||
]
|
||||
])
|
||||
def test_owner_assign_when_put(ray_start_cluster, actor_resources):
|
||||
cluster_node_config = [{
|
||||
"num_cpus": 1,
|
||||
"resources": {
|
||||
f"node{i+1}": 10
|
||||
}
|
||||
} for i in range(3)]
|
||||
cluster = ray_start_cluster
|
||||
for kwargs in cluster_node_config:
|
||||
cluster.add_node(**kwargs)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote(resources=actor_resources["creator"], num_cpus=0)
|
||||
class Creator:
|
||||
def gen_object_ref(self, data="test", owner=None):
|
||||
return ray.put(data, _owner=owner)
|
||||
|
||||
@ray.remote(resources=actor_resources["owner"], num_cpus=0)
|
||||
class Owner:
|
||||
def __init__(self):
|
||||
self.ref = None
|
||||
|
||||
def set_object_ref(self, ref):
|
||||
self.ref = ref
|
||||
|
||||
def warmup(self):
|
||||
return 0
|
||||
|
||||
@ray.remote(resources=actor_resources["borrower"], num_cpus=0)
|
||||
class Borrower:
|
||||
def get_object(self, ref):
|
||||
return ray.get(ref)
|
||||
|
||||
owner = Owner.remote()
|
||||
creator = Creator.remote()
|
||||
borrower = Borrower.remote()
|
||||
|
||||
# Make sure the owner actor is alive.
|
||||
ray.get(owner.warmup.remote())
|
||||
|
||||
object_ref = creator.gen_object_ref.remote(data="test1", owner=owner)
|
||||
# TODO(Catch-Bull): Ideally, deleting this line can also work normally,
|
||||
# cause driver keep a reference of the object. But, for now, it still
|
||||
# requires the owner to keep a reference of the object to make it
|
||||
# available.
|
||||
ray.get(owner.set_object_ref.remote(object_ref))
|
||||
|
||||
ray.kill(creator)
|
||||
time.sleep(10)
|
||||
|
||||
data = ray.get(borrower.get_object.remote(object_ref))
|
||||
assert data == "test1"
|
||||
|
||||
ray.kill(owner)
|
||||
time.sleep(2)
|
||||
with pytest.raises(ray.exceptions.RayTaskError) as error:
|
||||
ray.get(borrower.get_object.remote(object_ref), timeout=2)
|
||||
assert "ObjectLostError" in error.value.args[1]
|
|
@ -249,7 +249,7 @@ class Worker:
|
|||
def set_load_code_from_local(self, load_code_from_local):
|
||||
self._load_code_from_local = load_code_from_local
|
||||
|
||||
def put_object(self, value, object_ref=None):
|
||||
def put_object(self, value, object_ref=None, owner_address=None):
|
||||
"""Put value in the local object store with object reference `object_ref`.
|
||||
|
||||
This assumes that the value for `object_ref` has not yet been placed in
|
||||
|
@ -263,6 +263,7 @@ class Worker:
|
|||
value: The value to put in the object store.
|
||||
object_ref (ObjectRef): The object ref of the value to be
|
||||
put. If None, one will be generated.
|
||||
owner_address: The serialized address of object's owner.
|
||||
|
||||
Returns:
|
||||
ObjectRef: The object ref the object was put under.
|
||||
|
@ -294,7 +295,9 @@ class Worker:
|
|||
# reference counter.
|
||||
return ray.ObjectRef(
|
||||
self.core_worker.put_serialized_object(
|
||||
serialized_value, object_ref=object_ref))
|
||||
serialized_value,
|
||||
object_ref=object_ref,
|
||||
owner_address=owner_address))
|
||||
|
||||
def raise_errors(self, data_metadata_pairs, object_refs):
|
||||
out = self.deserialize_objects(data_metadata_pairs, object_refs)
|
||||
|
@ -1595,22 +1598,45 @@ def get(object_refs, *, timeout=None):
|
|||
|
||||
@PublicAPI
|
||||
@client_mode_hook
|
||||
def put(value):
|
||||
def put(value, *, _owner=None):
|
||||
"""Store an object in the object store.
|
||||
|
||||
The object may not be evicted while a reference to the returned ID exists.
|
||||
|
||||
Args:
|
||||
value: The Python object to be stored.
|
||||
_owner: The actor that should own this object. This allows creating
|
||||
objects with lifetimes decoupled from that of the creating process.
|
||||
Note that the owner actor must be passed a reference to the object
|
||||
prior to the object creator exiting, otherwise the reference will
|
||||
still be lost.
|
||||
|
||||
Returns:
|
||||
The object ref assigned to this value.
|
||||
"""
|
||||
worker = global_worker
|
||||
worker.check_connected()
|
||||
|
||||
if _owner is None:
|
||||
serialize_owner_address = None
|
||||
elif isinstance(_owner, ray.actor.ActorHandle):
|
||||
# Ensure `ray.state.state.global_state_accessor` is not None
|
||||
ray.state.state._check_connected()
|
||||
owner_address = ray.gcs_utils.ActorTableData.FromString(
|
||||
ray.state.state.global_state_accessor.get_actor_info(
|
||||
_owner._actor_id)).address
|
||||
if len(owner_address.worker_id) == 0:
|
||||
raise RuntimeError(
|
||||
f"{_owner} is not alive, it's worker_id is empty!")
|
||||
serialize_owner_address = owner_address.SerializeToString()
|
||||
else:
|
||||
raise TypeError(
|
||||
f"Expect an `ray.actor.ActorHandle`, but got: {type(_owner)}")
|
||||
|
||||
with profiling.profile("ray.put"):
|
||||
try:
|
||||
object_ref = worker.put_object(value)
|
||||
object_ref = worker.put_object(
|
||||
value, owner_address=serialize_owner_address)
|
||||
except ObjectStoreFullError:
|
||||
logger.info(
|
||||
"Put failed since the value was either too large or the "
|
||||
|
|
|
@ -1109,23 +1109,66 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
|
|||
const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids,
|
||||
ObjectID *object_id, std::shared_ptr<Buffer> *data,
|
||||
bool created_by_worker) {
|
||||
bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> owner_address) {
|
||||
*object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(),
|
||||
worker_context_.GetNextPutIndex());
|
||||
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
|
||||
CurrentCallSite(), data_size + metadata->Size(),
|
||||
/*is_reconstructable=*/false,
|
||||
NodeID::FromBinary(rpc_address_.raylet_id()));
|
||||
if (options_.is_local_mode ||
|
||||
(RayConfig::instance().put_small_object_in_memory_store() &&
|
||||
static_cast<int64_t>(data_size) < max_direct_call_object_size_)) {
|
||||
rpc::Address real_owner_address =
|
||||
owner_address != nullptr ? *owner_address : rpc_address_;
|
||||
bool owned_by_us = real_owner_address.worker_id() == rpc_address_.worker_id();
|
||||
auto status = Status::OK();
|
||||
if (owned_by_us) {
|
||||
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
|
||||
CurrentCallSite(), data_size + metadata->Size(),
|
||||
/*is_reconstructable=*/false,
|
||||
NodeID::FromBinary(rpc_address_.raylet_id()));
|
||||
} else {
|
||||
// Because in the remote worker's `HandleAssignObjectOwner`,
|
||||
// a `WaitForRefRemoved` RPC request will be sent back to
|
||||
// the current worker. So we need to make sure ref count is > 0
|
||||
// by invoking `AddLocalReference` first.
|
||||
AddLocalReference(*object_id);
|
||||
RAY_UNUSED(reference_counter_->AddBorrowedObject(*object_id, ObjectID::Nil(),
|
||||
real_owner_address));
|
||||
|
||||
// Remote call `AssignObjectOwner()`.
|
||||
rpc::AssignObjectOwnerRequest request;
|
||||
request.set_object_id(object_id->Binary());
|
||||
request.mutable_borrower_address()->CopyFrom(rpc_address_);
|
||||
request.set_call_site(CurrentCallSite());
|
||||
|
||||
for (auto &contained_object_id : contained_object_ids) {
|
||||
request.add_contained_object_ids(contained_object_id.Binary());
|
||||
}
|
||||
request.set_object_size(data_size + metadata->Size());
|
||||
auto conn = core_worker_client_pool_->GetOrConnect(real_owner_address);
|
||||
std::promise<Status> status_promise;
|
||||
conn->AssignObjectOwner(request,
|
||||
[&status_promise](const Status &returned_status,
|
||||
const rpc::AssignObjectOwnerReply &reply) {
|
||||
status_promise.set_value(returned_status);
|
||||
});
|
||||
// Block until the remote call `AssignObjectOwner` returns.
|
||||
status = status_promise.get_future().get();
|
||||
}
|
||||
|
||||
if ((options_.is_local_mode ||
|
||||
(RayConfig::instance().put_small_object_in_memory_store() &&
|
||||
static_cast<int64_t>(data_size) < max_direct_call_object_size_)) &&
|
||||
owned_by_us) {
|
||||
*data = std::make_shared<LocalMemoryBuffer>(data_size);
|
||||
} else {
|
||||
auto status = plasma_store_provider_->Create(metadata, data_size, *object_id,
|
||||
/* owner_address = */ rpc_address_, data,
|
||||
created_by_worker);
|
||||
if (status.ok()) {
|
||||
status = plasma_store_provider_->Create(metadata, data_size, *object_id,
|
||||
/* owner_address = */ rpc_address_, data,
|
||||
created_by_worker);
|
||||
}
|
||||
if (!status.ok() || !data) {
|
||||
reference_counter_->RemoveOwnedObject(*object_id);
|
||||
if (owned_by_us) {
|
||||
reference_counter_->RemoveOwnedObject(*object_id);
|
||||
} else {
|
||||
RemoveLocalReference(*object_id);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
@ -1146,22 +1189,30 @@ Status CoreWorker::CreateExisting(const std::shared_ptr<Buffer> &metadata,
|
|||
}
|
||||
}
|
||||
|
||||
Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object) {
|
||||
auto status = SealExisting(object_id, pin_object);
|
||||
if (!status.ok()) {
|
||||
Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object,
|
||||
const std::unique_ptr<rpc::Address> &owner_address) {
|
||||
bool owned_by_us = owner_address != nullptr
|
||||
? WorkerID::FromBinary(owner_address->worker_id()) ==
|
||||
WorkerID::FromBinary(rpc_address_.worker_id())
|
||||
: true;
|
||||
auto status = SealExisting(object_id, pin_object, std::move(owner_address));
|
||||
if (status.ok()) return status;
|
||||
if (owned_by_us) {
|
||||
reference_counter_->RemoveOwnedObject(object_id);
|
||||
} else {
|
||||
RemoveLocalReference(object_id);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object,
|
||||
const absl::optional<rpc::Address> &owner_address) {
|
||||
const std::unique_ptr<rpc::Address> &owner_address) {
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id));
|
||||
if (pin_object) {
|
||||
// Tell the raylet to pin the object **after** it is created.
|
||||
RAY_LOG(DEBUG) << "Pinning sealed object " << object_id;
|
||||
local_raylet_client_->PinObjectIDs(
|
||||
owner_address.has_value() ? *owner_address : rpc_address_, {object_id},
|
||||
owner_address != nullptr ? *owner_address : rpc_address_, {object_id},
|
||||
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
|
||||
// Only release the object once the raylet has responded to avoid the race
|
||||
// condition that the object could be evicted before the raylet pins it.
|
||||
|
@ -2280,11 +2331,12 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id,
|
|||
if (!return_object) {
|
||||
return status;
|
||||
}
|
||||
absl::optional<rpc::Address> caller_address(
|
||||
options_.is_local_mode ? absl::optional<rpc::Address>()
|
||||
: worker_context_.GetCurrentTask()->CallerAddress());
|
||||
std::unique_ptr<rpc::Address> caller_address =
|
||||
options_.is_local_mode ? nullptr
|
||||
: std::make_unique<rpc::Address>(
|
||||
worker_context_.GetCurrentTask()->CallerAddress());
|
||||
if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) {
|
||||
status = SealExisting(return_id, /*pin_object=*/true, caller_address);
|
||||
status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address));
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(FATAL) << "Failed to seal object " << return_id
|
||||
<< " in store: " << status.message();
|
||||
|
@ -3032,6 +3084,27 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *rep
|
|||
[this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); });
|
||||
}
|
||||
|
||||
void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request,
|
||||
rpc::AssignObjectOwnerReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
ObjectID object_id = ObjectID::FromBinary(request.object_id());
|
||||
const auto &borrower_address = request.borrower_address();
|
||||
std::string call_site = request.call_site();
|
||||
// Get a list of contained object ids.
|
||||
std::vector<ObjectID> contained_object_ids;
|
||||
contained_object_ids.reserve(request.contained_object_ids_size());
|
||||
for (const auto &id_binary : request.contained_object_ids()) {
|
||||
contained_object_ids.push_back(ObjectID::FromBinary(id_binary));
|
||||
}
|
||||
reference_counter_->AddOwnedObject(
|
||||
object_id, contained_object_ids, rpc_address_, call_site, request.object_size(),
|
||||
/*is_reconstructable=*/false,
|
||||
/*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id()));
|
||||
reference_counter_->AddBorrowerAddress(object_id, borrower_address);
|
||||
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
}
|
||||
|
||||
void CoreWorker::YieldCurrentFiber(FiberEvent &event) {
|
||||
RAY_CHECK(worker_context_.CurrentActorIsAsync());
|
||||
boost::this_fiber::yield();
|
||||
|
|
|
@ -536,11 +536,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
/// \param[in] contained_object_ids The IDs serialized in this object.
|
||||
/// \param[out] object_id Object ID generated for the put.
|
||||
/// \param[out] data Buffer for the user to write the object into.
|
||||
/// \param[in] object create by worker or not.
|
||||
/// \param[in] owner_address The address of object's owner. If not provided,
|
||||
/// defaults to this worker.
|
||||
/// \return Status.
|
||||
Status CreateOwned(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids,
|
||||
ObjectID *object_id, std::shared_ptr<Buffer> *data,
|
||||
bool created_by_worker);
|
||||
bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> owner_address = nullptr);
|
||||
|
||||
/// Create and return a buffer in the object store that can be directly written
|
||||
/// into, for an object ID that already exists. After writing to the buffer, the
|
||||
|
@ -563,8 +567,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
///
|
||||
/// \param[in] object_id Object ID corresponding to the object.
|
||||
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
|
||||
/// \param[in] The address of object's owner. If not provided,
|
||||
/// defaults to this worker.
|
||||
/// \return Status.
|
||||
Status SealOwned(const ObjectID &object_id, bool pin_object);
|
||||
Status SealOwned(const ObjectID &object_id, bool pin_object,
|
||||
const std::unique_ptr<rpc::Address> &owner_address = nullptr);
|
||||
|
||||
/// Finalize placing an object into the object store. This should be called after
|
||||
/// a corresponding `CreateExisting()` call and then writing into the returned buffer.
|
||||
|
@ -575,7 +582,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
/// the raylet if the object is pinned. If not provided, defaults to this worker.
|
||||
/// \return Status.
|
||||
Status SealExisting(const ObjectID &object_id, bool pin_object,
|
||||
const absl::optional<rpc::Address> &owner_address = absl::nullopt);
|
||||
const std::unique_ptr<rpc::Address> &owner_address = nullptr);
|
||||
|
||||
/// Get a list of objects from the object store. Objects that failed to be retrieved
|
||||
/// will be returned as nullptrs.
|
||||
|
@ -1005,6 +1012,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) override;
|
||||
|
||||
// Set local worker as the owner of object.
|
||||
// Request by borrower's worker, execute by owner's worker.
|
||||
void HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request,
|
||||
rpc::AssignObjectOwnerReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) override;
|
||||
|
||||
///
|
||||
/// Public methods related to async actor call. This should only be used when
|
||||
/// the actor is (1) direct actor and (2) using asyncio mode.
|
||||
|
|
|
@ -1094,6 +1094,27 @@ bool ReferenceCounter::ReportLocalityData(const ObjectID &object_id,
|
|||
return true;
|
||||
}
|
||||
|
||||
void ReferenceCounter::AddBorrowerAddress(const ObjectID &object_id,
|
||||
const rpc::Address &borrower_address) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
auto it = object_id_refs_.find(object_id);
|
||||
RAY_CHECK(it != object_id_refs_.end());
|
||||
|
||||
RAY_CHECK(it->second.owned_by_us)
|
||||
<< "AddBorrowerAddress should only be used for owner references.";
|
||||
|
||||
rpc::WorkerAddress borrower_worker_address = rpc::WorkerAddress(borrower_address);
|
||||
RAY_CHECK(borrower_worker_address.worker_id != rpc_address_.worker_id)
|
||||
<< "The borrower cannot be the owner itself";
|
||||
|
||||
RAY_LOG(DEBUG) << "Add borrower " << borrower_address.DebugString() << " for object "
|
||||
<< object_id;
|
||||
auto inserted = it->second.borrowers.insert(borrower_worker_address).second;
|
||||
if (inserted) {
|
||||
WaitForRefRemoved(it, borrower_worker_address);
|
||||
}
|
||||
}
|
||||
|
||||
void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) {
|
||||
const auto &object_id = it->first;
|
||||
const auto &locations = it->second.locations;
|
||||
|
|
|
@ -455,6 +455,15 @@ class ReferenceCounter : public ReferenceCounterInterface,
|
|||
const absl::flat_hash_set<NodeID> &locations,
|
||||
uint64_t object_size);
|
||||
|
||||
/// Add borrower address in owner's worker. This function will add borrower address
|
||||
/// to the `object_id_refs_`, then call WaitForRefRemoved() to monitor borrowed
|
||||
/// object in borrower's worker.
|
||||
///
|
||||
/// \param[in] object_id The ID of Object whose been borrowed.
|
||||
/// \param[in] borrower_address The address of borrower.
|
||||
void AddBorrowerAddress(const ObjectID &object_id, const rpc::Address &borrower_address)
|
||||
LOCKS_EXCLUDED(mutex_);
|
||||
|
||||
private:
|
||||
struct Reference {
|
||||
/// Constructor for a reference whose origin is unknown.
|
||||
|
|
|
@ -335,6 +335,22 @@ message RunOnUtilWorkerRequest {
|
|||
message RunOnUtilWorkerReply {
|
||||
}
|
||||
|
||||
message AssignObjectOwnerRequest {
|
||||
// The ID of added object.
|
||||
bytes object_id = 1;
|
||||
// The size of the object in bytes.
|
||||
uint64 object_size = 2;
|
||||
// The IDs of contained objects.
|
||||
repeated bytes contained_object_ids = 3;
|
||||
// The borrower address.
|
||||
Address borrower_address = 4;
|
||||
// Description of the call site where the reference was created.
|
||||
string call_site = 5;
|
||||
}
|
||||
|
||||
message AssignObjectOwnerReply {
|
||||
}
|
||||
|
||||
service CoreWorkerService {
|
||||
// Push a task directly to this worker from another.
|
||||
rpc PushTask(PushTaskRequest) returns (PushTaskReply);
|
||||
|
@ -392,4 +408,6 @@ service CoreWorkerService {
|
|||
rpc RunOnUtilWorker(RunOnUtilWorkerRequest) returns (RunOnUtilWorkerReply);
|
||||
// Request for a worker to exit.
|
||||
rpc Exit(ExitRequest) returns (ExitReply);
|
||||
// Assign the owner of an object to the intended worker.
|
||||
rpc AssignObjectOwner(AssignObjectOwnerRequest) returns (AssignObjectOwnerReply);
|
||||
}
|
||||
|
|
|
@ -201,6 +201,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface {
|
|||
virtual void Exit(const ExitRequest &request,
|
||||
const ClientCallback<ExitReply> &callback) {}
|
||||
|
||||
virtual void AssignObjectOwner(const AssignObjectOwnerRequest &request,
|
||||
const ClientCallback<AssignObjectOwnerReply> &callback) {
|
||||
}
|
||||
|
||||
virtual ~CoreWorkerClientInterface(){};
|
||||
};
|
||||
|
||||
|
@ -266,6 +270,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
|
|||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override)
|
||||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, AssignObjectOwner, grpc_client_, override)
|
||||
|
||||
void PushActorTask(std::unique_ptr<PushTaskRequest> request, bool skip_queue,
|
||||
const ClientCallback<PushTaskReply> &callback) override {
|
||||
if (skip_queue) {
|
||||
|
|
|
@ -49,7 +49,8 @@ namespace rpc {
|
|||
RPC_SERVICE_HANDLER(CoreWorkerService, AddSpilledUrl) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, RunOnUtilWorker) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, Exit)
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, Exit) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner)
|
||||
|
||||
#define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \
|
||||
|
@ -73,7 +74,8 @@ namespace rpc {
|
|||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddSpilledUrl) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RunOnUtilWorker) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit)
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner)
|
||||
|
||||
/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.
|
||||
class CoreWorkerServiceHandler {
|
||||
|
|
Loading…
Add table
Reference in a new issue