Revert "[core] Increment ref count when creating an ObjectRef to prev… (#22106)

This reverts commit e3af828220.
This commit is contained in:
SangBin Cho 2022-02-04 17:55:45 +09:00 committed by GitHub
parent a887763b38
commit 6dda196f47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 197 additions and 305 deletions

View file

@ -163,12 +163,7 @@ inline ray::ObjectRef<T> Put(const T &obj) {
auto buffer =
std::make_shared<msgpack::sbuffer>(ray::internal::Serializer::Serialize(obj));
auto id = ray::internal::GetRayRuntime()->Put(buffer);
auto ref = ObjectRef<T>(id);
// The core worker will add an initial ref to the put ID to
// keep it in scope. Now that we've created the frontend
// ObjectRef, remove this initial ref.
ray::internal::GetRayRuntime()->RemoveLocalReference(id);
return ref;
return ray::ObjectRef<T>(id);
}
template <typename T>

View file

@ -82,12 +82,7 @@ ObjectRef<boost::callable_traits::return_type_t<F>> ActorTaskCaller<F>::Remote(
using ReturnType = boost::callable_traits::return_type_t<F>;
auto returned_object_id =
runtime_->CallActor(remote_function_holder_, id_, args_, task_options_);
auto return_ref = ObjectRef<ReturnType>(returned_object_id);
// The core worker will add an initial ref to each return ID to keep it in
// scope. Now that we've created the frontend ObjectRef, remove this initial
// ref.
runtime_->RemoveLocalReference(returned_object_id);
return return_ref;
return ObjectRef<ReturnType>(returned_object_id);
}
} // namespace internal

View file

@ -89,12 +89,7 @@ ObjectRef<boost::callable_traits::return_type_t<F>> TaskCaller<F>::Remote(
auto returned_object_id = runtime_->Call(remote_function_holder_, args_, task_options_);
using ReturnType = boost::callable_traits::return_type_t<F>;
auto return_ref = ObjectRef<ReturnType>(returned_object_id);
// The core worker will add an initial ref to each return ID to keep it in
// scope. Now that we've created the frontend ObjectRef, remove this initial
// ref.
runtime_->RemoveLocalReference(returned_object_id);
return return_ref;
return ObjectRef<ReturnType>(returned_object_id);
}
} // namespace internal
} // namespace ray

View file

@ -76,10 +76,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
LOGGER.debug("Putting Object in Task {}.", workerContext.getCurrentTaskId());
}
ObjectId objectId = objectStore.put(obj);
return new ObjectRefImpl<T>(
objectId,
(Class<T>) (obj == null ? Object.class : obj.getClass()),
/*skipAddingLocalRef=*/ true);
return new ObjectRefImpl<T>(objectId, (Class<T>) (obj == null ? Object.class : obj.getClass()));
}
@Override
@ -91,10 +88,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
ownerActor.getId());
}
ObjectId objectId = objectStore.put(obj, ownerActor.getId());
return new ObjectRefImpl<T>(
objectId,
(Class<T>) (obj == null ? Object.class : obj.getClass()),
/*skipAddingLocalRef=*/ true);
return new ObjectRefImpl<T>(objectId, (Class<T>) (obj == null ? Object.class : obj.getClass()));
}
@Override
@ -301,7 +295,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
if (returnIds.isEmpty()) {
return null;
} else {
return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true);
return new ObjectRefImpl(returnIds.get(0), returnType.get());
}
}
@ -323,7 +317,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
if (returnIds.isEmpty()) {
return null;
} else {
return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true);
return new ObjectRefImpl(returnIds.get(0), returnType.get());
}
}

View file

@ -32,22 +32,10 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
private Class<T> type;
public ObjectRefImpl(ObjectId id, Class<T> type, boolean skipAddingLocalRef) {
public ObjectRefImpl(ObjectId id, Class<T> type) {
this.id = id;
this.type = type;
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
if (!skipAddingLocalRef) {
runtime.getObjectStore().addLocalReference(workerId, id);
}
// We still add the reference so that the local ref count will be properly
// decremented once this object is GCed.
new ObjectRefImplReference(this);
}
public ObjectRefImpl(ObjectId id, Class<T> type) {
this(id, type, /*skipAddingLocalRef=*/ false);
addLocalReference();
}
public ObjectRefImpl() {}
@ -93,19 +81,22 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
int len = in.readInt();
byte[] ownerAddress = new byte[len];
in.readFully(ownerAddress);
addLocalReference();
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
runtime.getObjectStore().addLocalReference(workerId, id);
new ObjectRefImplReference(this);
runtime
.getObjectStore()
.registerOwnershipInfoAndResolveFuture(
this.id, ObjectSerializer.getOuterObjectId(), ownerAddress);
}
private void addLocalReference() {
Preconditions.checkState(workerId == null);
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
workerId = runtime.getWorkerContext().getCurrentWorkerId();
runtime.getObjectStore().addLocalReference(workerId, id);
new ObjectRefImplReference(this);
}
private static final class ObjectRefImplReference
extends FinalizableWeakReference<ObjectRefImpl<?>> {

View file

@ -523,6 +523,15 @@ def put_object(obj, use_ray_put):
return _put.remote(obj)
def put_unpinned_object(obj):
value = ray.worker.global_worker.get_serialization_context().serialize(obj)
return ray.ObjectRef(
ray.worker.global_worker.core_worker.put_serialized_object(
value, pin_object=False
)
)
def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100):
ip_port = address.split(":")
ip = ip_port[0]

View file

@ -194,15 +194,13 @@ cdef RayObjectsToDataMetadataPairs(
return data_metadata_pairs
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs,
skip_adding_local_ref):
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
result = []
for i in range(object_refs.size()):
result.append(ObjectRef(
object_refs[i].object_id(),
object_refs[i].owner_address().SerializeAsString(),
object_refs[i].call_site(),
skip_adding_local_ref=skip_adding_local_ref))
object_refs[i].call_site()))
return result
@ -363,28 +361,10 @@ cdef int prepare_actor_concurrency_groups(
concurrency_groups.push_back(cg)
return 1
cdef prepare_args_and_increment_put_refs(
cdef prepare_args(
CoreWorker core_worker,
Language language, args,
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor,
c_vector[CObjectID] *incremented_put_arg_ids):
try:
prepare_args_internal(core_worker, language, args, args_vector,
function_descriptor, incremented_put_arg_ids)
except Exception as e:
# An error occurred during arg serialization. We must remove the
# initial local ref for all args that were successfully put into the
# local plasma store. These objects will then get released.
for put_arg_id in dereference(incremented_put_arg_ids):
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
put_arg_id)
raise e
cdef prepare_args_internal(
CoreWorker core_worker,
Language language, args,
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor,
c_vector[CObjectID] *incremented_put_arg_ids):
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor):
cdef:
size_t size
int64_t put_threshold
@ -457,16 +437,12 @@ cdef prepare_args_internal(
inlined_ids.clear()
total_inlined += <int64_t>size
else:
put_id = CObjectID.FromBinary(
core_worker.put_serialized_object_and_increment_local_ref(
serialized_arg, inline_small_object=False))
args_vector.push_back(unique_ptr[CTaskArg](
new CTaskArgByReference(
put_id,
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
put_arg_call_site
)))
incremented_put_arg_ids.push_back(put_id)
new CTaskArgByReference(CObjectID.FromBinary(
core_worker.put_serialized_object(
serialized_arg, inline_small_object=False)),
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
put_arg_call_site)))
cdef raise_if_dependency_failed(arg):
@ -628,9 +604,7 @@ cdef execute_task(
args, kwargs = [], {}
else:
metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
object_refs = VectorToObjectRefs(
c_arg_refs,
skip_adding_local_ref=False)
object_refs = VectorToObjectRefs(c_arg_refs)
if core_worker.current_actor_is_asyncio():
# We deserialize objects in event loop thread to
@ -886,9 +860,7 @@ cdef c_vector[c_string] spill_objects_handler(
c_vector[c_string] owner_addresses
with gil:
object_refs = VectorToObjectRefs(
object_refs_to_spill,
skip_adding_local_ref=False)
object_refs = VectorToObjectRefs(object_refs_to_spill)
for i in range(object_refs_to_spill.size()):
owner_addresses.push_back(
object_refs_to_spill[i].owner_address()
@ -924,9 +896,7 @@ cdef int64_t restore_spilled_objects_handler(
size = object_urls.size()
for i in range(size):
urls.append(object_urls[i])
object_refs = VectorToObjectRefs(
object_refs_to_restore,
skip_adding_local_ref=False)
object_refs = VectorToObjectRefs(object_refs_to_restore)
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
@ -1241,8 +1211,7 @@ cdef class CoreWorker:
if object_ref is None:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.CreateOwnedAndIncrementLocalRef(
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
metadata, data_size, contained_ids,
c_object_id, data, created_by_worker,
move(c_owner_address),
@ -1296,25 +1265,20 @@ cdef class CoreWorker:
owner_address: Owner address for this object ref.
"""
cdef:
CObjectID c_object_id = object_ref.native()
CObjectID c_object_id
shared_ptr[CBuffer] data_buf
shared_ptr[CBuffer] metadata_buf
int64_t put_threshold
unique_ptr[CAddress] c_owner_address = move(self._convert_python_address(
object_ref.owner_address()))
c_vector[CObjectID] c_object_id_vector
unique_ptr[CAddress] c_owner_address
# TODO(suquark): This method does not support put objects to
# in memory store currently.
metadata_buf = string_to_buffer(metadata)
status = CCoreWorkerProcess.GetCoreWorker().CreateExisting(
metadata_buf, data_size, object_ref.native(),
dereference(c_owner_address), &data_buf,
False)
if not status.ok():
logger.debug("Error putting restored object into plasma.")
return
if data_buf == NULL:
object_already_exists = self._create_put_buffer(
metadata_buf, data_size, object_ref,
ObjectRefsToVector([]),
&c_object_id, &data_buf, False, owner_address)
if object_already_exists:
logger.debug("Object already exists in 'put_file_like_object'.")
return
data = Buffer.make(data_buf)
@ -1323,6 +1287,7 @@ cdef class CoreWorker:
while index < data_size:
bytes_read = file_like.readinto(view[index:])
index += bytes_read
c_owner_address = move(self._convert_python_address(owner_address))
with nogil:
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the object
@ -1330,13 +1295,13 @@ cdef class CoreWorker:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False,
owner_address=c_owner_address))
owner_address=move(c_owner_address)))
def put_serialized_object_and_increment_local_ref(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True):
def put_serialized_object(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None,
c_bool inline_small_object=True):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data
@ -1506,7 +1471,6 @@ cdef class CoreWorker:
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectReference] return_refs
CSchedulingStrategy c_scheduling_strategy
c_vector[CObjectID] incremented_put_arg_ids
self.python_scheduling_strategy_to_c(
scheduling_strategy, &c_scheduling_strategy)
@ -1515,9 +1479,8 @@ cdef class CoreWorker:
prepare_resources(resources, &c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
self, language, args, &args_vector, function_descriptor,
&incremented_put_arg_ids)
prepare_args(
self, language, args, &args_vector, function_descriptor)
# NOTE(edoakes): releasing the GIL while calling this method causes
# segfaults. See relevant issue for details:
@ -1531,18 +1494,7 @@ cdef class CoreWorker:
c_scheduling_strategy,
debugger_breakpoint)
# These arguments were serialized and put into the local object
# store during task submission. The backend increments their local
# ref count initially to ensure that they remain in scope until we
# add to their submitted task ref count. Now that the task has
# been submitted, it's safe to remove the initial local ref.
for put_arg_id in incremented_put_arg_ids:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
put_arg_id)
# The initial local reference is already acquired internally when
# adding the pending task.
return VectorToObjectRefs(return_refs, skip_adding_local_ref=True)
return VectorToObjectRefs(return_refs)
def create_actor(self,
Language language,
@ -1572,7 +1524,6 @@ cdef class CoreWorker:
CActorID c_actor_id
c_vector[CConcurrencyGroup] c_concurrency_groups
CSchedulingStrategy c_scheduling_strategy
c_vector[CObjectID] incremented_put_arg_ids
optional[c_bool] is_detached_optional = nullopt
self.python_scheduling_strategy_to_c(
@ -1583,9 +1534,8 @@ cdef class CoreWorker:
prepare_resources(placement_resources, &c_placement_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
self, language, args, &args_vector, function_descriptor,
&incremented_put_arg_ids)
prepare_args(
self, language, args, &args_vector, function_descriptor)
prepare_actor_concurrency_groups(
concurrency_groups_dict, &c_concurrency_groups)
@ -1594,7 +1544,7 @@ cdef class CoreWorker:
True if is_detached else False)
with nogil:
status = CCoreWorkerProcess.GetCoreWorker().CreateActor(
check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_restarts, max_task_retries, max_concurrency,
@ -1610,18 +1560,7 @@ cdef class CoreWorker:
is_asyncio or max_concurrency > 1,
max_pending_calls),
extension_data,
&c_actor_id)
# These arguments were serialized and put into the local object
# store during task submission. The backend increments their local
# ref count initially to ensure that they remain in scope until we
# add to their submitted task ref count. Now that the task has
# been submitted, it's safe to remove the initial local ref.
for put_arg_id in incremented_put_arg_ids:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
put_arg_id)
check_status(status)
&c_actor_id))
return ActorID(c_actor_id.Binary())
@ -1702,16 +1641,14 @@ cdef class CoreWorker:
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
optional[c_vector[CObjectReference]] return_refs
c_vector[CObjectID] incremented_put_arg_ids
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
self, language, args, &args_vector, function_descriptor,
&incremented_put_arg_ids)
prepare_args(
self, language, args, &args_vector, function_descriptor)
# NOTE(edoakes): releasing the GIL while calling this method causes
# segfaults. See relevant issue for details:
@ -1722,20 +1659,8 @@ cdef class CoreWorker:
args_vector,
CTaskOptions(
name, num_returns, c_resources, concurrency_group_name))
# These arguments were serialized and put into the local object
# store during task submission. The backend increments their local
# ref count initially to ensure that they remain in scope until we
# add to their submitted task ref count. Now that the task has
# been submitted, it's safe to remove the initial local ref.
for put_arg_id in incremented_put_arg_ids:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
put_arg_id)
if return_refs.has_value():
# The initial local reference is already acquired internally
# when adding the pending task.
return VectorToObjectRefs(return_refs.value(),
skip_adding_local_ref=True)
return VectorToObjectRefs(return_refs.value())
else:
actor = self.get_actor_handle(actor_id)
actor_handle = (CCoreWorkerProcess.GetCoreWorker()

View file

@ -199,14 +199,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Put(const CRayObject &object,
const c_vector[CObjectID] &contained_object_ids,
const CObjectID &object_id)
CRayStatus CreateOwnedAndIncrementLocalRef(
const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const c_vector[CObjectID] &contained_object_ids,
CObjectID *object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
const unique_ptr[CAddress] &owner_address,
c_bool inline_small_object)
CRayStatus CreateOwned(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const c_vector[CObjectID] &contained_object_ids,
CObjectID *object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
const unique_ptr[CAddress] &owner_address,
c_bool inline_small_object)
CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const CObjectID &object_id,

View file

@ -314,11 +314,14 @@ class Worker:
# removed before this one, it will corrupt the state in the
# reference counter.
return ray.ObjectRef(
self.core_worker.put_serialized_object_and_increment_local_ref(
self.core_worker.put_serialized_object(
serialized_value, object_ref=object_ref, owner_address=owner_address
),
# The initial local reference is already acquired internally.
skip_adding_local_ref=True,
# If the owner address is set, then the initial reference is
# already acquired internally in CoreWorker::CreateOwned.
# TODO(ekl) we should unify the code path more with the others
# to avoid this special case.
skip_adding_local_ref=(owner_address is not None),
)
def raise_errors(self, data_metadata_pairs, object_refs):

View file

@ -111,13 +111,10 @@ bool ActorManager::AddNewActorHandle(std::unique_ptr<ActorHandle> actor_handle,
const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id);
// Detached actor doesn't need ref counting.
if (!is_detached) {
// We don't need to add an initial local ref here because it will get added
// in AddActorHandle.
reference_counter_->AddOwnedObject(actor_creation_return_id,
/*inner_ids=*/{}, caller_address, call_site,
/*object_size*/ -1,
/*is_reconstructable=*/true,
/*add_local_ref=*/false);
/*is_reconstructable=*/true);
}
return AddActorHandle(std::move(actor_handle), cached_actor_name,

View file

@ -826,10 +826,9 @@ Status CoreWorker::Put(const RayObject &object,
ObjectID *object_id) {
*object_id = ObjectID::FromIndex(worker_context_.GetCurrentInternalTaskId(),
worker_context_.GetNextPutIndex());
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false, /*add_local_ref=*/true,
NodeID::FromBinary(rpc_address_.raylet_id()));
reference_counter_->AddOwnedObject(
*object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false, NodeID::FromBinary(rpc_address_.raylet_id()));
auto status = Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
if (!status.ok()) {
reference_counter_->RemoveOwnedObject(*object_id);
@ -876,11 +875,13 @@ Status CoreWorker::Put(const RayObject &object,
return PutInLocalPlasmaStore(object, object_id, pin_object);
}
Status CoreWorker::CreateOwnedAndIncrementLocalRef(
const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids, ObjectID *object_id,
std::shared_ptr<Buffer> *data, bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address, bool inline_small_object) {
Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data,
bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address,
bool inline_small_object) {
auto status = WaitForActorRegistered(contained_object_ids);
if (!status.ok()) {
return status;
@ -894,7 +895,6 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef(
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
CurrentCallSite(), data_size + metadata->Size(),
/*is_reconstructable=*/false,
/*add_local_ref=*/true,
NodeID::FromBinary(rpc_address_.raylet_id()));
} else {
// Because in the remote worker's `HandleAssignObjectOwner`,
@ -937,9 +937,10 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef(
created_by_worker);
}
if (!status.ok()) {
RemoveLocalReference(*object_id);
if (owned_by_us) {
reference_counter_->RemoveOwnedObject(*object_id);
} else {
RemoveLocalReference(*object_id);
}
return status;
} else if (*data == nullptr) {
@ -968,13 +969,16 @@ Status CoreWorker::CreateExisting(const std::shared_ptr<Buffer> &metadata,
Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object,
const std::unique_ptr<rpc::Address> &owner_address) {
bool owned_by_us = owner_address != nullptr
? WorkerID::FromBinary(owner_address->worker_id()) ==
WorkerID::FromBinary(rpc_address_.worker_id())
: true;
auto status = SealExisting(object_id, pin_object, std::move(owner_address));
if (status.ok()) return status;
RemoveLocalReference(object_id);
if (reference_counter_->HasReference(object_id)) {
RAY_LOG(WARNING)
<< "Object " << object_id
<< " failed to be put but has a nonzero ref count. This object may leak.";
if (owned_by_us) {
reference_counter_->RemoveOwnedObject(object_id);
} else {
RemoveLocalReference(object_id);
}
return status;
}
@ -2290,8 +2294,7 @@ std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
reference_counter_->AddOwnedObject(task_spec.ReturnId(i),
/*inner_ids=*/{}, rpc_address_,
CurrentCallSite(), -1,
/*is_reconstructable=*/false,
/*add_local_ref=*/true);
/*is_reconstructable=*/false);
}
rpc::ObjectReference ref;
ref.set_object_id(task_spec.ReturnId(i).Binary());
@ -3045,7 +3048,6 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re
reference_counter_->AddOwnedObject(
object_id, contained_object_ids, rpc_address_, call_site, request.object_size(),
/*is_reconstructable=*/false,
/*add_local_ref=*/false,
/*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id()));
reference_counter_->AddBorrowerAddress(object_id, borrower_address);
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));

View file

@ -260,14 +260,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Create and return a buffer in the object store that can be directly written
/// into. After writing to the buffer, the caller must call `SealOwned()` to
/// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and
/// `SealOwned()` combination is an alternative interface to `Put()` that
/// allows frontends to avoid an extra copy when possible.
///
/// Note that this call also initializes the local reference count for the
/// object to 1 so that the ref is considered in scope. The caller must
/// ensure that they decrement the ref count once the returned ObjectRef has
/// gone out of scope.
/// finalize the object. The `CreateOwned()` and `SealOwned()` combination is
/// an alternative interface to `Put()` that allows frontends to avoid an extra
/// copy when possible.
///
/// \param[in] metadata Metadata of the object to be written.
/// \param[in] data_size Size of the object to be written.
@ -280,12 +275,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] inline_small_object Whether to inline create this object if it's
/// small.
/// \return Status.
Status CreateOwnedAndIncrementLocalRef(
const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids, ObjectID *object_id,
std::shared_ptr<Buffer> *data, bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address = nullptr,
bool inline_small_object = true);
Status CreateOwned(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data,
bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address = nullptr,
bool inline_small_object = true);
/// Create and return a buffer in the object store that can be directly written
/// into, for an object ID that already exists. After writing to the buffer, the
@ -306,10 +301,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Finalize placing an object into the object store. This should be called after
/// a corresponding `CreateOwned()` call and then writing into the returned buffer.
///
/// If the object seal fails, then the initial local reference that was added
/// in CreateOwnedAndIncrementLocalRef will be deleted and the object will be
/// released by the ref counter.
///
/// \param[in] object_id Object ID corresponding to the object.
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
/// \param[in] The address of object's owner. If not provided,

View file

@ -38,7 +38,7 @@ Status PutSerializedObject(JNIEnv *env, jobject obj, ObjectID object_id,
for (const auto &ref : native_ray_object->GetNestedRefs()) {
nested_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
status = CoreWorkerProcess::GetCoreWorker().CreateOwnedAndIncrementLocalRef(
status = CoreWorkerProcess::GetCoreWorker().CreateOwned(
native_ray_object->GetMetadata(), data_size, nested_ids, out_object_id, &data,
/*created_by_worker=*/true,
/*owner_address=*/owner_address);

View file

@ -169,7 +169,6 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
const rpc::Address &owner_address,
const std::string &call_site,
const int64_t object_size, bool is_reconstructable,
bool add_local_ref,
const absl::optional<NodeID> &pinned_at_raylet_id) {
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
absl::MutexLock lock(&mutex_);
@ -198,10 +197,6 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
auto back_it = reconstructable_owned_objects_.end();
back_it--;
RAY_CHECK(reconstructable_owned_objects_index_.emplace(object_id, back_it).second);
if (add_local_ref) {
it->second.local_ref_count++;
}
}
void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) {

View file

@ -42,7 +42,7 @@ class ReferenceCounterInterface {
virtual void AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size, bool is_reconstructable, bool add_local_ref,
const int64_t object_size, bool is_reconstructable,
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>()) = 0;
virtual bool SetDeleteCallback(
const ObjectID &object_id,
@ -167,15 +167,10 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[in] object_size Object size if known, otherwise -1;
/// \param[in] is_reconstructable Whether the object can be reconstructed
/// through lineage re-execution.
/// \param[in] add_local_ref Whether to initialize the local ref count to 1.
/// This is used to ensure that the ref is considered in scope before the
/// corresponding ObjectRef has been returned to the language frontend.
/// \param[in] pinned_at_raylet_id The primary location for the object, if it
/// is already known. This is only used for ray.put calls.
void AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size, bool is_reconstructable, bool add_local_ref,
const int64_t object_size, bool is_reconstructable,
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>())
LOCKS_EXCLUDED(mutex_);

View file

@ -337,7 +337,8 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
// The below methods mirror a core worker's operations, e.g., `Put` simulates
// a ray.put().
void Put(const ObjectID &object_id) {
rc_.AddOwnedObject(object_id, {}, address_, "", 0, false, /*add_local_ref=*/true);
rc_.AddOwnedObject(object_id, {}, address_, "", 0, false);
rc_.AddLocalReference(object_id, "");
}
void PutWithForeignOwner(const ObjectID &object_id, const rpc::Address &owner_address) {
@ -346,8 +347,8 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
}
void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) {
rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false,
/*add_local_ref=*/true);
rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false);
rc_.AddLocalReference(outer_id, "");
}
void GetSerializedObjectId(const ObjectID outer_id, const ObjectID &inner_id,
@ -369,7 +370,9 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
if (!arg_id.IsNil()) {
rc_.UpdateSubmittedTaskReferences({return_id}, {arg_id});
}
rc_.AddOwnedObject(return_id, {}, address_, "", 0, false, /*add_local_ref=*/true);
rc_.AddOwnedObject(return_id, {}, address_, "", 0, false);
// Add a sentinel reference to keep all nested object IDs in scope.
rc_.AddLocalReference(return_id, "");
return_ids_.push_back(return_id);
return return_id;
}
@ -533,9 +536,11 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) {
// The object goes out of scope once it has no more refs.
std::vector<ObjectID> out;
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, address, "", 0, false);
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_FALSE(*out_of_scope);
rc->AddLocalReference(id, "");
ASSERT_FALSE(*out_of_scope);
rc->RemoveLocalReference(id, &out);
ASSERT_TRUE(*out_of_scope);
@ -543,7 +548,7 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) {
// lineage ref count.
*out_of_scope = false;
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
rc->AddOwnedObject(id, {}, address, "", 0, false);
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
rc->UpdateSubmittedTaskReferences({}, {id});
ASSERT_FALSE(*out_of_scope);
@ -570,14 +575,16 @@ TEST_F(ReferenceCountTest, TestReferenceStats) {
ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42");
rc->RemoveLocalReference(id1, nullptr);
rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false, /*add_local_ref=*/true);
rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false);
rpc::CoreWorkerStats stats2;
rc->AddObjectRefStats({}, &stats2);
ASSERT_EQ(stats2.object_refs_size(), 1);
ASSERT_EQ(stats2.object_refs(0).object_id(), id2.Binary());
ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 1);
ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 0);
ASSERT_EQ(stats2.object_refs(0).object_size(), 100);
ASSERT_EQ(stats2.object_refs(0).call_site(), "file2.py:43");
rc->AddLocalReference(id2, "");
rc->RemoveLocalReference(id2, nullptr);
}
@ -594,7 +601,7 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) {
// locality data.
int64_t object_size = 100;
rc->AddOwnedObject(obj1, {}, address, "file2.py:42", object_size, false,
/*add_local_ref=*/true, absl::optional<NodeID>(node1));
absl::optional<NodeID>(node1));
auto locality_data_obj1 = rc->GetLocalityData(obj1);
ASSERT_TRUE(locality_data_obj1.has_value());
ASSERT_EQ(locality_data_obj1->object_size, object_size);
@ -656,11 +663,13 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) {
// Fetching locality data for an object that doesn't have an object size defined
// should return a null optional.
rc->AddOwnedObject(obj2, {}, address, "file2.py:43", -1, false,
/*add_local_ref=*/true, absl::optional<NodeID>(node2));
absl::optional<NodeID>(node2));
auto locality_data_obj2_no_object_size = rc->GetLocalityData(obj2);
ASSERT_FALSE(locality_data_obj2_no_object_size.has_value());
rc->AddLocalReference(obj1, "");
rc->RemoveLocalReference(obj1, nullptr);
rc->AddLocalReference(obj2, "");
rc->RemoveLocalReference(obj2, nullptr);
}
@ -671,7 +680,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
auto object_id = ObjectID::FromRandom();
rpc::Address address;
address.set_ip_address("1234");
rc->AddOwnedObject(object_id, {}, address, "", 0, false, /*add_local_ref=*/true);
rc->AddOwnedObject(object_id, {}, address, "", 0, false);
TaskID added_id;
rpc::Address added_address;
@ -680,7 +689,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
auto object_id2 = ObjectID::FromRandom();
address.set_ip_address("5678");
rc->AddOwnedObject(object_id2, {}, address, "", 0, false, /*add_local_ref=*/true);
rc->AddOwnedObject(object_id2, {}, address, "", 0, false);
ASSERT_TRUE(rc->GetOwner(object_id2, &added_address));
ASSERT_EQ(address.ip_address(), added_address.ip_address());
@ -689,7 +698,9 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
rc->AddLocalReference(object_id3, "");
ASSERT_FALSE(rc->GetOwner(object_id3, &added_address));
rc->AddLocalReference(object_id, "");
rc->RemoveLocalReference(object_id, nullptr);
rc->AddLocalReference(object_id2, "");
rc->RemoveLocalReference(object_id2, nullptr);
rc->RemoveLocalReference(object_id3, nullptr);
}
@ -1658,8 +1669,7 @@ TEST(DistributedReferenceCountTest, TestForeignOwner) {
// Phase 3 -- foreign owner gets ref removed information.
//
// Emulate ref removed callback.
foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false,
/*add_local_ref=*/false);
foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false);
foreign_owner->rc_.AddBorrowerAddress(inner_id, owner->address_);
// Foreign owner waits on owner.
@ -2307,9 +2317,10 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)
// The object goes out of scope once it has no more refs.
std::vector<ObjectID> out;
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, address, "", 0, false);
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_FALSE(*out_of_scope);
rc->AddLocalReference(id, "");
ASSERT_FALSE(*out_of_scope);
rc->RemoveLocalReference(id, &out);
ASSERT_TRUE(*out_of_scope);
@ -2320,7 +2331,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)
// count.
*out_of_scope = false;
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
rc->AddOwnedObject(id, {}, address, "", 0, false);
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
rc->UpdateSubmittedTaskReferences({return_id}, {id});
ASSERT_TRUE(rc->IsObjectPendingCreation(return_id));
@ -2360,7 +2371,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) {
ASSERT_TRUE(lineage_deleted.empty());
// We should keep lineage for owned objects.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveLocalReference(id, nullptr);
ASSERT_EQ(lineage_deleted.size(), 1);
@ -2378,7 +2390,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
for (int i = 0; i < 3; i++) {
ObjectID id = ObjectID::FromRandom();
ids.push_back(id);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/false);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
}
rc->SetReleaseLineageCallback(
@ -2401,6 +2413,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
for (size_t i = 0; i < ids.size() - 1; i++) {
auto id = ids[i];
// Submit a dependent task on id.
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->UpdateSubmittedTaskReferences({}, {id});
rc->RemoveLocalReference(id, nullptr);
@ -2432,7 +2445,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) {
for (int i = 0; i < 3; i++) {
ObjectID id = ObjectID::FromRandom();
ids.push_back(id);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
}
std::vector<ObjectID> lineage_deleted;
rc->SetReleaseLineageCallback(
@ -2448,9 +2461,10 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) {
// ID1 depends on ID0.
rc->UpdateSubmittedTaskReferences({ids[1]}, {ids[0]});
rc->RemoveLocalReference(ids[0], nullptr);
rc->UpdateFinishedTaskReferences({ids[1]}, {ids[0]}, /*release_lineage=*/false,
empty_borrower, empty_refs, nullptr);
rc->AddLocalReference(ids[1], "");
rc->AddLocalReference(ids[2], "");
bool lineage_evicted = false;
for (const auto &id : ids) {
@ -2478,7 +2492,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
std::vector<ObjectID> lineage_deleted;
ObjectID id = ObjectID::FromRandom();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
@ -2487,6 +2501,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
});
// Local references.
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
// Submit 2 dependent tasks.
@ -2525,7 +2540,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ObjectID id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
@ -2540,7 +2556,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
rc->UpdateObjectPinnedAtRaylet(id, node_id);
auto objects = rc->ResetObjectsOnRemovedNode(node_id);
@ -2561,8 +2578,9 @@ TEST_F(ReferenceCountTest, TestFree) {
NodeID node_id = NodeID::FromRandom();
// Test free before receiving information about where the object is pinned.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
rc->AddLocalReference(id, "");
rc->FreePlasmaObjects({id});
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
@ -2579,7 +2597,8 @@ TEST_F(ReferenceCountTest, TestFree) {
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
// Test free after receiving information about where the object is pinned.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
rc->UpdateObjectPinnedAtRaylet(id, node_id);
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
@ -2597,7 +2616,7 @@ TEST_F(ReferenceCountTest, TestRemoveOwnedObject) {
ObjectID id = ObjectID::FromRandom();
// Test remove owned object.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false, /*add_local_ref=*/false);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false);
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveOwnedObject(id);
ASSERT_FALSE(rc->HasReference(id));
@ -2649,11 +2668,10 @@ TEST_F(ReferenceCountTest, TestDelayedWaitForRefRemoved) {
// Owner owns a nested object ref, borrower is using the outer ObjectRef.
ObjectID outer_id = ObjectID::FromRandom();
ObjectID inner_id = ObjectID::FromRandom();
owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false);
owner->rc_.AddBorrowerAddress(outer_id, borrower->address_);
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
/*add_local_ref=*/true);
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
owner->rc_.AddLocalReference(inner_id, "");
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
@ -2692,12 +2710,9 @@ TEST_F(ReferenceCountTest, TestRepeatedDeserialization) {
ObjectID outer_id = ObjectID::FromRandom();
ObjectID middle_id = ObjectID::FromRandom();
ObjectID inner_id = ObjectID::FromRandom();
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
owner->rc_.AddBorrowerAddress(outer_id, borrower->address_);
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
@ -2743,12 +2758,9 @@ TEST_F(ReferenceCountTest, TestForwardNestedRefs) {
ObjectID outer_id = ObjectID::FromRandom();
ObjectID middle_id = ObjectID::FromRandom();
ObjectID inner_id = ObjectID::FromRandom();
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false,
/*add_local_ref=*/false);
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
owner->rc_.AddBorrowerAddress(outer_id, borrower1->address_);
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
ASSERT_TRUE(owner->rc_.HasReference(middle_id));

View file

@ -70,14 +70,9 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// publish the WaitForRefRemoved message that we are now a borrower for
// the inner IDs. Note that this message can be received *before* the
// PushTaskReply.
// NOTE(swang): We increment the local ref count to ensure that the
// object is considered in scope before we return the ObjectRef to the
// language frontend. Note that the language bindings should set
// skip_adding_local_ref=True to avoid double referencing the object.
reference_counter_->AddOwnedObject(return_id,
/*inner_ids=*/{}, caller_address, call_site, -1,
/*is_reconstructable=*/is_reconstructable,
/*add_local_ref=*/true);
/*is_reconstructable=*/is_reconstructable);
}
return_ids.push_back(return_id);

View file

@ -99,11 +99,6 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Add a task that is pending execution.
///
/// The local ref count for all return refs (excluding actor creation tasks)
/// will be initialized to 1 so that the ref is considered in scope before
/// returning to the language frontend. The caller is responsible for
/// decrementing the ref count once the frontend ref has gone out of scope.
///
/// \param[in] caller_address The rpc address of the calling task.
/// \param[in] spec The spec of the pending task.
/// \param[in] max_retries Number of times this task may be retried

View file

@ -103,11 +103,10 @@ class MockReferenceCounter : public ReferenceCounterInterface {
const rpc::Address &owner_address,
bool foreign_owner_already_monitoring));
MOCK_METHOD8(AddOwnedObject,
MOCK_METHOD7(AddOwnedObject,
void(const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size, bool is_reconstructable,
bool add_local_ref,
const absl::optional<NodeID> &pinned_at_raylet_id));
MOCK_METHOD2(SetDeleteCallback,

View file

@ -170,8 +170,7 @@ class ObjectRecoveryManagerTest : public ObjectRecoveryManagerTestBase {
TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) {
// Lineage recording disabled.
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_TRUE(object_directory_->Flush() == 1);
@ -193,8 +192,7 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) {
TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
std::vector<rpc::Address> addresses({rpc::Address()});
object_directory_->SetLocations(object_id, addresses);
@ -207,8 +205,7 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) {
TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
std::vector<rpc::Address> addresses({rpc::Address()});
object_directory_->SetLocations(object_id, addresses);
@ -221,8 +218,7 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
task_resubmitter_->AddTask(object_id.TaskId(), {});
ASSERT_TRUE(manager_.RecoverObject(object_id));
@ -234,8 +230,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ref_counter_->AddLocalReference(object_id, "");
ASSERT_TRUE(manager_.RecoverObject(object_id));
@ -270,8 +265,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
std::vector<ObjectID> dependencies;
for (int i = 0; i < 3; i++) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
task_resubmitter_->AddTask(object_id.TaskId(), dependencies);
dependencies = {object_id};
object_ids.push_back(object_id);
@ -288,8 +282,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
@ -301,12 +294,10 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
ObjectID dep_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true);
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
task_resubmitter_->AddTask(object_id.TaskId(), {dep_id});
RAY_LOG(INFO) << object_id;
@ -322,8 +313,7 @@ TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
/*add_local_ref=*/true);
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ref_counter_->AddLocalReference(object_id, "");
ref_counter_->EvictLineage(1);

View file

@ -124,6 +124,7 @@ TEST_F(TaskManagerTest, TestTaskSuccess) {
ASSERT_EQ(num_retries_, 0);
std::vector<ObjectID> removed;
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, &removed);
ASSERT_EQ(removed[0], return_id);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
@ -159,6 +160,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) {
ASSERT_EQ(num_retries_, 0);
std::vector<ObjectID> removed;
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, &removed);
ASSERT_EQ(removed[0], return_id);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
@ -219,6 +221,7 @@ TEST_F(TaskManagerTest, TestFailPendingTask) {
ASSERT_EQ(stored_error, rpc::ErrorType::LOCAL_RAYLET_DIED);
std::vector<ObjectID> removed;
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, &removed);
ASSERT_EQ(removed[0], return_id);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
@ -265,6 +268,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) {
ASSERT_EQ(stored_error, error);
std::vector<ObjectID> removed;
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, &removed);
ASSERT_EQ(removed[0], return_id);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
@ -322,6 +326,7 @@ TEST_F(TaskManagerTest, TestLineageEvicted) {
// Once the return ID goes out of scope, the task spec and its dependencies
// are released.
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
@ -340,6 +345,7 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) {
int num_retries = 3;
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -378,6 +384,7 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) {
int num_retries = 3;
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -411,6 +418,7 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) {
int num_retries = 3;
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -444,11 +452,13 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) {
rpc::Address caller_address;
ObjectID dep = ObjectID::FromRandom();
reference_counter_->AddLocalReference(dep, "");
for (int i = 0; i < 3; i++) {
auto spec = CreateTaskHelper(1, {dep});
int num_retries = 3;
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;
@ -489,7 +499,7 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
int num_retries = 3;
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
reference_counter_->RemoveLocalReference(dep, nullptr);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;
@ -502,13 +512,15 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
// No tasks should be pinned because they returned direct objects.
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
// Only the newest return ID should be in scope because all objects in the
// lineage were direct.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
// Only the dependency and the newest return ID should be in scope because
// all objects in the lineage were direct.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2);
reference_counter_->RemoveLocalReference(dep, nullptr);
dep = return_id;
}
// The task's return ID goes out of scope before the task finishes.
reference_counter_->RemoveLocalReference(dep, nullptr);
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
@ -540,6 +552,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
// The task completes.
reference_counter_->AddLocalReference(return_id, "");
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
@ -591,6 +604,8 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) {
// The task completes. Both return objects are stored in plasma.
{
reference_counter_->AddLocalReference(return_id1, "");
reference_counter_->AddLocalReference(return_id2, "");
rpc::PushTaskReply reply;
auto return_object1 = reply.add_return_objects();
return_object1->set_object_id(return_id1.Binary());