mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
[core] Move out-of-memory handling into the plasma store and support async object creation (#12186)
* Refactor to extract creation request queue * timer on oom * move timer out * Move evict_if_full and on_store_full into plasma store * Remove client-side code * revert * Distinguish between transient and permanent OOM delays * update * Move out create request queue, unit test * unit test * Fix max retries * test * Do not pin restored objects * First pass to add polling requests, unit test passes * worker plasma client retries plasma requests * cleanup * Clean up after disconnected clients, check memory leaks * Support immediate requests in request queue * Option to try creating immediately * lint * Fix build, address comments * doc * fixes * debug travis * debug * debug * debug * debug * Revert "debug" This reverts commit 6bf2f6ee5640e71630c4aecdb7ebf54911ea32db. Revert "debug" This reverts commit 73017099c9b06cdaae1217bf0e0f4d23ed68a9e5. Revert "debug" This reverts commit 5a155529e28cee9461a598b0cdf7b6a3cc194c93. Revert "debug" This reverts commit b50c2101afd45d4cf663daae857bfe1b40387703. Revert "debug travis" This reverts commit 012b8721dedf9bca46294ae75eee2815b160368b. * Skip if new scheduler enabled * error message * merge
This commit is contained in:
parent
786f839ff3
commit
443339ab19
32 changed files with 1010 additions and 278 deletions
|
@ -348,7 +348,8 @@ def test_system_config_when_connecting(ray_start_cluster):
|
|||
obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
|
||||
for _ in range(5):
|
||||
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
put_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
del put_ref
|
||||
|
||||
# This would not raise an exception if object pinning was enabled.
|
||||
with pytest.raises(ray.exceptions.ObjectLostError):
|
||||
|
|
|
@ -7,6 +7,7 @@ import warnings
|
|||
|
||||
import ray
|
||||
from ray.cluster_utils import Cluster
|
||||
from ray.exceptions import GetTimeoutError
|
||||
|
||||
if (multiprocessing.cpu_count() < 40
|
||||
or ray.utils.get_system_memory() < 50 * 10**9):
|
||||
|
@ -33,6 +34,29 @@ def ray_start_cluster_with_resource():
|
|||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster_head", [{
|
||||
"num_cpus": 0,
|
||||
"object_store_memory": 75 * 1024 * 1024,
|
||||
}],
|
||||
indirect=True)
|
||||
def test_object_transfer_during_oom(ray_start_cluster_head):
|
||||
cluster = ray_start_cluster_head
|
||||
cluster.add_node(object_store_memory=75 * 1024 * 1024)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.random.rand(5 * 1024 * 1024) # 40 MB data
|
||||
|
||||
local_ref = ray.put(np.random.rand(5 * 1024 * 1024))
|
||||
remote_ref = put.remote()
|
||||
|
||||
with pytest.raises(GetTimeoutError):
|
||||
ray.get(remote_ref, timeout=1)
|
||||
del local_ref
|
||||
ray.get(remote_ref)
|
||||
|
||||
|
||||
# This test is here to make sure that when we broadcast an object to a bunch of
|
||||
# machines, we don't have too many excess object transfers.
|
||||
@pytest.mark.skip(reason="TODO(ekl)")
|
||||
|
|
|
@ -12,7 +12,7 @@ import psutil
|
|||
import ray
|
||||
from ray.external_storage import (create_url_with_offset,
|
||||
parse_url_with_offset)
|
||||
from ray.test_utils import wait_for_condition
|
||||
from ray.test_utils import new_scheduler_enabled, wait_for_condition
|
||||
|
||||
bucket_name = "object-spilling-test"
|
||||
spill_local_path = "/tmp/spill"
|
||||
|
@ -338,16 +338,19 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
|
|||
|
||||
@pytest.mark.skipif(
|
||||
platform.system() == "Windows", reason="Failing on Windows.")
|
||||
@pytest.mark.skip(
|
||||
"Temporarily disabled until OutOfMemory retries can be moved "
|
||||
"into the plasma store")
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs")
|
||||
def test_spill_during_get(object_spilling_config, shutdown_only):
|
||||
ray.init(
|
||||
num_cpus=4,
|
||||
object_store_memory=100 * 1024 * 1024,
|
||||
_system_config={
|
||||
"automatic_object_spilling_enabled": True,
|
||||
"max_io_workers": 2,
|
||||
"object_store_full_initial_delay_ms": 100,
|
||||
# NOTE(swang): Use infinite retries because the OOM timer can still
|
||||
# get accidentally triggered when objects are released too slowly
|
||||
# (see github.com/ray-project/ray/issues/12040).
|
||||
"object_store_full_max_retries": -1,
|
||||
"max_io_workers": 1,
|
||||
"object_spilling_config": object_spilling_config,
|
||||
"min_spilling_size": 0,
|
||||
},
|
||||
|
|
|
@ -10,8 +10,8 @@ import pytest
|
|||
|
||||
import ray
|
||||
import ray.cluster_utils
|
||||
from ray.test_utils import SignalActor, put_object, wait_for_condition, \
|
||||
new_scheduler_enabled
|
||||
from ray.test_utils import (SignalActor, put_object, wait_for_condition,
|
||||
new_scheduler_enabled)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -167,7 +167,7 @@ def test_dependency_refcounts(ray_start_regular):
|
|||
check_refcounts({})
|
||||
|
||||
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="dynres notimpl")
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs")
|
||||
def test_actor_creation_task(ray_start_regular):
|
||||
@ray.remote
|
||||
def large_object():
|
||||
|
@ -269,7 +269,16 @@ def test_feature_flag(shutdown_only):
|
|||
|
||||
# The ray.get below fails with only LRU eviction, as the object
|
||||
# that was ray.put by the actor should have been evicted.
|
||||
_fill_object_store_and_get(actor.get_large_object.remote(), succeed=False)
|
||||
ref = actor.get_large_object.remote()
|
||||
ray.get(ref)
|
||||
|
||||
# Keep refs in scope so that they don't get GCed immediately.
|
||||
for _ in range(5):
|
||||
put_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
del put_ref
|
||||
|
||||
wait_for_condition(
|
||||
lambda: not ray.worker.global_worker.core_worker.object_exists(ref))
|
||||
|
||||
|
||||
def test_out_of_band_serialized_object_ref(one_worker_100MiB):
|
||||
|
|
|
@ -411,11 +411,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
|||
plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
|
||||
options_.store_socket, local_raylet_client_, reference_counter_,
|
||||
options_.check_signals,
|
||||
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
|
||||
/*warmup=*/
|
||||
(options_.worker_type != ray::WorkerType::SPILL_WORKER &&
|
||||
options_.worker_type != ray::WorkerType::RESTORE_WORKER),
|
||||
/*on_store_full=*/boost::bind(&CoreWorker::TriggerGlobalGC, this),
|
||||
/*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this)));
|
||||
memory_store_.reset(new CoreWorkerMemoryStore(
|
||||
[this](const RayObject &object, const ObjectID &object_id) {
|
||||
|
|
|
@ -25,14 +25,11 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
|
|||
const std::string &store_socket,
|
||||
const std::shared_ptr<raylet::RayletClient> raylet_client,
|
||||
const std::shared_ptr<ReferenceCounter> reference_counter,
|
||||
std::function<Status()> check_signals, bool evict_if_full, bool warmup,
|
||||
std::function<void()> on_store_full,
|
||||
std::function<Status()> check_signals, bool warmup,
|
||||
std::function<std::string()> get_current_call_site)
|
||||
: raylet_client_(raylet_client),
|
||||
reference_counter_(reference_counter),
|
||||
check_signals_(check_signals),
|
||||
evict_if_full_(evict_if_full),
|
||||
on_store_full_(on_store_full) {
|
||||
check_signals_(check_signals) {
|
||||
if (get_current_call_site != nullptr) {
|
||||
get_current_call_site_ = get_current_call_site;
|
||||
} else {
|
||||
|
@ -86,59 +83,52 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
|
|||
const ObjectID &object_id,
|
||||
const rpc::Address &owner_address,
|
||||
std::shared_ptr<Buffer> *data) {
|
||||
int32_t retries = 0;
|
||||
int32_t max_retries = RayConfig::instance().object_store_full_max_retries();
|
||||
uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms();
|
||||
Status status;
|
||||
bool should_retry = true;
|
||||
// If we cannot retry, then always evict on the first attempt.
|
||||
bool evict_if_full = max_retries == 0 ? true : evict_if_full_;
|
||||
while (should_retry) {
|
||||
should_retry = false;
|
||||
Status plasma_status;
|
||||
std::shared_ptr<arrow::Buffer> arrow_buffer;
|
||||
std::shared_ptr<arrow::Buffer> arrow_buffer;
|
||||
uint64_t retry_with_request_id = 0;
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(store_client_mutex_);
|
||||
status = store_client_.Create(
|
||||
object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr,
|
||||
metadata ? metadata->Size() : 0, &retry_with_request_id, &arrow_buffer,
|
||||
/*device_num=*/0);
|
||||
}
|
||||
|
||||
while (retry_with_request_id > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(store_client_mutex_);
|
||||
plasma_status = store_client_.Create(object_id, owner_address, data_size,
|
||||
metadata ? metadata->Data() : nullptr,
|
||||
metadata ? metadata->Size() : 0, &arrow_buffer,
|
||||
/*device_num=*/0, evict_if_full);
|
||||
// Always try to evict after the first attempt.
|
||||
evict_if_full = true;
|
||||
}
|
||||
if (plasma_status.IsObjectStoreFull()) {
|
||||
std::ostringstream message;
|
||||
message << "Failed to put object " << object_id << " in object store because it "
|
||||
<< "is full. Object size is " << data_size << " bytes.";
|
||||
status = Status::ObjectStoreFull(message.str());
|
||||
if (max_retries < 0 || retries < max_retries) {
|
||||
RAY_LOG(ERROR) << message.str() << "\nWaiting " << delay
|
||||
<< "ms for space to free up...";
|
||||
if (on_store_full_) {
|
||||
on_store_full_();
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
|
||||
delay *= 2;
|
||||
retries += 1;
|
||||
should_retry = true;
|
||||
} else {
|
||||
RAY_LOG(ERROR) << "Failed to put object " << object_id << " after "
|
||||
<< (max_retries + 1) << " attempts. Plasma store status:\n"
|
||||
<< MemoryUsageString() << "\n---\n"
|
||||
<< "--- Tip: Use the `ray memory` command to list active objects "
|
||||
"in the cluster."
|
||||
<< "\n---\n";
|
||||
}
|
||||
} else if (plasma_status.IsObjectExists()) {
|
||||
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
|
||||
<< object_id << ".";
|
||||
status = Status::OK();
|
||||
} else {
|
||||
RAY_RETURN_NOT_OK(plasma_status);
|
||||
*data = std::make_shared<PlasmaBuffer>(PlasmaBuffer(arrow_buffer));
|
||||
status = Status::OK();
|
||||
RAY_LOG(DEBUG) << "Retrying request for object " << object_id << " with request ID "
|
||||
<< retry_with_request_id;
|
||||
status = store_client_.RetryCreate(object_id, retry_with_request_id,
|
||||
metadata ? metadata->Data() : nullptr,
|
||||
&retry_with_request_id, &arrow_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
if (status.IsObjectStoreFull()) {
|
||||
RAY_LOG(ERROR) << "Failed to put object " << object_id
|
||||
<< " in object store because it "
|
||||
<< "is full. Object size is " << data_size << " bytes.\n"
|
||||
<< "Plasma store status:\n"
|
||||
<< MemoryUsageString() << "\n---\n"
|
||||
<< "--- Tip: Use the `ray memory` command to list active objects "
|
||||
"in the cluster."
|
||||
<< "\n---\n";
|
||||
|
||||
// Replace the status with a more helpful error message.
|
||||
std::ostringstream message;
|
||||
message << "Failed to put object " << object_id << " in object store because it "
|
||||
<< "is full. Object size is " << data_size << " bytes.";
|
||||
status = Status::ObjectStoreFull(message.str());
|
||||
} else if (status.IsObjectExists()) {
|
||||
RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: "
|
||||
<< object_id << ".";
|
||||
status = Status::OK();
|
||||
} else {
|
||||
RAY_RETURN_NOT_OK(status);
|
||||
*data = std::make_shared<PlasmaBuffer>(PlasmaBuffer(arrow_buffer));
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -37,8 +37,7 @@ class CoreWorkerPlasmaStoreProvider {
|
|||
const std::string &store_socket,
|
||||
const std::shared_ptr<raylet::RayletClient> raylet_client,
|
||||
const std::shared_ptr<ReferenceCounter> reference_counter,
|
||||
std::function<Status()> check_signals, bool evict_if_full, bool warmup,
|
||||
std::function<void()> on_store_full = nullptr,
|
||||
std::function<Status()> check_signals, bool warmup,
|
||||
std::function<std::string()> get_current_call_site = nullptr);
|
||||
|
||||
~CoreWorkerPlasmaStoreProvider();
|
||||
|
@ -154,8 +153,6 @@ class CoreWorkerPlasmaStoreProvider {
|
|||
const std::shared_ptr<ReferenceCounter> reference_counter_;
|
||||
std::mutex store_client_mutex_;
|
||||
std::function<Status()> check_signals_;
|
||||
const bool evict_if_full_;
|
||||
std::function<void()> on_store_full_;
|
||||
std::function<std::string()> get_current_call_site_;
|
||||
|
||||
// Active buffers tracker. This must be allocated as a separate structure since its
|
||||
|
|
|
@ -104,8 +104,8 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
|
|||
int64_t object_size = data_size - metadata_size;
|
||||
// Try to create shared buffer.
|
||||
std::shared_ptr<Buffer> data;
|
||||
Status s = store_client_.Create(object_id, owner_address, object_size, NULL,
|
||||
metadata_size, &data);
|
||||
Status s = store_client_.TryCreateImmediately(object_id, owner_address, object_size,
|
||||
NULL, metadata_size, &data);
|
||||
std::vector<boost::asio::mutable_buffer> buffer;
|
||||
if (!s.ok()) {
|
||||
// Create failed. The object may already exist locally. If something else went
|
||||
|
|
|
@ -27,7 +27,8 @@ namespace object_manager_protocol = ray::object_manager::protocol;
|
|||
namespace ray {
|
||||
|
||||
ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config,
|
||||
SpillObjectsCallback spill_objects_callback) {
|
||||
SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback) {
|
||||
if (config.object_store_memory > 0) {
|
||||
plasma::plasma_store_runner.reset(new plasma::PlasmaStoreRunner(
|
||||
config.store_socket_name, config.object_store_memory, config.huge_pages,
|
||||
|
@ -35,7 +36,7 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config,
|
|||
// Initialize object store.
|
||||
store_thread_ =
|
||||
std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get(),
|
||||
spill_objects_callback);
|
||||
spill_objects_callback, object_store_full_callback);
|
||||
// Sleep for sometime until the store is working. This can suppress some
|
||||
// connection warnings.
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(500));
|
||||
|
@ -54,11 +55,12 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_
|
|||
const ObjectManagerConfig &config,
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory,
|
||||
RestoreSpilledObjectCallback restore_spilled_object,
|
||||
SpillObjectsCallback spill_objects_callback)
|
||||
SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback)
|
||||
: self_node_id_(self_node_id),
|
||||
config_(config),
|
||||
object_directory_(std::move(object_directory)),
|
||||
object_store_internal_(config, spill_objects_callback),
|
||||
object_store_internal_(config, spill_objects_callback, object_store_full_callback),
|
||||
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
|
||||
rpc_work_(rpc_service_),
|
||||
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()),
|
||||
|
|
|
@ -82,7 +82,8 @@ struct LocalObjectInfo {
|
|||
class ObjectStoreRunner {
|
||||
public:
|
||||
ObjectStoreRunner(const ObjectManagerConfig &config,
|
||||
SpillObjectsCallback spill_objects_callback);
|
||||
SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback);
|
||||
~ObjectStoreRunner();
|
||||
|
||||
private:
|
||||
|
@ -194,7 +195,8 @@ class ObjectManager : public ObjectManagerInterface,
|
|||
const NodeID &self_node_id, const ObjectManagerConfig &config,
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory,
|
||||
RestoreSpilledObjectCallback restore_spilled_object,
|
||||
SpillObjectsCallback spill_objects_callback = nullptr);
|
||||
SpillObjectsCallback spill_objects_callback = nullptr,
|
||||
std::function<void()> object_store_full_callback = nullptr);
|
||||
|
||||
~ObjectManager();
|
||||
|
||||
|
|
|
@ -155,8 +155,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
|
|||
|
||||
Status Create(const ObjectID &object_id, const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata, int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num = 0,
|
||||
bool evict_if_full = true);
|
||||
uint64_t *retry_with_request_id, std::shared_ptr<Buffer> *data,
|
||||
int device_num = 0);
|
||||
|
||||
Status RetryCreate(const ObjectID &object_id, uint64_t request_id,
|
||||
const uint8_t *metadata, uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data);
|
||||
|
||||
Status TryCreateImmediately(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
const uint8_t *metadata, int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num);
|
||||
|
||||
Status Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
std::vector<ObjectBuffer> *object_buffers);
|
||||
|
@ -187,6 +196,11 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
|
|||
int64_t store_capacity() { return store_capacity_; }
|
||||
|
||||
private:
|
||||
/// Helper method to read and process the reply of a create request.
|
||||
Status HandleCreateReply(const ObjectID &object_id, const uint8_t *metadata,
|
||||
uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data);
|
||||
|
||||
/// Check if store_fd has already been received from the store. If yes,
|
||||
/// return it. Otherwise, receive it from the store (see analogous logic
|
||||
/// in store.cc).
|
||||
|
@ -307,42 +321,46 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID &object_id,
|
|||
object_entry->count += 1;
|
||||
}
|
||||
|
||||
Status PlasmaClient::Impl::Create(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata,
|
||||
int64_t metadata_size, std::shared_ptr<Buffer> *data,
|
||||
int device_num, bool evict_if_full) {
|
||||
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
|
||||
|
||||
RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
|
||||
<< data_size << " and metadata size " << metadata_size;
|
||||
RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address,
|
||||
evict_if_full, data_size, metadata_size,
|
||||
device_num));
|
||||
Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id,
|
||||
const uint8_t *metadata,
|
||||
uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data) {
|
||||
std::vector<uint8_t> buffer;
|
||||
RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer));
|
||||
ObjectID id;
|
||||
PlasmaObject object;
|
||||
MEMFD_TYPE store_fd;
|
||||
int64_t mmap_size;
|
||||
RAY_RETURN_NOT_OK(
|
||||
ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size));
|
||||
|
||||
if (retry_with_request_id) {
|
||||
RAY_RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id,
|
||||
retry_with_request_id, &object, &store_fd,
|
||||
&mmap_size));
|
||||
if (*retry_with_request_id > 0) {
|
||||
// The client should retry the request.
|
||||
return Status::OK();
|
||||
}
|
||||
} else {
|
||||
uint64_t unused = 0;
|
||||
RAY_RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &unused, &object,
|
||||
&store_fd, &mmap_size));
|
||||
RAY_CHECK(unused == 0);
|
||||
}
|
||||
|
||||
// If the CreateReply included an error, then the store will not send a file
|
||||
// descriptor.
|
||||
if (device_num == 0) {
|
||||
RAY_CHECK(object.data_size == data_size);
|
||||
RAY_CHECK(object.metadata_size == metadata_size);
|
||||
if (object.device_num == 0) {
|
||||
// The metadata should come right after the data.
|
||||
RAY_CHECK(object.metadata_offset == object.data_offset + data_size);
|
||||
RAY_CHECK(object.metadata_offset == object.data_offset + object.data_size);
|
||||
*data = std::make_shared<PlasmaMutableBuffer>(
|
||||
shared_from_this(), GetStoreFdAndMmap(store_fd, mmap_size) + object.data_offset,
|
||||
data_size);
|
||||
object.data_size);
|
||||
// If plasma_create is being called from a transfer, then we will not copy the
|
||||
// metadata here. The metadata will be written along with the data streamed
|
||||
// from the transfer.
|
||||
if (metadata != NULL) {
|
||||
// Copy the metadata to the buffer.
|
||||
memcpy((*data)->mutable_data() + object.data_size, metadata, metadata_size);
|
||||
memcpy((*data)->mutable_data() + object.data_size, metadata, object.metadata_size);
|
||||
}
|
||||
} else {
|
||||
#ifdef PLASMA_CUDA
|
||||
|
@ -378,6 +396,44 @@ Status PlasmaClient::Impl::Create(const ObjectID &object_id,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PlasmaClient::Impl::Create(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata,
|
||||
int64_t metadata_size, uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data, int device_num) {
|
||||
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
|
||||
|
||||
RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
|
||||
<< data_size << " and metadata size " << metadata_size;
|
||||
RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, data_size,
|
||||
metadata_size, device_num,
|
||||
/*try_immediately=*/false));
|
||||
return HandleCreateReply(object_id, metadata, retry_with_request_id, data);
|
||||
}
|
||||
|
||||
Status PlasmaClient::Impl::RetryCreate(const ObjectID &object_id, uint64_t request_id,
|
||||
const uint8_t *metadata,
|
||||
uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data) {
|
||||
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
|
||||
RAY_RETURN_NOT_OK(SendCreateRetryRequest(store_conn_, object_id, request_id));
|
||||
return HandleCreateReply(object_id, metadata, retry_with_request_id, data);
|
||||
}
|
||||
|
||||
Status PlasmaClient::Impl::TryCreateImmediately(
|
||||
const ObjectID &object_id, const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
const uint8_t *metadata, int64_t metadata_size, std::shared_ptr<Buffer> *data,
|
||||
int device_num) {
|
||||
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
|
||||
|
||||
RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
|
||||
<< data_size << " and metadata size " << metadata_size;
|
||||
RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, data_size,
|
||||
metadata_size, device_num,
|
||||
/*try_immediately=*/true));
|
||||
return HandleCreateReply(object_id, metadata, nullptr, data);
|
||||
}
|
||||
|
||||
Status PlasmaClient::Impl::GetBuffers(
|
||||
const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
|
||||
const std::function<std::shared_ptr<Buffer>(
|
||||
|
@ -798,10 +854,25 @@ Status PlasmaClient::SetClientOptions(const std::string &client_name,
|
|||
Status PlasmaClient::Create(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
const uint8_t *metadata, int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num,
|
||||
bool evict_if_full) {
|
||||
return impl_->Create(object_id, owner_address, data_size, metadata, metadata_size, data,
|
||||
device_num, evict_if_full);
|
||||
uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data, int device_num) {
|
||||
return impl_->Create(object_id, owner_address, data_size, metadata, metadata_size,
|
||||
retry_with_request_id, data, device_num);
|
||||
}
|
||||
|
||||
Status PlasmaClient::RetryCreate(const ObjectID &object_id, uint64_t request_id,
|
||||
const uint8_t *metadata, uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data) {
|
||||
return impl_->RetryCreate(object_id, request_id, metadata, retry_with_request_id, data);
|
||||
}
|
||||
|
||||
Status PlasmaClient::TryCreateImmediately(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata,
|
||||
int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num) {
|
||||
return impl_->TryCreateImmediately(object_id, owner_address, data_size, metadata,
|
||||
metadata_size, data, device_num);
|
||||
}
|
||||
|
||||
Status PlasmaClient::Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
|
|
|
@ -77,6 +77,54 @@ class RAY_EXPORT PlasmaClient {
|
|||
/// Create an object in the Plasma Store. Any metadata for this object must be
|
||||
/// be passed in when the object is created.
|
||||
///
|
||||
/// If this request cannot be fulfilled immediately, the client will be
|
||||
/// returned a request ID, which it should use to retry the request.
|
||||
///
|
||||
/// \param object_id The ID to use for the newly created object.
|
||||
/// \param owner_address The address of the object's owner.
|
||||
/// \param data_size The size in bytes of the space to be allocated for this
|
||||
/// object's
|
||||
/// data (this does not include space used for metadata).
|
||||
/// \param metadata The object's metadata. If there is no metadata, this
|
||||
/// pointer should be NULL.
|
||||
/// \param metadata_size The size in bytes of the metadata. If there is no
|
||||
/// metadata, this should be 0.
|
||||
/// \param retry_with_request_id If the request is not yet fulfilled, this
|
||||
/// will be set to a unique ID with which the client should retry.
|
||||
/// \param data The address of the newly created object will be written here.
|
||||
/// \param device_num The number of the device where the object is being
|
||||
/// created.
|
||||
/// device_num = 0 corresponds to the host,
|
||||
/// device_num = 1 corresponds to GPU0,
|
||||
/// device_num = 2 corresponds to GPU1, etc.
|
||||
/// \return The return status.
|
||||
///
|
||||
/// The returned object must be released once it is done with. It must also
|
||||
/// be either sealed or aborted.
|
||||
Status Create(const ObjectID &object_id, const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata, int64_t metadata_size,
|
||||
uint64_t *retry_with_request_id, std::shared_ptr<Buffer> *data,
|
||||
int device_num = 0);
|
||||
|
||||
/// Retry a previous Create call using the returned request ID.
|
||||
///
|
||||
/// \param object_id The ID to use for the newly created object.
|
||||
/// \param request_id The request ID returned by the previous Create call.
|
||||
/// \param metadata The object's metadata. If there is no metadata, this
|
||||
/// pointer should be NULL.
|
||||
/// \param retry_with_request_id If the request is not yet fulfilled, this
|
||||
/// will be set to a unique ID with which the client should retry.
|
||||
/// \param data The address of the newly created object will be written here.
|
||||
Status RetryCreate(const ObjectID &object_id, uint64_t request_id,
|
||||
const uint8_t *metadata, uint64_t *retry_with_request_id,
|
||||
std::shared_ptr<Buffer> *data);
|
||||
|
||||
/// Create an object in the Plasma Store. Any metadata for this object must be
|
||||
/// be passed in when the object is created.
|
||||
///
|
||||
/// The plasma store will attempt to fulfill this request immediately. If it
|
||||
/// cannot be fulfilled immediately, an error will be returned to the client.
|
||||
///
|
||||
/// \param object_id The ID to use for the newly created object.
|
||||
/// \param owner_address The address of the object's owner.
|
||||
/// \param data_size The size in bytes of the space to be allocated for this
|
||||
|
@ -93,16 +141,14 @@ class RAY_EXPORT PlasmaClient {
|
|||
/// device_num = 0 corresponds to the host,
|
||||
/// device_num = 1 corresponds to GPU0,
|
||||
/// device_num = 2 corresponds to GPU1, etc.
|
||||
/// \param evict_if_full Whether to evict other objects to make space for
|
||||
/// this object.
|
||||
/// \return The return status.
|
||||
///
|
||||
/// The returned object must be released once it is done with. It must also
|
||||
/// be either sealed or aborted.
|
||||
Status Create(const ObjectID &object_id, const ray::rpc::Address &owner_address,
|
||||
int64_t data_size, const uint8_t *metadata, int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num = 0,
|
||||
bool evict_if_full = true);
|
||||
Status TryCreateImmediately(const ObjectID &object_id,
|
||||
const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
const uint8_t *metadata, int64_t metadata_size,
|
||||
std::shared_ptr<Buffer> *data, int device_num = 0);
|
||||
|
||||
/// Get some objects from the Plasma Store. This function will block until the
|
||||
/// objects have all been created and sealed in the Plasma Store or the
|
||||
|
|
|
@ -24,31 +24,149 @@
|
|||
|
||||
namespace plasma {
|
||||
|
||||
void CreateRequestQueue::AddRequest(const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &request_callback) {
|
||||
queue_.push_back({client, request_callback});
|
||||
uint64_t CreateRequestQueue::AddRequest(const ObjectID &object_id,
|
||||
const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &create_callback) {
|
||||
auto req_id = next_req_id_++;
|
||||
fulfilled_requests_[req_id] = nullptr;
|
||||
queue_.emplace_back(new CreateRequest(object_id, req_id, client, create_callback));
|
||||
return req_id;
|
||||
}
|
||||
|
||||
bool CreateRequestQueue::GetRequestResult(uint64_t req_id, PlasmaObject *result,
|
||||
PlasmaError *error) {
|
||||
auto it = fulfilled_requests_.find(req_id);
|
||||
if (it == fulfilled_requests_.end()) {
|
||||
RAY_LOG(ERROR)
|
||||
<< "Object store client requested the result of a previous request to create an "
|
||||
"object, but the result has already been returned to the client. This client "
|
||||
"may hang because the creation request cannot be fulfilled.";
|
||||
*error = PlasmaError::UnexpectedError;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!it->second) {
|
||||
return false;
|
||||
}
|
||||
|
||||
*result = it->second->result;
|
||||
*error = it->second->error;
|
||||
fulfilled_requests_.erase(it);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::pair<PlasmaObject, PlasmaError> CreateRequestQueue::TryRequestImmediately(
|
||||
const ObjectID &object_id, const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &create_callback) {
|
||||
PlasmaObject result = {};
|
||||
|
||||
if (!queue_.empty()) {
|
||||
// There are other requests queued. Return an out-of-memory error
|
||||
// immediately because this request cannot be served.
|
||||
return {result, PlasmaError::OutOfMemory};
|
||||
}
|
||||
|
||||
auto req_id = AddRequest(object_id, client, create_callback);
|
||||
if (!ProcessRequests().ok()) {
|
||||
// If the request was not immediately fulfillable, finish it.
|
||||
RAY_CHECK(!queue_.empty());
|
||||
FinishRequest(queue_.begin());
|
||||
}
|
||||
PlasmaError error;
|
||||
RAY_CHECK(GetRequestResult(req_id, &result, &error));
|
||||
return {result, error};
|
||||
}
|
||||
|
||||
Status CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &request) {
|
||||
// Return an OOM error to the client if we have hit the maximum number of
|
||||
// retries.
|
||||
bool evict_if_full = evict_if_full_;
|
||||
if (max_retries_ == 0) {
|
||||
// If we cannot retry, then always evict on the first attempt.
|
||||
evict_if_full = true;
|
||||
} else if (num_retries_ > 0) {
|
||||
// Always try to evict after the first attempt.
|
||||
evict_if_full = true;
|
||||
}
|
||||
|
||||
request->error = request->create_callback(evict_if_full, &request->result);
|
||||
Status status;
|
||||
auto should_retry_on_oom = max_retries_ == -1 || num_retries_ < max_retries_;
|
||||
if (request->error == PlasmaError::TransientOutOfMemory) {
|
||||
// The object store is full, but we should wait for space to be made
|
||||
// through spilling, so do nothing. The caller must guarantee that
|
||||
// ProcessRequests is called again so that we can try this request again.
|
||||
// NOTE(swang): There could be other requests behind this one that are
|
||||
// actually serviceable. This may be inefficient, but eventually this
|
||||
// request will get served and unblock the following requests, once
|
||||
// enough objects have been spilled.
|
||||
// TODO(swang): Ask the raylet to spill enough space for multiple requests
|
||||
// at once, instead of just the head of the queue.
|
||||
num_retries_ = 0;
|
||||
status =
|
||||
Status::TransientObjectStoreFull("Object store full, queueing creation request");
|
||||
} else if (request->error == PlasmaError::OutOfMemory && should_retry_on_oom) {
|
||||
num_retries_++;
|
||||
RAY_LOG(DEBUG) << "Not enough memory to create the object, after " << num_retries_
|
||||
<< " tries";
|
||||
|
||||
if (trigger_global_gc_) {
|
||||
trigger_global_gc_();
|
||||
}
|
||||
|
||||
status = Status::ObjectStoreFull("Object store full, should retry on timeout");
|
||||
} else if (request->error == PlasmaError::OutOfMemory) {
|
||||
RAY_LOG(ERROR) << "Not enough memory to create object " << request->object_id
|
||||
<< " after " << num_retries_
|
||||
<< " tries, will return OutOfMemory to the client";
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status CreateRequestQueue::ProcessRequests() {
|
||||
for (auto request_it = queue_.begin(); request_it != queue_.end();) {
|
||||
auto status = request_it->second();
|
||||
if (status.IsTransientObjectStoreFull()) {
|
||||
while (!queue_.empty()) {
|
||||
auto request_it = queue_.begin();
|
||||
auto status = ProcessRequest(*request_it);
|
||||
if (status.IsTransientObjectStoreFull() || status.IsObjectStoreFull()) {
|
||||
return status;
|
||||
}
|
||||
request_it = queue_.erase(request_it);
|
||||
FinishRequest(request_it);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void CreateRequestQueue::FinishRequest(
|
||||
std::list<std::unique_ptr<CreateRequest>>::iterator request_it) {
|
||||
// Fulfill the request.
|
||||
auto &request = *request_it;
|
||||
auto it = fulfilled_requests_.find(request->request_id);
|
||||
RAY_CHECK(it != fulfilled_requests_.end());
|
||||
RAY_CHECK(it->second == nullptr);
|
||||
it->second = std::move(request);
|
||||
queue_.erase(request_it);
|
||||
|
||||
// Reset the number of retries since we are no longer trying this request.
|
||||
num_retries_ = 0;
|
||||
}
|
||||
|
||||
void CreateRequestQueue::RemoveDisconnectedClientRequests(
|
||||
const std::shared_ptr<ClientInterface> &client) {
|
||||
for (auto it = queue_.begin(); it != queue_.end();) {
|
||||
if (it->first == client) {
|
||||
if ((*it)->client == client) {
|
||||
fulfilled_requests_.erase((*it)->request_id);
|
||||
it = queue_.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto it = fulfilled_requests_.begin(); it != fulfilled_requests_.end();) {
|
||||
if (it->second && it->second->client == client) {
|
||||
fulfilled_requests_.erase(it);
|
||||
}
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace plasma
|
||||
|
|
|
@ -18,28 +18,75 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/container/flat_hash_map.h"
|
||||
|
||||
#include "ray/common/status.h"
|
||||
#include "ray/object_manager/plasma/common.h"
|
||||
#include "ray/object_manager/plasma/connection.h"
|
||||
#include "ray/object_manager/plasma/plasma.h"
|
||||
#include "ray/object_manager/plasma/protocol.h"
|
||||
|
||||
namespace plasma {
|
||||
|
||||
using ray::Status;
|
||||
|
||||
using CreateObjectCallback = std::function<Status()>;
|
||||
|
||||
class CreateRequestQueue {
|
||||
public:
|
||||
CreateRequestQueue() {}
|
||||
using CreateObjectCallback =
|
||||
std::function<PlasmaError(bool evict_if_full, PlasmaObject *result)>;
|
||||
|
||||
/// Add a request to the queue.
|
||||
CreateRequestQueue(int32_t max_retries, bool evict_if_full,
|
||||
std::function<void()> trigger_global_gc)
|
||||
: max_retries_(max_retries),
|
||||
evict_if_full_(evict_if_full),
|
||||
trigger_global_gc_(trigger_global_gc) {
|
||||
RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with " << max_retries_
|
||||
<< " retries on OOM, evict if full? " << (evict_if_full_ ? 1 : 0);
|
||||
}
|
||||
|
||||
/// Add a request to the queue. The caller should use the returned request ID
|
||||
/// to later get the result of the request.
|
||||
///
|
||||
/// The request may not get tried immediately if the head of the queue is not
|
||||
/// serviceable.
|
||||
///
|
||||
/// \param client The client that sent the request.
|
||||
void AddRequest(const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &request_callback);
|
||||
/// \param object_id The ID of the object to create.
|
||||
/// \param client The client that sent the request. This is used as a key to
|
||||
/// drop this request if the client disconnects.
|
||||
/// \param create_callback A callback to attempt to create the object.
|
||||
/// \return A request ID that can be used to get the result.
|
||||
uint64_t AddRequest(const ObjectID &object_id,
|
||||
const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &create_callback);
|
||||
|
||||
/// Get the result of a request.
|
||||
///
|
||||
/// This method should only be called with a request ID returned by a
|
||||
/// previous call to add a request. The result will be popped, so this method
|
||||
/// should not be called again with the same request ID once a result is
|
||||
/// returned. If either of these is violated, an error will be returned.
|
||||
///
|
||||
/// \param[in] req_id The request ID that was returned to the caller by a
|
||||
/// previous call to add a request.
|
||||
/// \param[out] result The resulting object information will be stored here,
|
||||
/// if the request was finished and successful.
|
||||
/// \param[out] error The error code returned by the creation handler will be
|
||||
/// stored here, if the request finished. This will also return an error if
|
||||
/// there is no information about this request ID.
|
||||
/// \return Whether the result and error are ready. This returns false if the
|
||||
/// request is still pending.
|
||||
bool GetRequestResult(uint64_t req_id, PlasmaObject *result, PlasmaError *error);
|
||||
|
||||
/// Try to fulfill a request immediately, for clients that cannot retry.
|
||||
///
|
||||
/// \param object_id The ID of the object to create.
|
||||
/// \param client The client that sent the request. This is used as a key to
|
||||
/// drop this request if the client disconnects.
|
||||
/// \param create_callback A callback to attempt to create the object.
|
||||
/// \return The result of the call. This will return an out-of-memory error
|
||||
/// if there are other requests queued or there is not enough space left in
|
||||
/// the object store, this will return an out-of-memory error.
|
||||
std::pair<PlasmaObject, PlasmaError> TryRequestImmediately(
|
||||
const ObjectID &object_id, const std::shared_ptr<ClientInterface> &client,
|
||||
const CreateObjectCallback &create_callback);
|
||||
|
||||
/// Process requests in the queue.
|
||||
///
|
||||
|
@ -57,6 +104,62 @@ class CreateRequestQueue {
|
|||
void RemoveDisconnectedClientRequests(const std::shared_ptr<ClientInterface> &client);
|
||||
|
||||
private:
|
||||
struct CreateRequest {
|
||||
CreateRequest(const ObjectID &object_id, uint64_t request_id,
|
||||
const std::shared_ptr<ClientInterface> &client,
|
||||
CreateObjectCallback create_callback)
|
||||
: object_id(object_id),
|
||||
request_id(request_id),
|
||||
client(client),
|
||||
create_callback(create_callback) {}
|
||||
|
||||
// The ObjectID to create.
|
||||
const ObjectID object_id;
|
||||
|
||||
// A request ID that can be returned to the caller to get the result once
|
||||
// ready.
|
||||
const uint64_t request_id;
|
||||
|
||||
// A pointer to the client, used as a key to delete requests that were made
|
||||
// by a client that is now disconnected.
|
||||
const std::shared_ptr<ClientInterface> client;
|
||||
|
||||
// A callback to attempt to create the object.
|
||||
const CreateObjectCallback create_callback;
|
||||
|
||||
// The results of the creation call. These should be sent back to the
|
||||
// client once ready.
|
||||
PlasmaError error = PlasmaError::OK;
|
||||
PlasmaObject result = {};
|
||||
};
|
||||
|
||||
/// Process a single request. Sets the request's error result to the error
|
||||
/// returned by the request handler inside. Returns OK if the request can be
|
||||
/// finished.
|
||||
Status ProcessRequest(std::unique_ptr<CreateRequest> &request);
|
||||
|
||||
/// Finish a queued request and remove it from the queue.
|
||||
void FinishRequest(std::list<std::unique_ptr<CreateRequest>>::iterator request_it);
|
||||
|
||||
/// The next request ID to assign, so that the caller can get the results of
|
||||
/// a request by retrying. Start at 1 because 0 means "do not retry".
|
||||
uint64_t next_req_id_ = 1;
|
||||
|
||||
/// The maximum number of times to retry each request upon OOM.
|
||||
const int32_t max_retries_;
|
||||
|
||||
/// The number of times the request at the head of the queue has been tried.
|
||||
int32_t num_retries_ = 0;
|
||||
|
||||
/// On the first attempt to create an object, whether to evict from the
|
||||
/// object store to make space. If the first attempt fails, then we will
|
||||
/// always try to evict.
|
||||
const bool evict_if_full_;
|
||||
|
||||
/// A callback to trigger global GC in the cluster if the object store is
|
||||
/// full.
|
||||
const std::function<void()> trigger_global_gc_;
|
||||
|
||||
/// Queue of object creation requests to respond to. Requests will be placed
|
||||
/// on this queue if the object store does not have enough room at the time
|
||||
/// that the client made the creation request, but space may be made through
|
||||
|
@ -68,8 +171,14 @@ class CreateRequestQueue {
|
|||
/// in the object store. Then, the client does not need to poll on an
|
||||
/// OutOfMemory error and we can just respond to them once there is enough
|
||||
/// space made, or after a timeout.
|
||||
std::list<std::pair<const std::shared_ptr<ClientInterface>, const CreateObjectCallback>>
|
||||
queue_;
|
||||
std::list<std::unique_ptr<CreateRequest>> queue_;
|
||||
|
||||
/// A buffer of the results of fulfilled requests. The value will be null
|
||||
/// while the request is pending and will be set once the request has
|
||||
/// finished.
|
||||
absl::flat_hash_map<uint64_t, std::unique_ptr<CreateRequest>> fulfilled_requests_;
|
||||
|
||||
friend class CreateRequestQueueTest;
|
||||
};
|
||||
|
||||
} // namespace plasma
|
||||
|
|
|
@ -23,6 +23,7 @@ enum MessageType:long {
|
|||
PlasmaDisconnectClient = 0,
|
||||
// Create a new object.
|
||||
PlasmaCreateRequest,
|
||||
PlasmaCreateRetryRequest,
|
||||
PlasmaCreateReply,
|
||||
PlasmaAbortRequest,
|
||||
PlasmaAbortReply,
|
||||
|
@ -90,6 +91,10 @@ enum PlasmaError:int {
|
|||
ObjectNotSealed,
|
||||
// Trying to delete an object but it's in use.
|
||||
ObjectInUse,
|
||||
// An unexpected error occurred during object creation, such as trying to get
|
||||
// the result of the same request twice. This is most likely due to a system
|
||||
// bug in the plasma store or caller.
|
||||
UnexpectedError,
|
||||
}
|
||||
|
||||
// Plasma store messages
|
||||
|
@ -141,16 +146,24 @@ table PlasmaCreateRequest {
|
|||
owner_port: int;
|
||||
// Unique id for the owner worker.
|
||||
owner_worker_id: string;
|
||||
// Whether to evict other objects to make room for this one.
|
||||
evict_if_full: bool;
|
||||
// The size of the object's data in bytes.
|
||||
data_size: ulong;
|
||||
// The size of the object's metadata in bytes.
|
||||
metadata_size: ulong;
|
||||
// Device to create buffer on.
|
||||
device_num: int;
|
||||
// Try the creation request immediately. If this is not possible (due to
|
||||
// out-of-memory), the error will be returned immediately to the client.
|
||||
try_immediately: bool;
|
||||
}
|
||||
|
||||
table PlasmaCreateRetryRequest {
|
||||
// ID of the object to be created.
|
||||
object_id: string;
|
||||
// The ID of the request to retry.
|
||||
request_id: uint64;
|
||||
}
|
||||
|
||||
table CudaHandle {
|
||||
handle: [ubyte];
|
||||
}
|
||||
|
@ -158,6 +171,9 @@ table CudaHandle {
|
|||
table PlasmaCreateReply {
|
||||
// ID of the object that was created.
|
||||
object_id: string;
|
||||
// The client should retry the request if this is > 0. This
|
||||
// is the request ID to include in the retry.
|
||||
retry_with_request_id: uint64;
|
||||
// The object that is returned with this reply.
|
||||
plasma_object: PlasmaObjectSpec;
|
||||
// Error that occurred for this call.
|
||||
|
|
|
@ -55,6 +55,8 @@ struct PlasmaObject {
|
|||
int64_t metadata_size;
|
||||
/// Device number object is on.
|
||||
int device_num;
|
||||
/// Set if device_num is equal to 0.
|
||||
int64_t mmap_size;
|
||||
|
||||
bool operator==(const PlasmaObject &other) const {
|
||||
return (
|
||||
|
|
|
@ -131,6 +131,12 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
|
|||
return Status::ObjectNotFound("object does not exist in the plasma store");
|
||||
case fb::PlasmaError::OutOfMemory:
|
||||
return Status::ObjectStoreFull("object does not fit in the plasma store");
|
||||
case fb::PlasmaError::TransientOutOfMemory:
|
||||
return Status::ObjectStoreFull(
|
||||
"object does not fit in the plasma store, spilling objects to make room");
|
||||
case fb::PlasmaError::UnexpectedError:
|
||||
return Status::UnknownError(
|
||||
"an unexpected error occurred, likely due to a bug in the system or caller");
|
||||
default:
|
||||
RAY_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
|
||||
}
|
||||
|
@ -196,27 +202,34 @@ Status ReadGetDebugStringReply(uint8_t *data, size_t size, std::string *debug_st
|
|||
|
||||
// Create messages.
|
||||
|
||||
Status SendCreateRetryRequest(const std::shared_ptr<StoreConn> &store_conn,
|
||||
ObjectID object_id, uint64_t request_id) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = fb::CreatePlasmaCreateRetryRequest(
|
||||
fbb, fbb.CreateString(object_id.Binary()), request_id);
|
||||
return PlasmaSend(store_conn, MessageType::PlasmaCreateRetryRequest, &fbb, message);
|
||||
}
|
||||
|
||||
Status SendCreateRequest(const std::shared_ptr<StoreConn> &store_conn, ObjectID object_id,
|
||||
const ray::rpc::Address &owner_address, bool evict_if_full,
|
||||
int64_t data_size, int64_t metadata_size, int device_num) {
|
||||
const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
int64_t metadata_size, int device_num, bool try_immediately) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = fb::CreatePlasmaCreateRequest(
|
||||
fbb, fbb.CreateString(object_id.Binary()),
|
||||
fbb.CreateString(owner_address.raylet_id()),
|
||||
fbb.CreateString(owner_address.ip_address()), owner_address.port(),
|
||||
fbb.CreateString(owner_address.worker_id()), evict_if_full, data_size,
|
||||
metadata_size, device_num);
|
||||
fbb.CreateString(owner_address.worker_id()), data_size, metadata_size, device_num,
|
||||
try_immediately);
|
||||
return PlasmaSend(store_conn, MessageType::PlasmaCreateRequest, &fbb, message);
|
||||
}
|
||||
|
||||
Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
NodeID *owner_raylet_id, std::string *owner_ip_address,
|
||||
int *owner_port, WorkerID *owner_worker_id, bool *evict_if_full,
|
||||
int64_t *data_size, int64_t *metadata_size, int *device_num) {
|
||||
void ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
NodeID *owner_raylet_id, std::string *owner_ip_address,
|
||||
int *owner_port, WorkerID *owner_worker_id, int64_t *data_size,
|
||||
int64_t *metadata_size, int *device_num) {
|
||||
RAY_DCHECK(data);
|
||||
auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data);
|
||||
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
|
||||
*evict_if_full = message->evict_if_full();
|
||||
*data_size = message->data_size();
|
||||
*metadata_size = message->metadata_size();
|
||||
*object_id = ObjectID::FromBinary(message->object_id()->str());
|
||||
|
@ -225,21 +238,32 @@ Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id,
|
|||
*owner_port = message->owner_port();
|
||||
*owner_worker_id = WorkerID::FromBinary(message->owner_worker_id()->str());
|
||||
*device_num = message->device_num();
|
||||
return Status::OK();
|
||||
return;
|
||||
}
|
||||
|
||||
Status SendUnfinishedCreateReply(const std::shared_ptr<Client> &client,
|
||||
ObjectID object_id, uint64_t retry_with_request_id) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto object_string = fbb.CreateString(object_id.Binary());
|
||||
fb::PlasmaCreateReplyBuilder crb(fbb);
|
||||
crb.add_object_id(object_string);
|
||||
crb.add_retry_with_request_id(retry_with_request_id);
|
||||
auto message = crb.Finish();
|
||||
return PlasmaSend(client, MessageType::PlasmaCreateReply, &fbb, message);
|
||||
}
|
||||
|
||||
Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id,
|
||||
PlasmaObject *object, PlasmaError error_code, int64_t mmap_size) {
|
||||
const PlasmaObject &object, PlasmaError error_code) {
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
PlasmaObjectSpec plasma_object(FD2INT(object->store_fd), object->data_offset,
|
||||
object->data_size, object->metadata_offset,
|
||||
object->metadata_size, object->device_num);
|
||||
PlasmaObjectSpec plasma_object(FD2INT(object.store_fd), object.data_offset,
|
||||
object.data_size, object.metadata_offset,
|
||||
object.metadata_size, object.device_num);
|
||||
auto object_string = fbb.CreateString(object_id.Binary());
|
||||
#ifdef PLASMA_CUDA
|
||||
flatbuffers::Offset<fb::CudaHandle> ipc_handle;
|
||||
if (object->device_num != 0) {
|
||||
if (object.device_num != 0) {
|
||||
std::shared_ptr<arrow::Buffer> handle;
|
||||
ARROW_ASSIGN_OR_RAISE(handle, object->ipc_handle->Serialize());
|
||||
ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize());
|
||||
ipc_handle =
|
||||
fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()));
|
||||
}
|
||||
|
@ -248,9 +272,10 @@ Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id
|
|||
crb.add_error(static_cast<PlasmaError>(error_code));
|
||||
crb.add_plasma_object(&plasma_object);
|
||||
crb.add_object_id(object_string);
|
||||
crb.add_store_fd(FD2INT(object->store_fd));
|
||||
crb.add_mmap_size(mmap_size);
|
||||
if (object->device_num != 0) {
|
||||
crb.add_retry_with_request_id(0);
|
||||
crb.add_store_fd(FD2INT(object.store_fd));
|
||||
crb.add_mmap_size(object.mmap_size);
|
||||
if (object.device_num != 0) {
|
||||
#ifdef PLASMA_CUDA
|
||||
crb.add_ipc_handle(ipc_handle);
|
||||
#else
|
||||
|
@ -262,11 +287,18 @@ Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id
|
|||
}
|
||||
|
||||
Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
PlasmaObject *object, MEMFD_TYPE *store_fd, int64_t *mmap_size) {
|
||||
uint64_t *retry_with_request_id, PlasmaObject *object,
|
||||
MEMFD_TYPE *store_fd, int64_t *mmap_size) {
|
||||
RAY_DCHECK(data);
|
||||
auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
|
||||
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
|
||||
*object_id = ObjectID::FromBinary(message->object_id()->str());
|
||||
*retry_with_request_id = message->retry_with_request_id();
|
||||
if (*retry_with_request_id > 0) {
|
||||
// The client should retry the request.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
object->store_fd = INT2FD(message->plasma_object()->segment_index());
|
||||
object->data_offset = message->plasma_object()->data_offset();
|
||||
object->data_size = message->plasma_object()->data_size();
|
||||
|
|
|
@ -37,6 +37,8 @@ using ray::Status;
|
|||
using flatbuf::MessageType;
|
||||
using flatbuf::PlasmaError;
|
||||
|
||||
Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error);
|
||||
|
||||
template <class T>
|
||||
bool VerifyFlatbuffer(T *object, uint8_t *data, size_t size) {
|
||||
flatbuffers::Verifier verifier(data, size);
|
||||
|
@ -82,20 +84,27 @@ Status ReadGetDebugStringReply(uint8_t *data, size_t size, std::string *debug_st
|
|||
|
||||
/* Plasma Create message functions. */
|
||||
|
||||
Status SendCreateRequest(const std::shared_ptr<StoreConn> &store_conn, ObjectID object_id,
|
||||
const ray::rpc::Address &owner_address, bool evict_if_full,
|
||||
int64_t data_size, int64_t metadata_size, int device_num);
|
||||
Status SendCreateRetryRequest(const std::shared_ptr<StoreConn> &store_conn,
|
||||
ObjectID object_id, uint64_t request_id);
|
||||
|
||||
Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
NodeID *owner_raylet_id, std::string *owner_ip_address,
|
||||
int *owner_port, WorkerID *owner_worker_id, bool *evict_if_full,
|
||||
int64_t *data_size, int64_t *metadata_size, int *device_num);
|
||||
Status SendCreateRequest(const std::shared_ptr<StoreConn> &store_conn, ObjectID object_id,
|
||||
const ray::rpc::Address &owner_address, int64_t data_size,
|
||||
int64_t metadata_size, int device_num, bool try_immediately);
|
||||
|
||||
void ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
NodeID *owner_raylet_id, std::string *owner_ip_address,
|
||||
int *owner_port, WorkerID *owner_worker_id, int64_t *data_size,
|
||||
int64_t *metadata_size, int *device_num);
|
||||
|
||||
Status SendUnfinishedCreateReply(const std::shared_ptr<Client> &client,
|
||||
ObjectID object_id, uint64_t retry_with_request_id);
|
||||
|
||||
Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id,
|
||||
PlasmaObject *object, PlasmaError error, int64_t mmap_size);
|
||||
const PlasmaObject &object, PlasmaError error);
|
||||
|
||||
Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id,
|
||||
PlasmaObject *object, MEMFD_TYPE *store_fd, int64_t *mmap_size);
|
||||
uint64_t *retry_with_request_id, PlasmaObject *object,
|
||||
MEMFD_TYPE *store_fd, int64_t *mmap_size);
|
||||
|
||||
Status SendAbortRequest(const std::shared_ptr<StoreConn> &store_conn, ObjectID object_id);
|
||||
|
||||
|
|
|
@ -122,7 +122,9 @@ GetRequest::GetRequest(boost::asio::io_service &io_context,
|
|||
PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string directory,
|
||||
bool hugepages_enabled, const std::string &socket_name,
|
||||
std::shared_ptr<ExternalStore> external_store,
|
||||
ray::SpillObjectsCallback spill_objects_callback)
|
||||
uint32_t delay_on_oom_ms,
|
||||
ray::SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback)
|
||||
: io_context_(main_service),
|
||||
socket_name_(socket_name),
|
||||
acceptor_(main_service, ParseUrlEndpoint(socket_name)),
|
||||
|
@ -130,7 +132,11 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
|
|||
eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
|
||||
external_store_(external_store),
|
||||
spill_objects_callback_(spill_objects_callback),
|
||||
create_request_queue_() {
|
||||
delay_on_oom_ms_(delay_on_oom_ms),
|
||||
create_request_queue_(
|
||||
RayConfig::instance().object_store_full_max_retries(),
|
||||
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
|
||||
object_store_full_callback) {
|
||||
store_info_.directory = directory;
|
||||
store_info_.hugepages_enabled = hugepages_enabled;
|
||||
#ifdef PLASMA_CUDA
|
||||
|
@ -274,54 +280,35 @@ Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t *pointe
|
|||
}
|
||||
#endif
|
||||
|
||||
Status PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client> &client,
|
||||
const std::vector<uint8_t> &message) {
|
||||
PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client> &client,
|
||||
const std::vector<uint8_t> &message,
|
||||
bool evict_if_full,
|
||||
PlasmaObject *object) {
|
||||
uint8_t *input = (uint8_t *)message.data();
|
||||
size_t input_size = message.size();
|
||||
ObjectID object_id;
|
||||
PlasmaObject object = {};
|
||||
|
||||
NodeID owner_raylet_id;
|
||||
std::string owner_ip_address;
|
||||
int owner_port;
|
||||
WorkerID owner_worker_id;
|
||||
bool evict_if_full;
|
||||
int64_t data_size;
|
||||
int64_t metadata_size;
|
||||
int device_num;
|
||||
RAY_RETURN_NOT_OK(ReadCreateRequest(
|
||||
input, input_size, &object_id, &owner_raylet_id, &owner_ip_address, &owner_port,
|
||||
&owner_worker_id, &evict_if_full, &data_size, &metadata_size, &device_num));
|
||||
PlasmaError error_code = CreateObject(
|
||||
object_id, owner_raylet_id, owner_ip_address, owner_port, owner_worker_id,
|
||||
evict_if_full, data_size, metadata_size, device_num, client, &object);
|
||||
Status status;
|
||||
if (error_code == PlasmaError::TransientOutOfMemory) {
|
||||
RAY_LOG(DEBUG) << "Create object " << object_id
|
||||
<< " failed, waiting for object spill";
|
||||
status =
|
||||
Status::TransientObjectStoreFull("Object store full, queueing creation request");
|
||||
} else if (error_code == PlasmaError::OutOfMemory) {
|
||||
RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id
|
||||
<< ", data_size=" << data_size << ", metadata_size=" << metadata_size
|
||||
<< ", will send a reply of PlasmaError::OutOfMemory";
|
||||
RAY_RETURN_NOT_OK(
|
||||
SendCreateReply(client, object_id, &object, error_code, /*mmap_size=*/0));
|
||||
} else {
|
||||
int64_t mmap_size = 0;
|
||||
if (error_code == PlasmaError::OK && device_num == 0) {
|
||||
mmap_size = GetMmapSize(object.store_fd);
|
||||
}
|
||||
RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, mmap_size));
|
||||
if (error_code == PlasmaError::OK && device_num == 0) {
|
||||
RAY_RETURN_NOT_OK(client->SendFd(object.store_fd));
|
||||
}
|
||||
ReadCreateRequest(input, input_size, &object_id, &owner_raylet_id, &owner_ip_address,
|
||||
&owner_port, &owner_worker_id, &data_size, &metadata_size,
|
||||
&device_num);
|
||||
auto error = CreateObject(object_id, owner_raylet_id, owner_ip_address, owner_port,
|
||||
owner_worker_id, evict_if_full, data_size, metadata_size,
|
||||
device_num, client, object);
|
||||
if (error == PlasmaError::OutOfMemory) {
|
||||
RAY_LOG(WARNING) << "Not enough memory to create the object " << object_id
|
||||
<< ", data_size=" << data_size
|
||||
<< ", metadata_size=" << metadata_size;
|
||||
}
|
||||
|
||||
return status;
|
||||
return error;
|
||||
}
|
||||
|
||||
// Create a new object buffer in the hash table.
|
||||
PlasmaError PlasmaStore::CreateObject(
|
||||
const ObjectID &object_id, const NodeID &owner_raylet_id,
|
||||
const std::string &owner_ip_address, int owner_port, const WorkerID &owner_worker_id,
|
||||
|
@ -393,6 +380,10 @@ PlasmaError PlasmaStore::CreateObject(
|
|||
result->data_size = data_size;
|
||||
result->metadata_size = metadata_size;
|
||||
result->device_num = device_num;
|
||||
if (device_num == 0) {
|
||||
result->mmap_size = GetMmapSize(fd);
|
||||
}
|
||||
|
||||
// Notify the eviction policy that this object was created. This must be done
|
||||
// immediately before the call to AddToClientObjectIds so that the
|
||||
// eviction policy does not have an opportunity to evict the object.
|
||||
|
@ -980,12 +971,38 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
|
|||
// Process the different types of requests.
|
||||
switch (type) {
|
||||
case fb::MessageType::PlasmaCreateRequest: {
|
||||
RAY_LOG(DEBUG) << "Received create request for object "
|
||||
<< GetCreateRequestObjectId(message);
|
||||
create_request_queue_.AddRequest(client, [this, client, message]() {
|
||||
return HandleCreateObjectRequest(client, message);
|
||||
});
|
||||
ProcessCreateRequests();
|
||||
const auto &object_id = GetCreateRequestObjectId(message);
|
||||
const auto &request = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(input);
|
||||
|
||||
auto handle_create = [this, client, message](bool evict_if_full,
|
||||
PlasmaObject *result) {
|
||||
return HandleCreateObjectRequest(client, message, evict_if_full, result);
|
||||
};
|
||||
|
||||
if (request->try_immediately()) {
|
||||
RAY_LOG(DEBUG) << "Received request to create object " << object_id
|
||||
<< " immediately";
|
||||
auto result_error =
|
||||
create_request_queue_.TryRequestImmediately(object_id, client, handle_create);
|
||||
const auto &result = result_error.first;
|
||||
const auto &error = result_error.second;
|
||||
if (SendCreateReply(client, object_id, result, error).ok() &&
|
||||
error == PlasmaError::OK && result.device_num == 0) {
|
||||
static_cast<void>(client->SendFd(result.store_fd));
|
||||
}
|
||||
} else {
|
||||
auto req_id = create_request_queue_.AddRequest(object_id, client, handle_create);
|
||||
RAY_LOG(DEBUG) << "Received create request for object " << object_id
|
||||
<< " assigned request ID " << req_id;
|
||||
ProcessCreateRequests();
|
||||
ReplyToCreateClient(client, object_id, req_id);
|
||||
}
|
||||
} break;
|
||||
case fb::MessageType::PlasmaCreateRetryRequest: {
|
||||
auto request = flatbuffers::GetRoot<fb::PlasmaCreateRetryRequest>(input);
|
||||
RAY_DCHECK(plasma::VerifyFlatbuffer(request, input, input_size));
|
||||
const auto &object_id = ObjectID::FromBinary(request->object_id()->str());
|
||||
ReplyToCreateClient(client, object_id, request->request_id());
|
||||
} break;
|
||||
case fb::MessageType::PlasmaAbortRequest: {
|
||||
RAY_RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
|
||||
|
@ -1089,7 +1106,14 @@ void PlasmaStore::ProcessCreateRequests() {
|
|||
}
|
||||
|
||||
auto status = create_request_queue_.ProcessRequests();
|
||||
uint32_t retry_after_ms = 0;
|
||||
if (status.IsTransientObjectStoreFull()) {
|
||||
retry_after_ms = delay_on_transient_oom_ms_;
|
||||
} else if (status.IsObjectStoreFull()) {
|
||||
retry_after_ms = delay_on_oom_ms_;
|
||||
}
|
||||
|
||||
if (retry_after_ms > 0) {
|
||||
// Try to process requests later, after space has been made.
|
||||
create_timer_ = execute_after(io_context_,
|
||||
[this]() {
|
||||
|
@ -1098,7 +1122,23 @@ void PlasmaStore::ProcessCreateRequests() {
|
|||
create_timer_ = nullptr;
|
||||
ProcessCreateRequests();
|
||||
},
|
||||
delay_on_transient_oom_ms_);
|
||||
retry_after_ms);
|
||||
}
|
||||
}
|
||||
|
||||
void PlasmaStore::ReplyToCreateClient(const std::shared_ptr<Client> &client,
|
||||
const ObjectID &object_id, uint64_t req_id) {
|
||||
PlasmaObject result = {};
|
||||
PlasmaError error;
|
||||
bool finished = create_request_queue_.GetRequestResult(req_id, &result, &error);
|
||||
if (finished) {
|
||||
RAY_LOG(DEBUG) << "Finishing create object " << object_id << " request ID " << req_id;
|
||||
if (SendCreateReply(client, object_id, result, error).ok() &&
|
||||
error == PlasmaError::OK && result.device_num == 0) {
|
||||
static_cast<void>(client->SendFd(result.store_fd));
|
||||
}
|
||||
} else {
|
||||
static_cast<void>(SendUnfinishedCreateReply(client, object_id, req_id));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,8 +55,9 @@ class PlasmaStore {
|
|||
// TODO: PascalCase PlasmaStore methods.
|
||||
PlasmaStore(boost::asio::io_service &main_service, std::string directory,
|
||||
bool hugepages_enabled, const std::string &socket_name,
|
||||
std::shared_ptr<ExternalStore> external_store,
|
||||
ray::SpillObjectsCallback spill_objects_callback);
|
||||
std::shared_ptr<ExternalStore> external_store, uint32_t delay_on_oom_ms,
|
||||
ray::SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback);
|
||||
|
||||
~PlasmaStore();
|
||||
|
||||
|
@ -206,8 +207,12 @@ class PlasmaStore {
|
|||
void ProcessCreateRequests();
|
||||
|
||||
private:
|
||||
Status HandleCreateObjectRequest(const std::shared_ptr<Client> &client,
|
||||
const std::vector<uint8_t> &message);
|
||||
PlasmaError HandleCreateObjectRequest(const std::shared_ptr<Client> &client,
|
||||
const std::vector<uint8_t> &message,
|
||||
bool evict_if_full, PlasmaObject *object);
|
||||
|
||||
void ReplyToCreateClient(const std::shared_ptr<Client> &client,
|
||||
const ObjectID &object_id, uint64_t req_id);
|
||||
|
||||
void PushNotification(ObjectInfoT *object_notification);
|
||||
|
||||
|
@ -283,6 +288,10 @@ class PlasmaStore {
|
|||
/// complete.
|
||||
ray::SpillObjectsCallback spill_objects_callback_;
|
||||
|
||||
/// The amount of time to wait before retrying a creation request after an
|
||||
/// OOM error.
|
||||
const uint32_t delay_on_oom_ms_;
|
||||
|
||||
/// The amount of time to wait before retrying a creation request after a
|
||||
/// transient OOM error.
|
||||
const uint32_t delay_on_transient_oom_ms_ = 10;
|
||||
|
|
|
@ -75,7 +75,8 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem
|
|||
plasma_directory_ = plasma_directory;
|
||||
}
|
||||
|
||||
void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback) {
|
||||
void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
|
||||
std::function<void()> object_store_full_callback) {
|
||||
// Get external store
|
||||
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
|
||||
if (!external_store_endpoint_.empty()) {
|
||||
|
@ -93,8 +94,10 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback)
|
|||
|
||||
{
|
||||
absl::MutexLock lock(&store_runner_mutex_);
|
||||
store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_,
|
||||
socket_name_, external_store, spill_objects_callback));
|
||||
store_.reset(new PlasmaStore(
|
||||
main_service_, plasma_directory_, hugepages_enabled_, socket_name_,
|
||||
external_store, RayConfig::instance().object_store_full_initial_delay_ms(),
|
||||
spill_objects_callback, object_store_full_callback));
|
||||
plasma_config = store_->GetPlasmaStoreInfo();
|
||||
|
||||
// We are using a single memory-mapped file by mallocing and freeing a single
|
||||
|
|
|
@ -15,18 +15,14 @@ class PlasmaStoreRunner {
|
|||
PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
|
||||
bool hugepages_enabled, std::string plasma_directory,
|
||||
const std::string external_store_endpoint);
|
||||
void Start(ray::SpillObjectsCallback spill_objects_callback = nullptr);
|
||||
void Start(ray::SpillObjectsCallback spill_objects_callback = nullptr,
|
||||
std::function<void()> object_store_full_callback = nullptr);
|
||||
void Stop();
|
||||
void SetNotificationListener(
|
||||
const std::shared_ptr<ray::ObjectStoreNotificationManager> ¬ification_listener) {
|
||||
store_->SetNotificationListener(notification_listener);
|
||||
}
|
||||
|
||||
ray::SpaceReleasedCallback OnSpaceReleased() {
|
||||
return
|
||||
[this]() { main_service_.post([this]() { store_->ProcessCreateRequests(); }); };
|
||||
}
|
||||
|
||||
private:
|
||||
void Shutdown();
|
||||
absl::Mutex store_runner_mutex_;
|
||||
|
|
|
@ -24,51 +24,313 @@ class MockClient : public ClientInterface {
|
|||
MockClient() {}
|
||||
};
|
||||
|
||||
TEST(CreateRequestQueueTest, TestSimple) {
|
||||
CreateRequestQueue queue;
|
||||
#define ASSERT_REQUEST_UNFINISHED(queue, req_id) \
|
||||
{ \
|
||||
PlasmaObject result = {}; \
|
||||
PlasmaError status; \
|
||||
ASSERT_FALSE(queue.GetRequestResult(req_id, &result, &status)); \
|
||||
}
|
||||
|
||||
bool created = false;
|
||||
auto request = [&]() {
|
||||
created = true;
|
||||
return Status();
|
||||
#define ASSERT_REQUEST_FINISHED(queue, req_id, expected_status) \
|
||||
{ \
|
||||
PlasmaObject result = {}; \
|
||||
PlasmaError status; \
|
||||
\
|
||||
ASSERT_TRUE(queue.GetRequestResult(req_id, &result, &status)); \
|
||||
if (expected_status == PlasmaError::OK) { \
|
||||
ASSERT_EQ(result.data_size, 1234); \
|
||||
} \
|
||||
ASSERT_EQ(status, expected_status); \
|
||||
}
|
||||
|
||||
class CreateRequestQueueTest : public ::testing::Test {
|
||||
public:
|
||||
CreateRequestQueueTest()
|
||||
: queue_(
|
||||
/*max_retries=*/2,
|
||||
/*evict_if_full=*/true,
|
||||
/*on_global_gc=*/[&]() { num_global_gc_++; }) {}
|
||||
|
||||
void AssertNoLeaks() {
|
||||
ASSERT_TRUE(queue_.queue_.empty());
|
||||
ASSERT_TRUE(queue_.fulfilled_requests_.empty());
|
||||
}
|
||||
|
||||
CreateRequestQueue queue_;
|
||||
int num_global_gc_ = 0;
|
||||
};
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestSimple) {
|
||||
auto request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
auto client = std::make_shared<MockClient>();
|
||||
queue.AddRequest(client, request);
|
||||
ASSERT_FALSE(created);
|
||||
ASSERT_TRUE(queue.ProcessRequests().ok());
|
||||
ASSERT_TRUE(created);
|
||||
auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id);
|
||||
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::OK);
|
||||
ASSERT_EQ(num_global_gc_, 0);
|
||||
// Request gets cleaned up after we get it.
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::UnexpectedError);
|
||||
|
||||
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
auto req_id3 = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id3);
|
||||
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OK);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::OK);
|
||||
ASSERT_EQ(num_global_gc_, 0);
|
||||
// Request gets cleaned up after we get it.
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::UnexpectedError);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::UnexpectedError);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::UnexpectedError);
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
TEST(CreateRequestQueueTest, TestTransientOom) {
|
||||
CreateRequestQueue queue;
|
||||
TEST_F(CreateRequestQueueTest, TestOom) {
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
return PlasmaError::OutOfMemory;
|
||||
};
|
||||
auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
|
||||
int num_created = 0;
|
||||
Status return_status = Status::TransientObjectStoreFull("");
|
||||
auto oom_request = [&]() {
|
||||
if (return_status.ok()) {
|
||||
num_created++;
|
||||
auto client = std::make_shared<MockClient>();
|
||||
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request);
|
||||
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request);
|
||||
|
||||
// Neither request was fulfilled.
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
|
||||
ASSERT_EQ(num_global_gc_, 2);
|
||||
|
||||
// Retries used up. The first request should reply with OOM and the second
|
||||
// request should also be served.
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_EQ(num_global_gc_, 2);
|
||||
|
||||
// Both requests fulfilled.
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) {
|
||||
int num_global_gc_ = 0;
|
||||
CreateRequestQueue queue(
|
||||
/*max_retries=*/-1,
|
||||
/*evict_if_full=*/true,
|
||||
/*on_global_gc=*/[&]() { num_global_gc_++; });
|
||||
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
return PlasmaError::OutOfMemory;
|
||||
};
|
||||
auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
|
||||
auto client = std::make_shared<MockClient>();
|
||||
auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request);
|
||||
auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_EQ(num_global_gc_, i + 1);
|
||||
}
|
||||
|
||||
// Neither request was fulfilled.
|
||||
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
|
||||
}
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestTransientOom) {
|
||||
auto return_status = PlasmaError::TransientOutOfMemory;
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
if (return_status == PlasmaError::OK) {
|
||||
result->data_size = 1234;
|
||||
}
|
||||
return return_status;
|
||||
};
|
||||
auto blocked_request = [&]() {
|
||||
num_created++;
|
||||
return Status();
|
||||
auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
|
||||
auto client = std::make_shared<MockClient>();
|
||||
queue.AddRequest(client, oom_request);
|
||||
queue.AddRequest(client, blocked_request);
|
||||
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request);
|
||||
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request);
|
||||
|
||||
// Transient OOM should not use up any retries.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
|
||||
ASSERT_EQ(num_created, 0);
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull());
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
|
||||
ASSERT_EQ(num_global_gc_, 0);
|
||||
}
|
||||
|
||||
// Return OK for the first request. The second request should also be served.
|
||||
return_status = Status();
|
||||
ASSERT_TRUE(queue.ProcessRequests().ok());
|
||||
ASSERT_EQ(num_created, 2);
|
||||
return_status = PlasmaError::OK;
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OK);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) {
|
||||
auto return_status = PlasmaError::TransientOutOfMemory;
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
if (return_status == PlasmaError::OK) {
|
||||
result->data_size = 1234;
|
||||
}
|
||||
return return_status;
|
||||
};
|
||||
auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
|
||||
auto client = std::make_shared<MockClient>();
|
||||
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request);
|
||||
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request);
|
||||
|
||||
// Transient OOM should not use up any retries.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull());
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
|
||||
ASSERT_EQ(num_global_gc_, 0);
|
||||
}
|
||||
|
||||
// Now we are actually OOM.
|
||||
return_status = PlasmaError::OutOfMemory;
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
|
||||
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
|
||||
ASSERT_EQ(num_global_gc_, 2);
|
||||
|
||||
// Retries used up. The first request should reply with OOM and the second
|
||||
// request should also be served.
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
|
||||
ASSERT_EQ(num_global_gc_, 2);
|
||||
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestEvictIfFull) {
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
RAY_CHECK(evict_if_full);
|
||||
return PlasmaError::OutOfMemory;
|
||||
};
|
||||
|
||||
auto client = std::make_shared<MockClient>();
|
||||
static_cast<void>(queue_.AddRequest(ObjectID::Nil(), client, oom_request));
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
|
||||
}
|
||||
|
||||
TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) {
|
||||
CreateRequestQueue queue(
|
||||
/*max_retries=*/2,
|
||||
/*evict_if_full=*/false,
|
||||
/*on_global_gc=*/[&]() {});
|
||||
|
||||
bool first_try = true;
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
if (first_try) {
|
||||
RAY_CHECK(!evict_if_full);
|
||||
first_try = false;
|
||||
} else {
|
||||
RAY_CHECK(evict_if_full);
|
||||
}
|
||||
return PlasmaError::OutOfMemory;
|
||||
};
|
||||
|
||||
auto client = std::make_shared<MockClient>();
|
||||
static_cast<void>(queue.AddRequest(ObjectID::Nil(), client, oom_request));
|
||||
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
|
||||
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
|
||||
}
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestClientDisconnected) {
|
||||
auto request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
|
||||
// Client makes two requests. One is processed, the other is still in the
|
||||
// queue.
|
||||
auto client = std::make_shared<MockClient>();
|
||||
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
|
||||
// Another client makes a concurrent request.
|
||||
auto client2 = std::make_shared<MockClient>();
|
||||
auto req_id3 = queue_.AddRequest(ObjectID::Nil(), client2, request);
|
||||
|
||||
// Client disconnects.
|
||||
queue_.RemoveDisconnectedClientRequests(client);
|
||||
|
||||
// Both requests should be cleaned up.
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::UnexpectedError);
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::UnexpectedError);
|
||||
// Other client's request was fulfilled.
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id3, PlasmaError::OK);
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
TEST_F(CreateRequestQueueTest, TestTryRequestImmediately) {
|
||||
auto request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
result->data_size = 1234;
|
||||
return PlasmaError::OK;
|
||||
};
|
||||
auto client = std::make_shared<MockClient>();
|
||||
|
||||
// Queue is empty, request can be fulfilled.
|
||||
auto result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request);
|
||||
ASSERT_EQ(result.first.data_size, 1234);
|
||||
ASSERT_EQ(result.second, PlasmaError::OK);
|
||||
|
||||
// Request would block.
|
||||
auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request);
|
||||
result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request);
|
||||
ASSERT_EQ(result.first.data_size, 0);
|
||||
ASSERT_EQ(result.second, PlasmaError::OutOfMemory);
|
||||
ASSERT_TRUE(queue_.ProcessRequests().ok());
|
||||
|
||||
// Queue is empty again, request can be fulfilled.
|
||||
result = queue_.TryRequestImmediately(ObjectID::Nil(), client, request);
|
||||
ASSERT_EQ(result.first.data_size, 1234);
|
||||
ASSERT_EQ(result.second, PlasmaError::OK);
|
||||
|
||||
// Queue is empty, but request would block. Check that we do not attempt to
|
||||
// retry the request.
|
||||
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
|
||||
return PlasmaError::OutOfMemory;
|
||||
};
|
||||
result = queue_.TryRequestImmediately(ObjectID::Nil(), client, oom_request);
|
||||
ASSERT_EQ(result.first.data_size, 0);
|
||||
ASSERT_EQ(result.second, PlasmaError::OutOfMemory);
|
||||
|
||||
ASSERT_REQUEST_FINISHED(queue_, req_id, PlasmaError::OK);
|
||||
AssertNoLeaks();
|
||||
}
|
||||
|
||||
} // namespace plasma
|
||||
|
|
|
@ -146,9 +146,11 @@ class TestObjectManagerBase : public ::testing::Test {
|
|||
RAY_LOG(DEBUG) << "ObjectID Created: " << object_id;
|
||||
uint8_t metadata[] = {5};
|
||||
int64_t metadata_size = sizeof(metadata);
|
||||
uint64_t retry_with_request_id = 0;
|
||||
std::shared_ptr<arrow::Buffer> data;
|
||||
RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata,
|
||||
metadata_size, &data));
|
||||
metadata_size, &retry_with_request_id, &data));
|
||||
RAY_CHECK(retry_with_request_id == 0);
|
||||
RAY_CHECK_OK(client.Seal(object_id));
|
||||
return object_id;
|
||||
}
|
||||
|
|
|
@ -145,9 +145,11 @@ class TestObjectManagerBase : public ::testing::Test {
|
|||
RAY_LOG(DEBUG) << "ObjectID Created: " << object_id;
|
||||
uint8_t metadata[] = {5};
|
||||
int64_t metadata_size = sizeof(metadata);
|
||||
uint64_t retry_with_request_id = 0;
|
||||
std::shared_ptr<arrow::Buffer> data;
|
||||
RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata,
|
||||
metadata_size, &data));
|
||||
metadata_size, &retry_with_request_id, &data));
|
||||
RAY_CHECK(retry_with_request_id == 0);
|
||||
RAY_CHECK_OK(client.Seal(object_id));
|
||||
return object_id;
|
||||
}
|
||||
|
|
|
@ -210,7 +210,6 @@ void LocalObjectManager::SpillObjectsInternal(
|
|||
<< status.ToString();
|
||||
if (callback) {
|
||||
callback(status);
|
||||
on_objects_spilled_();
|
||||
}
|
||||
} else {
|
||||
AddSpilledUrls(objects_to_spill, r, callback);
|
||||
|
@ -262,7 +261,6 @@ void LocalObjectManager::AddSpilledUrls(
|
|||
(*num_remaining)--;
|
||||
if (*num_remaining == 0 && callback) {
|
||||
callback(status);
|
||||
on_objects_spilled_();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -39,8 +39,7 @@ class LocalObjectManager {
|
|||
gcs::ObjectInfoAccessor &object_info_accessor,
|
||||
rpc::CoreWorkerClientPool &owner_client_pool,
|
||||
bool object_pinning_enabled, bool automatic_object_deletion_enabled,
|
||||
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
|
||||
SpaceReleasedCallback on_objects_spilled)
|
||||
std::function<void(const std::vector<ObjectID> &)> on_objects_freed)
|
||||
: free_objects_period_ms_(free_objects_period_ms),
|
||||
free_objects_batch_size_(free_objects_batch_size),
|
||||
io_worker_pool_(io_worker_pool),
|
||||
|
@ -49,7 +48,6 @@ class LocalObjectManager {
|
|||
object_pinning_enabled_(object_pinning_enabled),
|
||||
automatic_object_deletion_enabled_(automatic_object_deletion_enabled),
|
||||
on_objects_freed_(on_objects_freed),
|
||||
on_objects_spilled_(on_objects_spilled),
|
||||
last_free_objects_at_ms_(current_time_ms()) {}
|
||||
|
||||
/// Pin objects.
|
||||
|
@ -175,10 +173,6 @@ class LocalObjectManager {
|
|||
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> objects_pending_spill_
|
||||
GUARDED_BY(mutex_);
|
||||
|
||||
/// Callback to call whenever objects have been spilled or failed to be
|
||||
/// spilled.
|
||||
SpaceReleasedCallback on_objects_spilled_;
|
||||
|
||||
/// The time that we last sent a FreeObjects request to other nodes for
|
||||
/// objects that have gone out of scope in the application.
|
||||
uint64_t last_free_objects_at_ms_ = 0;
|
||||
|
|
|
@ -116,8 +116,7 @@ std::string WorkerOwnerString(std::shared_ptr<WorkerInterface> &worker) {
|
|||
NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id,
|
||||
const NodeManagerConfig &config, ObjectManager &object_manager,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory,
|
||||
SpaceReleasedCallback on_objects_spilled)
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory)
|
||||
: self_node_id_(self_node_id),
|
||||
io_service_(io_service),
|
||||
object_manager_(object_manager),
|
||||
|
@ -166,8 +165,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
|
|||
[this](const std::vector<ObjectID> &object_ids) {
|
||||
object_manager_.FreeObjects(object_ids,
|
||||
/*local_only=*/false);
|
||||
},
|
||||
on_objects_spilled),
|
||||
}),
|
||||
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()),
|
||||
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
|
||||
record_metrics_period_(config.record_metrics_period_ms) {
|
||||
|
@ -2140,9 +2138,9 @@ void NodeManager::MarkObjectsAsFailed(
|
|||
ObjectID object_id = ObjectID::FromBinary(ref.object_id());
|
||||
std::shared_ptr<arrow::Buffer> data;
|
||||
Status status;
|
||||
status = store_client_.Create(object_id, ref.owner_address(), 0,
|
||||
reinterpret_cast<const uint8_t *>(meta.c_str()),
|
||||
meta.length(), &data);
|
||||
status = store_client_.TryCreateImmediately(
|
||||
object_id, ref.owner_address(), 0,
|
||||
reinterpret_cast<const uint8_t *>(meta.c_str()), meta.length(), &data);
|
||||
if (status.ok()) {
|
||||
status = store_client_.Seal(object_id);
|
||||
}
|
||||
|
|
|
@ -135,8 +135,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
|||
NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id,
|
||||
const NodeManagerConfig &config, ObjectManager &object_manager,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory_,
|
||||
SpaceReleasedCallback on_objects_spilled);
|
||||
std::shared_ptr<ObjectDirectoryInterface> object_directory_);
|
||||
|
||||
/// Process a new client connection.
|
||||
///
|
||||
|
@ -176,6 +175,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
|||
|
||||
LocalObjectManager &GetLocalObjectManager() { return local_object_manager_; }
|
||||
|
||||
/// Trigger global GC across the cluster to free up references to actors or
|
||||
/// object ids.
|
||||
void TriggerGlobalGC();
|
||||
|
||||
private:
|
||||
/// Methods for handling nodes.
|
||||
|
||||
|
@ -637,10 +640,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
|||
rpc::ReleaseUnusedBundlesReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) override;
|
||||
|
||||
/// Trigger global GC across the cluster to free up references to actors or
|
||||
/// object ids.
|
||||
void TriggerGlobalGC();
|
||||
|
||||
/// Trigger local GC on each worker of this raylet.
|
||||
void DoLocalGC();
|
||||
|
||||
|
|
|
@ -60,7 +60,8 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
|
|||
const NodeManagerConfig &node_manager_config,
|
||||
const ObjectManagerConfig &object_manager_config,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client, int metrics_export_port)
|
||||
: self_node_id_(NodeID::FromRandom()),
|
||||
: main_service_(main_service),
|
||||
self_node_id_(NodeID::FromRandom()),
|
||||
gcs_client_(gcs_client),
|
||||
object_directory_(
|
||||
RayConfig::instance().ownership_based_object_directory_enabled()
|
||||
|
@ -79,10 +80,14 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
|
|||
[this](int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) {
|
||||
return node_manager_.GetLocalObjectManager().SpillObjectsOfSize(
|
||||
num_bytes_to_spill, min_bytes_to_spill);
|
||||
},
|
||||
[this]() {
|
||||
// Post on the node manager's event loop since this
|
||||
// will be called from the plasma store thread.
|
||||
main_service_.post([this]() { node_manager_.TriggerGlobalGC(); });
|
||||
}),
|
||||
node_manager_(main_service, self_node_id_, node_manager_config, object_manager_,
|
||||
gcs_client_, object_directory_,
|
||||
plasma::plasma_store_runner->OnSpaceReleased()),
|
||||
gcs_client_, object_directory_),
|
||||
socket_name_(socket_name),
|
||||
acceptor_(main_service, ParseUrlEndpoint(socket_name)),
|
||||
socket_(main_service) {
|
||||
|
|
|
@ -76,6 +76,9 @@ class Raylet {
|
|||
|
||||
friend class TestObjectManagerIntegration;
|
||||
|
||||
// Main event loop.
|
||||
boost::asio::io_service &main_service_;
|
||||
|
||||
/// ID of this node.
|
||||
NodeID self_node_id_;
|
||||
/// Information of this node.
|
||||
|
|
|
@ -240,8 +240,7 @@ class LocalObjectManagerTest : public ::testing::Test {
|
|||
for (const auto &object_id : object_ids) {
|
||||
freed.insert(object_id);
|
||||
}
|
||||
},
|
||||
[&]() { num_callbacks_fired++; }),
|
||||
}),
|
||||
unpins(std::make_shared<std::unordered_map<ObjectID, int>>()) {
|
||||
RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}});
|
||||
}
|
||||
|
@ -263,7 +262,6 @@ class LocalObjectManagerTest : public ::testing::Test {
|
|||
// This hashmap is incremented when objects are unpinned by destroying their
|
||||
// unique_ptr.
|
||||
std::shared_ptr<std::unordered_map<ObjectID, int>> unpins;
|
||||
size_t num_callbacks_fired = 0;
|
||||
};
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestPin) {
|
||||
|
@ -292,7 +290,6 @@ TEST_F(LocalObjectManagerTest, TestPin) {
|
|||
}
|
||||
std::unordered_set<ObjectID> expected(object_ids.begin(), object_ids.end());
|
||||
ASSERT_EQ(freed, expected);
|
||||
ASSERT_EQ(num_callbacks_fired, 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||
|
@ -305,7 +302,6 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
|||
num_times_fired++;
|
||||
});
|
||||
ASSERT_EQ(num_times_fired, 1);
|
||||
ASSERT_EQ(num_callbacks_fired, 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
|
||||
|
@ -348,7 +344,6 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
|
|||
for (const auto &id : object_ids) {
|
||||
ASSERT_EQ((*unpins)[id], 1);
|
||||
}
|
||||
ASSERT_TRUE(num_callbacks_fired > 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
|
||||
|
@ -400,7 +395,6 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
|
|||
for (const auto &id : object_ids) {
|
||||
ASSERT_EQ((*unpins)[id], 1);
|
||||
}
|
||||
ASSERT_TRUE(num_callbacks_fired > 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
|
||||
|
@ -456,7 +450,6 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
|
|||
// Check that this returns the total number of bytes currently being spilled.
|
||||
num_bytes_required = manager.SpillObjectsOfSize(0, 0);
|
||||
ASSERT_EQ(num_bytes_required, 0);
|
||||
ASSERT_TRUE(num_callbacks_fired > 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestSpillError) {
|
||||
|
@ -500,7 +493,6 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
|
|||
ASSERT_EQ(num_times_fired, 2);
|
||||
ASSERT_EQ(object_table.object_urls[object_id], url);
|
||||
ASSERT_EQ((*unpins)[object_id], 1);
|
||||
ASSERT_TRUE(num_callbacks_fired > 0);
|
||||
}
|
||||
|
||||
TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) {
|
||||
|
|
Loading…
Add table
Reference in a new issue