mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[core] Increment ref count when creating an ObjectRef to prevent object from going out of scope (#22120)
When a Ray program first creates an ObjectRef (via ray.put or task call), we add it with a ref count of 0 in the C++ backend because the language frontend will increment the initial local ref once we return the allocated ObjectID, then delete the local ref once the ObjectRef goes out of scope. Thus, there is a brief window where the object ref will appear to be out of scope. This can cause problems with async protocols that check whether the object is in scope or not, such as the previous bug fixed in #19910. Now that we plan to enable lineage reconstruction to automatically recover lost objects, this race condition can also be problematic because we use the ref count to decide whether an object needs to be recovered or not. This PR avoids these race conditions by incrementing the local ref count in the C++ backend when executing ray.put() and task calls. The frontend is then responsible for skipping the initial local ref increment when creating the ObjectRef. This is the same fix used in #19910, but generalized to all initial ObjectRefs. This is a re-merge for #21719 with a fix for removing the owned object ref if creation fails.
This commit is contained in:
parent
6b7d995e64
commit
dcd96ca348
21 changed files with 304 additions and 232 deletions
|
@ -163,7 +163,12 @@ inline ray::ObjectRef<T> Put(const T &obj) {
|
|||
auto buffer =
|
||||
std::make_shared<msgpack::sbuffer>(ray::internal::Serializer::Serialize(obj));
|
||||
auto id = ray::internal::GetRayRuntime()->Put(buffer);
|
||||
return ray::ObjectRef<T>(id);
|
||||
auto ref = ObjectRef<T>(id);
|
||||
// The core worker will add an initial ref to the put ID to
|
||||
// keep it in scope. Now that we've created the frontend
|
||||
// ObjectRef, remove this initial ref.
|
||||
ray::internal::GetRayRuntime()->RemoveLocalReference(id);
|
||||
return ref;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
|
|
@ -82,7 +82,12 @@ ObjectRef<boost::callable_traits::return_type_t<F>> ActorTaskCaller<F>::Remote(
|
|||
using ReturnType = boost::callable_traits::return_type_t<F>;
|
||||
auto returned_object_id =
|
||||
runtime_->CallActor(remote_function_holder_, id_, args_, task_options_);
|
||||
return ObjectRef<ReturnType>(returned_object_id);
|
||||
auto return_ref = ObjectRef<ReturnType>(returned_object_id);
|
||||
// The core worker will add an initial ref to each return ID to keep it in
|
||||
// scope. Now that we've created the frontend ObjectRef, remove this initial
|
||||
// ref.
|
||||
runtime_->RemoveLocalReference(returned_object_id);
|
||||
return return_ref;
|
||||
}
|
||||
|
||||
} // namespace internal
|
||||
|
|
|
@ -89,7 +89,12 @@ ObjectRef<boost::callable_traits::return_type_t<F>> TaskCaller<F>::Remote(
|
|||
|
||||
auto returned_object_id = runtime_->Call(remote_function_holder_, args_, task_options_);
|
||||
using ReturnType = boost::callable_traits::return_type_t<F>;
|
||||
return ObjectRef<ReturnType>(returned_object_id);
|
||||
auto return_ref = ObjectRef<ReturnType>(returned_object_id);
|
||||
// The core worker will add an initial ref to each return ID to keep it in
|
||||
// scope. Now that we've created the frontend ObjectRef, remove this initial
|
||||
// ref.
|
||||
runtime_->RemoveLocalReference(returned_object_id);
|
||||
return return_ref;
|
||||
}
|
||||
} // namespace internal
|
||||
} // namespace ray
|
||||
|
|
|
@ -76,7 +76,10 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
LOGGER.debug("Putting Object in Task {}.", workerContext.getCurrentTaskId());
|
||||
}
|
||||
ObjectId objectId = objectStore.put(obj);
|
||||
return new ObjectRefImpl<T>(objectId, (Class<T>) (obj == null ? Object.class : obj.getClass()));
|
||||
return new ObjectRefImpl<T>(
|
||||
objectId,
|
||||
(Class<T>) (obj == null ? Object.class : obj.getClass()),
|
||||
/*skipAddingLocalRef=*/ true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +91,10 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
ownerActor.getId());
|
||||
}
|
||||
ObjectId objectId = objectStore.put(obj, ownerActor.getId());
|
||||
return new ObjectRefImpl<T>(objectId, (Class<T>) (obj == null ? Object.class : obj.getClass()));
|
||||
return new ObjectRefImpl<T>(
|
||||
objectId,
|
||||
(Class<T>) (obj == null ? Object.class : obj.getClass()),
|
||||
/*skipAddingLocalRef=*/ true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -295,7 +301,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
if (returnIds.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
return new ObjectRefImpl(returnIds.get(0), returnType.get());
|
||||
return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -317,7 +323,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
if (returnIds.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
return new ObjectRefImpl(returnIds.get(0), returnType.get());
|
||||
return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,10 +32,22 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
|
|||
|
||||
private Class<T> type;
|
||||
|
||||
public ObjectRefImpl(ObjectId id, Class<T> type) {
|
||||
public ObjectRefImpl(ObjectId id, Class<T> type, boolean skipAddingLocalRef) {
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
addLocalReference();
|
||||
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
|
||||
Preconditions.checkState(workerId == null);
|
||||
workerId = runtime.getWorkerContext().getCurrentWorkerId();
|
||||
if (!skipAddingLocalRef) {
|
||||
runtime.getObjectStore().addLocalReference(workerId, id);
|
||||
}
|
||||
// We still add the reference so that the local ref count will be properly
|
||||
// decremented once this object is GCed.
|
||||
new ObjectRefImplReference(this);
|
||||
}
|
||||
|
||||
public ObjectRefImpl(ObjectId id, Class<T> type) {
|
||||
this(id, type, /*skipAddingLocalRef=*/ false);
|
||||
}
|
||||
|
||||
public ObjectRefImpl() {}
|
||||
|
@ -81,22 +93,19 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
|
|||
int len = in.readInt();
|
||||
byte[] ownerAddress = new byte[len];
|
||||
in.readFully(ownerAddress);
|
||||
addLocalReference();
|
||||
|
||||
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
|
||||
Preconditions.checkState(workerId == null);
|
||||
workerId = runtime.getWorkerContext().getCurrentWorkerId();
|
||||
runtime.getObjectStore().addLocalReference(workerId, id);
|
||||
new ObjectRefImplReference(this);
|
||||
|
||||
runtime
|
||||
.getObjectStore()
|
||||
.registerOwnershipInfoAndResolveFuture(
|
||||
this.id, ObjectSerializer.getOuterObjectId(), ownerAddress);
|
||||
}
|
||||
|
||||
private void addLocalReference() {
|
||||
Preconditions.checkState(workerId == null);
|
||||
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
|
||||
workerId = runtime.getWorkerContext().getCurrentWorkerId();
|
||||
runtime.getObjectStore().addLocalReference(workerId, id);
|
||||
new ObjectRefImplReference(this);
|
||||
}
|
||||
|
||||
private static final class ObjectRefImplReference
|
||||
extends FinalizableWeakReference<ObjectRefImpl<?>> {
|
||||
|
||||
|
|
|
@ -551,15 +551,6 @@ def put_object(obj, use_ray_put):
|
|||
return _put.remote(obj)
|
||||
|
||||
|
||||
def put_unpinned_object(obj):
|
||||
value = ray.worker.global_worker.get_serialization_context().serialize(obj)
|
||||
return ray.ObjectRef(
|
||||
ray.worker.global_worker.core_worker.put_serialized_object(
|
||||
value, pin_object=False
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100):
|
||||
ip_port = address.split(":")
|
||||
ip = ip_port[0]
|
||||
|
|
|
@ -194,13 +194,15 @@ cdef RayObjectsToDataMetadataPairs(
|
|||
return data_metadata_pairs
|
||||
|
||||
|
||||
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
|
||||
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs,
|
||||
skip_adding_local_ref):
|
||||
result = []
|
||||
for i in range(object_refs.size()):
|
||||
result.append(ObjectRef(
|
||||
object_refs[i].object_id(),
|
||||
object_refs[i].owner_address().SerializeAsString(),
|
||||
object_refs[i].call_site()))
|
||||
object_refs[i].call_site(),
|
||||
skip_adding_local_ref=skip_adding_local_ref))
|
||||
return result
|
||||
|
||||
|
||||
|
@ -361,10 +363,28 @@ cdef int prepare_actor_concurrency_groups(
|
|||
concurrency_groups.push_back(cg)
|
||||
return 1
|
||||
|
||||
cdef prepare_args(
|
||||
cdef prepare_args_and_increment_put_refs(
|
||||
CoreWorker core_worker,
|
||||
Language language, args,
|
||||
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor):
|
||||
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor,
|
||||
c_vector[CObjectID] *incremented_put_arg_ids):
|
||||
try:
|
||||
prepare_args_internal(core_worker, language, args, args_vector,
|
||||
function_descriptor, incremented_put_arg_ids)
|
||||
except Exception as e:
|
||||
# An error occurred during arg serialization. We must remove the
|
||||
# initial local ref for all args that were successfully put into the
|
||||
# local plasma store. These objects will then get released.
|
||||
for put_arg_id in dereference(incremented_put_arg_ids):
|
||||
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
|
||||
put_arg_id)
|
||||
raise e
|
||||
|
||||
cdef prepare_args_internal(
|
||||
CoreWorker core_worker,
|
||||
Language language, args,
|
||||
c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor,
|
||||
c_vector[CObjectID] *incremented_put_arg_ids):
|
||||
cdef:
|
||||
size_t size
|
||||
int64_t put_threshold
|
||||
|
@ -437,12 +457,16 @@ cdef prepare_args(
|
|||
inlined_ids.clear()
|
||||
total_inlined += <int64_t>size
|
||||
else:
|
||||
put_id = CObjectID.FromBinary(
|
||||
core_worker.put_serialized_object_and_increment_local_ref(
|
||||
serialized_arg, inline_small_object=False))
|
||||
args_vector.push_back(unique_ptr[CTaskArg](
|
||||
new CTaskArgByReference(CObjectID.FromBinary(
|
||||
core_worker.put_serialized_object(
|
||||
serialized_arg, inline_small_object=False)),
|
||||
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
|
||||
put_arg_call_site)))
|
||||
new CTaskArgByReference(
|
||||
put_id,
|
||||
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
|
||||
put_arg_call_site
|
||||
)))
|
||||
incremented_put_arg_ids.push_back(put_id)
|
||||
|
||||
|
||||
cdef raise_if_dependency_failed(arg):
|
||||
|
@ -604,7 +628,9 @@ cdef execute_task(
|
|||
args, kwargs = [], {}
|
||||
else:
|
||||
metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
|
||||
object_refs = VectorToObjectRefs(c_arg_refs)
|
||||
object_refs = VectorToObjectRefs(
|
||||
c_arg_refs,
|
||||
skip_adding_local_ref=False)
|
||||
|
||||
if core_worker.current_actor_is_asyncio():
|
||||
# We deserialize objects in event loop thread to
|
||||
|
@ -860,7 +886,9 @@ cdef c_vector[c_string] spill_objects_handler(
|
|||
c_vector[c_string] owner_addresses
|
||||
|
||||
with gil:
|
||||
object_refs = VectorToObjectRefs(object_refs_to_spill)
|
||||
object_refs = VectorToObjectRefs(
|
||||
object_refs_to_spill,
|
||||
skip_adding_local_ref=False)
|
||||
for i in range(object_refs_to_spill.size()):
|
||||
owner_addresses.push_back(
|
||||
object_refs_to_spill[i].owner_address()
|
||||
|
@ -896,7 +924,9 @@ cdef int64_t restore_spilled_objects_handler(
|
|||
size = object_urls.size()
|
||||
for i in range(size):
|
||||
urls.append(object_urls[i])
|
||||
object_refs = VectorToObjectRefs(object_refs_to_restore)
|
||||
object_refs = VectorToObjectRefs(
|
||||
object_refs_to_restore,
|
||||
skip_adding_local_ref=False)
|
||||
try:
|
||||
with ray.worker._changeproctitle(
|
||||
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
|
||||
|
@ -1211,7 +1241,8 @@ cdef class CoreWorker:
|
|||
|
||||
if object_ref is None:
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker()
|
||||
.CreateOwnedAndIncrementLocalRef(
|
||||
metadata, data_size, contained_ids,
|
||||
c_object_id, data, created_by_worker,
|
||||
move(c_owner_address),
|
||||
|
@ -1265,20 +1296,25 @@ cdef class CoreWorker:
|
|||
owner_address: Owner address for this object ref.
|
||||
"""
|
||||
cdef:
|
||||
CObjectID c_object_id
|
||||
CObjectID c_object_id = object_ref.native()
|
||||
shared_ptr[CBuffer] data_buf
|
||||
shared_ptr[CBuffer] metadata_buf
|
||||
int64_t put_threshold
|
||||
c_vector[CObjectID] c_object_id_vector
|
||||
unique_ptr[CAddress] c_owner_address
|
||||
unique_ptr[CAddress] c_owner_address = move(self._convert_python_address(
|
||||
object_ref.owner_address()))
|
||||
|
||||
# TODO(suquark): This method does not support put objects to
|
||||
# in memory store currently.
|
||||
metadata_buf = string_to_buffer(metadata)
|
||||
object_already_exists = self._create_put_buffer(
|
||||
metadata_buf, data_size, object_ref,
|
||||
ObjectRefsToVector([]),
|
||||
&c_object_id, &data_buf, False, owner_address)
|
||||
if object_already_exists:
|
||||
|
||||
status = CCoreWorkerProcess.GetCoreWorker().CreateExisting(
|
||||
metadata_buf, data_size, object_ref.native(),
|
||||
dereference(c_owner_address), &data_buf,
|
||||
False)
|
||||
if not status.ok():
|
||||
logger.debug("Error putting restored object into plasma.")
|
||||
return
|
||||
if data_buf == NULL:
|
||||
logger.debug("Object already exists in 'put_file_like_object'.")
|
||||
return
|
||||
data = Buffer.make(data_buf)
|
||||
|
@ -1287,7 +1323,6 @@ cdef class CoreWorker:
|
|||
while index < data_size:
|
||||
bytes_read = file_like.readinto(view[index:])
|
||||
index += bytes_read
|
||||
c_owner_address = move(self._convert_python_address(owner_address))
|
||||
with nogil:
|
||||
# Using custom object refs is not supported because we
|
||||
# can't track their lifecycle, so we don't pin the object
|
||||
|
@ -1295,13 +1330,13 @@ cdef class CoreWorker:
|
|||
check_status(
|
||||
CCoreWorkerProcess.GetCoreWorker().SealExisting(
|
||||
c_object_id, pin_object=False,
|
||||
owner_address=move(c_owner_address)))
|
||||
owner_address=c_owner_address))
|
||||
|
||||
def put_serialized_object(self, serialized_object,
|
||||
ObjectRef object_ref=None,
|
||||
c_bool pin_object=True,
|
||||
owner_address=None,
|
||||
c_bool inline_small_object=True):
|
||||
def put_serialized_object_and_increment_local_ref(self, serialized_object,
|
||||
ObjectRef object_ref=None,
|
||||
c_bool pin_object=True,
|
||||
owner_address=None,
|
||||
c_bool inline_small_object=True):
|
||||
cdef:
|
||||
CObjectID c_object_id
|
||||
shared_ptr[CBuffer] data
|
||||
|
@ -1471,6 +1506,7 @@ cdef class CoreWorker:
|
|||
c_vector[unique_ptr[CTaskArg]] args_vector
|
||||
c_vector[CObjectReference] return_refs
|
||||
CSchedulingStrategy c_scheduling_strategy
|
||||
c_vector[CObjectID] incremented_put_arg_ids
|
||||
|
||||
self.python_scheduling_strategy_to_c(
|
||||
scheduling_strategy, &c_scheduling_strategy)
|
||||
|
@ -1479,8 +1515,9 @@ cdef class CoreWorker:
|
|||
prepare_resources(resources, &c_resources)
|
||||
ray_function = CRayFunction(
|
||||
language.lang, function_descriptor.descriptor)
|
||||
prepare_args(
|
||||
self, language, args, &args_vector, function_descriptor)
|
||||
prepare_args_and_increment_put_refs(
|
||||
self, language, args, &args_vector, function_descriptor,
|
||||
&incremented_put_arg_ids)
|
||||
|
||||
# NOTE(edoakes): releasing the GIL while calling this method causes
|
||||
# segfaults. See relevant issue for details:
|
||||
|
@ -1494,7 +1531,18 @@ cdef class CoreWorker:
|
|||
c_scheduling_strategy,
|
||||
debugger_breakpoint)
|
||||
|
||||
return VectorToObjectRefs(return_refs)
|
||||
# These arguments were serialized and put into the local object
|
||||
# store during task submission. The backend increments their local
|
||||
# ref count initially to ensure that they remain in scope until we
|
||||
# add to their submitted task ref count. Now that the task has
|
||||
# been submitted, it's safe to remove the initial local ref.
|
||||
for put_arg_id in incremented_put_arg_ids:
|
||||
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
|
||||
put_arg_id)
|
||||
|
||||
# The initial local reference is already acquired internally when
|
||||
# adding the pending task.
|
||||
return VectorToObjectRefs(return_refs, skip_adding_local_ref=True)
|
||||
|
||||
def create_actor(self,
|
||||
Language language,
|
||||
|
@ -1524,6 +1572,7 @@ cdef class CoreWorker:
|
|||
CActorID c_actor_id
|
||||
c_vector[CConcurrencyGroup] c_concurrency_groups
|
||||
CSchedulingStrategy c_scheduling_strategy
|
||||
c_vector[CObjectID] incremented_put_arg_ids
|
||||
optional[c_bool] is_detached_optional = nullopt
|
||||
|
||||
self.python_scheduling_strategy_to_c(
|
||||
|
@ -1534,8 +1583,9 @@ cdef class CoreWorker:
|
|||
prepare_resources(placement_resources, &c_placement_resources)
|
||||
ray_function = CRayFunction(
|
||||
language.lang, function_descriptor.descriptor)
|
||||
prepare_args(
|
||||
self, language, args, &args_vector, function_descriptor)
|
||||
prepare_args_and_increment_put_refs(
|
||||
self, language, args, &args_vector, function_descriptor,
|
||||
&incremented_put_arg_ids)
|
||||
prepare_actor_concurrency_groups(
|
||||
concurrency_groups_dict, &c_concurrency_groups)
|
||||
|
||||
|
@ -1544,7 +1594,7 @@ cdef class CoreWorker:
|
|||
True if is_detached else False)
|
||||
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor(
|
||||
status = CCoreWorkerProcess.GetCoreWorker().CreateActor(
|
||||
ray_function, args_vector,
|
||||
CActorCreationOptions(
|
||||
max_restarts, max_task_retries, max_concurrency,
|
||||
|
@ -1560,7 +1610,18 @@ cdef class CoreWorker:
|
|||
is_asyncio or max_concurrency > 1,
|
||||
max_pending_calls),
|
||||
extension_data,
|
||||
&c_actor_id))
|
||||
&c_actor_id)
|
||||
|
||||
# These arguments were serialized and put into the local object
|
||||
# store during task submission. The backend increments their local
|
||||
# ref count initially to ensure that they remain in scope until we
|
||||
# add to their submitted task ref count. Now that the task has
|
||||
# been submitted, it's safe to remove the initial local ref.
|
||||
for put_arg_id in incremented_put_arg_ids:
|
||||
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
|
||||
put_arg_id)
|
||||
|
||||
check_status(status)
|
||||
|
||||
return ActorID(c_actor_id.Binary())
|
||||
|
||||
|
@ -1641,14 +1702,16 @@ cdef class CoreWorker:
|
|||
CRayFunction ray_function
|
||||
c_vector[unique_ptr[CTaskArg]] args_vector
|
||||
optional[c_vector[CObjectReference]] return_refs
|
||||
c_vector[CObjectID] incremented_put_arg_ids
|
||||
|
||||
with self.profile_event(b"submit_task"):
|
||||
if num_method_cpus > 0:
|
||||
c_resources[b"CPU"] = num_method_cpus
|
||||
ray_function = CRayFunction(
|
||||
language.lang, function_descriptor.descriptor)
|
||||
prepare_args(
|
||||
self, language, args, &args_vector, function_descriptor)
|
||||
prepare_args_and_increment_put_refs(
|
||||
self, language, args, &args_vector, function_descriptor,
|
||||
&incremented_put_arg_ids)
|
||||
|
||||
# NOTE(edoakes): releasing the GIL while calling this method causes
|
||||
# segfaults. See relevant issue for details:
|
||||
|
@ -1659,8 +1722,20 @@ cdef class CoreWorker:
|
|||
args_vector,
|
||||
CTaskOptions(
|
||||
name, num_returns, c_resources, concurrency_group_name))
|
||||
# These arguments were serialized and put into the local object
|
||||
# store during task submission. The backend increments their local
|
||||
# ref count initially to ensure that they remain in scope until we
|
||||
# add to their submitted task ref count. Now that the task has
|
||||
# been submitted, it's safe to remove the initial local ref.
|
||||
for put_arg_id in incremented_put_arg_ids:
|
||||
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
|
||||
put_arg_id)
|
||||
|
||||
if return_refs.has_value():
|
||||
return VectorToObjectRefs(return_refs.value())
|
||||
# The initial local reference is already acquired internally
|
||||
# when adding the pending task.
|
||||
return VectorToObjectRefs(return_refs.value(),
|
||||
skip_adding_local_ref=True)
|
||||
else:
|
||||
actor = self.get_actor_handle(actor_id)
|
||||
actor_handle = (CCoreWorkerProcess.GetCoreWorker()
|
||||
|
|
|
@ -199,13 +199,14 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
|||
CRayStatus Put(const CRayObject &object,
|
||||
const c_vector[CObjectID] &contained_object_ids,
|
||||
const CObjectID &object_id)
|
||||
CRayStatus CreateOwned(const shared_ptr[CBuffer] &metadata,
|
||||
const size_t data_size,
|
||||
const c_vector[CObjectID] &contained_object_ids,
|
||||
CObjectID *object_id, shared_ptr[CBuffer] *data,
|
||||
c_bool created_by_worker,
|
||||
const unique_ptr[CAddress] &owner_address,
|
||||
c_bool inline_small_object)
|
||||
CRayStatus CreateOwnedAndIncrementLocalRef(
|
||||
const shared_ptr[CBuffer] &metadata,
|
||||
const size_t data_size,
|
||||
const c_vector[CObjectID] &contained_object_ids,
|
||||
CObjectID *object_id, shared_ptr[CBuffer] *data,
|
||||
c_bool created_by_worker,
|
||||
const unique_ptr[CAddress] &owner_address,
|
||||
c_bool inline_small_object)
|
||||
CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata,
|
||||
const size_t data_size,
|
||||
const CObjectID &object_id,
|
||||
|
|
|
@ -314,14 +314,11 @@ class Worker:
|
|||
# removed before this one, it will corrupt the state in the
|
||||
# reference counter.
|
||||
return ray.ObjectRef(
|
||||
self.core_worker.put_serialized_object(
|
||||
self.core_worker.put_serialized_object_and_increment_local_ref(
|
||||
serialized_value, object_ref=object_ref, owner_address=owner_address
|
||||
),
|
||||
# If the owner address is set, then the initial reference is
|
||||
# already acquired internally in CoreWorker::CreateOwned.
|
||||
# TODO(ekl) we should unify the code path more with the others
|
||||
# to avoid this special case.
|
||||
skip_adding_local_ref=(owner_address is not None),
|
||||
# The initial local reference is already acquired internally.
|
||||
skip_adding_local_ref=True,
|
||||
)
|
||||
|
||||
def raise_errors(self, data_metadata_pairs, object_refs):
|
||||
|
|
|
@ -111,10 +111,13 @@ bool ActorManager::AddNewActorHandle(std::unique_ptr<ActorHandle> actor_handle,
|
|||
const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id);
|
||||
// Detached actor doesn't need ref counting.
|
||||
if (!is_detached) {
|
||||
// We don't need to add an initial local ref here because it will get added
|
||||
// in AddActorHandle.
|
||||
reference_counter_->AddOwnedObject(actor_creation_return_id,
|
||||
/*inner_ids=*/{}, caller_address, call_site,
|
||||
/*object_size*/ -1,
|
||||
/*is_reconstructable=*/true);
|
||||
/*is_reconstructable=*/true,
|
||||
/*add_local_ref=*/false);
|
||||
}
|
||||
|
||||
return AddActorHandle(std::move(actor_handle), cached_actor_name,
|
||||
|
|
|
@ -827,12 +827,13 @@ Status CoreWorker::Put(const RayObject &object,
|
|||
ObjectID *object_id) {
|
||||
*object_id = ObjectID::FromIndex(worker_context_.GetCurrentInternalTaskId(),
|
||||
worker_context_.GetNextPutIndex());
|
||||
reference_counter_->AddOwnedObject(
|
||||
*object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(),
|
||||
/*is_reconstructable=*/false, NodeID::FromBinary(rpc_address_.raylet_id()));
|
||||
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
|
||||
CurrentCallSite(), object.GetSize(),
|
||||
/*is_reconstructable=*/false, /*add_local_ref=*/true,
|
||||
NodeID::FromBinary(rpc_address_.raylet_id()));
|
||||
auto status = Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
|
||||
if (!status.ok()) {
|
||||
reference_counter_->RemoveOwnedObject(*object_id);
|
||||
RemoveLocalReference(*object_id);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
@ -876,13 +877,11 @@ Status CoreWorker::Put(const RayObject &object,
|
|||
return PutInLocalPlasmaStore(object, object_id, pin_object);
|
||||
}
|
||||
|
||||
Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
|
||||
const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids,
|
||||
ObjectID *object_id, std::shared_ptr<Buffer> *data,
|
||||
bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> &owner_address,
|
||||
bool inline_small_object) {
|
||||
Status CoreWorker::CreateOwnedAndIncrementLocalRef(
|
||||
const std::shared_ptr<Buffer> &metadata, const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids, ObjectID *object_id,
|
||||
std::shared_ptr<Buffer> *data, bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> &owner_address, bool inline_small_object) {
|
||||
auto status = WaitForActorRegistered(contained_object_ids);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
|
@ -896,6 +895,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
|
|||
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
|
||||
CurrentCallSite(), data_size + metadata->Size(),
|
||||
/*is_reconstructable=*/false,
|
||||
/*add_local_ref=*/true,
|
||||
NodeID::FromBinary(rpc_address_.raylet_id()));
|
||||
} else {
|
||||
// Because in the remote worker's `HandleAssignObjectOwner`,
|
||||
|
@ -938,11 +938,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
|
|||
created_by_worker);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
if (owned_by_us) {
|
||||
reference_counter_->RemoveOwnedObject(*object_id);
|
||||
} else {
|
||||
RemoveLocalReference(*object_id);
|
||||
}
|
||||
RemoveLocalReference(*object_id);
|
||||
return status;
|
||||
} else if (*data == nullptr) {
|
||||
// Object already exists in plasma. Store the in-memory value so that the
|
||||
|
@ -970,16 +966,13 @@ Status CoreWorker::CreateExisting(const std::shared_ptr<Buffer> &metadata,
|
|||
|
||||
Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object,
|
||||
const std::unique_ptr<rpc::Address> &owner_address) {
|
||||
bool owned_by_us = owner_address != nullptr
|
||||
? WorkerID::FromBinary(owner_address->worker_id()) ==
|
||||
WorkerID::FromBinary(rpc_address_.worker_id())
|
||||
: true;
|
||||
auto status = SealExisting(object_id, pin_object, std::move(owner_address));
|
||||
if (status.ok()) return status;
|
||||
if (owned_by_us) {
|
||||
reference_counter_->RemoveOwnedObject(object_id);
|
||||
} else {
|
||||
RemoveLocalReference(object_id);
|
||||
RemoveLocalReference(object_id);
|
||||
if (reference_counter_->HasReference(object_id)) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "Object " << object_id
|
||||
<< " failed to be put but has a nonzero ref count. This object may leak.";
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
@ -2295,7 +2288,8 @@ std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
|
|||
reference_counter_->AddOwnedObject(task_spec.ReturnId(i),
|
||||
/*inner_ids=*/{}, rpc_address_,
|
||||
CurrentCallSite(), -1,
|
||||
/*is_reconstructable=*/false);
|
||||
/*is_reconstructable=*/false,
|
||||
/*add_local_ref=*/true);
|
||||
}
|
||||
rpc::ObjectReference ref;
|
||||
ref.set_object_id(task_spec.ReturnId(i).Binary());
|
||||
|
@ -3049,6 +3043,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re
|
|||
reference_counter_->AddOwnedObject(
|
||||
object_id, contained_object_ids, rpc_address_, call_site, request.object_size(),
|
||||
/*is_reconstructable=*/false,
|
||||
/*add_local_ref=*/false,
|
||||
/*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id()));
|
||||
reference_counter_->AddBorrowerAddress(object_id, borrower_address);
|
||||
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
|
||||
|
|
|
@ -260,9 +260,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
|
||||
/// Create and return a buffer in the object store that can be directly written
|
||||
/// into. After writing to the buffer, the caller must call `SealOwned()` to
|
||||
/// finalize the object. The `CreateOwned()` and `SealOwned()` combination is
|
||||
/// an alternative interface to `Put()` that allows frontends to avoid an extra
|
||||
/// copy when possible.
|
||||
/// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and
|
||||
/// `SealOwned()` combination is an alternative interface to `Put()` that
|
||||
/// allows frontends to avoid an extra copy when possible.
|
||||
///
|
||||
/// Note that this call also initializes the local reference count for the
|
||||
/// object to 1 so that the ref is considered in scope. The caller must
|
||||
/// ensure that they decrement the ref count once the returned ObjectRef has
|
||||
/// gone out of scope.
|
||||
///
|
||||
/// \param[in] metadata Metadata of the object to be written.
|
||||
/// \param[in] data_size Size of the object to be written.
|
||||
|
@ -275,12 +280,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
/// \param[in] inline_small_object Whether to inline create this object if it's
|
||||
/// small.
|
||||
/// \return Status.
|
||||
Status CreateOwned(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids,
|
||||
ObjectID *object_id, std::shared_ptr<Buffer> *data,
|
||||
bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> &owner_address = nullptr,
|
||||
bool inline_small_object = true);
|
||||
Status CreateOwnedAndIncrementLocalRef(
|
||||
const std::shared_ptr<Buffer> &metadata, const size_t data_size,
|
||||
const std::vector<ObjectID> &contained_object_ids, ObjectID *object_id,
|
||||
std::shared_ptr<Buffer> *data, bool created_by_worker,
|
||||
const std::unique_ptr<rpc::Address> &owner_address = nullptr,
|
||||
bool inline_small_object = true);
|
||||
|
||||
/// Create and return a buffer in the object store that can be directly written
|
||||
/// into, for an object ID that already exists. After writing to the buffer, the
|
||||
|
@ -301,6 +306,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
|||
/// Finalize placing an object into the object store. This should be called after
|
||||
/// a corresponding `CreateOwned()` call and then writing into the returned buffer.
|
||||
///
|
||||
/// If the object seal fails, then the initial local reference that was added
|
||||
/// in CreateOwnedAndIncrementLocalRef will be deleted and the object will be
|
||||
/// released by the ref counter.
|
||||
///
|
||||
/// \param[in] object_id Object ID corresponding to the object.
|
||||
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
|
||||
/// \param[in] The address of object's owner. If not provided,
|
||||
|
|
|
@ -38,7 +38,7 @@ Status PutSerializedObject(JNIEnv *env, jobject obj, ObjectID object_id,
|
|||
for (const auto &ref : native_ray_object->GetNestedRefs()) {
|
||||
nested_ids.push_back(ObjectID::FromBinary(ref.object_id()));
|
||||
}
|
||||
status = CoreWorkerProcess::GetCoreWorker().CreateOwned(
|
||||
status = CoreWorkerProcess::GetCoreWorker().CreateOwnedAndIncrementLocalRef(
|
||||
native_ray_object->GetMetadata(), data_size, nested_ids, out_object_id, &data,
|
||||
/*created_by_worker=*/true,
|
||||
/*owner_address=*/owner_address);
|
||||
|
|
|
@ -169,6 +169,7 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
|
|||
const rpc::Address &owner_address,
|
||||
const std::string &call_site,
|
||||
const int64_t object_size, bool is_reconstructable,
|
||||
bool add_local_ref,
|
||||
const absl::optional<NodeID> &pinned_at_raylet_id) {
|
||||
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
|
||||
absl::MutexLock lock(&mutex_);
|
||||
|
@ -197,20 +198,10 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
|
|||
auto back_it = reconstructable_owned_objects_.end();
|
||||
back_it--;
|
||||
RAY_CHECK(reconstructable_owned_objects_index_.emplace(object_id, back_it).second);
|
||||
}
|
||||
|
||||
void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
auto it = object_id_refs_.find(object_id);
|
||||
RAY_CHECK(it != object_id_refs_.end())
|
||||
<< "Tried to remove reference for nonexistent owned object " << object_id
|
||||
<< ", object must be added with ReferenceCounter::AddOwnedObject() before it "
|
||||
<< "can be removed";
|
||||
RAY_CHECK(it->second.RefCount() == 0)
|
||||
<< "Tried to remove reference for owned object " << object_id << " that has "
|
||||
<< it->second.RefCount() << " references, must have 0 references to be removed";
|
||||
RAY_LOG(DEBUG) << "Removing owned object " << object_id;
|
||||
DeleteReferenceInternal(it, nullptr);
|
||||
if (add_local_ref) {
|
||||
it->second.local_ref_count++;
|
||||
}
|
||||
}
|
||||
|
||||
void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) {
|
||||
|
|
|
@ -42,7 +42,7 @@ class ReferenceCounterInterface {
|
|||
virtual void AddOwnedObject(
|
||||
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
|
||||
const rpc::Address &owner_address, const std::string &call_site,
|
||||
const int64_t object_size, bool is_reconstructable,
|
||||
const int64_t object_size, bool is_reconstructable, bool add_local_ref,
|
||||
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>()) = 0;
|
||||
virtual bool SetDeleteCallback(
|
||||
const ObjectID &object_id,
|
||||
|
@ -167,22 +167,18 @@ class ReferenceCounter : public ReferenceCounterInterface,
|
|||
/// \param[in] object_size Object size if known, otherwise -1;
|
||||
/// \param[in] is_reconstructable Whether the object can be reconstructed
|
||||
/// through lineage re-execution.
|
||||
/// \param[in] add_local_ref Whether to initialize the local ref count to 1.
|
||||
/// This is used to ensure that the ref is considered in scope before the
|
||||
/// corresponding ObjectRef has been returned to the language frontend.
|
||||
/// \param[in] pinned_at_raylet_id The primary location for the object, if it
|
||||
/// is already known. This is only used for ray.put calls.
|
||||
void AddOwnedObject(
|
||||
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
|
||||
const rpc::Address &owner_address, const std::string &call_site,
|
||||
const int64_t object_size, bool is_reconstructable,
|
||||
const int64_t object_size, bool is_reconstructable, bool add_local_ref,
|
||||
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>())
|
||||
LOCKS_EXCLUDED(mutex_);
|
||||
|
||||
/// Remove reference for an object that we own. The reference will only be
|
||||
/// removed if the object's ref count is 0. This should only be used when
|
||||
/// speculatively adding an owned reference that may need to be rolled back, e.g. if
|
||||
/// the creation of the corresponding Plasma object fails. All other references will
|
||||
/// be cleaned up via the reference counting protocol.
|
||||
///
|
||||
/// \param[in] object_id The ID of the object that we own and wish to remove.
|
||||
void RemoveOwnedObject(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);
|
||||
|
||||
/// Update the size of the object.
|
||||
///
|
||||
/// \param[in] object_id The ID of the object.
|
||||
|
|
|
@ -337,8 +337,7 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
|
|||
// The below methods mirror a core worker's operations, e.g., `Put` simulates
|
||||
// a ray.put().
|
||||
void Put(const ObjectID &object_id) {
|
||||
rc_.AddOwnedObject(object_id, {}, address_, "", 0, false);
|
||||
rc_.AddLocalReference(object_id, "");
|
||||
rc_.AddOwnedObject(object_id, {}, address_, "", 0, false, /*add_local_ref=*/true);
|
||||
}
|
||||
|
||||
void PutWithForeignOwner(const ObjectID &object_id, const rpc::Address &owner_address) {
|
||||
|
@ -347,8 +346,8 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
|
|||
}
|
||||
|
||||
void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) {
|
||||
rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false);
|
||||
rc_.AddLocalReference(outer_id, "");
|
||||
rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false,
|
||||
/*add_local_ref=*/true);
|
||||
}
|
||||
|
||||
void GetSerializedObjectId(const ObjectID outer_id, const ObjectID &inner_id,
|
||||
|
@ -370,9 +369,7 @@ class MockWorkerClient : public MockCoreWorkerClientInterface {
|
|||
if (!arg_id.IsNil()) {
|
||||
rc_.UpdateSubmittedTaskReferences({return_id}, {arg_id});
|
||||
}
|
||||
rc_.AddOwnedObject(return_id, {}, address_, "", 0, false);
|
||||
// Add a sentinel reference to keep all nested object IDs in scope.
|
||||
rc_.AddLocalReference(return_id, "");
|
||||
rc_.AddOwnedObject(return_id, {}, address_, "", 0, false, /*add_local_ref=*/true);
|
||||
return_ids_.push_back(return_id);
|
||||
return return_id;
|
||||
}
|
||||
|
@ -536,11 +533,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) {
|
|||
// The object goes out of scope once it has no more refs.
|
||||
std::vector<ObjectID> out;
|
||||
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
ASSERT_FALSE(*out_of_scope);
|
||||
rc->AddLocalReference(id, "");
|
||||
ASSERT_FALSE(*out_of_scope);
|
||||
rc->RemoveLocalReference(id, &out);
|
||||
ASSERT_TRUE(*out_of_scope);
|
||||
|
||||
|
@ -548,7 +543,7 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) {
|
|||
// lineage ref count.
|
||||
*out_of_scope = false;
|
||||
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
rc->UpdateSubmittedTaskReferences({}, {id});
|
||||
ASSERT_FALSE(*out_of_scope);
|
||||
|
@ -575,16 +570,14 @@ TEST_F(ReferenceCountTest, TestReferenceStats) {
|
|||
ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42");
|
||||
rc->RemoveLocalReference(id1, nullptr);
|
||||
|
||||
rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false);
|
||||
rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false, /*add_local_ref=*/true);
|
||||
rpc::CoreWorkerStats stats2;
|
||||
rc->AddObjectRefStats({}, &stats2);
|
||||
ASSERT_EQ(stats2.object_refs_size(), 1);
|
||||
ASSERT_EQ(stats2.object_refs(0).object_id(), id2.Binary());
|
||||
ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 0);
|
||||
ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 1);
|
||||
ASSERT_EQ(stats2.object_refs(0).object_size(), 100);
|
||||
ASSERT_EQ(stats2.object_refs(0).call_site(), "file2.py:43");
|
||||
|
||||
rc->AddLocalReference(id2, "");
|
||||
rc->RemoveLocalReference(id2, nullptr);
|
||||
}
|
||||
|
||||
|
@ -601,7 +594,7 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) {
|
|||
// locality data.
|
||||
int64_t object_size = 100;
|
||||
rc->AddOwnedObject(obj1, {}, address, "file2.py:42", object_size, false,
|
||||
absl::optional<NodeID>(node1));
|
||||
/*add_local_ref=*/true, absl::optional<NodeID>(node1));
|
||||
auto locality_data_obj1 = rc->GetLocalityData(obj1);
|
||||
ASSERT_TRUE(locality_data_obj1.has_value());
|
||||
ASSERT_EQ(locality_data_obj1->object_size, object_size);
|
||||
|
@ -663,13 +656,11 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) {
|
|||
// Fetching locality data for an object that doesn't have an object size defined
|
||||
// should return a null optional.
|
||||
rc->AddOwnedObject(obj2, {}, address, "file2.py:43", -1, false,
|
||||
absl::optional<NodeID>(node2));
|
||||
/*add_local_ref=*/true, absl::optional<NodeID>(node2));
|
||||
auto locality_data_obj2_no_object_size = rc->GetLocalityData(obj2);
|
||||
ASSERT_FALSE(locality_data_obj2_no_object_size.has_value());
|
||||
|
||||
rc->AddLocalReference(obj1, "");
|
||||
rc->RemoveLocalReference(obj1, nullptr);
|
||||
rc->AddLocalReference(obj2, "");
|
||||
rc->RemoveLocalReference(obj2, nullptr);
|
||||
}
|
||||
|
||||
|
@ -680,7 +671,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
|
|||
auto object_id = ObjectID::FromRandom();
|
||||
rpc::Address address;
|
||||
address.set_ip_address("1234");
|
||||
rc->AddOwnedObject(object_id, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(object_id, {}, address, "", 0, false, /*add_local_ref=*/true);
|
||||
|
||||
TaskID added_id;
|
||||
rpc::Address added_address;
|
||||
|
@ -689,7 +680,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
|
|||
|
||||
auto object_id2 = ObjectID::FromRandom();
|
||||
address.set_ip_address("5678");
|
||||
rc->AddOwnedObject(object_id2, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(object_id2, {}, address, "", 0, false, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->GetOwner(object_id2, &added_address));
|
||||
ASSERT_EQ(address.ip_address(), added_address.ip_address());
|
||||
|
||||
|
@ -698,9 +689,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) {
|
|||
rc->AddLocalReference(object_id3, "");
|
||||
ASSERT_FALSE(rc->GetOwner(object_id3, &added_address));
|
||||
|
||||
rc->AddLocalReference(object_id, "");
|
||||
rc->RemoveLocalReference(object_id, nullptr);
|
||||
rc->AddLocalReference(object_id2, "");
|
||||
rc->RemoveLocalReference(object_id2, nullptr);
|
||||
rc->RemoveLocalReference(object_id3, nullptr);
|
||||
}
|
||||
|
@ -1669,7 +1658,8 @@ TEST(DistributedReferenceCountTest, TestForeignOwner) {
|
|||
// Phase 3 -- foreign owner gets ref removed information.
|
||||
//
|
||||
// Emulate ref removed callback.
|
||||
foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false);
|
||||
foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
foreign_owner->rc_.AddBorrowerAddress(inner_id, owner->address_);
|
||||
|
||||
// Foreign owner waits on owner.
|
||||
|
@ -2317,10 +2307,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)
|
|||
// The object goes out of scope once it has no more refs.
|
||||
std::vector<ObjectID> out;
|
||||
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
ASSERT_FALSE(*out_of_scope);
|
||||
rc->AddLocalReference(id, "");
|
||||
ASSERT_FALSE(*out_of_scope);
|
||||
rc->RemoveLocalReference(id, &out);
|
||||
ASSERT_TRUE(*out_of_scope);
|
||||
|
@ -2331,7 +2320,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)
|
|||
// count.
|
||||
*out_of_scope = false;
|
||||
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false);
|
||||
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
rc->UpdateSubmittedTaskReferences({return_id}, {id});
|
||||
ASSERT_TRUE(rc->IsObjectPendingCreation(return_id));
|
||||
|
@ -2371,8 +2360,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) {
|
|||
ASSERT_TRUE(lineage_deleted.empty());
|
||||
|
||||
// We should keep lineage for owned objects.
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false);
|
||||
rc->AddLocalReference(id, "");
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->HasReference(id));
|
||||
rc->RemoveLocalReference(id, nullptr);
|
||||
ASSERT_EQ(lineage_deleted.size(), 1);
|
||||
|
@ -2390,7 +2378,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
|
|||
for (int i = 0; i < 3; i++) {
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
ids.push_back(id);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/false);
|
||||
}
|
||||
|
||||
rc->SetReleaseLineageCallback(
|
||||
|
@ -2413,7 +2401,6 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
|
|||
for (size_t i = 0; i < ids.size() - 1; i++) {
|
||||
auto id = ids[i];
|
||||
// Submit a dependent task on id.
|
||||
rc->AddLocalReference(id, "");
|
||||
ASSERT_TRUE(rc->HasReference(id));
|
||||
rc->UpdateSubmittedTaskReferences({}, {id});
|
||||
rc->RemoveLocalReference(id, nullptr);
|
||||
|
@ -2445,7 +2432,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) {
|
|||
for (int i = 0; i < 3; i++) {
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
ids.push_back(id);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
}
|
||||
std::vector<ObjectID> lineage_deleted;
|
||||
rc->SetReleaseLineageCallback(
|
||||
|
@ -2461,10 +2448,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) {
|
|||
|
||||
// ID1 depends on ID0.
|
||||
rc->UpdateSubmittedTaskReferences({ids[1]}, {ids[0]});
|
||||
rc->RemoveLocalReference(ids[0], nullptr);
|
||||
rc->UpdateFinishedTaskReferences({ids[1]}, {ids[0]}, /*release_lineage=*/false,
|
||||
empty_borrower, empty_refs, nullptr);
|
||||
rc->AddLocalReference(ids[1], "");
|
||||
rc->AddLocalReference(ids[2], "");
|
||||
|
||||
bool lineage_evicted = false;
|
||||
for (const auto &id : ids) {
|
||||
|
@ -2492,7 +2478,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
|
|||
std::vector<ObjectID> lineage_deleted;
|
||||
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
|
||||
rc->SetReleaseLineageCallback(
|
||||
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
|
||||
|
@ -2501,7 +2487,6 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
|
|||
});
|
||||
|
||||
// Local references.
|
||||
rc->AddLocalReference(id, "");
|
||||
ASSERT_TRUE(rc->HasReference(id));
|
||||
|
||||
// Submit 2 dependent tasks.
|
||||
|
@ -2540,8 +2525,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
|
|||
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
NodeID node_id = NodeID::FromRandom();
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddLocalReference(id, "");
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
|
||||
ASSERT_TRUE(owned_by_us);
|
||||
|
@ -2556,8 +2540,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
|
|||
ASSERT_TRUE(deleted->count(id) > 0);
|
||||
deleted->clear();
|
||||
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddLocalReference(id, "");
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
rc->UpdateObjectPinnedAtRaylet(id, node_id);
|
||||
auto objects = rc->ResetObjectsOnRemovedNode(node_id);
|
||||
|
@ -2578,9 +2561,8 @@ TEST_F(ReferenceCountTest, TestFree) {
|
|||
NodeID node_id = NodeID::FromRandom();
|
||||
|
||||
// Test free before receiving information about where the object is pinned.
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
|
||||
rc->AddLocalReference(id, "");
|
||||
rc->FreePlasmaObjects({id});
|
||||
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
|
||||
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
|
||||
|
@ -2597,8 +2579,7 @@ TEST_F(ReferenceCountTest, TestFree) {
|
|||
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
|
||||
|
||||
// Test free after receiving information about where the object is pinned.
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
|
||||
rc->AddLocalReference(id, "");
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
|
||||
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
|
||||
rc->UpdateObjectPinnedAtRaylet(id, node_id);
|
||||
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
|
||||
|
@ -2612,16 +2593,6 @@ TEST_F(ReferenceCountTest, TestFree) {
|
|||
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
|
||||
}
|
||||
|
||||
TEST_F(ReferenceCountTest, TestRemoveOwnedObject) {
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
|
||||
// Test remove owned object.
|
||||
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false);
|
||||
ASSERT_TRUE(rc->HasReference(id));
|
||||
rc->RemoveOwnedObject(id);
|
||||
ASSERT_FALSE(rc->HasReference(id));
|
||||
}
|
||||
|
||||
TEST_F(ReferenceCountTest, TestGetObjectStatusReplyDelayed) {
|
||||
// https://github.com/ray-project/ray/issues/18557.
|
||||
// Check that we track an ObjectRef nested inside another borrowed ObjectRef.
|
||||
|
@ -2668,10 +2639,11 @@ TEST_F(ReferenceCountTest, TestDelayedWaitForRefRemoved) {
|
|||
// Owner owns a nested object ref, borrower is using the outer ObjectRef.
|
||||
ObjectID outer_id = ObjectID::FromRandom();
|
||||
ObjectID inner_id = ObjectID::FromRandom();
|
||||
owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddBorrowerAddress(outer_id, borrower->address_);
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddLocalReference(inner_id, "");
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/true);
|
||||
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
|
||||
ASSERT_TRUE(owner->rc_.HasReference(inner_id));
|
||||
|
||||
|
@ -2710,9 +2682,12 @@ TEST_F(ReferenceCountTest, TestRepeatedDeserialization) {
|
|||
ObjectID outer_id = ObjectID::FromRandom();
|
||||
ObjectID middle_id = ObjectID::FromRandom();
|
||||
ObjectID inner_id = ObjectID::FromRandom();
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddBorrowerAddress(outer_id, borrower->address_);
|
||||
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
|
||||
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
|
||||
|
@ -2758,9 +2733,12 @@ TEST_F(ReferenceCountTest, TestForwardNestedRefs) {
|
|||
ObjectID outer_id = ObjectID::FromRandom();
|
||||
ObjectID middle_id = ObjectID::FromRandom();
|
||||
ObjectID inner_id = ObjectID::FromRandom();
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false);
|
||||
owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false,
|
||||
/*add_local_ref=*/false);
|
||||
owner->rc_.AddBorrowerAddress(outer_id, borrower1->address_);
|
||||
ASSERT_TRUE(owner->rc_.HasReference(outer_id));
|
||||
ASSERT_TRUE(owner->rc_.HasReference(middle_id));
|
||||
|
|
|
@ -70,9 +70,14 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
|
|||
// publish the WaitForRefRemoved message that we are now a borrower for
|
||||
// the inner IDs. Note that this message can be received *before* the
|
||||
// PushTaskReply.
|
||||
// NOTE(swang): We increment the local ref count to ensure that the
|
||||
// object is considered in scope before we return the ObjectRef to the
|
||||
// language frontend. Note that the language bindings should set
|
||||
// skip_adding_local_ref=True to avoid double referencing the object.
|
||||
reference_counter_->AddOwnedObject(return_id,
|
||||
/*inner_ids=*/{}, caller_address, call_site, -1,
|
||||
/*is_reconstructable=*/is_reconstructable);
|
||||
/*is_reconstructable=*/is_reconstructable,
|
||||
/*add_local_ref=*/true);
|
||||
}
|
||||
|
||||
return_ids.push_back(return_id);
|
||||
|
|
|
@ -99,6 +99,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
|
|||
|
||||
/// Add a task that is pending execution.
|
||||
///
|
||||
/// The local ref count for all return refs (excluding actor creation tasks)
|
||||
/// will be initialized to 1 so that the ref is considered in scope before
|
||||
/// returning to the language frontend. The caller is responsible for
|
||||
/// decrementing the ref count once the frontend ref has gone out of scope.
|
||||
///
|
||||
/// \param[in] caller_address The rpc address of the calling task.
|
||||
/// \param[in] spec The spec of the pending task.
|
||||
/// \param[in] max_retries Number of times this task may be retried
|
||||
|
|
|
@ -103,10 +103,11 @@ class MockReferenceCounter : public ReferenceCounterInterface {
|
|||
const rpc::Address &owner_address,
|
||||
bool foreign_owner_already_monitoring));
|
||||
|
||||
MOCK_METHOD7(AddOwnedObject,
|
||||
MOCK_METHOD8(AddOwnedObject,
|
||||
void(const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
|
||||
const rpc::Address &owner_address, const std::string &call_site,
|
||||
const int64_t object_size, bool is_reconstructable,
|
||||
bool add_local_ref,
|
||||
const absl::optional<NodeID> &pinned_at_raylet_id));
|
||||
|
||||
MOCK_METHOD2(SetDeleteCallback,
|
||||
|
|
|
@ -170,7 +170,8 @@ class ObjectRecoveryManagerTest : public ObjectRecoveryManagerTestBase {
|
|||
TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) {
|
||||
// Lineage recording disabled.
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
ASSERT_TRUE(manager_.RecoverObject(object_id));
|
||||
ASSERT_TRUE(failed_reconstructions_.empty());
|
||||
ASSERT_TRUE(object_directory_->Flush() == 1);
|
||||
|
@ -192,7 +193,8 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) {
|
|||
|
||||
TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
std::vector<rpc::Address> addresses({rpc::Address()});
|
||||
object_directory_->SetLocations(object_id, addresses);
|
||||
|
||||
|
@ -205,7 +207,8 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
std::vector<rpc::Address> addresses({rpc::Address()});
|
||||
object_directory_->SetLocations(object_id, addresses);
|
||||
|
||||
|
@ -218,7 +221,8 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
task_resubmitter_->AddTask(object_id.TaskId(), {});
|
||||
|
||||
ASSERT_TRUE(manager_.RecoverObject(object_id));
|
||||
|
@ -230,7 +234,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
ref_counter_->AddLocalReference(object_id, "");
|
||||
|
||||
ASSERT_TRUE(manager_.RecoverObject(object_id));
|
||||
|
@ -265,7 +270,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
|
|||
std::vector<ObjectID> dependencies;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
task_resubmitter_->AddTask(object_id.TaskId(), dependencies);
|
||||
dependencies = {object_id};
|
||||
object_ids.push_back(object_id);
|
||||
|
@ -282,7 +288,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
|
||||
ASSERT_TRUE(manager_.RecoverObject(object_id));
|
||||
ASSERT_TRUE(object_directory_->Flush() == 1);
|
||||
|
@ -294,10 +301,12 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
|
||||
ObjectID dep_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
task_resubmitter_->AddTask(object_id.TaskId(), {dep_id});
|
||||
RAY_LOG(INFO) << object_id;
|
||||
|
||||
|
@ -313,7 +322,8 @@ TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
|
|||
|
||||
TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
|
||||
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true,
|
||||
/*add_local_ref=*/true);
|
||||
ref_counter_->AddLocalReference(object_id, "");
|
||||
ref_counter_->EvictLineage(1);
|
||||
|
||||
|
|
|
@ -124,7 +124,6 @@ TEST_F(TaskManagerTest, TestTaskSuccess) {
|
|||
ASSERT_EQ(num_retries_, 0);
|
||||
|
||||
std::vector<ObjectID> removed;
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(return_id, &removed);
|
||||
ASSERT_EQ(removed[0], return_id);
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
|
||||
|
@ -160,7 +159,6 @@ TEST_F(TaskManagerTest, TestTaskFailure) {
|
|||
ASSERT_EQ(num_retries_, 0);
|
||||
|
||||
std::vector<ObjectID> removed;
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(return_id, &removed);
|
||||
ASSERT_EQ(removed[0], return_id);
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
|
||||
|
@ -221,7 +219,6 @@ TEST_F(TaskManagerTest, TestFailPendingTask) {
|
|||
ASSERT_EQ(stored_error, rpc::ErrorType::LOCAL_RAYLET_DIED);
|
||||
|
||||
std::vector<ObjectID> removed;
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(return_id, &removed);
|
||||
ASSERT_EQ(removed[0], return_id);
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
|
||||
|
@ -268,7 +265,6 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) {
|
|||
ASSERT_EQ(stored_error, error);
|
||||
|
||||
std::vector<ObjectID> removed;
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(return_id, &removed);
|
||||
ASSERT_EQ(removed[0], return_id);
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
|
||||
|
@ -326,7 +322,6 @@ TEST_F(TaskManagerTest, TestLineageEvicted) {
|
|||
|
||||
// Once the return ID goes out of scope, the task spec and its dependencies
|
||||
// are released.
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(return_id, nullptr);
|
||||
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
|
||||
ASSERT_FALSE(reference_counter_->HasReference(return_id));
|
||||
|
@ -345,7 +340,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) {
|
|||
int num_retries = 3;
|
||||
manager_.AddPendingTask(caller_address, spec, "", num_retries);
|
||||
auto return_id = spec.ReturnId(0);
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
|
||||
|
||||
|
@ -384,7 +378,6 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) {
|
|||
int num_retries = 3;
|
||||
manager_.AddPendingTask(caller_address, spec, "", num_retries);
|
||||
auto return_id = spec.ReturnId(0);
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
|
||||
|
||||
|
@ -418,7 +411,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) {
|
|||
int num_retries = 3;
|
||||
manager_.AddPendingTask(caller_address, spec, "", num_retries);
|
||||
auto return_id = spec.ReturnId(0);
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
|
||||
|
||||
|
@ -452,13 +444,11 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) {
|
|||
rpc::Address caller_address;
|
||||
|
||||
ObjectID dep = ObjectID::FromRandom();
|
||||
reference_counter_->AddLocalReference(dep, "");
|
||||
for (int i = 0; i < 3; i++) {
|
||||
auto spec = CreateTaskHelper(1, {dep});
|
||||
int num_retries = 3;
|
||||
manager_.AddPendingTask(caller_address, spec, "", num_retries);
|
||||
auto return_id = spec.ReturnId(0);
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
|
||||
// The task completes.
|
||||
rpc::PushTaskReply reply;
|
||||
|
@ -499,7 +489,7 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
|
|||
int num_retries = 3;
|
||||
manager_.AddPendingTask(caller_address, spec, "", num_retries);
|
||||
auto return_id = spec.ReturnId(0);
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
reference_counter_->RemoveLocalReference(dep, nullptr);
|
||||
|
||||
// The task completes.
|
||||
rpc::PushTaskReply reply;
|
||||
|
@ -512,15 +502,13 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
|
|||
|
||||
// No tasks should be pinned because they returned direct objects.
|
||||
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
|
||||
// Only the dependency and the newest return ID should be in scope because
|
||||
// all objects in the lineage were direct.
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2);
|
||||
// Only the newest return ID should be in scope because all objects in the
|
||||
// lineage were direct.
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
|
||||
|
||||
reference_counter_->RemoveLocalReference(dep, nullptr);
|
||||
dep = return_id;
|
||||
}
|
||||
|
||||
// The task's return ID goes out of scope before the task finishes.
|
||||
reference_counter_->RemoveLocalReference(dep, nullptr);
|
||||
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
|
||||
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
|
||||
|
@ -552,7 +540,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
|
|||
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));
|
||||
|
||||
// The task completes.
|
||||
reference_counter_->AddLocalReference(return_id, "");
|
||||
rpc::PushTaskReply reply;
|
||||
auto return_object = reply.add_return_objects();
|
||||
return_object->set_object_id(return_id.Binary());
|
||||
|
@ -604,8 +591,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) {
|
|||
|
||||
// The task completes. Both return objects are stored in plasma.
|
||||
{
|
||||
reference_counter_->AddLocalReference(return_id1, "");
|
||||
reference_counter_->AddLocalReference(return_id2, "");
|
||||
rpc::PushTaskReply reply;
|
||||
auto return_object1 = reply.add_return_objects();
|
||||
return_object1->set_object_id(return_id1.Binary());
|
||||
|
|
Loading…
Add table
Reference in a new issue