diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h index 0a04aa6db..78b30790d 100644 --- a/cpp/include/ray/api.h +++ b/cpp/include/ray/api.h @@ -163,7 +163,12 @@ inline ray::ObjectRef Put(const T &obj) { auto buffer = std::make_shared(ray::internal::Serializer::Serialize(obj)); auto id = ray::internal::GetRayRuntime()->Put(buffer); - return ray::ObjectRef(id); + auto ref = ObjectRef(id); + // The core worker will add an initial ref to the put ID to + // keep it in scope. Now that we've created the frontend + // ObjectRef, remove this initial ref. + ray::internal::GetRayRuntime()->RemoveLocalReference(id); + return ref; } template diff --git a/cpp/include/ray/api/actor_task_caller.h b/cpp/include/ray/api/actor_task_caller.h index 9c9b3a0ea..5fb2ff510 100644 --- a/cpp/include/ray/api/actor_task_caller.h +++ b/cpp/include/ray/api/actor_task_caller.h @@ -82,7 +82,12 @@ ObjectRef> ActorTaskCaller::Remote( using ReturnType = boost::callable_traits::return_type_t; auto returned_object_id = runtime_->CallActor(remote_function_holder_, id_, args_, task_options_); - return ObjectRef(returned_object_id); + auto return_ref = ObjectRef(returned_object_id); + // The core worker will add an initial ref to each return ID to keep it in + // scope. Now that we've created the frontend ObjectRef, remove this initial + // ref. + runtime_->RemoveLocalReference(returned_object_id); + return return_ref; } } // namespace internal diff --git a/cpp/include/ray/api/task_caller.h b/cpp/include/ray/api/task_caller.h index 1f8109638..cf49b0902 100644 --- a/cpp/include/ray/api/task_caller.h +++ b/cpp/include/ray/api/task_caller.h @@ -89,7 +89,12 @@ ObjectRef> TaskCaller::Remote( auto returned_object_id = runtime_->Call(remote_function_holder_, args_, task_options_); using ReturnType = boost::callable_traits::return_type_t; - return ObjectRef(returned_object_id); + auto return_ref = ObjectRef(returned_object_id); + // The core worker will add an initial ref to each return ID to keep it in + // scope. Now that we've created the frontend ObjectRef, remove this initial + // ref. + runtime_->RemoveLocalReference(returned_object_id); + return return_ref; } } // namespace internal } // namespace ray diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index 2de394809..3f241cbee 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -76,7 +76,10 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { LOGGER.debug("Putting Object in Task {}.", workerContext.getCurrentTaskId()); } ObjectId objectId = objectStore.put(obj); - return new ObjectRefImpl(objectId, (Class) (obj == null ? Object.class : obj.getClass())); + return new ObjectRefImpl( + objectId, + (Class) (obj == null ? Object.class : obj.getClass()), + /*skipAddingLocalRef=*/ true); } @Override @@ -88,7 +91,10 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { ownerActor.getId()); } ObjectId objectId = objectStore.put(obj, ownerActor.getId()); - return new ObjectRefImpl(objectId, (Class) (obj == null ? Object.class : obj.getClass())); + return new ObjectRefImpl( + objectId, + (Class) (obj == null ? Object.class : obj.getClass()), + /*skipAddingLocalRef=*/ true); } @Override @@ -295,7 +301,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { if (returnIds.isEmpty()) { return null; } else { - return new ObjectRefImpl(returnIds.get(0), returnType.get()); + return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true); } } @@ -317,7 +323,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { if (returnIds.isEmpty()) { return null; } else { - return new ObjectRefImpl(returnIds.get(0), returnType.get()); + return new ObjectRefImpl(returnIds.get(0), returnType.get(), /*skipAddingLocalRef=*/ true); } } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java index cbe5e8a25..bb8aa0f9d 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java @@ -32,10 +32,22 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { private Class type; - public ObjectRefImpl(ObjectId id, Class type) { + public ObjectRefImpl(ObjectId id, Class type, boolean skipAddingLocalRef) { this.id = id; this.type = type; - addLocalReference(); + RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + Preconditions.checkState(workerId == null); + workerId = runtime.getWorkerContext().getCurrentWorkerId(); + if (!skipAddingLocalRef) { + runtime.getObjectStore().addLocalReference(workerId, id); + } + // We still add the reference so that the local ref count will be properly + // decremented once this object is GCed. + new ObjectRefImplReference(this); + } + + public ObjectRefImpl(ObjectId id, Class type) { + this(id, type, /*skipAddingLocalRef=*/ false); } public ObjectRefImpl() {} @@ -81,22 +93,19 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { int len = in.readInt(); byte[] ownerAddress = new byte[len]; in.readFully(ownerAddress); - addLocalReference(); + RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + Preconditions.checkState(workerId == null); + workerId = runtime.getWorkerContext().getCurrentWorkerId(); + runtime.getObjectStore().addLocalReference(workerId, id); + new ObjectRefImplReference(this); + runtime .getObjectStore() .registerOwnershipInfoAndResolveFuture( this.id, ObjectSerializer.getOuterObjectId(), ownerAddress); } - private void addLocalReference() { - Preconditions.checkState(workerId == null); - RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); - workerId = runtime.getWorkerContext().getCurrentWorkerId(); - runtime.getObjectStore().addLocalReference(workerId, id); - new ObjectRefImplReference(this); - } - private static final class ObjectRefImplReference extends FinalizableWeakReference> { diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 6ae3ed5cb..d1617f62b 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -551,15 +551,6 @@ def put_object(obj, use_ray_put): return _put.remote(obj) -def put_unpinned_object(obj): - value = ray.worker.global_worker.get_serialization_context().serialize(obj) - return ray.ObjectRef( - ray.worker.global_worker.core_worker.put_serialized_object( - value, pin_object=False - ) - ) - - def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100): ip_port = address.split(":") ip = ip_port[0] diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8144bf555..183c68f06 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -194,13 +194,15 @@ cdef RayObjectsToDataMetadataPairs( return data_metadata_pairs -cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs): +cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs, + skip_adding_local_ref): result = [] for i in range(object_refs.size()): result.append(ObjectRef( object_refs[i].object_id(), object_refs[i].owner_address().SerializeAsString(), - object_refs[i].call_site())) + object_refs[i].call_site(), + skip_adding_local_ref=skip_adding_local_ref)) return result @@ -361,10 +363,28 @@ cdef int prepare_actor_concurrency_groups( concurrency_groups.push_back(cg) return 1 -cdef prepare_args( +cdef prepare_args_and_increment_put_refs( CoreWorker core_worker, Language language, args, - c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor): + c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor, + c_vector[CObjectID] *incremented_put_arg_ids): + try: + prepare_args_internal(core_worker, language, args, args_vector, + function_descriptor, incremented_put_arg_ids) + except Exception as e: + # An error occurred during arg serialization. We must remove the + # initial local ref for all args that were successfully put into the + # local plasma store. These objects will then get released. + for put_arg_id in dereference(incremented_put_arg_ids): + CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( + put_arg_id) + raise e + +cdef prepare_args_internal( + CoreWorker core_worker, + Language language, args, + c_vector[unique_ptr[CTaskArg]] *args_vector, function_descriptor, + c_vector[CObjectID] *incremented_put_arg_ids): cdef: size_t size int64_t put_threshold @@ -437,12 +457,16 @@ cdef prepare_args( inlined_ids.clear() total_inlined += size else: + put_id = CObjectID.FromBinary( + core_worker.put_serialized_object_and_increment_local_ref( + serialized_arg, inline_small_object=False)) args_vector.push_back(unique_ptr[CTaskArg]( - new CTaskArgByReference(CObjectID.FromBinary( - core_worker.put_serialized_object( - serialized_arg, inline_small_object=False)), - CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), - put_arg_call_site))) + new CTaskArgByReference( + put_id, + CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), + put_arg_call_site + ))) + incremented_put_arg_ids.push_back(put_id) cdef raise_if_dependency_failed(arg): @@ -604,7 +628,9 @@ cdef execute_task( args, kwargs = [], {} else: metadata_pairs = RayObjectsToDataMetadataPairs(c_args) - object_refs = VectorToObjectRefs(c_arg_refs) + object_refs = VectorToObjectRefs( + c_arg_refs, + skip_adding_local_ref=False) if core_worker.current_actor_is_asyncio(): # We deserialize objects in event loop thread to @@ -860,7 +886,9 @@ cdef c_vector[c_string] spill_objects_handler( c_vector[c_string] owner_addresses with gil: - object_refs = VectorToObjectRefs(object_refs_to_spill) + object_refs = VectorToObjectRefs( + object_refs_to_spill, + skip_adding_local_ref=False) for i in range(object_refs_to_spill.size()): owner_addresses.push_back( object_refs_to_spill[i].owner_address() @@ -896,7 +924,9 @@ cdef int64_t restore_spilled_objects_handler( size = object_urls.size() for i in range(size): urls.append(object_urls[i]) - object_refs = VectorToObjectRefs(object_refs_to_restore) + object_refs = VectorToObjectRefs( + object_refs_to_restore, + skip_adding_local_ref=False) try: with ray.worker._changeproctitle( ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER, @@ -1211,7 +1241,8 @@ cdef class CoreWorker: if object_ref is None: with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned( + check_status(CCoreWorkerProcess.GetCoreWorker() + .CreateOwnedAndIncrementLocalRef( metadata, data_size, contained_ids, c_object_id, data, created_by_worker, move(c_owner_address), @@ -1265,20 +1296,25 @@ cdef class CoreWorker: owner_address: Owner address for this object ref. """ cdef: - CObjectID c_object_id + CObjectID c_object_id = object_ref.native() shared_ptr[CBuffer] data_buf shared_ptr[CBuffer] metadata_buf int64_t put_threshold - c_vector[CObjectID] c_object_id_vector - unique_ptr[CAddress] c_owner_address + unique_ptr[CAddress] c_owner_address = move(self._convert_python_address( + object_ref.owner_address())) + # TODO(suquark): This method does not support put objects to # in memory store currently. metadata_buf = string_to_buffer(metadata) - object_already_exists = self._create_put_buffer( - metadata_buf, data_size, object_ref, - ObjectRefsToVector([]), - &c_object_id, &data_buf, False, owner_address) - if object_already_exists: + + status = CCoreWorkerProcess.GetCoreWorker().CreateExisting( + metadata_buf, data_size, object_ref.native(), + dereference(c_owner_address), &data_buf, + False) + if not status.ok(): + logger.debug("Error putting restored object into plasma.") + return + if data_buf == NULL: logger.debug("Object already exists in 'put_file_like_object'.") return data = Buffer.make(data_buf) @@ -1287,7 +1323,6 @@ cdef class CoreWorker: while index < data_size: bytes_read = file_like.readinto(view[index:]) index += bytes_read - c_owner_address = move(self._convert_python_address(owner_address)) with nogil: # Using custom object refs is not supported because we # can't track their lifecycle, so we don't pin the object @@ -1295,13 +1330,13 @@ cdef class CoreWorker: check_status( CCoreWorkerProcess.GetCoreWorker().SealExisting( c_object_id, pin_object=False, - owner_address=move(c_owner_address))) + owner_address=c_owner_address)) - def put_serialized_object(self, serialized_object, - ObjectRef object_ref=None, - c_bool pin_object=True, - owner_address=None, - c_bool inline_small_object=True): + def put_serialized_object_and_increment_local_ref(self, serialized_object, + ObjectRef object_ref=None, + c_bool pin_object=True, + owner_address=None, + c_bool inline_small_object=True): cdef: CObjectID c_object_id shared_ptr[CBuffer] data @@ -1471,6 +1506,7 @@ cdef class CoreWorker: c_vector[unique_ptr[CTaskArg]] args_vector c_vector[CObjectReference] return_refs CSchedulingStrategy c_scheduling_strategy + c_vector[CObjectID] incremented_put_arg_ids self.python_scheduling_strategy_to_c( scheduling_strategy, &c_scheduling_strategy) @@ -1479,8 +1515,9 @@ cdef class CoreWorker: prepare_resources(resources, &c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) - prepare_args( - self, language, args, &args_vector, function_descriptor) + prepare_args_and_increment_put_refs( + self, language, args, &args_vector, function_descriptor, + &incremented_put_arg_ids) # NOTE(edoakes): releasing the GIL while calling this method causes # segfaults. See relevant issue for details: @@ -1494,7 +1531,18 @@ cdef class CoreWorker: c_scheduling_strategy, debugger_breakpoint) - return VectorToObjectRefs(return_refs) + # These arguments were serialized and put into the local object + # store during task submission. The backend increments their local + # ref count initially to ensure that they remain in scope until we + # add to their submitted task ref count. Now that the task has + # been submitted, it's safe to remove the initial local ref. + for put_arg_id in incremented_put_arg_ids: + CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( + put_arg_id) + + # The initial local reference is already acquired internally when + # adding the pending task. + return VectorToObjectRefs(return_refs, skip_adding_local_ref=True) def create_actor(self, Language language, @@ -1524,6 +1572,7 @@ cdef class CoreWorker: CActorID c_actor_id c_vector[CConcurrencyGroup] c_concurrency_groups CSchedulingStrategy c_scheduling_strategy + c_vector[CObjectID] incremented_put_arg_ids optional[c_bool] is_detached_optional = nullopt self.python_scheduling_strategy_to_c( @@ -1534,8 +1583,9 @@ cdef class CoreWorker: prepare_resources(placement_resources, &c_placement_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) - prepare_args( - self, language, args, &args_vector, function_descriptor) + prepare_args_and_increment_put_refs( + self, language, args, &args_vector, function_descriptor, + &incremented_put_arg_ids) prepare_actor_concurrency_groups( concurrency_groups_dict, &c_concurrency_groups) @@ -1544,7 +1594,7 @@ cdef class CoreWorker: True if is_detached else False) with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor( + status = CCoreWorkerProcess.GetCoreWorker().CreateActor( ray_function, args_vector, CActorCreationOptions( max_restarts, max_task_retries, max_concurrency, @@ -1560,7 +1610,18 @@ cdef class CoreWorker: is_asyncio or max_concurrency > 1, max_pending_calls), extension_data, - &c_actor_id)) + &c_actor_id) + + # These arguments were serialized and put into the local object + # store during task submission. The backend increments their local + # ref count initially to ensure that they remain in scope until we + # add to their submitted task ref count. Now that the task has + # been submitted, it's safe to remove the initial local ref. + for put_arg_id in incremented_put_arg_ids: + CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( + put_arg_id) + + check_status(status) return ActorID(c_actor_id.Binary()) @@ -1641,14 +1702,16 @@ cdef class CoreWorker: CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector optional[c_vector[CObjectReference]] return_refs + c_vector[CObjectID] incremented_put_arg_ids with self.profile_event(b"submit_task"): if num_method_cpus > 0: c_resources[b"CPU"] = num_method_cpus ray_function = CRayFunction( language.lang, function_descriptor.descriptor) - prepare_args( - self, language, args, &args_vector, function_descriptor) + prepare_args_and_increment_put_refs( + self, language, args, &args_vector, function_descriptor, + &incremented_put_arg_ids) # NOTE(edoakes): releasing the GIL while calling this method causes # segfaults. See relevant issue for details: @@ -1659,8 +1722,20 @@ cdef class CoreWorker: args_vector, CTaskOptions( name, num_returns, c_resources, concurrency_group_name)) + # These arguments were serialized and put into the local object + # store during task submission. The backend increments their local + # ref count initially to ensure that they remain in scope until we + # add to their submitted task ref count. Now that the task has + # been submitted, it's safe to remove the initial local ref. + for put_arg_id in incremented_put_arg_ids: + CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( + put_arg_id) + if return_refs.has_value(): - return VectorToObjectRefs(return_refs.value()) + # The initial local reference is already acquired internally + # when adding the pending task. + return VectorToObjectRefs(return_refs.value(), + skip_adding_local_ref=True) else: actor = self.get_actor_handle(actor_id) actor_handle = (CCoreWorkerProcess.GetCoreWorker() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 054aa132b..dddccb4fb 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -199,13 +199,14 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Put(const CRayObject &object, const c_vector[CObjectID] &contained_object_ids, const CObjectID &object_id) - CRayStatus CreateOwned(const shared_ptr[CBuffer] &metadata, - const size_t data_size, - const c_vector[CObjectID] &contained_object_ids, - CObjectID *object_id, shared_ptr[CBuffer] *data, - c_bool created_by_worker, - const unique_ptr[CAddress] &owner_address, - c_bool inline_small_object) + CRayStatus CreateOwnedAndIncrementLocalRef( + const shared_ptr[CBuffer] &metadata, + const size_t data_size, + const c_vector[CObjectID] &contained_object_ids, + CObjectID *object_id, shared_ptr[CBuffer] *data, + c_bool created_by_worker, + const unique_ptr[CAddress] &owner_address, + c_bool inline_small_object) CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata, const size_t data_size, const CObjectID &object_id, diff --git a/python/ray/worker.py b/python/ray/worker.py index 59ad95e5c..1fdfed3bd 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -314,14 +314,11 @@ class Worker: # removed before this one, it will corrupt the state in the # reference counter. return ray.ObjectRef( - self.core_worker.put_serialized_object( + self.core_worker.put_serialized_object_and_increment_local_ref( serialized_value, object_ref=object_ref, owner_address=owner_address ), - # If the owner address is set, then the initial reference is - # already acquired internally in CoreWorker::CreateOwned. - # TODO(ekl) we should unify the code path more with the others - # to avoid this special case. - skip_adding_local_ref=(owner_address is not None), + # The initial local reference is already acquired internally. + skip_adding_local_ref=True, ) def raise_errors(self, data_metadata_pairs, object_refs): diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 5e1ee7e09..e136c53b0 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -111,10 +111,13 @@ bool ActorManager::AddNewActorHandle(std::unique_ptr actor_handle, const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); // Detached actor doesn't need ref counting. if (!is_detached) { + // We don't need to add an initial local ref here because it will get added + // in AddActorHandle. reference_counter_->AddOwnedObject(actor_creation_return_id, /*inner_ids=*/{}, caller_address, call_site, /*object_size*/ -1, - /*is_reconstructable=*/true); + /*is_reconstructable=*/true, + /*add_local_ref=*/false); } return AddActorHandle(std::move(actor_handle), cached_actor_name, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9a0c57291..0c8edfa2a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -827,12 +827,13 @@ Status CoreWorker::Put(const RayObject &object, ObjectID *object_id) { *object_id = ObjectID::FromIndex(worker_context_.GetCurrentInternalTaskId(), worker_context_.GetNextPutIndex()); - reference_counter_->AddOwnedObject( - *object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(), - /*is_reconstructable=*/false, NodeID::FromBinary(rpc_address_.raylet_id())); + reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, + CurrentCallSite(), object.GetSize(), + /*is_reconstructable=*/false, /*add_local_ref=*/true, + NodeID::FromBinary(rpc_address_.raylet_id())); auto status = Put(object, contained_object_ids, *object_id, /*pin_object=*/true); if (!status.ok()) { - reference_counter_->RemoveOwnedObject(*object_id); + RemoveLocalReference(*object_id); } return status; } @@ -876,13 +877,11 @@ Status CoreWorker::Put(const RayObject &object, return PutInLocalPlasmaStore(object, object_id, pin_object); } -Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, - const size_t data_size, - const std::vector &contained_object_ids, - ObjectID *object_id, std::shared_ptr *data, - bool created_by_worker, - const std::unique_ptr &owner_address, - bool inline_small_object) { +Status CoreWorker::CreateOwnedAndIncrementLocalRef( + const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, ObjectID *object_id, + std::shared_ptr *data, bool created_by_worker, + const std::unique_ptr &owner_address, bool inline_small_object) { auto status = WaitForActorRegistered(contained_object_ids); if (!status.ok()) { return status; @@ -896,6 +895,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, CurrentCallSite(), data_size + metadata->Size(), /*is_reconstructable=*/false, + /*add_local_ref=*/true, NodeID::FromBinary(rpc_address_.raylet_id())); } else { // Because in the remote worker's `HandleAssignObjectOwner`, @@ -938,11 +938,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, created_by_worker); } if (!status.ok()) { - if (owned_by_us) { - reference_counter_->RemoveOwnedObject(*object_id); - } else { - RemoveLocalReference(*object_id); - } + RemoveLocalReference(*object_id); return status; } else if (*data == nullptr) { // Object already exists in plasma. Store the in-memory value so that the @@ -970,16 +966,13 @@ Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object, const std::unique_ptr &owner_address) { - bool owned_by_us = owner_address != nullptr - ? WorkerID::FromBinary(owner_address->worker_id()) == - WorkerID::FromBinary(rpc_address_.worker_id()) - : true; auto status = SealExisting(object_id, pin_object, std::move(owner_address)); if (status.ok()) return status; - if (owned_by_us) { - reference_counter_->RemoveOwnedObject(object_id); - } else { - RemoveLocalReference(object_id); + RemoveLocalReference(object_id); + if (reference_counter_->HasReference(object_id)) { + RAY_LOG(WARNING) + << "Object " << object_id + << " failed to be put but has a nonzero ref count. This object may leak."; } return status; } @@ -2295,7 +2288,8 @@ std::vector CoreWorker::ExecuteTaskLocalMode( reference_counter_->AddOwnedObject(task_spec.ReturnId(i), /*inner_ids=*/{}, rpc_address_, CurrentCallSite(), -1, - /*is_reconstructable=*/false); + /*is_reconstructable=*/false, + /*add_local_ref=*/true); } rpc::ObjectReference ref; ref.set_object_id(task_spec.ReturnId(i).Binary()); @@ -3049,6 +3043,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re reference_counter_->AddOwnedObject( object_id, contained_object_ids, rpc_address_, call_site, request.object_size(), /*is_reconstructable=*/false, + /*add_local_ref=*/false, /*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id())); reference_counter_->AddBorrowerAddress(object_id, borrower_address); RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index dafde3e8e..5c80459f5 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -260,9 +260,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `SealOwned()` to - /// finalize the object. The `CreateOwned()` and `SealOwned()` combination is - /// an alternative interface to `Put()` that allows frontends to avoid an extra - /// copy when possible. + /// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and + /// `SealOwned()` combination is an alternative interface to `Put()` that + /// allows frontends to avoid an extra copy when possible. + /// + /// Note that this call also initializes the local reference count for the + /// object to 1 so that the ref is considered in scope. The caller must + /// ensure that they decrement the ref count once the returned ObjectRef has + /// gone out of scope. /// /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. @@ -275,12 +280,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] inline_small_object Whether to inline create this object if it's /// small. /// \return Status. - Status CreateOwned(const std::shared_ptr &metadata, const size_t data_size, - const std::vector &contained_object_ids, - ObjectID *object_id, std::shared_ptr *data, - bool created_by_worker, - const std::unique_ptr &owner_address = nullptr, - bool inline_small_object = true); + Status CreateOwnedAndIncrementLocalRef( + const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, ObjectID *object_id, + std::shared_ptr *data, bool created_by_worker, + const std::unique_ptr &owner_address = nullptr, + bool inline_small_object = true); /// Create and return a buffer in the object store that can be directly written /// into, for an object ID that already exists. After writing to the buffer, the @@ -301,6 +306,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Finalize placing an object into the object store. This should be called after /// a corresponding `CreateOwned()` call and then writing into the returned buffer. /// + /// If the object seal fails, then the initial local reference that was added + /// in CreateOwnedAndIncrementLocalRef will be deleted and the object will be + /// released by the ref counter. + /// /// \param[in] object_id Object ID corresponding to the object. /// \param[in] pin_object Whether or not to pin the object at the local raylet. /// \param[in] The address of object's owner. If not provided, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 6af083669..c883b401a 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -38,7 +38,7 @@ Status PutSerializedObject(JNIEnv *env, jobject obj, ObjectID object_id, for (const auto &ref : native_ray_object->GetNestedRefs()) { nested_ids.push_back(ObjectID::FromBinary(ref.object_id())); } - status = CoreWorkerProcess::GetCoreWorker().CreateOwned( + status = CoreWorkerProcess::GetCoreWorker().CreateOwnedAndIncrementLocalRef( native_ray_object->GetMetadata(), data_size, nested_ids, out_object_id, &data, /*created_by_worker=*/true, /*owner_address=*/owner_address); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 93be2248d..88405f8ff 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -169,6 +169,7 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, const rpc::Address &owner_address, const std::string &call_site, const int64_t object_size, bool is_reconstructable, + bool add_local_ref, const absl::optional &pinned_at_raylet_id) { RAY_LOG(DEBUG) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); @@ -197,20 +198,10 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, auto back_it = reconstructable_owned_objects_.end(); back_it--; RAY_CHECK(reconstructable_owned_objects_index_.emplace(object_id, back_it).second); -} -void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) { - absl::MutexLock lock(&mutex_); - auto it = object_id_refs_.find(object_id); - RAY_CHECK(it != object_id_refs_.end()) - << "Tried to remove reference for nonexistent owned object " << object_id - << ", object must be added with ReferenceCounter::AddOwnedObject() before it " - << "can be removed"; - RAY_CHECK(it->second.RefCount() == 0) - << "Tried to remove reference for owned object " << object_id << " that has " - << it->second.RefCount() << " references, must have 0 references to be removed"; - RAY_LOG(DEBUG) << "Removing owned object " << object_id; - DeleteReferenceInternal(it, nullptr); + if (add_local_ref) { + it->second.local_ref_count++; + } } void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) { diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 5600677ad..6c239db04 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -42,7 +42,7 @@ class ReferenceCounterInterface { virtual void AddOwnedObject( const ObjectID &object_id, const std::vector &contained_ids, const rpc::Address &owner_address, const std::string &call_site, - const int64_t object_size, bool is_reconstructable, + const int64_t object_size, bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id = absl::optional()) = 0; virtual bool SetDeleteCallback( const ObjectID &object_id, @@ -167,22 +167,18 @@ class ReferenceCounter : public ReferenceCounterInterface, /// \param[in] object_size Object size if known, otherwise -1; /// \param[in] is_reconstructable Whether the object can be reconstructed /// through lineage re-execution. + /// \param[in] add_local_ref Whether to initialize the local ref count to 1. + /// This is used to ensure that the ref is considered in scope before the + /// corresponding ObjectRef has been returned to the language frontend. + /// \param[in] pinned_at_raylet_id The primary location for the object, if it + /// is already known. This is only used for ray.put calls. void AddOwnedObject( const ObjectID &object_id, const std::vector &contained_ids, const rpc::Address &owner_address, const std::string &call_site, - const int64_t object_size, bool is_reconstructable, + const int64_t object_size, bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id = absl::optional()) LOCKS_EXCLUDED(mutex_); - /// Remove reference for an object that we own. The reference will only be - /// removed if the object's ref count is 0. This should only be used when - /// speculatively adding an owned reference that may need to be rolled back, e.g. if - /// the creation of the corresponding Plasma object fails. All other references will - /// be cleaned up via the reference counting protocol. - /// - /// \param[in] object_id The ID of the object that we own and wish to remove. - void RemoveOwnedObject(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); - /// Update the size of the object. /// /// \param[in] object_id The ID of the object. diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 7c6e6970c..b9512b822 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -337,8 +337,7 @@ class MockWorkerClient : public MockCoreWorkerClientInterface { // The below methods mirror a core worker's operations, e.g., `Put` simulates // a ray.put(). void Put(const ObjectID &object_id) { - rc_.AddOwnedObject(object_id, {}, address_, "", 0, false); - rc_.AddLocalReference(object_id, ""); + rc_.AddOwnedObject(object_id, {}, address_, "", 0, false, /*add_local_ref=*/true); } void PutWithForeignOwner(const ObjectID &object_id, const rpc::Address &owner_address) { @@ -347,8 +346,8 @@ class MockWorkerClient : public MockCoreWorkerClientInterface { } void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) { - rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false); - rc_.AddLocalReference(outer_id, ""); + rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false, + /*add_local_ref=*/true); } void GetSerializedObjectId(const ObjectID outer_id, const ObjectID &inner_id, @@ -370,9 +369,7 @@ class MockWorkerClient : public MockCoreWorkerClientInterface { if (!arg_id.IsNil()) { rc_.UpdateSubmittedTaskReferences({return_id}, {arg_id}); } - rc_.AddOwnedObject(return_id, {}, address_, "", 0, false); - // Add a sentinel reference to keep all nested object IDs in scope. - rc_.AddLocalReference(return_id, ""); + rc_.AddOwnedObject(return_id, {}, address_, "", 0, false, /*add_local_ref=*/true); return_ids_.push_back(return_id); return return_id; } @@ -536,11 +533,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // The object goes out of scope once it has no more refs. std::vector out; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); - rc->AddLocalReference(id, ""); - ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); ASSERT_TRUE(*out_of_scope); @@ -548,7 +543,7 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // lineage ref count. *out_of_scope = false; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({}, {id}); ASSERT_FALSE(*out_of_scope); @@ -575,16 +570,14 @@ TEST_F(ReferenceCountTest, TestReferenceStats) { ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42"); rc->RemoveLocalReference(id1, nullptr); - rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false); + rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false, /*add_local_ref=*/true); rpc::CoreWorkerStats stats2; rc->AddObjectRefStats({}, &stats2); ASSERT_EQ(stats2.object_refs_size(), 1); ASSERT_EQ(stats2.object_refs(0).object_id(), id2.Binary()); - ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 0); + ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 1); ASSERT_EQ(stats2.object_refs(0).object_size(), 100); ASSERT_EQ(stats2.object_refs(0).call_site(), "file2.py:43"); - - rc->AddLocalReference(id2, ""); rc->RemoveLocalReference(id2, nullptr); } @@ -601,7 +594,7 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) { // locality data. int64_t object_size = 100; rc->AddOwnedObject(obj1, {}, address, "file2.py:42", object_size, false, - absl::optional(node1)); + /*add_local_ref=*/true, absl::optional(node1)); auto locality_data_obj1 = rc->GetLocalityData(obj1); ASSERT_TRUE(locality_data_obj1.has_value()); ASSERT_EQ(locality_data_obj1->object_size, object_size); @@ -663,13 +656,11 @@ TEST_F(ReferenceCountTest, TestGetLocalityData) { // Fetching locality data for an object that doesn't have an object size defined // should return a null optional. rc->AddOwnedObject(obj2, {}, address, "file2.py:43", -1, false, - absl::optional(node2)); + /*add_local_ref=*/true, absl::optional(node2)); auto locality_data_obj2_no_object_size = rc->GetLocalityData(obj2); ASSERT_FALSE(locality_data_obj2_no_object_size.has_value()); - rc->AddLocalReference(obj1, ""); rc->RemoveLocalReference(obj1, nullptr); - rc->AddLocalReference(obj2, ""); rc->RemoveLocalReference(obj2, nullptr); } @@ -680,7 +671,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { auto object_id = ObjectID::FromRandom(); rpc::Address address; address.set_ip_address("1234"); - rc->AddOwnedObject(object_id, {}, address, "", 0, false); + rc->AddOwnedObject(object_id, {}, address, "", 0, false, /*add_local_ref=*/true); TaskID added_id; rpc::Address added_address; @@ -689,7 +680,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { auto object_id2 = ObjectID::FromRandom(); address.set_ip_address("5678"); - rc->AddOwnedObject(object_id2, {}, address, "", 0, false); + rc->AddOwnedObject(object_id2, {}, address, "", 0, false, /*add_local_ref=*/true); ASSERT_TRUE(rc->GetOwner(object_id2, &added_address)); ASSERT_EQ(address.ip_address(), added_address.ip_address()); @@ -698,9 +689,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { rc->AddLocalReference(object_id3, ""); ASSERT_FALSE(rc->GetOwner(object_id3, &added_address)); - rc->AddLocalReference(object_id, ""); rc->RemoveLocalReference(object_id, nullptr); - rc->AddLocalReference(object_id2, ""); rc->RemoveLocalReference(object_id2, nullptr); rc->RemoveLocalReference(object_id3, nullptr); } @@ -1669,7 +1658,8 @@ TEST(DistributedReferenceCountTest, TestForeignOwner) { // Phase 3 -- foreign owner gets ref removed information. // // Emulate ref removed callback. - foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false); + foreign_owner->rc_.AddOwnedObject(inner_id, {}, foreign_owner->address_, "", 0, false, + /*add_local_ref=*/false); foreign_owner->rc_.AddBorrowerAddress(inner_id, owner->address_); // Foreign owner waits on owner. @@ -2317,10 +2307,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // The object goes out of scope once it has no more refs. std::vector out; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); - rc->AddLocalReference(id, ""); ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); ASSERT_TRUE(*out_of_scope); @@ -2331,7 +2320,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // count. *out_of_scope = false; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({return_id}, {id}); ASSERT_TRUE(rc->IsObjectPendingCreation(return_id)); @@ -2371,8 +2360,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { ASSERT_TRUE(lineage_deleted.empty()); // We should keep lineage for owned objects. - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false); - rc->AddLocalReference(id, ""); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false, /*add_local_ref=*/true); ASSERT_TRUE(rc->HasReference(id)); rc->RemoveLocalReference(id, nullptr); ASSERT_EQ(lineage_deleted.size(), 1); @@ -2390,7 +2378,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { for (int i = 0; i < 3; i++) { ObjectID id = ObjectID::FromRandom(); ids.push_back(id); - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/false); } rc->SetReleaseLineageCallback( @@ -2413,7 +2401,6 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { for (size_t i = 0; i < ids.size() - 1; i++) { auto id = ids[i]; // Submit a dependent task on id. - rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->HasReference(id)); rc->UpdateSubmittedTaskReferences({}, {id}); rc->RemoveLocalReference(id, nullptr); @@ -2445,7 +2432,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) { for (int i = 0; i < 3; i++) { ObjectID id = ObjectID::FromRandom(); ids.push_back(id); - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); } std::vector lineage_deleted; rc->SetReleaseLineageCallback( @@ -2461,10 +2448,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) { // ID1 depends on ID0. rc->UpdateSubmittedTaskReferences({ids[1]}, {ids[0]}); + rc->RemoveLocalReference(ids[0], nullptr); rc->UpdateFinishedTaskReferences({ids[1]}, {ids[0]}, /*release_lineage=*/false, empty_borrower, empty_refs, nullptr); - rc->AddLocalReference(ids[1], ""); - rc->AddLocalReference(ids[2], ""); bool lineage_evicted = false; for (const auto &id : ids) { @@ -2492,7 +2478,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { std::vector lineage_deleted; ObjectID id = ObjectID::FromRandom(); - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); rc->SetReleaseLineageCallback( [&](const ObjectID &object_id, std::vector *ids_to_release) { @@ -2501,7 +2487,6 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { }); // Local references. - rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->HasReference(id)); // Submit 2 dependent tasks. @@ -2540,8 +2525,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); - rc->AddLocalReference(id, ""); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); ASSERT_TRUE(owned_by_us); @@ -2556,8 +2540,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); - rc->AddLocalReference(id, ""); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); auto objects = rc->ResetObjectsOnRemovedNode(node_id); @@ -2578,9 +2561,8 @@ TEST_F(ReferenceCountTest, TestFree) { NodeID node_id = NodeID::FromRandom(); // Test free before receiving information about where the object is pinned. - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); - rc->AddLocalReference(id, ""); rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); @@ -2597,8 +2579,7 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); // Test free after receiving information about where the object is pinned. - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); - rc->AddLocalReference(id, ""); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); @@ -2612,16 +2593,6 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); } -TEST_F(ReferenceCountTest, TestRemoveOwnedObject) { - ObjectID id = ObjectID::FromRandom(); - - // Test remove owned object. - rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false); - ASSERT_TRUE(rc->HasReference(id)); - rc->RemoveOwnedObject(id); - ASSERT_FALSE(rc->HasReference(id)); -} - TEST_F(ReferenceCountTest, TestGetObjectStatusReplyDelayed) { // https://github.com/ray-project/ray/issues/18557. // Check that we track an ObjectRef nested inside another borrowed ObjectRef. @@ -2668,10 +2639,11 @@ TEST_F(ReferenceCountTest, TestDelayedWaitForRefRemoved) { // Owner owns a nested object ref, borrower is using the outer ObjectRef. ObjectID outer_id = ObjectID::FromRandom(); ObjectID inner_id = ObjectID::FromRandom(); - owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false); + owner->rc_.AddOwnedObject(outer_id, {}, owner->address_, "", 0, false, + /*add_local_ref=*/false); owner->rc_.AddBorrowerAddress(outer_id, borrower->address_); - owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false); - owner->rc_.AddLocalReference(inner_id, ""); + owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false, + /*add_local_ref=*/true); ASSERT_TRUE(owner->rc_.HasReference(outer_id)); ASSERT_TRUE(owner->rc_.HasReference(inner_id)); @@ -2710,9 +2682,12 @@ TEST_F(ReferenceCountTest, TestRepeatedDeserialization) { ObjectID outer_id = ObjectID::FromRandom(); ObjectID middle_id = ObjectID::FromRandom(); ObjectID inner_id = ObjectID::FromRandom(); - owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false); - owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false); - owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false); + owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false, + /*add_local_ref=*/false); + owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false, + /*add_local_ref=*/false); + owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false, + /*add_local_ref=*/false); owner->rc_.AddBorrowerAddress(outer_id, borrower->address_); ASSERT_TRUE(owner->rc_.HasReference(outer_id)); ASSERT_TRUE(owner->rc_.HasReference(middle_id)); @@ -2758,9 +2733,12 @@ TEST_F(ReferenceCountTest, TestForwardNestedRefs) { ObjectID outer_id = ObjectID::FromRandom(); ObjectID middle_id = ObjectID::FromRandom(); ObjectID inner_id = ObjectID::FromRandom(); - owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false); - owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false); - owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false); + owner->rc_.AddOwnedObject(inner_id, {}, owner->address_, "", 0, false, + /*add_local_ref=*/false); + owner->rc_.AddOwnedObject(middle_id, {inner_id}, owner->address_, "", 0, false, + /*add_local_ref=*/false); + owner->rc_.AddOwnedObject(outer_id, {middle_id}, owner->address_, "", 0, false, + /*add_local_ref=*/false); owner->rc_.AddBorrowerAddress(outer_id, borrower1->address_); ASSERT_TRUE(owner->rc_.HasReference(outer_id)); ASSERT_TRUE(owner->rc_.HasReference(middle_id)); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index bf110ccd9..f8e55e4f5 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -70,9 +70,14 @@ std::vector TaskManager::AddPendingTask( // publish the WaitForRefRemoved message that we are now a borrower for // the inner IDs. Note that this message can be received *before* the // PushTaskReply. + // NOTE(swang): We increment the local ref count to ensure that the + // object is considered in scope before we return the ObjectRef to the + // language frontend. Note that the language bindings should set + // skip_adding_local_ref=True to avoid double referencing the object. reference_counter_->AddOwnedObject(return_id, /*inner_ids=*/{}, caller_address, call_site, -1, - /*is_reconstructable=*/is_reconstructable); + /*is_reconstructable=*/is_reconstructable, + /*add_local_ref=*/true); } return_ids.push_back(return_id); diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 6ccdbda83..4a444418d 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -99,6 +99,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Add a task that is pending execution. /// + /// The local ref count for all return refs (excluding actor creation tasks) + /// will be initialized to 1 so that the ref is considered in scope before + /// returning to the language frontend. The caller is responsible for + /// decrementing the ref count once the frontend ref has gone out of scope. + /// /// \param[in] caller_address The rpc address of the calling task. /// \param[in] spec The spec of the pending task. /// \param[in] max_retries Number of times this task may be retried diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 1a8fc0fe1..189783292 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -103,10 +103,11 @@ class MockReferenceCounter : public ReferenceCounterInterface { const rpc::Address &owner_address, bool foreign_owner_already_monitoring)); - MOCK_METHOD7(AddOwnedObject, + MOCK_METHOD8(AddOwnedObject, void(const ObjectID &object_id, const std::vector &contained_ids, const rpc::Address &owner_address, const std::string &call_site, const int64_t object_size, bool is_reconstructable, + bool add_local_ref, const absl::optional &pinned_at_raylet_id)); MOCK_METHOD2(SetDeleteCallback, diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index e1d9317f7..f3272b5e0 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -170,7 +170,8 @@ class ObjectRecoveryManagerTest : public ObjectRecoveryManagerTestBase { TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) { // Lineage recording disabled. ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(object_directory_->Flush() == 1); @@ -192,7 +193,8 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) { TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); std::vector addresses({rpc::Address()}); object_directory_->SetLocations(object_id, addresses); @@ -205,7 +207,8 @@ TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) { TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); std::vector addresses({rpc::Address()}); object_directory_->SetLocations(object_id, addresses); @@ -218,7 +221,8 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); task_resubmitter_->AddTask(object_id.TaskId(), {}); ASSERT_TRUE(manager_.RecoverObject(object_id)); @@ -230,7 +234,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); ref_counter_->AddLocalReference(object_id, ""); ASSERT_TRUE(manager_.RecoverObject(object_id)); @@ -265,7 +270,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { std::vector dependencies; for (int i = 0; i < 3; i++) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); task_resubmitter_->AddTask(object_id.TaskId(), dependencies); dependencies = {object_id}; object_ids.push_back(object_id); @@ -282,7 +288,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(object_directory_->Flush() == 1); @@ -294,10 +301,12 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) { TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) { ObjectID dep_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); task_resubmitter_->AddTask(object_id.TaskId(), {dep_id}); RAY_LOG(INFO) << object_id; @@ -313,7 +322,8 @@ TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) { TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true, + /*add_local_ref=*/true); ref_counter_->AddLocalReference(object_id, ""); ref_counter_->EvictLineage(1); diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index ec4ebad4a..8b054a5a0 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -124,7 +124,6 @@ TEST_F(TaskManagerTest, TestTaskSuccess) { ASSERT_EQ(num_retries_, 0); std::vector removed; - reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -160,7 +159,6 @@ TEST_F(TaskManagerTest, TestTaskFailure) { ASSERT_EQ(num_retries_, 0); std::vector removed; - reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -221,7 +219,6 @@ TEST_F(TaskManagerTest, TestFailPendingTask) { ASSERT_EQ(stored_error, rpc::ErrorType::LOCAL_RAYLET_DIED); std::vector removed; - reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -268,7 +265,6 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { ASSERT_EQ(stored_error, error); std::vector removed; - reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -326,7 +322,6 @@ TEST_F(TaskManagerTest, TestLineageEvicted) { // Once the return ID goes out of scope, the task spec and its dependencies // are released. - reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, nullptr); ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId())); ASSERT_FALSE(reference_counter_->HasReference(return_id)); @@ -345,7 +340,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) { int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -384,7 +378,6 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -418,7 +411,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -452,13 +444,11 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { rpc::Address caller_address; ObjectID dep = ObjectID::FromRandom(); - reference_counter_->AddLocalReference(dep, ""); for (int i = 0; i < 3; i++) { auto spec = CreateTaskHelper(1, {dep}); int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - reference_counter_->AddLocalReference(return_id, ""); // The task completes. rpc::PushTaskReply reply; @@ -499,7 +489,7 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); - reference_counter_->AddLocalReference(return_id, ""); + reference_counter_->RemoveLocalReference(dep, nullptr); // The task completes. rpc::PushTaskReply reply; @@ -512,15 +502,13 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { // No tasks should be pinned because they returned direct objects. ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); - // Only the dependency and the newest return ID should be in scope because - // all objects in the lineage were direct. - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2); + // Only the newest return ID should be in scope because all objects in the + // lineage were direct. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); - reference_counter_->RemoveLocalReference(dep, nullptr); dep = return_id; } - // The task's return ID goes out of scope before the task finishes. reference_counter_->RemoveLocalReference(dep, nullptr); ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -552,7 +540,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) { ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id)); // The task completes. - reference_counter_->AddLocalReference(return_id, ""); rpc::PushTaskReply reply; auto return_object = reply.add_return_objects(); return_object->set_object_id(return_id.Binary()); @@ -604,8 +591,6 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) { // The task completes. Both return objects are stored in plasma. { - reference_counter_->AddLocalReference(return_id1, ""); - reference_counter_->AddLocalReference(return_id2, ""); rpc::PushTaskReply reply; auto return_object1 = reply.add_return_objects(); return_object1->set_object_id(return_id1.Binary());