diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index 7d270bb17..b0ee726c6 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -198,7 +198,7 @@ Status TaskExecutor::ExecuteTask( int64_t task_output_inlined_bytes = 0; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject( result_id, data_size, meta_buffer, std::vector(), - task_output_inlined_bytes, result_ptr)); + &task_output_inlined_bytes, result_ptr)); auto result = *result_ptr; if (result != nullptr) { diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 95f0aa398..dd36e886b 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -5,6 +5,9 @@ from cpython.pystate cimport PyThreadState_Get +from libc.stdint cimport ( + int64_t, +) from libcpp cimport bool as c_bool from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector @@ -132,6 +135,11 @@ cdef class CoreWorker: owner_address=*, c_bool inline_small_object=*) cdef unique_ptr[CAddress] _convert_python_address(self, address=*) + cdef store_task_output( + self, serialized_object, const CObjectID &return_id, size_t + data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID] + &contained_id, int64_t *task_output_inlined_bytes, + shared_ptr[CRayObject] *return_ptr) cdef store_task_outputs( self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8be6096ec..b53a1fee2 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1869,6 +1869,41 @@ cdef class CoreWorker: c_owner_address, serialized_object_status)) + cdef store_task_output(self, serialized_object, const CObjectID &return_id, + size_t data_size, shared_ptr[CBuffer] &metadata, + const c_vector[CObjectID] &contained_id, + int64_t *task_output_inlined_bytes, + shared_ptr[CRayObject] *return_ptr): + """Store a task return value in plasma or as an inlined object.""" + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker().AllocateReturnObject( + return_id, data_size, metadata, contained_id, + task_output_inlined_bytes, return_ptr)) + + if return_ptr.get() != NULL: + if return_ptr.get().HasData(): + (serialized_object).write_to( + Buffer.make(return_ptr.get().GetData())) + if self.is_local_mode: + check_status( + CCoreWorkerProcess.GetCoreWorker().Put( + CRayObject(return_ptr.get().GetData(), + return_ptr.get().GetMetadata(), + c_vector[CObjectReference]()), + c_vector[CObjectID](), return_id)) + else: + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker().SealReturnObject( + return_id, return_ptr[0])) + return True + else: + with nogil: + success = (CCoreWorkerProcess.GetCoreWorker() + .PinExistingReturnObject(return_id, return_ptr)) + return success + cdef store_task_outputs( self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns): @@ -1877,7 +1912,6 @@ cdef class CoreWorker: size_t data_size shared_ptr[CBuffer] metadata c_vector[CObjectID] contained_id - c_vector[CObjectID] return_ids_vector int64_t task_output_inlined_bytes if return_ids.size() == 0: @@ -1904,30 +1938,17 @@ cdef class CoreWorker: contained_id = ObjectRefsToVector( serialized_object.contained_object_refs) - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker().AllocateReturnObject( - return_id, data_size, metadata, contained_id, - task_output_inlined_bytes, &returns[0][i])) - - if returns[0][i].get() != NULL: - if returns[0][i].get().HasData(): - (serialized_object).write_to( - Buffer.make(returns[0][i].get().GetData())) - if self.is_local_mode: - return_ids_vector.push_back(return_ids[i]) - check_status( - CCoreWorkerProcess.GetCoreWorker().Put( - CRayObject(returns[0][i].get().GetData(), - returns[0][i].get().GetMetadata(), - c_vector[CObjectReference]()), - c_vector[CObjectID](), return_ids[i])) - return_ids_vector.clear() - - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker().SealReturnObject( - return_id, returns[0][i])) + if not self.store_task_output( + serialized_object, return_id, + data_size, metadata, contained_id, + &task_output_inlined_bytes, &returns[0][i]): + # If the object already exists, but we fail to pin the copy, it + # means the existing copy might've gotten evicted. Try to + # create another copy. + self.store_task_output( + serialized_object, return_id, data_size, metadata, + contained_id, &task_output_inlined_bytes, + &returns[0][i]) cdef c_function_descriptors_to_python( self, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 10d24ead9..fc7cb5193 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -139,12 +139,16 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const size_t &data_size, const shared_ptr[CBuffer] &metadata, const c_vector[CObjectID] &contained_object_id, - int64_t &task_output_inlined_bytes, + int64_t *task_output_inlined_bytes, shared_ptr[CRayObject] *return_object) CRayStatus SealReturnObject( const CObjectID& return_id, shared_ptr[CRayObject] return_object ) + c_bool PinExistingReturnObject( + const CObjectID& return_id, + shared_ptr[CRayObject] *return_object + ) CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 09e3798e4..2070bf07f 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -10,6 +10,7 @@ import ray from ray._private.test_utils import ( wait_for_condition, wait_for_pid_to_exit, + SignalActor, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM @@ -778,6 +779,132 @@ def test_lineage_evicted(ray_start_cluster): assert "ObjectReconstructionFailedLineageEvictedError" in str(e) +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_multiple_returns(ray_start_cluster, reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = False + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=reconstruction_enabled) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote(num_returns=2) + def two_large_objects(): + return (np.zeros(10**7, dtype=np.uint8), np.zeros( + 10**7, dtype=np.uint8)) + + @ray.remote + def dependent_task(x): + return + + obj1, obj2 = two_large_objects.remote() + ray.get(dependent_task.remote(obj1)) + cluster.add_node( + num_cpus=1, resources={"node": 1}, object_store_memory=10**8) + ray.get(dependent_task.options(resources={"node": 1}).remote(obj1)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + wait_for_condition( + lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) + + if reconstruction_enabled: + ray.get(dependent_task.remote(obj1)) + ray.get(dependent_task.remote(obj2)) + else: + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(dependent_task.remote(obj1)) + ray.get(dependent_task.remote(obj2)) + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj2) + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_nested(ray_start_cluster, reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + "fetch_fail_timeout_milliseconds": 10_000, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = False + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=reconstruction_enabled) + ray.init(address=cluster.address) + done_signal = SignalActor.remote() + exit_signal = SignalActor.remote() + ray.get(done_signal.wait.remote(should_wait=False)) + ray.get(exit_signal.wait.remote(should_wait=False)) + + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote + def dependent_task(x): + return + + @ray.remote + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def nested(done_signal, exit_signal): + ref = ray.put(np.zeros(10**7, dtype=np.uint8)) + # Flush object store. + for _ in range(20): + ray.put(np.zeros(10**7, dtype=np.uint8)) + dep = dependent_task.options(resources={"node": 1}).remote(ref) + ray.get(done_signal.send.remote(clear=True)) + ray.get(dep) + return ray.get(ref) + + ref = nested.remote(done_signal, exit_signal) + # Wait for task to get scheduled on the node to kill. + ray.get(done_signal.wait.remote()) + # Wait for ray.put object to get transferred to the other node. + cluster.add_node( + num_cpus=2, resources={"node": 10}, object_store_memory=10**8) + ray.get(dependent_task.remote(ref)) + + # Destroy the task's output. + cluster.remove_node(node_to_kill, allow_graceful=False) + wait_for_condition( + lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) + + if reconstruction_enabled: + # NOTE(swang): This is supposed to work because nested doesn't actually + # return any ObjectRefs. However, currently the ray.put in `nested` + # fails because the object already exists with a different owner. + # See https://github.com/ray-project/ray/issues/20713. + try: + ray.get(ref, timeout=60) + except ray.exceptions.RayTaskError as e: + assert isinstance(e.cause, ray.exceptions.ObjectFetchTimedOutError) + else: + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(ref, timeout=60) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e100cd9fa..66a31c26a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -927,13 +927,18 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, /* owner_address = */ rpc_address_, data, created_by_worker); } - if (!status.ok() || !data) { + if (!status.ok()) { if (owned_by_us) { reference_counter_->RemoveOwnedObject(*object_id); } else { RemoveLocalReference(*object_id); } return status; + } else if (*data == nullptr) { + // Object already exists in plasma. Store the in-memory value so that the + // client will check the plasma store. + RAY_CHECK( + memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), *object_id)); } } return Status::OK(); @@ -2005,7 +2010,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, const size_t &data_size, const std::shared_ptr &metadata, const std::vector &contained_object_ids, - int64_t &task_output_inlined_bytes, + int64_t *task_output_inlined_bytes, std::shared_ptr *return_object) { rpc::Address owner_address(options_.is_local_mode ? rpc::Address() @@ -2026,10 +2031,10 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, if (options_.is_local_mode || (static_cast(data_size) < max_direct_call_object_size_ && // ensure we don't exceed the limit if we allocate this object inline. - (task_output_inlined_bytes + static_cast(data_size) <= + (*task_output_inlined_bytes + static_cast(data_size) <= RayConfig::instance().task_rpc_inlined_bytes_limit()))) { data_buffer = std::make_shared(data_size); - task_output_inlined_bytes += static_cast(data_size); + *task_output_inlined_bytes += static_cast(data_size); } else { RAY_RETURN_NOT_OK(CreateExisting(metadata, data_size, object_id, owner_address, &data_buffer, @@ -2193,14 +2198,12 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, Status CoreWorker::SealReturnObject(const ObjectID &return_id, std::shared_ptr return_object) { + RAY_LOG(DEBUG) << "Sealing return object " << return_id; Status status = Status::OK(); - if (!return_object) { - return status; - } + RAY_CHECK(return_object); + RAY_CHECK(!options_.is_local_mode); std::unique_ptr caller_address = - options_.is_local_mode ? nullptr - : std::make_unique( - worker_context_.GetCurrentTask()->CallerAddress()); + std::make_unique(worker_context_.GetCurrentTask()->CallerAddress()); if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address)); if (!status.ok()) { @@ -2211,6 +2214,56 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, return status; } +bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, + std::shared_ptr *return_object) { + // TODO(swang): If there is already an existing copy of this object, then it + // might not have the same value as the new copy. It would be better to evict + // the existing copy here. + absl::flat_hash_map> result_map; + bool got_exception; + rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress()); + + // Temporarily set the return object's owner's address. This is needed to retrieve the + // value from plasma. + reference_counter_->AddLocalReference(return_id, ""); + reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address); + + auto status = plasma_store_provider_->Get({return_id}, 0, worker_context_, &result_map, + &got_exception); + // Remove the temporary ref. + RemoveLocalReference(return_id); + + if (result_map.count(return_id)) { + *return_object = std::move(result_map[return_id]); + RAY_LOG(DEBUG) << "Pinning existing return object " << return_id + << " owned by worker " + << WorkerID::FromBinary(owner_address.worker_id()); + // Keep the object in scope until it's been pinned. + std::shared_ptr pinned_return_object = *return_object; + // Asynchronously ask the raylet to pin the object. Note that this can fail + // if the raylet fails. We expect the owner of the object to handle that + // case (e.g., by detecting the raylet failure and storing an error). + local_raylet_client_->PinObjectIDs( + owner_address, {return_id}, + [return_id, pinned_return_object](const Status &status, + const rpc::PinObjectIDsReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to pin existing copy of the task return object " + << return_id + << ". This object may get evicted while there are still " + "references to it."; + } + }); + return true; + } else { + // Failed to get the existing copy of the return object. It must have been + // evicted before we could pin it. + // TODO(swang): We should allow the owner to retry this task instead of + // immediately returning an error to the application. + return false; + } +} + std::vector CoreWorker::ExecuteTaskLocalMode( const TaskSpecification &task_spec, const ActorID &actor_id) { auto resource_ids = std::make_shared(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 24a3bd3dc..0cc63d744 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -567,7 +567,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status AllocateReturnObject(const ObjectID &object_id, const size_t &data_size, const std::shared_ptr &metadata, const std::vector &contained_object_id, - int64_t &task_output_inlined_bytes, + int64_t *task_output_inlined_bytes, std::shared_ptr *return_object); /// Seal a return object for an executing task. The caller should already have @@ -579,6 +579,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status SealReturnObject(const ObjectID &return_id, std::shared_ptr return_object); + /// Pin the local copy of the return object, if one exists. + /// + /// \param[in] return_id ObjectID of the return value. + /// \param[out] return_object The object that was pinned. + /// \return success if the object still existed and was pinned. Note that + /// pinning is done asynchronously. + bool PinExistingReturnObject(const ObjectID &return_id, + std::shared_ptr *return_object); + /// Get a handle to an actor. /// /// NOTE: This function should be called ONLY WHEN we know actor handle exists. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index ca00ef32e..468d6984d 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -185,7 +185,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject( result_id, data_size, metadata, contained_object_ids, - task_output_inlined_bytes, result_ptr)); + &task_output_inlined_bytes, result_ptr)); // A nullptr is returned if the object already exists. auto result = *result_ptr; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index f75502550..cbf80a1b5 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -102,7 +102,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask( ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1); return_object->set_object_id(id.Binary()); - // The object is nullptr if it already existed in the object store. + if (!return_objects[i]) { + // This should only happen if the local raylet died. Caller should + // retry the task. + RAY_LOG(WARNING) << "Failed to create task return object " << id + << " in the object store, exiting."; + QuickExit(); + } const auto &result = return_objects[i]; return_object->set_size(result->GetSize()); if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) { diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 1bf67689b..7b48e5061 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -685,7 +685,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // need to do anything here. return; } else if (!status.ok() || !is_actor_creation) { - RAY_LOG(DEBUG) << "Task failed with error: " << status; // Successful actor creation leases the worker indefinitely from the raylet. OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources); diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 7157d8bf2..4d9b3d712 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -22,9 +22,10 @@ namespace ray { namespace raylet { -void LocalObjectManager::PinObjects(const std::vector &object_ids, - std::vector> &&objects, - const rpc::Address &owner_address) { +void LocalObjectManager::PinObjectsAndWaitForFree( + const std::vector &object_ids, + std::vector> &&objects, + const rpc::Address &owner_address) { for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; auto &object = objects[i]; @@ -33,15 +34,28 @@ void LocalObjectManager::PinObjects(const std::vector &object_ids, << " was evicted before the raylet could pin it."; continue; } - RAY_LOG(DEBUG) << "Pinning object " << object_id; - pinned_objects_size_ += object->GetSize(); - pinned_objects_.emplace(object_id, std::make_pair(std::move(object), owner_address)); - } -} -void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, - const std::vector &object_ids) { - for (const auto &object_id : object_ids) { + const auto inserted = objects_waiting_for_free_.emplace(object_id, owner_address); + if (inserted.second) { + // This is the first time we're pinning this object. + RAY_LOG(DEBUG) << "Pinning object " << object_id; + pinned_objects_size_ += object->GetSize(); + pinned_objects_.emplace(object_id, std::move(object)); + } else { + if (inserted.first->second.worker_id() != owner_address.worker_id()) { + // TODO(swang): Handle this case. We should use the new owner address + // and object copy. + auto original_worker_id = + WorkerID::FromBinary(inserted.first->second.worker_id()); + auto new_worker_id = WorkerID::FromBinary(owner_address.worker_id()); + RAY_LOG(WARNING) + << "Received PinObjects request from a different owner " << new_worker_id + << " from the original " << original_worker_id << ". Object " << object_id + << " may get freed while the new owner still has the object in scope."; + } + continue; + } + // Create a object eviction subscription message. auto wait_request = std::make_unique(); wait_request->set_object_id(object_id.Binary()); @@ -64,8 +78,8 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, }; // Callback that is invoked when the owner of the object id is dead. - auto owner_dead_callback = [this](const std::string &object_id_binary, - const Status &) { + auto owner_dead_callback = [this, owner_address](const std::string &object_id_binary, + const Status &) { const auto object_id = ObjectID::FromBinary(object_id_binary); ReleaseFreedObject(object_id); }; @@ -82,13 +96,16 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Unpinning object " << object_id; + if (!objects_waiting_for_free_.erase(object_id)) { + return; + } // The object should be in one of these stats. pinned, spilling, or spilled. RAY_CHECK((pinned_objects_.count(object_id) > 0) || (spilled_objects_url_.count(object_id) > 0) || (objects_pending_spill_.count(object_id) > 0)); spilled_object_pending_delete_.push(object_id); if (pinned_objects_.count(object_id)) { - pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize(); + pinned_objects_size_ -= pinned_objects_[object_id]->GetSize(); pinned_objects_.erase(object_id); } @@ -150,7 +167,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end() && counts < max_fused_object_count_) { if (is_plasma_object_spillable_(it->first)) { - bytes_to_spill += it->second.first->GetSize(); + bytes_to_spill += it->second->GetSize(); objects_to_spill.push_back(it->first); } it++; @@ -220,7 +237,7 @@ void LocalObjectManager::SpillObjectsInternal( objects_to_spill.push_back(id); // Move a pinned object to the pending spill object. - auto object_size = it->second.first->GetSize(); + auto object_size = it->second->GetSize(); num_bytes_pending_spill_ += object_size; objects_pending_spill_[id] = std::move(it->second); @@ -238,16 +255,23 @@ void LocalObjectManager::SpillObjectsInternal( io_worker_pool_.PopSpillWorker( [this, objects_to_spill, callback](std::shared_ptr io_worker) { rpc::SpillObjectsRequest request; + std::vector requested_objects_to_spill; for (const auto &object_id : objects_to_spill) { - RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; - auto ref = request.add_object_refs_to_spill(); - ref->set_object_id(object_id.Binary()); - auto it = objects_pending_spill_.find(object_id); - RAY_CHECK(it != objects_pending_spill_.end()); - ref->mutable_owner_address()->CopyFrom(it->second.second); + RAY_CHECK(objects_pending_spill_.count(object_id)); + auto owner_it = objects_waiting_for_free_.find(object_id); + // If the object hasn't already been freed, spill it. + if (owner_it == objects_waiting_for_free_.end()) { + objects_pending_spill_.erase(object_id); + } else { + auto ref = request.add_object_refs_to_spill(); + ref->set_object_id(object_id.Binary()); + ref->mutable_owner_address()->CopyFrom(owner_it->second); + RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; + requested_objects_to_spill.push_back(object_id); + } } io_worker->rpc_client()->SpillObjects( - request, [this, objects_to_spill, callback, io_worker]( + request, [this, requested_objects_to_spill, callback, io_worker]( const ray::Status &status, const rpc::SpillObjectsReply &r) { { absl::MutexLock lock(&mutex_); @@ -258,13 +282,14 @@ void LocalObjectManager::SpillObjectsInternal( // Object spilling is always done in the order of the request. // For example, if an object succeeded, it'll guarentee that all objects // before this will succeed. - RAY_CHECK(num_objects_spilled <= objects_to_spill.size()); - for (size_t i = num_objects_spilled; i != objects_to_spill.size(); ++i) { - const auto &object_id = objects_to_spill[i]; + RAY_CHECK(num_objects_spilled <= requested_objects_to_spill.size()); + for (size_t i = num_objects_spilled; i != requested_objects_to_spill.size(); + ++i) { + const auto &object_id = requested_objects_to_spill[i]; auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); - pinned_objects_size_ += it->second.first->GetSize(); - num_bytes_pending_spill_ -= it->second.first->GetSize(); + pinned_objects_size_ += it->second->GetSize(); + num_bytes_pending_spill_ -= it->second->GetSize(); pinned_objects_.emplace(object_id, std::move(it->second)); objects_pending_spill_.erase(it); } @@ -273,7 +298,7 @@ void LocalObjectManager::SpillObjectsInternal( RAY_LOG(ERROR) << "Failed to send object spilling request: " << status.ToString(); } else { - OnObjectSpilled(objects_to_spill, r); + OnObjectSpilled(requested_objects_to_spill, r); } if (callback) { callback(status); @@ -313,8 +338,7 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id; auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); - const auto object_size = it->second.first->GetSize(); - const auto worker_addr = it->second.second; + const auto object_size = it->second->GetSize(); num_bytes_pending_spill_ -= object_size; objects_pending_spill_.erase(it); @@ -325,6 +349,14 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids request.set_spilled_node_id(node_id_object_spilled.Binary()); request.set_size(object_size); + auto owner_it = objects_waiting_for_free_.find(object_id); + if (owner_it == objects_waiting_for_free_.end()) { + RAY_LOG(DEBUG) << "Spilled object already freed, skipping send of spilled URL to " + "object directory for object " + << object_id; + continue; + } + const auto &worker_addr = owner_it->second; auto owner_client = owner_client_pool_.GetOrConnect(worker_addr); RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id << " to owner " << WorkerID::FromBinary(worker_addr.worker_id()); diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 80df532fa..3c36dc91b 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -64,22 +64,17 @@ class LocalObjectManager { /// Pin objects. /// + /// Also wait for the objects' owner to free the object. The objects will be + /// released when the owner at the given address fails or replies that the + /// object can be evicted. + /// /// \param object_ids The objects to be pinned. /// \param objects Pointers to the objects to be pinned. The pointer should /// be kept in scope until the object can be released. /// \param owner_address The owner of the objects to be pinned. - void PinObjects(const std::vector &object_ids, - std::vector> &&objects, - const rpc::Address &owner_address); - - /// Wait for the objects' owner to free the object. The objects will be - /// released when the owner at the given address fails or replies that the - /// object can be evicted. - /// - /// \param owner_address The address of the owner of the objects. - /// \param object_ids The objects to be freed. - void WaitForObjectFree(const rpc::Address &owner_address, - const std::vector &object_ids); + void PinObjectsAndWaitForFree(const std::vector &object_ids, + std::vector> &&objects, + const rpc::Address &owner_address); /// Spill objects as much as possible as fast as possible up to the max throughput. /// @@ -201,9 +196,11 @@ class LocalObjectManager { /// A callback to call when an object has been freed. std::function &)> on_objects_freed_; + /// Hashmap from objects that we are waiting to free to their owner address. + absl::flat_hash_map objects_waiting_for_free_; + // Objects that are pinned on this node. - absl::flat_hash_map, rpc::Address>> - pinned_objects_; + absl::flat_hash_map> pinned_objects_; // Total size of objects pinned on this node. size_t pinned_objects_size_ = 0; @@ -211,8 +208,7 @@ class LocalObjectManager { // Objects that were pinned on this node but that are being spilled. // These objects will be released once spilling is complete and the URL is // written to the object directory. - absl::flat_hash_map, rpc::Address>> - objects_pending_spill_; + absl::flat_hash_map> objects_pending_spill_; /// Objects that were spilled on this node but that are being restored. /// The field is used to dedup the same restore request while restoration is in @@ -310,6 +306,8 @@ class LocalObjectManager { /// The last time a restore log finished. int64_t last_restore_log_ns_ = 0; + + friend class LocalObjectManagerTest; }; }; // namespace raylet diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 36334cf5b..81f70af74 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2170,9 +2170,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); return; } - local_object_manager_.PinObjects(object_ids, std::move(results), owner_address); // Wait for the object to be freed by the owner, which keeps the ref count. - local_object_manager_.WaitForObjectFree(owner_address, object_ids); + local_object_manager_.PinObjectsAndWaitForFree(object_ids, std::move(results), + owner_address); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index a91036845..39bd67579 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -43,24 +43,35 @@ class MockSubscriber : public pubsub::SubscriberInterface { pubsub::SubscribeDoneCallback subscribe_done_callback, pubsub::SubscriptionItemCallback subscription_callback, pubsub::SubscriptionFailureCallback subscription_failure_callback) override { - callbacks.push_back( + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + callbacks[worker_id].push_back( std::make_pair(ObjectID::FromBinary(key_id_binary), subscription_callback)); return true; } - bool PublishObjectEviction() { + bool PublishObjectEviction(WorkerID worker_id = WorkerID::Nil()) { if (callbacks.empty()) { return false; } - auto object_id = callbacks.front().first; - auto callback = callbacks.front().second; + auto cbs = callbacks.begin(); + if (!worker_id.IsNil()) { + cbs = callbacks.find(worker_id); + } + if (cbs == callbacks.end() || cbs->second.empty()) { + return false; + } + auto object_id = cbs->second.front().first; + auto callback = cbs->second.front().second; auto msg = rpc::PubMessage(); msg.set_key_id(object_id.Binary()); msg.set_channel_type(channel_type_); auto *object_eviction_msg = msg.mutable_worker_object_eviction_message(); object_eviction_msg->set_object_id(object_id.Binary()); callback(msg); - callbacks.pop_front(); + cbs->second.pop_front(); + if (cbs->second.empty()) { + callbacks.erase(cbs); + } return true; } @@ -86,7 +97,9 @@ class MockSubscriber : public pubsub::SubscriberInterface { MOCK_CONST_METHOD0(DebugString, std::string()); rpc::ChannelType channel_type_ = rpc::ChannelType::WORKER_OBJECT_EVICTION; - std::deque> callbacks; + std::unordered_map>> + callbacks; }; class MockWorkerClient : public rpc::CoreWorkerClientInterface { @@ -289,6 +302,16 @@ class LocalObjectManagerTest : public ::testing::Test { RayConfig::instance().initialize(R"({"object_spilling_config": "dummy"})"); } + void AssertNoLeaks() { + // TODO(swang): Assert this for all tests. + ASSERT_TRUE(manager.pinned_objects_size_ == 0); + ASSERT_TRUE(manager.pinned_objects_.empty()); + ASSERT_TRUE(manager.spilled_objects_url_.empty()); + ASSERT_TRUE(manager.objects_pending_spill_.empty()); + ASSERT_TRUE(manager.url_ref_count_.empty()); + ASSERT_TRUE(manager.objects_waiting_for_free_.empty()); + } + void TearDown() { unevictable_objects_.clear(); } std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) { @@ -331,8 +354,7 @@ TEST_F(LocalObjectManagerTest, TestPin) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); for (size_t i = 0; i < free_objects_batch_size; i++) { ASSERT_TRUE(freed.empty()); @@ -358,7 +380,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); @@ -414,7 +436,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects(object_ids, [&](const Status &status) mutable { @@ -459,8 +481,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects(object_ids, [&](const Status &status) mutable { @@ -513,7 +534,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -580,7 +601,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_TRUE(manager.SpillObjectsOfSize(total_size)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -626,7 +647,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { std::vector()); objects.push_back(std::move(object)); - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_FALSE(manager.SpillObjectsOfSize(1000)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -655,7 +676,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // This will spill until 2 workers are occupied. manager.SpillObjectUptoMaxThroughput(); @@ -722,7 +743,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { std::vector> objects; objects.push_back(std::move(object)); - manager.PinObjects({object_id}, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree({object_id}, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects({object_id}, [&](const Status &status) mutable { @@ -767,7 +788,7 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); @@ -803,8 +824,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); for (size_t i = 0; i < free_objects_batch_size; i++) { ASSERT_TRUE(freed.empty()); @@ -832,8 +852,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // 2 Objects are spilled out of 3. std::vector object_ids_to_spill; @@ -881,8 +900,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Every object is spilled. std::vector object_ids_to_spill; @@ -942,8 +960,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Objects are spilled. std::vector spill_set_1; @@ -991,7 +1008,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { // Now spilling is completely done. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_2)); for (size_t i = 0; i < spill_set_2_size; i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + // These fail because the object is already freed, so the raylet does not + // send the RPC. + ASSERT_FALSE(owner_client->ReplyAddSpilledUrl()); } // Every object is now deleted. @@ -1016,8 +1035,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); std::vector object_ids_to_spill; int spilled_urls_size = free_objects_batch_size; @@ -1068,8 +1086,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Every object is spilled. std::vector object_ids_to_spill; @@ -1110,6 +1127,70 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { ASSERT_EQ(deleted_urls_size, 1); } +TEST_F(LocalObjectManagerTest, TestDuplicatePin) { + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + } + + std::vector> objects; + for (size_t i = 0; i < free_objects_batch_size; i++) { + std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + auto object = std::make_unique(nullptr, meta_buffer, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + + // Receive a duplicate pin with the same owner. Same objects should not get + // pinned again. + objects.clear(); + for (size_t i = 0; i < free_objects_batch_size; i++) { + std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + auto object = std::make_unique(nullptr, meta_buffer, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + + // Receive a duplicate pin with a different owner. + objects.clear(); + for (size_t i = 0; i < free_objects_batch_size; i++) { + std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + auto object = std::make_unique(nullptr, meta_buffer, + std::vector()); + objects.push_back(std::move(object)); + } + rpc::Address owner_address2; + owner_address2.set_worker_id(WorkerID::FromRandom().Binary()); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address2); + // No subscribe to the second owner. + auto owner_id2 = WorkerID::FromBinary(owner_address2.worker_id()); + ASSERT_FALSE(subscriber_->PublishObjectEviction(owner_id2)); + + // Free on messages from the original owner. + auto owner_id1 = WorkerID::FromBinary(owner_address.worker_id()); + for (size_t i = 0; i < free_objects_batch_size; i++) { + ASSERT_TRUE(freed.empty()); + EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); + ASSERT_TRUE(subscriber_->PublishObjectEviction(owner_id1)); + } + std::unordered_set expected(object_ids.begin(), object_ids.end()); + ASSERT_EQ(freed, expected); + + AssertNoLeaks(); +} + } // namespace raylet } // namespace ray