diff --git a/python/ray/tests/test_basic_2.py b/python/ray/tests/test_basic_2.py index d10abb31d..fc6befc7b 100644 --- a/python/ray/tests/test_basic_2.py +++ b/python/ray/tests/test_basic_2.py @@ -348,7 +348,8 @@ def test_system_config_when_connecting(ray_start_cluster): obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) for _ in range(5): - ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + put_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + del put_ref # This would not raise an exception if object pinning was enabled. with pytest.raises(ray.exceptions.ObjectLostError): diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 3d46c133d..c474fced4 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -7,6 +7,7 @@ import warnings import ray from ray.cluster_utils import Cluster +from ray.exceptions import GetTimeoutError if (multiprocessing.cpu_count() < 40 or ray.utils.get_system_memory() < 50 * 10**9): @@ -33,6 +34,29 @@ def ray_start_cluster_with_resource(): cluster.shutdown() +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 0, + "object_store_memory": 75 * 1024 * 1024, + }], + indirect=True) +def test_object_transfer_during_oom(ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node(object_store_memory=75 * 1024 * 1024) + + @ray.remote + def put(): + return np.random.rand(5 * 1024 * 1024) # 40 MB data + + local_ref = ray.put(np.random.rand(5 * 1024 * 1024)) + remote_ref = put.remote() + + with pytest.raises(GetTimeoutError): + ray.get(remote_ref, timeout=1) + del local_ref + ray.get(remote_ref) + + # This test is here to make sure that when we broadcast an object to a bunch of # machines, we don't have too many excess object transfers. @pytest.mark.skip(reason="TODO(ekl)") diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 7a0a76c9d..2baa5e31b 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -12,7 +12,7 @@ import psutil import ray from ray.external_storage import (create_url_with_offset, parse_url_with_offset) -from ray.test_utils import wait_for_condition +from ray.test_utils import new_scheduler_enabled, wait_for_condition bucket_name = "object-spilling-test" spill_local_path = "/tmp/spill" @@ -338,16 +338,19 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -@pytest.mark.skip( - "Temporarily disabled until OutOfMemory retries can be moved " - "into the plasma store") +@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs") def test_spill_during_get(object_spilling_config, shutdown_only): ray.init( num_cpus=4, object_store_memory=100 * 1024 * 1024, _system_config={ "automatic_object_spilling_enabled": True, - "max_io_workers": 2, + "object_store_full_initial_delay_ms": 100, + # NOTE(swang): Use infinite retries because the OOM timer can still + # get accidentally triggered when objects are released too slowly + # (see github.com/ray-project/ray/issues/12040). + "object_store_full_max_retries": -1, + "max_io_workers": 1, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, }, diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index a0dc10f6b..b9f3b0906 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -10,8 +10,8 @@ import pytest import ray import ray.cluster_utils -from ray.test_utils import SignalActor, put_object, wait_for_condition, \ - new_scheduler_enabled +from ray.test_utils import (SignalActor, put_object, wait_for_condition, + new_scheduler_enabled) logger = logging.getLogger(__name__) @@ -167,7 +167,7 @@ def test_dependency_refcounts(ray_start_regular): check_refcounts({}) -@pytest.mark.skipif(new_scheduler_enabled(), reason="dynres notimpl") +@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs") def test_actor_creation_task(ray_start_regular): @ray.remote def large_object(): @@ -269,7 +269,16 @@ def test_feature_flag(shutdown_only): # The ray.get below fails with only LRU eviction, as the object # that was ray.put by the actor should have been evicted. - _fill_object_store_and_get(actor.get_large_object.remote(), succeed=False) + ref = actor.get_large_object.remote() + ray.get(ref) + + # Keep refs in scope so that they don't get GCed immediately. + for _ in range(5): + put_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + del put_ref + + wait_for_condition( + lambda: not ray.worker.global_worker.core_worker.object_exists(ref)) def test_out_of_band_serialized_object_ref(one_worker_100MiB): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 96ffaf944..fcff4356b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -411,11 +411,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, reference_counter_, options_.check_signals, - /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), /*warmup=*/ (options_.worker_type != ray::WorkerType::SPILL_WORKER && options_.worker_type != ray::WorkerType::RESTORE_WORKER), - /*on_store_full=*/boost::bind(&CoreWorker::TriggerGlobalGC, this), /*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this))); memory_store_.reset(new CoreWorkerMemoryStore( [this](const RayObject &object, const ObjectID &object_id) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index b3772abe8..2faa8be51 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -25,14 +25,11 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( const std::string &store_socket, const std::shared_ptr raylet_client, const std::shared_ptr reference_counter, - std::function check_signals, bool evict_if_full, bool warmup, - std::function on_store_full, + std::function check_signals, bool warmup, std::function get_current_call_site) : raylet_client_(raylet_client), reference_counter_(reference_counter), - check_signals_(check_signals), - evict_if_full_(evict_if_full), - on_store_full_(on_store_full) { + check_signals_(check_signals) { if (get_current_call_site != nullptr) { get_current_call_site_ = get_current_call_site; } else { @@ -86,59 +83,52 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta const ObjectID &object_id, const rpc::Address &owner_address, std::shared_ptr *data) { - int32_t retries = 0; - int32_t max_retries = RayConfig::instance().object_store_full_max_retries(); - uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms(); Status status; - bool should_retry = true; - // If we cannot retry, then always evict on the first attempt. - bool evict_if_full = max_retries == 0 ? true : evict_if_full_; - while (should_retry) { - should_retry = false; - Status plasma_status; - std::shared_ptr arrow_buffer; + std::shared_ptr arrow_buffer; + uint64_t retry_with_request_id = 0; + { + std::lock_guard guard(store_client_mutex_); + status = store_client_.Create( + object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, &retry_with_request_id, &arrow_buffer, + /*device_num=*/0); + } + + while (retry_with_request_id > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); { std::lock_guard guard(store_client_mutex_); - plasma_status = store_client_.Create(object_id, owner_address, data_size, - metadata ? metadata->Data() : nullptr, - metadata ? metadata->Size() : 0, &arrow_buffer, - /*device_num=*/0, evict_if_full); - // Always try to evict after the first attempt. - evict_if_full = true; - } - if (plasma_status.IsObjectStoreFull()) { - std::ostringstream message; - message << "Failed to put object " << object_id << " in object store because it " - << "is full. Object size is " << data_size << " bytes."; - status = Status::ObjectStoreFull(message.str()); - if (max_retries < 0 || retries < max_retries) { - RAY_LOG(ERROR) << message.str() << "\nWaiting " << delay - << "ms for space to free up..."; - if (on_store_full_) { - on_store_full_(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(delay)); - delay *= 2; - retries += 1; - should_retry = true; - } else { - RAY_LOG(ERROR) << "Failed to put object " << object_id << " after " - << (max_retries + 1) << " attempts. Plasma store status:\n" - << MemoryUsageString() << "\n---\n" - << "--- Tip: Use the `ray memory` command to list active objects " - "in the cluster." - << "\n---\n"; - } - } else if (plasma_status.IsObjectExists()) { - RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " - << object_id << "."; - status = Status::OK(); - } else { - RAY_RETURN_NOT_OK(plasma_status); - *data = std::make_shared(PlasmaBuffer(arrow_buffer)); - status = Status::OK(); + RAY_LOG(DEBUG) << "Retrying request for object " << object_id << " with request ID " + << retry_with_request_id; + status = store_client_.RetryCreate(object_id, retry_with_request_id, + metadata ? metadata->Data() : nullptr, + &retry_with_request_id, &arrow_buffer); } } + + if (status.IsObjectStoreFull()) { + RAY_LOG(ERROR) << "Failed to put object " << object_id + << " in object store because it " + << "is full. Object size is " << data_size << " bytes.\n" + << "Plasma store status:\n" + << MemoryUsageString() << "\n---\n" + << "--- Tip: Use the `ray memory` command to list active objects " + "in the cluster." + << "\n---\n"; + + // Replace the status with a more helpful error message. + std::ostringstream message; + message << "Failed to put object " << object_id << " in object store because it " + << "is full. Object size is " << data_size << " bytes."; + status = Status::ObjectStoreFull(message.str()); + } else if (status.IsObjectExists()) { + RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " + << object_id << "."; + status = Status::OK(); + } else { + RAY_RETURN_NOT_OK(status); + *data = std::make_shared(PlasmaBuffer(arrow_buffer)); + } return status; } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 4c485687e..ef9f4f850 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -37,8 +37,7 @@ class CoreWorkerPlasmaStoreProvider { const std::string &store_socket, const std::shared_ptr raylet_client, const std::shared_ptr reference_counter, - std::function check_signals, bool evict_if_full, bool warmup, - std::function on_store_full = nullptr, + std::function check_signals, bool warmup, std::function get_current_call_site = nullptr); ~CoreWorkerPlasmaStoreProvider(); @@ -154,8 +153,6 @@ class CoreWorkerPlasmaStoreProvider { const std::shared_ptr reference_counter_; std::mutex store_client_mutex_; std::function check_signals_; - const bool evict_if_full_; - std::function on_store_full_; std::function get_current_call_site_; // Active buffers tracker. This must be allocated as a separate structure since its diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index a81126289..cdeaa7b84 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -104,8 +104,8 @@ std::pair ObjectBufferPool::Cr int64_t object_size = data_size - metadata_size; // Try to create shared buffer. std::shared_ptr data; - Status s = store_client_.Create(object_id, owner_address, object_size, NULL, - metadata_size, &data); + Status s = store_client_.TryCreateImmediately(object_id, owner_address, object_size, + NULL, metadata_size, &data); std::vector buffer; if (!s.ok()) { // Create failed. The object may already exist locally. If something else went diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 6f054603f..90380e1b0 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -27,7 +27,8 @@ namespace object_manager_protocol = ray::object_manager::protocol; namespace ray { ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config, - SpillObjectsCallback spill_objects_callback) { + SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback) { if (config.object_store_memory > 0) { plasma::plasma_store_runner.reset(new plasma::PlasmaStoreRunner( config.store_socket_name, config.object_store_memory, config.huge_pages, @@ -35,7 +36,7 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config, // Initialize object store. store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get(), - spill_objects_callback); + spill_objects_callback, object_store_full_callback); // Sleep for sometime until the store is working. This can suppress some // connection warnings. std::this_thread::sleep_for(std::chrono::microseconds(500)); @@ -54,11 +55,12 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_ const ObjectManagerConfig &config, std::shared_ptr object_directory, RestoreSpilledObjectCallback restore_spilled_object, - SpillObjectsCallback spill_objects_callback) + SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback) : self_node_id_(self_node_id), config_(config), object_directory_(std::move(object_directory)), - object_store_internal_(config, spill_objects_callback), + object_store_internal_(config, spill_objects_callback, object_store_full_callback), buffer_pool_(config_.store_socket_name, config_.object_chunk_size), rpc_work_(rpc_service_), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()), diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 7f8167fb8..d7d630c01 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -82,7 +82,8 @@ struct LocalObjectInfo { class ObjectStoreRunner { public: ObjectStoreRunner(const ObjectManagerConfig &config, - SpillObjectsCallback spill_objects_callback); + SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback); ~ObjectStoreRunner(); private: @@ -194,7 +195,8 @@ class ObjectManager : public ObjectManagerInterface, const NodeID &self_node_id, const ObjectManagerConfig &config, std::shared_ptr object_directory, RestoreSpilledObjectCallback restore_spilled_object, - SpillObjectsCallback spill_objects_callback = nullptr); + SpillObjectsCallback spill_objects_callback = nullptr, + std::function object_store_full_callback = nullptr); ~ObjectManager(); diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 03b47c13b..e6a71ece1 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -155,8 +155,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data, int device_num = 0, - bool evict_if_full = true); + uint64_t *retry_with_request_id, std::shared_ptr *data, + int device_num = 0); + + Status RetryCreate(const ObjectID &object_id, uint64_t request_id, + const uint8_t *metadata, uint64_t *retry_with_request_id, + std::shared_ptr *data); + + Status TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, int64_t data_size, + const uint8_t *metadata, int64_t metadata_size, + std::shared_ptr *data, int device_num); Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers); @@ -187,6 +196,11 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data); + /// Check if store_fd has already been received from the store. If yes, /// return it. Otherwise, receive it from the store (see analogous logic /// in store.cc). @@ -307,42 +321,46 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID &object_id, object_entry->count += 1; } -Status PlasmaClient::Impl::Create(const ObjectID &object_id, - const ray::rpc::Address &owner_address, - int64_t data_size, const uint8_t *metadata, - int64_t metadata_size, std::shared_ptr *data, - int device_num, bool evict_if_full) { - std::lock_guard guard(client_mutex_); - - RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " - << data_size << " and metadata size " << metadata_size; - RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, - evict_if_full, data_size, metadata_size, - device_num)); +Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, + const uint8_t *metadata, + uint64_t *retry_with_request_id, + std::shared_ptr *data) { std::vector buffer; RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer)); ObjectID id; PlasmaObject object; MEMFD_TYPE store_fd; int64_t mmap_size; - RAY_RETURN_NOT_OK( - ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size)); + + if (retry_with_request_id) { + RAY_RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, + retry_with_request_id, &object, &store_fd, + &mmap_size)); + if (*retry_with_request_id > 0) { + // The client should retry the request. + return Status::OK(); + } + } else { + uint64_t unused = 0; + RAY_RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &unused, &object, + &store_fd, &mmap_size)); + RAY_CHECK(unused == 0); + } + // If the CreateReply included an error, then the store will not send a file // descriptor. - if (device_num == 0) { - RAY_CHECK(object.data_size == data_size); - RAY_CHECK(object.metadata_size == metadata_size); + if (object.device_num == 0) { // The metadata should come right after the data. - RAY_CHECK(object.metadata_offset == object.data_offset + data_size); + RAY_CHECK(object.metadata_offset == object.data_offset + object.data_size); *data = std::make_shared( shared_from_this(), GetStoreFdAndMmap(store_fd, mmap_size) + object.data_offset, - data_size); + object.data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. if (metadata != NULL) { // Copy the metadata to the buffer. - memcpy((*data)->mutable_data() + object.data_size, metadata, metadata_size); + memcpy((*data)->mutable_data() + object.data_size, metadata, object.metadata_size); } } else { #ifdef PLASMA_CUDA @@ -378,6 +396,44 @@ Status PlasmaClient::Impl::Create(const ObjectID &object_id, return Status::OK(); } +Status PlasmaClient::Impl::Create(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, const uint8_t *metadata, + int64_t metadata_size, uint64_t *retry_with_request_id, + std::shared_ptr *data, int device_num) { + std::lock_guard guard(client_mutex_); + + RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " + << data_size << " and metadata size " << metadata_size; + RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, data_size, + metadata_size, device_num, + /*try_immediately=*/false)); + return HandleCreateReply(object_id, metadata, retry_with_request_id, data); +} + +Status PlasmaClient::Impl::RetryCreate(const ObjectID &object_id, uint64_t request_id, + const uint8_t *metadata, + uint64_t *retry_with_request_id, + std::shared_ptr *data) { + std::lock_guard guard(client_mutex_); + RAY_RETURN_NOT_OK(SendCreateRetryRequest(store_conn_, object_id, request_id)); + return HandleCreateReply(object_id, metadata, retry_with_request_id, data); +} + +Status PlasmaClient::Impl::TryCreateImmediately( + const ObjectID &object_id, const ray::rpc::Address &owner_address, int64_t data_size, + const uint8_t *metadata, int64_t metadata_size, std::shared_ptr *data, + int device_num) { + std::lock_guard guard(client_mutex_); + + RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " + << data_size << " and metadata size " << metadata_size; + RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, data_size, + metadata_size, device_num, + /*try_immediately=*/true)); + return HandleCreateReply(object_id, metadata, nullptr, data); +} + Status PlasmaClient::Impl::GetBuffers( const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, const std::function( @@ -798,10 +854,25 @@ Status PlasmaClient::SetClientOptions(const std::string &client_name, Status PlasmaClient::Create(const ObjectID &object_id, const ray::rpc::Address &owner_address, int64_t data_size, const uint8_t *metadata, int64_t metadata_size, - std::shared_ptr *data, int device_num, - bool evict_if_full) { - return impl_->Create(object_id, owner_address, data_size, metadata, metadata_size, data, - device_num, evict_if_full); + uint64_t *retry_with_request_id, + std::shared_ptr *data, int device_num) { + return impl_->Create(object_id, owner_address, data_size, metadata, metadata_size, + retry_with_request_id, data, device_num); +} + +Status PlasmaClient::RetryCreate(const ObjectID &object_id, uint64_t request_id, + const uint8_t *metadata, uint64_t *retry_with_request_id, + std::shared_ptr *data) { + return impl_->RetryCreate(object_id, request_id, metadata, retry_with_request_id, data); +} + +Status PlasmaClient::TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, + int64_t data_size, const uint8_t *metadata, + int64_t metadata_size, + std::shared_ptr *data, int device_num) { + return impl_->TryCreateImmediately(object_id, owner_address, data_size, metadata, + metadata_size, data, device_num); } Status PlasmaClient::Get(const std::vector &object_ids, int64_t timeout_ms, diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 7e5f5561b..4c1d2745a 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -77,6 +77,54 @@ class RAY_EXPORT PlasmaClient { /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. /// + /// If this request cannot be fulfilled immediately, the client will be + /// returned a request ID, which it should use to retry the request. + /// + /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. + /// \param data_size The size in bytes of the space to be allocated for this + /// object's + /// data (this does not include space used for metadata). + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer should be NULL. + /// \param metadata_size The size in bytes of the metadata. If there is no + /// metadata, this should be 0. + /// \param retry_with_request_id If the request is not yet fulfilled, this + /// will be set to a unique ID with which the client should retry. + /// \param data The address of the newly created object will be written here. + /// \param device_num The number of the device where the object is being + /// created. + /// device_num = 0 corresponds to the host, + /// device_num = 1 corresponds to GPU0, + /// device_num = 2 corresponds to GPU1, etc. + /// \return The return status. + /// + /// The returned object must be released once it is done with. It must also + /// be either sealed or aborted. + Status Create(const ObjectID &object_id, const ray::rpc::Address &owner_address, + int64_t data_size, const uint8_t *metadata, int64_t metadata_size, + uint64_t *retry_with_request_id, std::shared_ptr *data, + int device_num = 0); + + /// Retry a previous Create call using the returned request ID. + /// + /// \param object_id The ID to use for the newly created object. + /// \param request_id The request ID returned by the previous Create call. + /// \param metadata The object's metadata. If there is no metadata, this + /// pointer should be NULL. + /// \param retry_with_request_id If the request is not yet fulfilled, this + /// will be set to a unique ID with which the client should retry. + /// \param data The address of the newly created object will be written here. + Status RetryCreate(const ObjectID &object_id, uint64_t request_id, + const uint8_t *metadata, uint64_t *retry_with_request_id, + std::shared_ptr *data); + + /// Create an object in the Plasma Store. Any metadata for this object must be + /// be passed in when the object is created. + /// + /// The plasma store will attempt to fulfill this request immediately. If it + /// cannot be fulfilled immediately, an error will be returned to the client. + /// /// \param object_id The ID to use for the newly created object. /// \param owner_address The address of the object's owner. /// \param data_size The size in bytes of the space to be allocated for this @@ -93,16 +141,14 @@ class RAY_EXPORT PlasmaClient { /// device_num = 0 corresponds to the host, /// device_num = 1 corresponds to GPU0, /// device_num = 2 corresponds to GPU1, etc. - /// \param evict_if_full Whether to evict other objects to make space for - /// this object. /// \return The return status. /// /// The returned object must be released once it is done with. It must also /// be either sealed or aborted. - Status Create(const ObjectID &object_id, const ray::rpc::Address &owner_address, - int64_t data_size, const uint8_t *metadata, int64_t metadata_size, - std::shared_ptr *data, int device_num = 0, - bool evict_if_full = true); + Status TryCreateImmediately(const ObjectID &object_id, + const ray::rpc::Address &owner_address, int64_t data_size, + const uint8_t *metadata, int64_t metadata_size, + std::shared_ptr *data, int device_num = 0); /// Get some objects from the Plasma Store. This function will block until the /// objects have all been created and sealed in the Plasma Store or the diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index 0daec6c3d..500fb1659 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -24,31 +24,149 @@ namespace plasma { -void CreateRequestQueue::AddRequest(const std::shared_ptr &client, - const CreateObjectCallback &request_callback) { - queue_.push_back({client, request_callback}); +uint64_t CreateRequestQueue::AddRequest(const ObjectID &object_id, + const std::shared_ptr &client, + const CreateObjectCallback &create_callback) { + auto req_id = next_req_id_++; + fulfilled_requests_[req_id] = nullptr; + queue_.emplace_back(new CreateRequest(object_id, req_id, client, create_callback)); + return req_id; +} + +bool CreateRequestQueue::GetRequestResult(uint64_t req_id, PlasmaObject *result, + PlasmaError *error) { + auto it = fulfilled_requests_.find(req_id); + if (it == fulfilled_requests_.end()) { + RAY_LOG(ERROR) + << "Object store client requested the result of a previous request to create an " + "object, but the result has already been returned to the client. This client " + "may hang because the creation request cannot be fulfilled."; + *error = PlasmaError::UnexpectedError; + return true; + } + + if (!it->second) { + return false; + } + + *result = it->second->result; + *error = it->second->error; + fulfilled_requests_.erase(it); + return true; +} + +std::pair CreateRequestQueue::TryRequestImmediately( + const ObjectID &object_id, const std::shared_ptr &client, + const CreateObjectCallback &create_callback) { + PlasmaObject result = {}; + + if (!queue_.empty()) { + // There are other requests queued. Return an out-of-memory error + // immediately because this request cannot be served. + return {result, PlasmaError::OutOfMemory}; + } + + auto req_id = AddRequest(object_id, client, create_callback); + if (!ProcessRequests().ok()) { + // If the request was not immediately fulfillable, finish it. + RAY_CHECK(!queue_.empty()); + FinishRequest(queue_.begin()); + } + PlasmaError error; + RAY_CHECK(GetRequestResult(req_id, &result, &error)); + return {result, error}; +} + +Status CreateRequestQueue::ProcessRequest(std::unique_ptr &request) { + // Return an OOM error to the client if we have hit the maximum number of + // retries. + bool evict_if_full = evict_if_full_; + if (max_retries_ == 0) { + // If we cannot retry, then always evict on the first attempt. + evict_if_full = true; + } else if (num_retries_ > 0) { + // Always try to evict after the first attempt. + evict_if_full = true; + } + + request->error = request->create_callback(evict_if_full, &request->result); + Status status; + auto should_retry_on_oom = max_retries_ == -1 || num_retries_ < max_retries_; + if (request->error == PlasmaError::TransientOutOfMemory) { + // The object store is full, but we should wait for space to be made + // through spilling, so do nothing. The caller must guarantee that + // ProcessRequests is called again so that we can try this request again. + // NOTE(swang): There could be other requests behind this one that are + // actually serviceable. This may be inefficient, but eventually this + // request will get served and unblock the following requests, once + // enough objects have been spilled. + // TODO(swang): Ask the raylet to spill enough space for multiple requests + // at once, instead of just the head of the queue. + num_retries_ = 0; + status = + Status::TransientObjectStoreFull("Object store full, queueing creation request"); + } else if (request->error == PlasmaError::OutOfMemory && should_retry_on_oom) { + num_retries_++; + RAY_LOG(DEBUG) << "Not enough memory to create the object, after " << num_retries_ + << " tries"; + + if (trigger_global_gc_) { + trigger_global_gc_(); + } + + status = Status::ObjectStoreFull("Object store full, should retry on timeout"); + } else if (request->error == PlasmaError::OutOfMemory) { + RAY_LOG(ERROR) << "Not enough memory to create object " << request->object_id + << " after " << num_retries_ + << " tries, will return OutOfMemory to the client"; + } + + return status; } Status CreateRequestQueue::ProcessRequests() { - for (auto request_it = queue_.begin(); request_it != queue_.end();) { - auto status = request_it->second(); - if (status.IsTransientObjectStoreFull()) { + while (!queue_.empty()) { + auto request_it = queue_.begin(); + auto status = ProcessRequest(*request_it); + if (status.IsTransientObjectStoreFull() || status.IsObjectStoreFull()) { return status; } - request_it = queue_.erase(request_it); + FinishRequest(request_it); } return Status::OK(); } +void CreateRequestQueue::FinishRequest( + std::list>::iterator request_it) { + // Fulfill the request. + auto &request = *request_it; + auto it = fulfilled_requests_.find(request->request_id); + RAY_CHECK(it != fulfilled_requests_.end()); + RAY_CHECK(it->second == nullptr); + it->second = std::move(request); + queue_.erase(request_it); + + // Reset the number of retries since we are no longer trying this request. + num_retries_ = 0; +} + void CreateRequestQueue::RemoveDisconnectedClientRequests( const std::shared_ptr &client) { for (auto it = queue_.begin(); it != queue_.end();) { - if (it->first == client) { + if ((*it)->client == client) { + fulfilled_requests_.erase((*it)->request_id); it = queue_.erase(it); } else { it++; } } + + for (auto it = fulfilled_requests_.begin(); it != fulfilled_requests_.end();) { + if (it->second && it->second->client == client) { + fulfilled_requests_.erase(it); + } + it++; + } } } // namespace plasma diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h index 425faf70a..f7d21fb97 100644 --- a/src/ray/object_manager/plasma/create_request_queue.h +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -18,28 +18,75 @@ #include #include +#include "absl/container/flat_hash_map.h" + #include "ray/common/status.h" #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/connection.h" +#include "ray/object_manager/plasma/plasma.h" +#include "ray/object_manager/plasma/protocol.h" namespace plasma { -using ray::Status; - -using CreateObjectCallback = std::function; - class CreateRequestQueue { public: - CreateRequestQueue() {} + using CreateObjectCallback = + std::function; - /// Add a request to the queue. + CreateRequestQueue(int32_t max_retries, bool evict_if_full, + std::function trigger_global_gc) + : max_retries_(max_retries), + evict_if_full_(evict_if_full), + trigger_global_gc_(trigger_global_gc) { + RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with " << max_retries_ + << " retries on OOM, evict if full? " << (evict_if_full_ ? 1 : 0); + } + + /// Add a request to the queue. The caller should use the returned request ID + /// to later get the result of the request. /// /// The request may not get tried immediately if the head of the queue is not /// serviceable. /// - /// \param client The client that sent the request. - void AddRequest(const std::shared_ptr &client, - const CreateObjectCallback &request_callback); + /// \param object_id The ID of the object to create. + /// \param client The client that sent the request. This is used as a key to + /// drop this request if the client disconnects. + /// \param create_callback A callback to attempt to create the object. + /// \return A request ID that can be used to get the result. + uint64_t AddRequest(const ObjectID &object_id, + const std::shared_ptr &client, + const CreateObjectCallback &create_callback); + + /// Get the result of a request. + /// + /// This method should only be called with a request ID returned by a + /// previous call to add a request. The result will be popped, so this method + /// should not be called again with the same request ID once a result is + /// returned. If either of these is violated, an error will be returned. + /// + /// \param[in] req_id The request ID that was returned to the caller by a + /// previous call to add a request. + /// \param[out] result The resulting object information will be stored here, + /// if the request was finished and successful. + /// \param[out] error The error code returned by the creation handler will be + /// stored here, if the request finished. This will also return an error if + /// there is no information about this request ID. + /// \return Whether the result and error are ready. This returns false if the + /// request is still pending. + bool GetRequestResult(uint64_t req_id, PlasmaObject *result, PlasmaError *error); + + /// Try to fulfill a request immediately, for clients that cannot retry. + /// + /// \param object_id The ID of the object to create. + /// \param client The client that sent the request. This is used as a key to + /// drop this request if the client disconnects. + /// \param create_callback A callback to attempt to create the object. + /// \return The result of the call. This will return an out-of-memory error + /// if there are other requests queued or there is not enough space left in + /// the object store, this will return an out-of-memory error. + std::pair TryRequestImmediately( + const ObjectID &object_id, const std::shared_ptr &client, + const CreateObjectCallback &create_callback); /// Process requests in the queue. /// @@ -57,6 +104,62 @@ class CreateRequestQueue { void RemoveDisconnectedClientRequests(const std::shared_ptr &client); private: + struct CreateRequest { + CreateRequest(const ObjectID &object_id, uint64_t request_id, + const std::shared_ptr &client, + CreateObjectCallback create_callback) + : object_id(object_id), + request_id(request_id), + client(client), + create_callback(create_callback) {} + + // The ObjectID to create. + const ObjectID object_id; + + // A request ID that can be returned to the caller to get the result once + // ready. + const uint64_t request_id; + + // A pointer to the client, used as a key to delete requests that were made + // by a client that is now disconnected. + const std::shared_ptr client; + + // A callback to attempt to create the object. + const CreateObjectCallback create_callback; + + // The results of the creation call. These should be sent back to the + // client once ready. + PlasmaError error = PlasmaError::OK; + PlasmaObject result = {}; + }; + + /// Process a single request. Sets the request's error result to the error + /// returned by the request handler inside. Returns OK if the request can be + /// finished. + Status ProcessRequest(std::unique_ptr &request); + + /// Finish a queued request and remove it from the queue. + void FinishRequest(std::list>::iterator request_it); + + /// The next request ID to assign, so that the caller can get the results of + /// a request by retrying. Start at 1 because 0 means "do not retry". + uint64_t next_req_id_ = 1; + + /// The maximum number of times to retry each request upon OOM. + const int32_t max_retries_; + + /// The number of times the request at the head of the queue has been tried. + int32_t num_retries_ = 0; + + /// On the first attempt to create an object, whether to evict from the + /// object store to make space. If the first attempt fails, then we will + /// always try to evict. + const bool evict_if_full_; + + /// A callback to trigger global GC in the cluster if the object store is + /// full. + const std::function trigger_global_gc_; + /// Queue of object creation requests to respond to. Requests will be placed /// on this queue if the object store does not have enough room at the time /// that the client made the creation request, but space may be made through @@ -68,8 +171,14 @@ class CreateRequestQueue { /// in the object store. Then, the client does not need to poll on an /// OutOfMemory error and we can just respond to them once there is enough /// space made, or after a timeout. - std::list, const CreateObjectCallback>> - queue_; + std::list> queue_; + + /// A buffer of the results of fulfilled requests. The value will be null + /// while the request is pending and will be set once the request has + /// finished. + absl::flat_hash_map> fulfilled_requests_; + + friend class CreateRequestQueueTest; }; } // namespace plasma diff --git a/src/ray/object_manager/plasma/plasma.fbs b/src/ray/object_manager/plasma/plasma.fbs index 456f73cfc..ff8099ea7 100644 --- a/src/ray/object_manager/plasma/plasma.fbs +++ b/src/ray/object_manager/plasma/plasma.fbs @@ -23,6 +23,7 @@ enum MessageType:long { PlasmaDisconnectClient = 0, // Create a new object. PlasmaCreateRequest, + PlasmaCreateRetryRequest, PlasmaCreateReply, PlasmaAbortRequest, PlasmaAbortReply, @@ -90,6 +91,10 @@ enum PlasmaError:int { ObjectNotSealed, // Trying to delete an object but it's in use. ObjectInUse, + // An unexpected error occurred during object creation, such as trying to get + // the result of the same request twice. This is most likely due to a system + // bug in the plasma store or caller. + UnexpectedError, } // Plasma store messages @@ -141,16 +146,24 @@ table PlasmaCreateRequest { owner_port: int; // Unique id for the owner worker. owner_worker_id: string; - // Whether to evict other objects to make room for this one. - evict_if_full: bool; // The size of the object's data in bytes. data_size: ulong; // The size of the object's metadata in bytes. metadata_size: ulong; // Device to create buffer on. device_num: int; + // Try the creation request immediately. If this is not possible (due to + // out-of-memory), the error will be returned immediately to the client. + try_immediately: bool; } +table PlasmaCreateRetryRequest { + // ID of the object to be created. + object_id: string; + // The ID of the request to retry. + request_id: uint64; + } + table CudaHandle { handle: [ubyte]; } @@ -158,6 +171,9 @@ table CudaHandle { table PlasmaCreateReply { // ID of the object that was created. object_id: string; + // The client should retry the request if this is > 0. This + // is the request ID to include in the retry. + retry_with_request_id: uint64; // The object that is returned with this reply. plasma_object: PlasmaObjectSpec; // Error that occurred for this call. diff --git a/src/ray/object_manager/plasma/plasma.h b/src/ray/object_manager/plasma/plasma.h index 9957df224..bc5605c4d 100644 --- a/src/ray/object_manager/plasma/plasma.h +++ b/src/ray/object_manager/plasma/plasma.h @@ -55,6 +55,8 @@ struct PlasmaObject { int64_t metadata_size; /// Device number object is on. int device_num; + /// Set if device_num is equal to 0. + int64_t mmap_size; bool operator==(const PlasmaObject &other) const { return ( diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 6481b6547..e80d0fc05 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -131,6 +131,12 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { return Status::ObjectNotFound("object does not exist in the plasma store"); case fb::PlasmaError::OutOfMemory: return Status::ObjectStoreFull("object does not fit in the plasma store"); + case fb::PlasmaError::TransientOutOfMemory: + return Status::ObjectStoreFull( + "object does not fit in the plasma store, spilling objects to make room"); + case fb::PlasmaError::UnexpectedError: + return Status::UnknownError( + "an unexpected error occurred, likely due to a bug in the system or caller"); default: RAY_LOG(FATAL) << "unknown plasma error code " << static_cast(plasma_error); } @@ -196,27 +202,34 @@ Status ReadGetDebugStringReply(uint8_t *data, size_t size, std::string *debug_st // Create messages. +Status SendCreateRetryRequest(const std::shared_ptr &store_conn, + ObjectID object_id, uint64_t request_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaCreateRetryRequest( + fbb, fbb.CreateString(object_id.Binary()), request_id); + return PlasmaSend(store_conn, MessageType::PlasmaCreateRetryRequest, &fbb, message); +} + Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, - const ray::rpc::Address &owner_address, bool evict_if_full, - int64_t data_size, int64_t metadata_size, int device_num) { + const ray::rpc::Address &owner_address, int64_t data_size, + int64_t metadata_size, int device_num, bool try_immediately) { flatbuffers::FlatBufferBuilder fbb; auto message = fb::CreatePlasmaCreateRequest( fbb, fbb.CreateString(object_id.Binary()), fbb.CreateString(owner_address.raylet_id()), fbb.CreateString(owner_address.ip_address()), owner_address.port(), - fbb.CreateString(owner_address.worker_id()), evict_if_full, data_size, - metadata_size, device_num); + fbb.CreateString(owner_address.worker_id()), data_size, metadata_size, device_num, + try_immediately); return PlasmaSend(store_conn, MessageType::PlasmaCreateRequest, &fbb, message); } -Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, - NodeID *owner_raylet_id, std::string *owner_ip_address, - int *owner_port, WorkerID *owner_worker_id, bool *evict_if_full, - int64_t *data_size, int64_t *metadata_size, int *device_num) { +void ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, + NodeID *owner_raylet_id, std::string *owner_ip_address, + int *owner_port, WorkerID *owner_worker_id, int64_t *data_size, + int64_t *metadata_size, int *device_num) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); RAY_DCHECK(VerifyFlatbuffer(message, data, size)); - *evict_if_full = message->evict_if_full(); *data_size = message->data_size(); *metadata_size = message->metadata_size(); *object_id = ObjectID::FromBinary(message->object_id()->str()); @@ -225,21 +238,32 @@ Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, *owner_port = message->owner_port(); *owner_worker_id = WorkerID::FromBinary(message->owner_worker_id()->str()); *device_num = message->device_num(); - return Status::OK(); + return; +} + +Status SendUnfinishedCreateReply(const std::shared_ptr &client, + ObjectID object_id, uint64_t retry_with_request_id) { + flatbuffers::FlatBufferBuilder fbb; + auto object_string = fbb.CreateString(object_id.Binary()); + fb::PlasmaCreateReplyBuilder crb(fbb); + crb.add_object_id(object_string); + crb.add_retry_with_request_id(retry_with_request_id); + auto message = crb.Finish(); + return PlasmaSend(client, MessageType::PlasmaCreateReply, &fbb, message); } Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id, - PlasmaObject *object, PlasmaError error_code, int64_t mmap_size) { + const PlasmaObject &object, PlasmaError error_code) { flatbuffers::FlatBufferBuilder fbb; - PlasmaObjectSpec plasma_object(FD2INT(object->store_fd), object->data_offset, - object->data_size, object->metadata_offset, - object->metadata_size, object->device_num); + PlasmaObjectSpec plasma_object(FD2INT(object.store_fd), object.data_offset, + object.data_size, object.metadata_offset, + object.metadata_size, object.device_num); auto object_string = fbb.CreateString(object_id.Binary()); #ifdef PLASMA_CUDA flatbuffers::Offset ipc_handle; - if (object->device_num != 0) { + if (object.device_num != 0) { std::shared_ptr handle; - ARROW_ASSIGN_OR_RAISE(handle, object->ipc_handle->Serialize()); + ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize()); ipc_handle = fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())); } @@ -248,9 +272,10 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id crb.add_error(static_cast(error_code)); crb.add_plasma_object(&plasma_object); crb.add_object_id(object_string); - crb.add_store_fd(FD2INT(object->store_fd)); - crb.add_mmap_size(mmap_size); - if (object->device_num != 0) { + crb.add_retry_with_request_id(0); + crb.add_store_fd(FD2INT(object.store_fd)); + crb.add_mmap_size(object.mmap_size); + if (object.device_num != 0) { #ifdef PLASMA_CUDA crb.add_ipc_handle(ipc_handle); #else @@ -262,11 +287,18 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id } Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id, - PlasmaObject *object, MEMFD_TYPE *store_fd, int64_t *mmap_size) { + uint64_t *retry_with_request_id, PlasmaObject *object, + MEMFD_TYPE *store_fd, int64_t *mmap_size) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); + *retry_with_request_id = message->retry_with_request_id(); + if (*retry_with_request_id > 0) { + // The client should retry the request. + return Status::OK(); + } + object->store_fd = INT2FD(message->plasma_object()->segment_index()); object->data_offset = message->plasma_object()->data_offset(); object->data_size = message->plasma_object()->data_size(); diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index 1f7786d3e..a8ba71b46 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -37,6 +37,8 @@ using ray::Status; using flatbuf::MessageType; using flatbuf::PlasmaError; +Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error); + template bool VerifyFlatbuffer(T *object, uint8_t *data, size_t size) { flatbuffers::Verifier verifier(data, size); @@ -82,20 +84,27 @@ Status ReadGetDebugStringReply(uint8_t *data, size_t size, std::string *debug_st /* Plasma Create message functions. */ -Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, - const ray::rpc::Address &owner_address, bool evict_if_full, - int64_t data_size, int64_t metadata_size, int device_num); +Status SendCreateRetryRequest(const std::shared_ptr &store_conn, + ObjectID object_id, uint64_t request_id); -Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, - NodeID *owner_raylet_id, std::string *owner_ip_address, - int *owner_port, WorkerID *owner_worker_id, bool *evict_if_full, - int64_t *data_size, int64_t *metadata_size, int *device_num); +Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, + const ray::rpc::Address &owner_address, int64_t data_size, + int64_t metadata_size, int device_num, bool try_immediately); + +void ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, + NodeID *owner_raylet_id, std::string *owner_ip_address, + int *owner_port, WorkerID *owner_worker_id, int64_t *data_size, + int64_t *metadata_size, int *device_num); + +Status SendUnfinishedCreateReply(const std::shared_ptr &client, + ObjectID object_id, uint64_t retry_with_request_id); Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id, - PlasmaObject *object, PlasmaError error, int64_t mmap_size); + const PlasmaObject &object, PlasmaError error); Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id, - PlasmaObject *object, MEMFD_TYPE *store_fd, int64_t *mmap_size); + uint64_t *retry_with_request_id, PlasmaObject *object, + MEMFD_TYPE *store_fd, int64_t *mmap_size); Status SendAbortRequest(const std::shared_ptr &store_conn, ObjectID object_id); diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 7a38f9934..21f50161e 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -122,7 +122,9 @@ GetRequest::GetRequest(boost::asio::io_service &io_context, PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string directory, bool hugepages_enabled, const std::string &socket_name, std::shared_ptr external_store, - ray::SpillObjectsCallback spill_objects_callback) + uint32_t delay_on_oom_ms, + ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback) : io_context_(main_service), socket_name_(socket_name), acceptor_(main_service, ParseUrlEndpoint(socket_name)), @@ -130,7 +132,11 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), external_store_(external_store), spill_objects_callback_(spill_objects_callback), - create_request_queue_() { + delay_on_oom_ms_(delay_on_oom_ms), + create_request_queue_( + RayConfig::instance().object_store_full_max_retries(), + /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), + object_store_full_callback) { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; #ifdef PLASMA_CUDA @@ -274,54 +280,35 @@ Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t *pointe } #endif -Status PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &client, - const std::vector &message) { +PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &client, + const std::vector &message, + bool evict_if_full, + PlasmaObject *object) { uint8_t *input = (uint8_t *)message.data(); size_t input_size = message.size(); ObjectID object_id; - PlasmaObject object = {}; NodeID owner_raylet_id; std::string owner_ip_address; int owner_port; WorkerID owner_worker_id; - bool evict_if_full; int64_t data_size; int64_t metadata_size; int device_num; - RAY_RETURN_NOT_OK(ReadCreateRequest( - input, input_size, &object_id, &owner_raylet_id, &owner_ip_address, &owner_port, - &owner_worker_id, &evict_if_full, &data_size, &metadata_size, &device_num)); - PlasmaError error_code = CreateObject( - object_id, owner_raylet_id, owner_ip_address, owner_port, owner_worker_id, - evict_if_full, data_size, metadata_size, device_num, client, &object); - Status status; - if (error_code == PlasmaError::TransientOutOfMemory) { - RAY_LOG(DEBUG) << "Create object " << object_id - << " failed, waiting for object spill"; - status = - Status::TransientObjectStoreFull("Object store full, queueing creation request"); - } else if (error_code == PlasmaError::OutOfMemory) { - RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id - << ", data_size=" << data_size << ", metadata_size=" << metadata_size - << ", will send a reply of PlasmaError::OutOfMemory"; - RAY_RETURN_NOT_OK( - SendCreateReply(client, object_id, &object, error_code, /*mmap_size=*/0)); - } else { - int64_t mmap_size = 0; - if (error_code == PlasmaError::OK && device_num == 0) { - mmap_size = GetMmapSize(object.store_fd); - } - RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, mmap_size)); - if (error_code == PlasmaError::OK && device_num == 0) { - RAY_RETURN_NOT_OK(client->SendFd(object.store_fd)); - } + ReadCreateRequest(input, input_size, &object_id, &owner_raylet_id, &owner_ip_address, + &owner_port, &owner_worker_id, &data_size, &metadata_size, + &device_num); + auto error = CreateObject(object_id, owner_raylet_id, owner_ip_address, owner_port, + owner_worker_id, evict_if_full, data_size, metadata_size, + device_num, client, object); + if (error == PlasmaError::OutOfMemory) { + RAY_LOG(WARNING) << "Not enough memory to create the object " << object_id + << ", data_size=" << data_size + << ", metadata_size=" << metadata_size; } - - return status; + return error; } -// Create a new object buffer in the hash table. PlasmaError PlasmaStore::CreateObject( const ObjectID &object_id, const NodeID &owner_raylet_id, const std::string &owner_ip_address, int owner_port, const WorkerID &owner_worker_id, @@ -393,6 +380,10 @@ PlasmaError PlasmaStore::CreateObject( result->data_size = data_size; result->metadata_size = metadata_size; result->device_num = device_num; + if (device_num == 0) { + result->mmap_size = GetMmapSize(fd); + } + // Notify the eviction policy that this object was created. This must be done // immediately before the call to AddToClientObjectIds so that the // eviction policy does not have an opportunity to evict the object. @@ -980,12 +971,38 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, // Process the different types of requests. switch (type) { case fb::MessageType::PlasmaCreateRequest: { - RAY_LOG(DEBUG) << "Received create request for object " - << GetCreateRequestObjectId(message); - create_request_queue_.AddRequest(client, [this, client, message]() { - return HandleCreateObjectRequest(client, message); - }); - ProcessCreateRequests(); + const auto &object_id = GetCreateRequestObjectId(message); + const auto &request = flatbuffers::GetRoot(input); + + auto handle_create = [this, client, message](bool evict_if_full, + PlasmaObject *result) { + return HandleCreateObjectRequest(client, message, evict_if_full, result); + }; + + if (request->try_immediately()) { + RAY_LOG(DEBUG) << "Received request to create object " << object_id + << " immediately"; + auto result_error = + create_request_queue_.TryRequestImmediately(object_id, client, handle_create); + const auto &result = result_error.first; + const auto &error = result_error.second; + if (SendCreateReply(client, object_id, result, error).ok() && + error == PlasmaError::OK && result.device_num == 0) { + static_cast(client->SendFd(result.store_fd)); + } + } else { + auto req_id = create_request_queue_.AddRequest(object_id, client, handle_create); + RAY_LOG(DEBUG) << "Received create request for object " << object_id + << " assigned request ID " << req_id; + ProcessCreateRequests(); + ReplyToCreateClient(client, object_id, req_id); + } + } break; + case fb::MessageType::PlasmaCreateRetryRequest: { + auto request = flatbuffers::GetRoot(input); + RAY_DCHECK(plasma::VerifyFlatbuffer(request, input, input_size)); + const auto &object_id = ObjectID::FromBinary(request->object_id()->str()); + ReplyToCreateClient(client, object_id, request->request_id()); } break; case fb::MessageType::PlasmaAbortRequest: { RAY_RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id)); @@ -1089,7 +1106,14 @@ void PlasmaStore::ProcessCreateRequests() { } auto status = create_request_queue_.ProcessRequests(); + uint32_t retry_after_ms = 0; if (status.IsTransientObjectStoreFull()) { + retry_after_ms = delay_on_transient_oom_ms_; + } else if (status.IsObjectStoreFull()) { + retry_after_ms = delay_on_oom_ms_; + } + + if (retry_after_ms > 0) { // Try to process requests later, after space has been made. create_timer_ = execute_after(io_context_, [this]() { @@ -1098,7 +1122,23 @@ void PlasmaStore::ProcessCreateRequests() { create_timer_ = nullptr; ProcessCreateRequests(); }, - delay_on_transient_oom_ms_); + retry_after_ms); + } +} + +void PlasmaStore::ReplyToCreateClient(const std::shared_ptr &client, + const ObjectID &object_id, uint64_t req_id) { + PlasmaObject result = {}; + PlasmaError error; + bool finished = create_request_queue_.GetRequestResult(req_id, &result, &error); + if (finished) { + RAY_LOG(DEBUG) << "Finishing create object " << object_id << " request ID " << req_id; + if (SendCreateReply(client, object_id, result, error).ok() && + error == PlasmaError::OK && result.device_num == 0) { + static_cast(client->SendFd(result.store_fd)); + } + } else { + static_cast(SendUnfinishedCreateReply(client, object_id, req_id)); } } diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 4b34221a1..4ea4ab51f 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -55,8 +55,9 @@ class PlasmaStore { // TODO: PascalCase PlasmaStore methods. PlasmaStore(boost::asio::io_service &main_service, std::string directory, bool hugepages_enabled, const std::string &socket_name, - std::shared_ptr external_store, - ray::SpillObjectsCallback spill_objects_callback); + std::shared_ptr external_store, uint32_t delay_on_oom_ms, + ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback); ~PlasmaStore(); @@ -206,8 +207,12 @@ class PlasmaStore { void ProcessCreateRequests(); private: - Status HandleCreateObjectRequest(const std::shared_ptr &client, - const std::vector &message); + PlasmaError HandleCreateObjectRequest(const std::shared_ptr &client, + const std::vector &message, + bool evict_if_full, PlasmaObject *object); + + void ReplyToCreateClient(const std::shared_ptr &client, + const ObjectID &object_id, uint64_t req_id); void PushNotification(ObjectInfoT *object_notification); @@ -283,6 +288,10 @@ class PlasmaStore { /// complete. ray::SpillObjectsCallback spill_objects_callback_; + /// The amount of time to wait before retrying a creation request after an + /// OOM error. + const uint32_t delay_on_oom_ms_; + /// The amount of time to wait before retrying a creation request after a /// transient OOM error. const uint32_t delay_on_transient_oom_ms_ = 10; diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 0e966c097..152e386aa 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -75,7 +75,8 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem plasma_directory_ = plasma_directory; } -void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback) { +void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback) { // Get external store std::shared_ptr external_store{nullptr}; if (!external_store_endpoint_.empty()) { @@ -93,8 +94,10 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback) { absl::MutexLock lock(&store_runner_mutex_); - store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_, - socket_name_, external_store, spill_objects_callback)); + store_.reset(new PlasmaStore( + main_service_, plasma_directory_, hugepages_enabled_, socket_name_, + external_store, RayConfig::instance().object_store_full_initial_delay_ms(), + spill_objects_callback, object_store_full_callback)); plasma_config = store_->GetPlasmaStoreInfo(); // We are using a single memory-mapped file by mallocing and freeing a single diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 98e16e311..2f6a61cd5 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -15,18 +15,14 @@ class PlasmaStoreRunner { PlasmaStoreRunner(std::string socket_name, int64_t system_memory, bool hugepages_enabled, std::string plasma_directory, const std::string external_store_endpoint); - void Start(ray::SpillObjectsCallback spill_objects_callback = nullptr); + void Start(ray::SpillObjectsCallback spill_objects_callback = nullptr, + std::function object_store_full_callback = nullptr); void Stop(); void SetNotificationListener( const std::shared_ptr ¬ification_listener) { store_->SetNotificationListener(notification_listener); } - ray::SpaceReleasedCallback OnSpaceReleased() { - return - [this]() { main_service_.post([this]() { store_->ProcessCreateRequests(); }); }; - } - private: void Shutdown(); absl::Mutex store_runner_mutex_; diff --git a/src/ray/object_manager/test/create_request_queue_test.cc b/src/ray/object_manager/test/create_request_queue_test.cc index 5355fe4f7..7d16d0b80 100644 --- a/src/ray/object_manager/test/create_request_queue_test.cc +++ b/src/ray/object_manager/test/create_request_queue_test.cc @@ -24,51 +24,313 @@ class MockClient : public ClientInterface { MockClient() {} }; -TEST(CreateRequestQueueTest, TestSimple) { - CreateRequestQueue queue; +#define ASSERT_REQUEST_UNFINISHED(queue, req_id) \ + { \ + PlasmaObject result = {}; \ + PlasmaError status; \ + ASSERT_FALSE(queue.GetRequestResult(req_id, &result, &status)); \ + } - bool created = false; - auto request = [&]() { - created = true; - return Status(); +#define ASSERT_REQUEST_FINISHED(queue, req_id, expected_status) \ + { \ + PlasmaObject result = {}; \ + PlasmaError status; \ + \ + ASSERT_TRUE(queue.GetRequestResult(req_id, &result, &status)); \ + if (expected_status == PlasmaError::OK) { \ + ASSERT_EQ(result.data_size, 1234); \ + } \ + ASSERT_EQ(status, expected_status); \ + } + +class CreateRequestQueueTest : public ::testing::Test { + public: + CreateRequestQueueTest() + : queue_( + /*max_retries=*/2, + /*evict_if_full=*/true, + /*on_global_gc=*/[&]() { num_global_gc_++; }) {} + + void AssertNoLeaks() { + ASSERT_TRUE(queue_.queue_.empty()); + ASSERT_TRUE(queue_.fulfilled_requests_.empty()); + } + + CreateRequestQueue queue_; + int num_global_gc_ = 0; +}; + +TEST_F(CreateRequestQueueTest, TestSimple) { + auto request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; }; auto client = std::make_shared(); - queue.AddRequest(client, request); - ASSERT_FALSE(created); - ASSERT_TRUE(queue.ProcessRequests().ok()); - ASSERT_TRUE(created); + auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request); + ASSERT_REQUEST_UNFINISHED(queue_, req_id); + + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::OK); + ASSERT_EQ(num_global_gc_, 0); + // Request gets cleaned up after we get it. + ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::UnexpectedError); + + auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, request); + auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, request); + auto req_id3 = queue_.AddRequest(ObjectID::Nil(), client, request); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_REQUEST_UNFINISHED(queue_, req_id3); + + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OK); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK); + ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::OK); + ASSERT_EQ(num_global_gc_, 0); + // Request gets cleaned up after we get it. + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::UnexpectedError); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::UnexpectedError); + ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::UnexpectedError); + AssertNoLeaks(); } -TEST(CreateRequestQueueTest, TestTransientOom) { - CreateRequestQueue queue; +TEST_F(CreateRequestQueueTest, TestOom) { + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + return PlasmaError::OutOfMemory; + }; + auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; + }; - int num_created = 0; - Status return_status = Status::TransientObjectStoreFull(""); - auto oom_request = [&]() { - if (return_status.ok()) { - num_created++; + auto client = std::make_shared(); + auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request); + auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request); + + // Neither request was fulfilled. + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_EQ(num_global_gc_, 2); + + // Retries used up. The first request should reply with OOM and the second + // request should also be served. + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_EQ(num_global_gc_, 2); + + // Both requests fulfilled. + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK); + + AssertNoLeaks(); +} + +TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { + int num_global_gc_ = 0; + CreateRequestQueue queue( + /*max_retries=*/-1, + /*evict_if_full=*/true, + /*on_global_gc=*/[&]() { num_global_gc_++; }); + + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + return PlasmaError::OutOfMemory; + }; + auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; + }; + + auto client = std::make_shared(); + auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request); + auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request); + + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); + ASSERT_EQ(num_global_gc_, i + 1); + } + + // Neither request was fulfilled. + ASSERT_REQUEST_UNFINISHED(queue, req_id1); + ASSERT_REQUEST_UNFINISHED(queue, req_id2); +} + +TEST_F(CreateRequestQueueTest, TestTransientOom) { + auto return_status = PlasmaError::TransientOutOfMemory; + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + if (return_status == PlasmaError::OK) { + result->data_size = 1234; } return return_status; }; - auto blocked_request = [&]() { - num_created++; - return Status(); + auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; }; auto client = std::make_shared(); - queue.AddRequest(client, oom_request); - queue.AddRequest(client, blocked_request); + auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request); + auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request); // Transient OOM should not use up any retries. for (int i = 0; i < 3; i++) { - ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull()); - ASSERT_EQ(num_created, 0); + ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull()); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_EQ(num_global_gc_, 0); } // Return OK for the first request. The second request should also be served. - return_status = Status(); - ASSERT_TRUE(queue.ProcessRequests().ok()); - ASSERT_EQ(num_created, 2); + return_status = PlasmaError::OK; + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OK); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK); + + AssertNoLeaks(); +} + +TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { + auto return_status = PlasmaError::TransientOutOfMemory; + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + if (return_status == PlasmaError::OK) { + result->data_size = 1234; + } + return return_status; + }; + auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; + }; + + auto client = std::make_shared(); + auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request); + auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request); + + // Transient OOM should not use up any retries. + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull()); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_EQ(num_global_gc_, 0); + } + + // Now we are actually OOM. + return_status = PlasmaError::OutOfMemory; + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_EQ(num_global_gc_, 2); + + // Retries used up. The first request should reply with OOM and the second + // request should also be served. + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK); + ASSERT_EQ(num_global_gc_, 2); + + AssertNoLeaks(); +} + +TEST_F(CreateRequestQueueTest, TestEvictIfFull) { + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + RAY_CHECK(evict_if_full); + return PlasmaError::OutOfMemory; + }; + + auto client = std::make_shared(); + static_cast(queue_.AddRequest(ObjectID::Nil(), client, oom_request)); + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); +} + +TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) { + CreateRequestQueue queue( + /*max_retries=*/2, + /*evict_if_full=*/false, + /*on_global_gc=*/[&]() {}); + + bool first_try = true; + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + if (first_try) { + RAY_CHECK(!evict_if_full); + first_try = false; + } else { + RAY_CHECK(evict_if_full); + } + return PlasmaError::OutOfMemory; + }; + + auto client = std::make_shared(); + static_cast(queue.AddRequest(ObjectID::Nil(), client, oom_request)); + ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); + ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); +} + +TEST_F(CreateRequestQueueTest, TestClientDisconnected) { + auto request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; + }; + + // Client makes two requests. One is processed, the other is still in the + // queue. + auto client = std::make_shared(); + auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, request); + ASSERT_TRUE(queue_.ProcessRequests().ok()); + auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, request); + + // Another client makes a concurrent request. + auto client2 = std::make_shared(); + auto req_id3 = queue_.AddRequest(ObjectID::Nil(), client2, request); + + // Client disconnects. + queue_.RemoveDisconnectedClientRequests(client); + + // Both requests should be cleaned up. + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::UnexpectedError); + ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::UnexpectedError); + // Other client's request was fulfilled. + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::OK); + AssertNoLeaks(); +} + +TEST_F(CreateRequestQueueTest, TestTryRequestImmediately) { + auto request = [&](bool evict_if_full, PlasmaObject *result) { + result->data_size = 1234; + return PlasmaError::OK; + }; + auto client = std::make_shared(); + + // Queue is empty, request can be fulfilled. + auto result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request); + ASSERT_EQ(result.first.data_size, 1234); + ASSERT_EQ(result.second, PlasmaError::OK); + + // Request would block. + auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request); + result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request); + ASSERT_EQ(result.first.data_size, 0); + ASSERT_EQ(result.second, PlasmaError::OutOfMemory); + ASSERT_TRUE(queue_.ProcessRequests().ok()); + + // Queue is empty again, request can be fulfilled. + result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request); + ASSERT_EQ(result.first.data_size, 1234); + ASSERT_EQ(result.second, PlasmaError::OK); + + // Queue is empty, but request would block. Check that we do not attempt to + // retry the request. + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + return PlasmaError::OutOfMemory; + }; + result = queue_.TryRequestImmediately(ObjectID::Nil(), client, oom_request); + ASSERT_EQ(result.first.data_size, 0); + ASSERT_EQ(result.second, PlasmaError::OutOfMemory); + + ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::OK); + AssertNoLeaks(); } } // namespace plasma diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index c0e163f7c..83daf8297 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -146,9 +146,11 @@ class TestObjectManagerBase : public ::testing::Test { RAY_LOG(DEBUG) << "ObjectID Created: " << object_id; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); + uint64_t retry_with_request_id = 0; std::shared_ptr data; RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, - metadata_size, &data)); + metadata_size, &retry_with_request_id, &data)); + RAY_CHECK(retry_with_request_id == 0); RAY_CHECK_OK(client.Seal(object_id)); return object_id; } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index da8a80775..75f3b5c70 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -145,9 +145,11 @@ class TestObjectManagerBase : public ::testing::Test { RAY_LOG(DEBUG) << "ObjectID Created: " << object_id; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); + uint64_t retry_with_request_id = 0; std::shared_ptr data; RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, - metadata_size, &data)); + metadata_size, &retry_with_request_id, &data)); + RAY_CHECK(retry_with_request_id == 0); RAY_CHECK_OK(client.Seal(object_id)); return object_id; } diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 8c9a1ec37..bd0ca72b1 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -210,7 +210,6 @@ void LocalObjectManager::SpillObjectsInternal( << status.ToString(); if (callback) { callback(status); - on_objects_spilled_(); } } else { AddSpilledUrls(objects_to_spill, r, callback); @@ -262,7 +261,6 @@ void LocalObjectManager::AddSpilledUrls( (*num_remaining)--; if (*num_remaining == 0 && callback) { callback(status); - on_objects_spilled_(); } })); } diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index b615af346..31adada2c 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -39,8 +39,7 @@ class LocalObjectManager { gcs::ObjectInfoAccessor &object_info_accessor, rpc::CoreWorkerClientPool &owner_client_pool, bool object_pinning_enabled, bool automatic_object_deletion_enabled, - std::function &)> on_objects_freed, - SpaceReleasedCallback on_objects_spilled) + std::function &)> on_objects_freed) : free_objects_period_ms_(free_objects_period_ms), free_objects_batch_size_(free_objects_batch_size), io_worker_pool_(io_worker_pool), @@ -49,7 +48,6 @@ class LocalObjectManager { object_pinning_enabled_(object_pinning_enabled), automatic_object_deletion_enabled_(automatic_object_deletion_enabled), on_objects_freed_(on_objects_freed), - on_objects_spilled_(on_objects_spilled), last_free_objects_at_ms_(current_time_ms()) {} /// Pin objects. @@ -175,10 +173,6 @@ class LocalObjectManager { absl::flat_hash_map> objects_pending_spill_ GUARDED_BY(mutex_); - /// Callback to call whenever objects have been spilled or failed to be - /// spilled. - SpaceReleasedCallback on_objects_spilled_; - /// The time that we last sent a FreeObjects request to other nodes for /// objects that have gone out of scope in the application. uint64_t last_free_objects_at_ms_ = 0; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9b77a6e38..9d0359e66 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -116,8 +116,7 @@ std::string WorkerOwnerString(std::shared_ptr &worker) { NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory, - SpaceReleasedCallback on_objects_spilled) + std::shared_ptr object_directory) : self_node_id_(self_node_id), io_service_(io_service), object_manager_(object_manager), @@ -166,8 +165,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self [this](const std::vector &object_ids) { object_manager_.FreeObjects(object_ids, /*local_only=*/false); - }, - on_objects_spilled), + }), new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()), record_metrics_period_(config.record_metrics_period_ms) { @@ -2140,9 +2138,9 @@ void NodeManager::MarkObjectsAsFailed( ObjectID object_id = ObjectID::FromBinary(ref.object_id()); std::shared_ptr data; Status status; - status = store_client_.Create(object_id, ref.owner_address(), 0, - reinterpret_cast(meta.c_str()), - meta.length(), &data); + status = store_client_.TryCreateImmediately( + object_id, ref.owner_address(), 0, + reinterpret_cast(meta.c_str()), meta.length(), &data); if (status.ok()) { status = store_client_.Seal(object_id); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index ccc1fb1ae..b73eb6876 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -135,8 +135,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory_, - SpaceReleasedCallback on_objects_spilled); + std::shared_ptr object_directory_); /// Process a new client connection. /// @@ -176,6 +175,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler { LocalObjectManager &GetLocalObjectManager() { return local_object_manager_; } + /// Trigger global GC across the cluster to free up references to actors or + /// object ids. + void TriggerGlobalGC(); + private: /// Methods for handling nodes. @@ -637,10 +640,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Trigger global GC across the cluster to free up references to actors or - /// object ids. - void TriggerGlobalGC(); - /// Trigger local GC on each worker of this raylet. void DoLocalGC(); diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 467a5301c..26e03b12e 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -60,7 +60,8 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client, int metrics_export_port) - : self_node_id_(NodeID::FromRandom()), + : main_service_(main_service), + self_node_id_(NodeID::FromRandom()), gcs_client_(gcs_client), object_directory_( RayConfig::instance().ownership_based_object_directory_enabled() @@ -79,10 +80,14 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ [this](int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) { return node_manager_.GetLocalObjectManager().SpillObjectsOfSize( num_bytes_to_spill, min_bytes_to_spill); + }, + [this]() { + // Post on the node manager's event loop since this + // will be called from the plasma store thread. + main_service_.post([this]() { node_manager_.TriggerGlobalGC(); }); }), node_manager_(main_service, self_node_id_, node_manager_config, object_manager_, - gcs_client_, object_directory_, - plasma::plasma_store_runner->OnSpaceReleased()), + gcs_client_, object_directory_), socket_name_(socket_name), acceptor_(main_service, ParseUrlEndpoint(socket_name)), socket_(main_service) { diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 99b754fc8..81b40b9fb 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -76,6 +76,9 @@ class Raylet { friend class TestObjectManagerIntegration; + // Main event loop. + boost::asio::io_service &main_service_; + /// ID of this node. NodeID self_node_id_; /// Information of this node. diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index eab6ea87d..96ee66638 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -240,8 +240,7 @@ class LocalObjectManagerTest : public ::testing::Test { for (const auto &object_id : object_ids) { freed.insert(object_id); } - }, - [&]() { num_callbacks_fired++; }), + }), unpins(std::make_shared>()) { RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}}); } @@ -263,7 +262,6 @@ class LocalObjectManagerTest : public ::testing::Test { // This hashmap is incremented when objects are unpinned by destroying their // unique_ptr. std::shared_ptr> unpins; - size_t num_callbacks_fired = 0; }; TEST_F(LocalObjectManagerTest, TestPin) { @@ -292,7 +290,6 @@ TEST_F(LocalObjectManagerTest, TestPin) { } std::unordered_set expected(object_ids.begin(), object_ids.end()); ASSERT_EQ(freed, expected); - ASSERT_EQ(num_callbacks_fired, 0); } TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { @@ -305,7 +302,6 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { num_times_fired++; }); ASSERT_EQ(num_times_fired, 1); - ASSERT_EQ(num_callbacks_fired, 0); } TEST_F(LocalObjectManagerTest, TestExplicitSpill) { @@ -348,7 +344,6 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 1); } - ASSERT_TRUE(num_callbacks_fired > 0); } TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { @@ -400,7 +395,6 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 1); } - ASSERT_TRUE(num_callbacks_fired > 0); } TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { @@ -456,7 +450,6 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { // Check that this returns the total number of bytes currently being spilled. num_bytes_required = manager.SpillObjectsOfSize(0, 0); ASSERT_EQ(num_bytes_required, 0); - ASSERT_TRUE(num_callbacks_fired > 0); } TEST_F(LocalObjectManagerTest, TestSpillError) { @@ -500,7 +493,6 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_EQ(num_times_fired, 2); ASSERT_EQ(object_table.object_urls[object_id], url); ASSERT_EQ((*unpins)[object_id], 1); - ASSERT_TRUE(num_callbacks_fired > 0); } TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) {