[Object Spilling] Implement level triggered logic to make streaming shuffle work + additional cleanup (#12773)

This commit is contained in:
SangBin Cho 2020-12-18 19:31:14 -08:00 committed by GitHub
parent 404161a3ff
commit 9d939e6674
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
41 changed files with 654 additions and 543 deletions

View file

@ -638,9 +638,11 @@ cdef c_vector[c_string] spill_objects_handler(
return return_urls
cdef void restore_spilled_objects_handler(
cdef int64_t restore_spilled_objects_handler(
const c_vector[CObjectID]& object_ids_to_restore,
const c_vector[c_string]& object_urls) nogil:
cdef:
int64_t bytes_restored = 0
with gil:
urls = []
size = object_urls.size()
@ -651,7 +653,8 @@ cdef void restore_spilled_objects_handler(
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE):
external_storage.restore_spilled_objects(object_refs, urls)
bytes_restored = external_storage.restore_spilled_objects(
object_refs, urls)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "
@ -662,6 +665,7 @@ cdef void restore_spilled_objects_handler(
"restore_spilled_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
return bytes_restored
cdef void delete_spilled_objects_handler(
@ -873,7 +877,8 @@ cdef class CoreWorker:
return self.plasma_event_handler
def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1, plasma_objects_only=False):
int64_t timeout_ms=-1,
plasma_objects_only=False):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
@ -1573,17 +1578,6 @@ cdef class CoreWorker:
resource_name.encode("ascii"), capacity,
CNodeID.FromBinary(client_id.binary()))
def force_spill_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
object_ids = ObjectRefsToVector(object_refs)
assert not RayConfig.instance().automatic_object_deletion_enabled(), (
"Automatic object deletion is not supported for"
"force_spill_objects yet. Please set"
"automatic_object_deletion_enabled: False in Ray's system config.")
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.SpillObjects(object_ids))
cdef void async_set_result(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *future) with gil:

View file

@ -1,6 +1,4 @@
from .dynamic_resources import set_resource
from .object_spilling import force_spill_objects
__all__ = [
"set_resource",
"force_spill_objects",
]

View file

@ -1,18 +0,0 @@
import ray
def force_spill_objects(object_refs):
"""Force spilling objects to external storage.
Args:
object_refs: Object refs of the objects to be
spilled.
"""
core_worker = ray.worker.global_worker.core_worker
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
f"Attempting to call `force_spill_objects` on the "
f"value {object_ref}, which is not an ray.ObjectRef.")
return core_worker.force_spill_objects(object_refs)

View file

@ -157,12 +157,15 @@ class ExternalStorage(metaclass=abc.ABCMeta):
@abc.abstractmethod
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
url_with_offset_list: List[str]) -> int:
"""Restore objects from the external storage.
Args:
object_refs: List of object IDs (note that it is not ref).
url_with_offset_list: List of url_with_offset.
Returns:
The total number of bytes restored.
"""
@abc.abstractmethod
@ -215,6 +218,7 @@ class FileSystemStorage(ExternalStorage):
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
total = 0
for i in range(len(object_refs)):
object_ref = object_refs[i]
url_with_offset = url_with_offset_list[i].decode()
@ -228,9 +232,11 @@ class FileSystemStorage(ExternalStorage):
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
total += buf_len
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
return total
def delete_spilled_objects(self, urls: List[str]):
for url in urls:
@ -297,6 +303,7 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
from smart_open import open
total = 0
for i in range(len(object_refs)):
object_ref = object_refs[i]
url_with_offset = url_with_offset_list[i].decode()
@ -315,9 +322,11 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
total += buf_len
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
return total
def delete_spilled_objects(self, urls: List[str]):
pass
@ -367,8 +376,8 @@ def restore_spilled_objects(object_refs: List[ObjectRef],
object_refs: List of object IDs (note that it is not ref).
url_with_offset_list: List of url_with_offset.
"""
_external_storage.restore_spilled_objects(object_refs,
url_with_offset_list)
return _external_storage.restore_spilled_objects(object_refs,
url_with_offset_list)
def delete_spilled_objects(urls: List[str]):

View file

@ -233,7 +233,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects
(void(
(int64_t(
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
(void(

View file

@ -23,7 +23,7 @@ def get_default_fixure_system_config():
"object_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
"object_store_full_max_retries": 3,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
}
return system_config

View file

@ -4,11 +4,9 @@ import os
import random
import platform
import sys
import time
import numpy as np
import pytest
import psutil
import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
@ -43,57 +41,6 @@ def object_spilling_config(request, tmpdir):
yield json.dumps(request.param)
@pytest.mark.skip("This test is for local benchmark.")
def test_sample_benchmark(object_spilling_config, shutdown_only):
# --Config values--
max_io_workers = 10
object_store_limit = 500 * 1024 * 1024
eight_mb = 1024 * 1024
object_size = 12 * eight_mb
spill_cnt = 50
# Limit our object store to 200 MiB of memory.
ray.init(
object_store_memory=object_store_limit,
_system_config={
"object_store_full_max_retries": 0,
"max_io_workers": max_io_workers,
"object_spilling_config": object_spilling_config,
"automatic_object_deletion_enabled": False,
})
arr = np.random.rand(object_size)
replay_buffer = []
pinned_objects = set()
# Create objects of more than 200 MiB.
spill_start = time.perf_counter()
for _ in range(spill_cnt):
ref = None
while ref is None:
try:
ref = ray.put(arr)
replay_buffer.append(ref)
pinned_objects.add(ref)
except ray.exceptions.ObjectStoreFullError:
ref_to_spill = pinned_objects.pop()
ray.experimental.force_spill_objects([ref_to_spill])
spill_end = time.perf_counter()
# Make sure to remove unpinned objects.
del pinned_objects
restore_start = time.perf_counter()
while replay_buffer:
ref = replay_buffer.pop()
sample = ray.get(ref) # noqa
restore_end = time.perf_counter()
print(f"Object spilling benchmark for the config {object_spilling_config}")
print(f"Spilling {spill_cnt} number of objects of size {object_size}B "
f"takes {spill_end - spill_start} seconds with {max_io_workers} "
"number of io workers.")
print(f"Getting all objects takes {restore_end - restore_start} seconds.")
def test_invalid_config_raises_exception(shutdown_only):
# Make sure ray.init raises an exception before
# it starts processes when invalid object spilling
@ -127,123 +74,38 @@ def test_url_generation_and_parse():
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_manually(object_spilling_config, shutdown_only):
def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"object_store_full_max_retries": 0,
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
pinned_objects = set()
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
ref = ray.get(ray.put(arr)) # noqa
# Since the ref exists, it should raise OOM.
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ref2 = ray.put(arr) # noqa
# Create objects of more than 200 MiB.
for _ in range(25):
ref = None
while ref is None:
try:
ref = ray.put(arr)
replay_buffer.append(ref)
pinned_objects.add(ref)
except ray.exceptions.ObjectStoreFullError:
ref_to_spill = pinned_objects.pop()
ray.experimental.force_spill_objects([ref_to_spill])
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
def is_worker(cmdline):
return cmdline and cmdline[0].startswith("ray::")
# Make sure io workers are spawned with proper name.
processes = [
x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"])
if is_worker(x.info["cmdline"])
]
assert (
ray.ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE in processes)
# Spill 2 more objects so we will always have enough space for
# restoring objects back.
refs_to_spill = (pinned_objects.pop(), pinned_objects.pop())
ray.experimental.force_spill_objects(refs_to_spill)
# randomly sample objects
for _ in range(100):
ref = random.choice(replay_buffer)
sample = ray.get(ref)
assert np.array_equal(sample, arr)
# Make sure io workers are spawned with proper name.
processes = [
x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"])
if is_worker(x.info["cmdline"])
]
assert (
ray.ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE in processes)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_manually_from_workers(object_spilling_config,
shutdown_only):
# Limit our object store to 100 MiB of memory.
ray.init(
object_store_memory=100 * 1024 * 1024,
_system_config={
"object_store_full_max_retries": 0,
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
@ray.remote
def _worker():
arr = np.random.rand(1024 * 1024) # 8 MB data
ref = ray.put(arr)
ray.experimental.force_spill_objects([ref])
return ref
# Create objects of more than 200 MiB.
replay_buffer = [ray.get(_worker.remote()) for _ in range(25)]
values = {ref: np.copy(ray.get(ref)) for ref in replay_buffer}
# Randomly sample objects.
for _ in range(100):
ref = random.choice(replay_buffer)
sample = ray.get(ref)
assert np.array_equal(sample, values[ref])
@pytest.mark.skip(reason="Not implemented yet.")
def test_spill_objects_manually_with_workers(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=100 * 1024 * 1024,
_system_config={
"object_store_full_max_retries": 0,
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
arrays = [np.random.rand(100 * 1024) for _ in range(50)]
objects = [ray.put(arr) for arr in arrays]
@ray.remote
def _worker(object_refs):
ray.experimental.force_spill_objects(object_refs)
ray.get([_worker.remote([o]) for o in objects])
for restored, arr in zip(ray.get(objects), arrays):
assert np.array_equal(restored, arr)
wait_for_condition(is_dir_empty)
@pytest.mark.skipif(
@ -255,7 +117,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config,
"_system_config": {
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"max_io_workers": 4,
"object_spilling_config": json.dumps({
"type": "filesystem",
@ -308,7 +170,7 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0
})
@ -344,7 +206,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
object_store_memory=100 * 1024 * 1024,
_system_config={
"automatic_object_spilling_enabled": True,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
# NOTE(swang): Use infinite retries because the OOM timer can still
# get accidentally triggered when objects are released too slowly
# (see github.com/ray-project/ray/issues/12040).
@ -381,7 +243,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
"max_io_workers": 1,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
@ -411,10 +273,11 @@ def test_delete_objects(tmp_path, shutdown_only):
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"max_io_workers": 1,
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
@ -454,9 +317,10 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
@ -506,7 +370,7 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
@ -579,9 +443,10 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 2,
"min_spilling_size": 20 * 1024 * 1024,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
@ -648,14 +513,14 @@ def test_fusion_objects(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
min_spilling_size = 30 * 1024 * 1024
min_spilling_size = 10 * 1024 * 1024
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"max_io_workers": 3,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {

View file

@ -19,7 +19,6 @@ logger = logging.getLogger(__name__)
@pytest.fixture
def one_worker_100MiB(request):
config = {
"object_store_full_max_retries": 2,
"task_retry_delay_ms": 0,
}
yield ray.init(

View file

@ -243,10 +243,9 @@ RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
RAY_CONFIG(int32_t, object_store_full_max_retries, 5)
RAY_CONFIG(int32_t, object_store_full_max_retries, 1000)
/// Duration to sleep after failing to put an object in plasma because it is full.
/// This will be exponentially increased for each retry.
RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000)
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
/// The amount of time to wait between logging plasma space usage debug messages.
RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60)
@ -254,6 +253,9 @@ RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60)
/// The amount of time between automatic local Python GC triggers.
RAY_CONFIG(uint64_t, local_gc_interval_s, 10 * 60)
/// The min amount of time between local GCs (whether auto or mem pressure triggered).
RAY_CONFIG(uint64_t, local_gc_min_interval_s, 10)
/// Duration to wait between retries for failed tasks.
RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000)

View file

@ -575,7 +575,8 @@ void CoreWorker::Exit(bool intentional) {
<< " received, this process will exit after all outstanding tasks have finished";
exiting_ = true;
// Release the resources early in case draining takes a long time.
RAY_CHECK_OK(local_raylet_client_->NotifyDirectCallTaskBlocked());
RAY_CHECK_OK(
local_raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true));
// Callback to shutdown.
auto shutdown = [this, intentional]() {
@ -2369,7 +2370,9 @@ void CoreWorker::HandleRestoreSpilledObjects(
for (const auto &url : request.spilled_objects_url()) {
spilled_objects_url.push_back(url);
}
options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url);
auto total =
options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url);
reply->set_bytes_restored_total(total);
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
send_reply_callback(

View file

@ -139,7 +139,7 @@ struct CoreWorkerOptions {
/// Application-language callback to spill objects to external storage.
std::function<std::vector<std::string>(const std::vector<ObjectID> &)> spill_objects;
/// Application-language callback to restore objects from external storage.
std::function<void(const std::vector<ObjectID> &, const std::vector<std::string> &)>
std::function<int64_t(const std::vector<ObjectID> &, const std::vector<std::string> &)>
restore_spilled_objects;
/// Application-language callback to delete objects from external storage.
std::function<void(const std::vector<std::string> &, rpc::WorkerType)>

View file

@ -232,16 +232,18 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results) {
std::vector<std::shared_ptr<RayObject>> *results,
bool release_resources) {
return GetImpl(object_ids, num_objects, timeout_ms, ctx, remove_after_get, results,
/*abort_if_any_object_is_exception=*/true);
/*abort_if_any_object_is_exception=*/true, release_resources);
}
Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception) {
bool abort_if_any_object_is_exception,
bool release_resources) {
(*results).resize(object_ids.size(), nullptr);
std::shared_ptr<GetRequest> get_request;
@ -299,7 +301,8 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
// Wait for remaining objects (or timeout).
if (should_notify_raylet) {
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked());
// SANG-TODO Implement memory store get
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources));
}
bool done = false;
@ -374,11 +377,11 @@ Status CoreWorkerMemoryStore::Get(
const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) {
bool *got_exception, bool release_resources) {
const std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_RETURN_NOT_OK(Get(id_vector, id_vector.size(), timeout_ms, ctx,
/*remove_after_get=*/false, &result_objects));
/*remove_after_get=*/false, &result_objects, release_resources));
for (size_t i = 0; i < id_vector.size(); i++) {
if (result_objects[i] != nullptr) {
@ -401,8 +404,9 @@ Status CoreWorkerMemoryStore::Wait(const absl::flat_hash_set<ObjectID> &object_i
std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_CHECK(object_ids.size() == id_vector.size());
auto status = GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects,
/*abort_if_any_object_is_exception=*/false);
auto status =
GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects,
/*abort_if_any_object_is_exception=*/false, /*release_resources=*/true);
// Ignore TimedOut statuses since we return ready objects explicitly.
if (!status.IsTimedOut()) {
RAY_RETURN_NOT_OK(status);

View file

@ -58,13 +58,14 @@ class CoreWorkerMemoryStore {
/// \return Status.
Status Get(const std::vector<ObjectID> &object_ids, int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results);
std::vector<std::shared_ptr<RayObject>> *results,
bool release_resources = true);
/// Convenience wrapper around Get() that stores results in a given result map.
Status Get(const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception);
bool *got_exception, bool release_resources = true);
/// Convenience wrapper around Get() that stores ready objects in a given result set.
Status Wait(const absl::flat_hash_set<ObjectID> &object_ids, int num_objects,
@ -137,11 +138,12 @@ class CoreWorkerMemoryStore {
private:
/// See the public version of `Get` for meaning of the other arguments.
/// \param[in] abort_if_any_object_is_exception Whether we should abort if any object
/// is an exception.
/// \param[in] release_resources true if memory store blocking get needs to release
/// resources. is an exception.
Status GetImpl(const std::vector<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception);
bool abort_if_any_object_is_exception, bool release_resources);
/// Optional callback for putting objects into the plasma store.
std::function<void(const RayObject &, const ObjectID &)> store_in_plasma_;

View file

@ -35,6 +35,7 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
} else {
get_current_call_site_ = []() { return "<no callsite callback>"; };
}
object_store_full_delay_ms_ = RayConfig::instance().object_store_full_delay_ms();
buffer_tracker_ = std::make_shared<BufferTracker>();
RAY_CHECK_OK(store_client_.Connect(store_socket));
if (warmup) {
@ -95,7 +96,8 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
}
while (retry_with_request_id > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// TODO(sang): Use exponential backoff instead.
std::this_thread::sleep_for(std::chrono::milliseconds(object_store_full_delay_ms_));
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_LOG(DEBUG) << "Retrying request for object " << object_id << " with request ID "
@ -224,7 +226,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(
const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception) {
bool *got_exception, bool release_resources) {
int64_t batch_size = RayConfig::instance().worker_fetch_request_size();
std::vector<ObjectID> batch_ids;
absl::flat_hash_set<ObjectID> remaining(object_ids.begin(), object_ids.end());
@ -275,7 +277,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(
size_t previous_size = remaining.size();
// This is a separate IPC from the FetchAndGet in direct call mode.
if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) {
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked());
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources));
}
RAY_RETURN_NOT_OK(
FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout,
@ -332,7 +334,9 @@ Status CoreWorkerPlasmaStoreProvider::Wait(
// This is a separate IPC from the Wait in direct call mode.
if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) {
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked());
// SANG-TODO Implement wait
RAY_RETURN_NOT_OK(
raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true));
}
const auto owner_addresses = reference_counter_->GetOwnerAddresses(id_vector);
RAY_RETURN_NOT_OK(

View file

@ -90,7 +90,7 @@ class CoreWorkerPlasmaStoreProvider {
Status Get(const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception);
bool *got_exception, bool release_resources = true);
Status Contains(const ObjectID &object_id, bool *has_object);
@ -154,6 +154,7 @@ class CoreWorkerPlasmaStoreProvider {
std::mutex store_client_mutex_;
std::function<Status()> check_signals_;
std::function<std::string()> get_current_call_site_;
uint32_t object_store_full_delay_ms_;
// Active buffers tracker. This must be allocated as a separate structure since its
// lifetime can exceed that of the store provider due to callback references.

View file

@ -72,7 +72,9 @@ void GcsObjectManager::HandleAddObjectLocation(
AddObjectLocationInCache(object_id, node_id);
} else {
absl::MutexLock lock(&mutex_);
object_to_locations_[object_id].spilled_url = request.spilled_url();
RAY_CHECK(!request.spilled_url().empty());
spilled_url = request.spilled_url();
object_to_locations_[object_id].spilled_url = spilled_url;
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id;
}
@ -91,7 +93,8 @@ void GcsObjectManager::HandleAddObjectLocation(
notification.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id << ", task id = " << object_id.TaskId();
<< ", node id = " << node_id << ", task id = " << object_id.TaskId()
<< ", spilled_url = " << spilled_url;
} else {
RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString()
<< ", job id = " << object_id.TaskId().JobId()

View file

@ -10,14 +10,8 @@
namespace ray {
/// A callback to asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// the amount of space needed after the spilling is complete.
/// The returned value is calculated based off of min_bytes_to_spill. That says,
/// although it fails to spill num_bytes_to_spill, as long as it spills more than
/// min_bytes_to_spill, it will return the value that is less than 0 (meaning we
/// don't need any more additional space).
using SpillObjectsCallback =
std::function<int64_t(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill)>;
/// It spills enough objects to saturate all spill IO workers.
using SpillObjectsCallback = std::function<bool()>;
/// A callback to call when space has been released.
using SpaceReleasedCallback = std::function<void()>;

View file

@ -118,6 +118,13 @@ void ObjectManager::Stop() {
}
}
bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) {
if (plasma::plasma_store_runner != nullptr) {
return plasma::plasma_store_runner->IsPlasmaObjectSpillable(object_id);
}
return false;
}
void ObjectManager::RunRpcService() { rpc_service_.run(); }
void ObjectManager::StartRpcService() {

View file

@ -206,6 +206,13 @@ class ObjectManager : public ObjectManagerInterface,
/// signals from Raylet.
void Stop();
/// This methods call the plasma store which runs in a separate thread.
/// Check if the given object id is evictable by directly calling plasma store.
/// Plasma store will return true if the object is spillable, meaning it is only
/// pinned by the raylet, so we can comfotable evict after spilling the object from
/// local object manager. False otherwise.
bool IsPlasmaObjectSpillable(const ObjectID &object_id);
/// Subscribe to notifications of objects added to local store.
/// Upon subscribing, the callback will be invoked for all objects that
///

View file

@ -69,17 +69,21 @@ std::pair<PlasmaObject, PlasmaError> CreateRequestQueue::TryRequestImmediately(
auto req_id = AddRequest(object_id, client, create_callback);
if (!ProcessRequests().ok()) {
// If the request was not immediately fulfillable, finish it.
RAY_CHECK(!queue_.empty());
FinishRequest(queue_.begin());
if (!queue_.empty()) {
// Some errors such as a transient OOM error doesn't finish the request, so we
// should finish it here.
FinishRequest(queue_.begin());
}
}
PlasmaError error;
RAY_CHECK(GetRequestResult(req_id, &result, &error));
return {result, error};
}
Status CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &request) {
bool CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &request) {
// Return an OOM error to the client if we have hit the maximum number of
// retries.
// TODO(sang): Delete this logic?
bool evict_if_full = evict_if_full_;
if (max_retries_ == 0) {
// If we cannot retry, then always evict on the first attempt.
@ -88,50 +92,36 @@ Status CreateRequestQueue::ProcessRequest(std::unique_ptr<CreateRequest> &reques
// Always try to evict after the first attempt.
evict_if_full = true;
}
request->error = request->create_callback(evict_if_full, &request->result);
Status status;
auto should_retry_on_oom = max_retries_ == -1 || num_retries_ < max_retries_;
if (request->error == PlasmaError::TransientOutOfMemory) {
// The object store is full, but we should wait for space to be made
// through spilling, so do nothing. The caller must guarantee that
// ProcessRequests is called again so that we can try this request again.
// NOTE(swang): There could be other requests behind this one that are
// actually serviceable. This may be inefficient, but eventually this
// request will get served and unblock the following requests, once
// enough objects have been spilled.
// TODO(swang): Ask the raylet to spill enough space for multiple requests
// at once, instead of just the head of the queue.
num_retries_ = 0;
status =
Status::TransientObjectStoreFull("Object store full, queueing creation request");
} else if (request->error == PlasmaError::OutOfMemory && should_retry_on_oom) {
num_retries_++;
RAY_LOG(DEBUG) << "Not enough memory to create the object, after " << num_retries_
<< " tries";
if (trigger_global_gc_) {
trigger_global_gc_();
}
status = Status::ObjectStoreFull("Object store full, should retry on timeout");
} else if (request->error == PlasmaError::OutOfMemory) {
RAY_LOG(ERROR) << "Not enough memory to create object " << request->object_id
<< " after " << num_retries_
<< " tries, will return OutOfMemory to the client";
}
return status;
return request->error != PlasmaError::OutOfMemory;
}
Status CreateRequestQueue::ProcessRequests() {
while (!queue_.empty()) {
auto request_it = queue_.begin();
auto status = ProcessRequest(*request_it);
if (status.IsTransientObjectStoreFull() || status.IsObjectStoreFull()) {
return status;
auto create_ok = ProcessRequest(*request_it);
if (create_ok) {
FinishRequest(request_it);
} else {
if (trigger_global_gc_) {
trigger_global_gc_();
}
if (spill_objects_callback_()) {
return Status::TransientObjectStoreFull("Waiting for spilling.");
} else if (num_retries_ < max_retries_ || max_retries_ == -1) {
// We need a grace period since (1) global GC takes a bit of time to
// kick in, and (2) there is a race between spilling finishing and space
// actually freeing up in the object store.
// If max_retries == -1, we retry infinitely.
num_retries_ += 1;
return Status::ObjectStoreFull("Waiting for grace period.");
} else {
// Raise OOM. In this case, the request will be marked as OOM.
// We don't return so that we can process the next entry right away.
FinishRequest(request_it);
}
}
FinishRequest(request_it);
}
return Status::OK();
}

View file

@ -21,6 +21,7 @@
#include "absl/container/flat_hash_map.h"
#include "ray/common/status.h"
#include "ray/object_manager/common.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/connection.h"
#include "ray/object_manager/plasma/plasma.h"
@ -34,9 +35,11 @@ class CreateRequestQueue {
std::function<PlasmaError(bool evict_if_full, PlasmaObject *result)>;
CreateRequestQueue(int32_t max_retries, bool evict_if_full,
ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> trigger_global_gc)
: max_retries_(max_retries),
evict_if_full_(evict_if_full),
spill_objects_callback_(spill_objects_callback),
trigger_global_gc_(trigger_global_gc) {
RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with " << max_retries_
<< " retries on OOM, evict if full? " << (evict_if_full_ ? 1 : 0);
@ -136,7 +139,7 @@ class CreateRequestQueue {
/// Process a single request. Sets the request's error result to the error
/// returned by the request handler inside. Returns OK if the request can be
/// finished.
Status ProcessRequest(std::unique_ptr<CreateRequest> &request);
bool ProcessRequest(std::unique_ptr<CreateRequest> &request);
/// Finish a queued request and remove it from the queue.
void FinishRequest(std::list<std::unique_ptr<CreateRequest>>::iterator request_it);
@ -156,6 +159,11 @@ class CreateRequestQueue {
/// always try to evict.
const bool evict_if_full_;
/// A callback to trigger object spilling. It tries to spill objects upto max
/// throughput. It returns true if space is made by object spilling, and false if
/// there's no more space to be made.
ray::SpillObjectsCallback spill_objects_callback_;
/// A callback to trigger global GC in the cluster if the object store is
/// full.
const std::function<void()> trigger_global_gc_;
@ -178,6 +186,9 @@ class CreateRequestQueue {
/// finished.
absl::flat_hash_map<uint64_t, std::unique_ptr<CreateRequest>> fulfilled_requests_;
/// Last time global gc was invoked in ms.
uint64_t last_global_gc_ms_;
friend class CreateRequestQueueTest;
};

View file

@ -132,10 +132,10 @@ int64_t EvictionPolicy::RequireSpace(int64_t size,
RAY_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
// Choose some objects to evict, and update the return pointers.
int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict);
RAY_LOG(INFO) << "There is not enough space to create this object, so evicting "
<< objects_to_evict->size() << " objects to free up " << num_bytes_evicted
<< " bytes. The number of bytes in use (before "
<< "this eviction) is " << PlasmaAllocator::Allocated() << ".";
RAY_LOG(DEBUG) << "There is not enough space to create this object, so evicting "
<< objects_to_evict->size() << " objects to free up "
<< num_bytes_evicted << " bytes. The number of bytes in use (before "
<< "this eviction) is " << PlasmaAllocator::Allocated() << ".";
return required_space - num_bytes_evicted;
}

View file

@ -82,11 +82,6 @@ enum PlasmaError:int {
ObjectNonexistent,
// Trying to create an object but there isn't enough space in the store.
OutOfMemory,
// Trying to create an object but there isn't enough space in the store.
// However, objects are currently being spilled to make enough space. The
// client should try again soon, and there will be enough space (assuming the
// space is not taken by another client).
TransientOutOfMemory,
// Trying to delete an object but it's not sealed.
ObjectNotSealed,
// Trying to delete an object but it's in use.
@ -162,7 +157,7 @@ table PlasmaCreateRetryRequest {
object_id: string;
// The ID of the request to retry.
request_id: uint64;
}
}
table CudaHandle {
handle: [ubyte];

View file

@ -131,9 +131,6 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
return Status::ObjectNotFound("object does not exist in the plasma store");
case fb::PlasmaError::OutOfMemory:
return Status::ObjectStoreFull("object does not fit in the plasma store");
case fb::PlasmaError::TransientOutOfMemory:
return Status::ObjectStoreFull(
"object does not fit in the plasma store, spilling objects to make room");
case fb::PlasmaError::UnexpectedError:
return Status::UnknownError(
"an unexpected error occurred, likely due to a bug in the system or caller");

View file

@ -138,7 +138,7 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
create_request_queue_(
RayConfig::instance().object_store_full_max_retries(),
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
object_store_full_callback) {
spill_objects_callback, object_store_full_callback) {
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
#ifdef PLASMA_CUDA
@ -223,34 +223,7 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE
// More space is still needed. Try to spill objects to external storage to
// make room.
if (space_needed > 0) {
if (spill_objects_callback_) {
// If the space needed is too small, we'd like to bump up to the minimum
// size. Cap the max size to be lower than the plasma store limit.
int64_t byte_to_spill =
std::min(PlasmaAllocator::GetFootprintLimit(),
std::max(space_needed, RayConfig::instance().min_spilling_size()));
// Object spilling is asynchronous so that we do not block the plasma
// store thread. Therefore the client must try again, even if enough
// space will be made after the spill is complete.
// TODO(swang): Only respond to the client with OutOfMemory if we could not
// make enough space through spilling. If we could make enough space,
// respond to the plasma client once spilling is complete.
space_needed = spill_objects_callback_(byte_to_spill, space_needed);
}
if (space_needed > 0) {
// There is still not enough space, even once all evictable objects
// were evicted and all pending object spills have finished. The
// client may choose to try again, or throw an OutOfMemory error to
// the application immediately.
*error = PlasmaError::OutOfMemory;
} else {
// Once all pending object spills have finished, there should be
// enough space for this allocation. Return a transient error to the
// client so that they try again soon.
*error = PlasmaError::TransientOutOfMemory;
}
// Return an error to the client if not enough space could be freed to
// create the object.
*error = PlasmaError::OutOfMemory;
break;
}
}
@ -311,9 +284,8 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client>
owner_worker_id, evict_if_full, data_size, metadata_size,
device_num, client, object);
if (error == PlasmaError::OutOfMemory) {
RAY_LOG(WARNING) << "Not enough memory to create the object " << object_id
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size;
RAY_LOG(DEBUG) << "Not enough memory to create the object " << object_id
<< ", data_size=" << data_size << ", metadata_size=" << metadata_size;
}
return error;
}
@ -551,8 +523,8 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
std::vector<ObjectID> evicted_ids;
std::vector<ObjectTableEntry *> evicted_entries;
for (auto object_id : object_ids) {
// Check if this object is already present locally. If so, record that the
// object is being used and mark it as accounted for.
// Check if this object is already present
// locally. If so, record that the object is being used and mark it as accounted for.
auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry && entry->state == ObjectState::PLASMA_SEALED) {
// Update the get request to take into account the present object.
@ -972,6 +944,9 @@ void PlasmaStore::SubscribeToUpdates(const std::shared_ptr<Client> &client) {
Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
fb::MessageType type,
const std::vector<uint8_t> &message) {
// Global lock is used here so that we allow raylet to access some of methods
// that are required for object spilling directly without releasing a lock.
std::lock_guard<std::recursive_mutex> guard(mutex_);
// TODO(suquark): We should convert these interfaces to const later.
uint8_t *input = (uint8_t *)message.data();
size_t input_size = message.size();
@ -1116,9 +1091,7 @@ void PlasmaStore::ProcessCreateRequests() {
auto status = create_request_queue_.ProcessRequests();
uint32_t retry_after_ms = 0;
if (status.IsTransientObjectStoreFull()) {
retry_after_ms = delay_on_transient_oom_ms_;
} else if (status.IsObjectStoreFull()) {
if (!status.ok()) {
retry_after_ms = delay_on_oom_ms_;
}
@ -1151,4 +1124,12 @@ void PlasmaStore::ReplyToCreateClient(const std::shared_ptr<Client> &client,
}
}
bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) {
// The lock is acquired when a request is received to the plasma store.
// recursive mutex is used here to allow
std::lock_guard<std::recursive_mutex> guard(mutex_);
auto entry = GetObjectTableEntry(&store_info_, object_id);
return entry->ref_count == 1;
}
} // namespace plasma

View file

@ -99,9 +99,6 @@ class PlasmaStore {
/// - PlasmaError::OutOfMemory, if the store is out of memory and
/// cannot create the object. In this case, the client should not call
/// plasma_release.
/// - PlasmaError::TransientOutOfMemory, if the store is temporarily out of
/// memory but there may be space soon to create the object. In this
/// case, the client should not call plasma_release.
PlasmaError CreateObject(const ObjectID &object_id, const NodeID &owner_raylet_id,
const std::string &owner_ip_address, int owner_port,
const WorkerID &owner_worker_id, bool evict_if_full,
@ -186,6 +183,14 @@ class PlasmaStore {
plasma::flatbuf::MessageType type,
const std::vector<uint8_t> &message);
/// Return true if the given object id has only one reference.
/// Only one reference means there's only a raylet that pins the object
/// so it is safe to spill the object.
/// NOTE: Avoid using this method outside object spilling context (e.g., unless you
/// absolutely know what's going on). This method won't work correctly if it is used
/// before the object is pinned by raylet for the first time.
bool IsObjectSpillable(const ObjectID &object_id);
void SetNotificationListener(
const std::shared_ptr<ray::ObjectStoreNotificationManager> &notification_listener) {
notification_listener_ = notification_listener;
@ -286,16 +291,14 @@ class PlasmaStore {
/// A callback to asynchronously spill objects when space is needed. The
/// callback returns the amount of space still needed after the spilling is
/// complete.
/// NOTE: This function should guarantee the thread-safety because the callback is
/// shared with the main raylet thread.
ray::SpillObjectsCallback spill_objects_callback_;
/// The amount of time to wait before retrying a creation request after an
/// OOM error.
const uint32_t delay_on_oom_ms_;
/// The amount of time to wait before retrying a creation request after a
/// transient OOM error.
const uint32_t delay_on_transient_oom_ms_ = 10;
/// The amount of time to wait between logging space usage debug messages.
const uint64_t usage_log_interval_ns_;
@ -309,6 +312,14 @@ class PlasmaStore {
/// Queue of object creation requests.
CreateRequestQueue create_request_queue_;
/// This mutex is used in order to make plasma store threas-safe with raylet.
/// Raylet's local_object_manager needs to ping access plasma store's method in order to
/// figure out the correct view of the object store. recursive_mutex is used to avoid
/// deadlock while we keep the simplest possible change. NOTE(sang): Avoid adding more
/// interface that node manager or object manager can access the plasma store with this
/// mutex if it is not absolutely necessary.
std::recursive_mutex mutex_;
};
} // namespace plasma

View file

@ -94,10 +94,10 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
{
absl::MutexLock lock(&store_runner_mutex_);
store_.reset(new PlasmaStore(
main_service_, plasma_directory_, hugepages_enabled_, socket_name_,
external_store, RayConfig::instance().object_store_full_initial_delay_ms(),
spill_objects_callback, object_store_full_callback));
store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_,
socket_name_, external_store,
RayConfig::instance().object_store_full_delay_ms(),
spill_objects_callback, object_store_full_callback));
plasma_config = store_->GetPlasmaStoreInfo();
// We are using a single memory-mapped file by mallocing and freeing a single
@ -134,6 +134,10 @@ void PlasmaStoreRunner::Shutdown() {
}
}
bool PlasmaStoreRunner::IsPlasmaObjectSpillable(const ObjectID &object_id) {
return store_->IsObjectSpillable(object_id);
}
std::unique_ptr<PlasmaStoreRunner> plasma_store_runner;
} // namespace plasma

View file

@ -22,6 +22,7 @@ class PlasmaStoreRunner {
const std::shared_ptr<ray::ObjectStoreNotificationManager> &notification_listener) {
store_->SetNotificationListener(notification_listener);
}
bool IsPlasmaObjectSpillable(const ObjectID &object_id);
private:
void Shutdown();

View file

@ -46,6 +46,8 @@ void PullManager::OnLocationChange(const ObjectID &object_id,
// before.
it->second.client_locations = std::vector<NodeID>(client_ids.begin(), client_ids.end());
if (!spilled_url.empty()) {
RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients "
<< client_ids.size();
// Try to restore the spilled object.
restore_spilled_object_(object_id, spilled_url,
[this, object_id](const ray::Status &status) {

View file

@ -49,6 +49,7 @@ class CreateRequestQueueTest : public ::testing::Test {
: queue_(
/*max_retries=*/2,
/*evict_if_full=*/true,
/*spill_object_callback=*/[&]() { return false; },
/*on_global_gc=*/[&]() { num_global_gc_++; }) {}
void AssertNoLeaks() {
@ -117,7 +118,7 @@ TEST_F(CreateRequestQueueTest, TestOom) {
// Retries used up. The first request should reply with OOM and the second
// request should also be served.
ASSERT_TRUE(queue_.ProcessRequests().ok());
ASSERT_EQ(num_global_gc_, 2);
ASSERT_EQ(num_global_gc_, 3);
// Both requests fulfilled.
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory);
@ -131,6 +132,8 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) {
CreateRequestQueue queue(
/*max_retries=*/-1,
/*evict_if_full=*/true,
// Spilling is failing.
/*spill_object_callback=*/[&]() { return false; },
/*on_global_gc=*/[&]() { num_global_gc_++; });
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
@ -156,7 +159,13 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) {
}
TEST_F(CreateRequestQueueTest, TestTransientOom) {
auto return_status = PlasmaError::TransientOutOfMemory;
CreateRequestQueue queue(
/*max_retries=*/2,
/*evict_if_full=*/false,
/*spill_object_callback=*/[&]() { return true; },
/*on_global_gc=*/[&]() { num_global_gc_++; });
auto return_status = PlasmaError::OutOfMemory;
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
if (return_status == PlasmaError::OK) {
result->data_size = 1234;
@ -169,28 +178,35 @@ TEST_F(CreateRequestQueueTest, TestTransientOom) {
};
auto client = std::make_shared<MockClient>();
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request);
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request);
auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request);
auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request);
// Transient OOM should not use up any retries.
for (int i = 0; i < 3; i++) {
ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
ASSERT_EQ(num_global_gc_, 0);
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
ASSERT_EQ(num_global_gc_, i + 1);
}
// Return OK for the first request. The second request should also be served.
return_status = PlasmaError::OK;
ASSERT_TRUE(queue_.ProcessRequests().ok());
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OK);
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
ASSERT_TRUE(queue.ProcessRequests().ok());
ASSERT_REQUEST_FINISHED(queue, req_id1, PlasmaError::OK);
ASSERT_REQUEST_FINISHED(queue, req_id2, PlasmaError::OK);
AssertNoLeaks();
}
TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) {
auto return_status = PlasmaError::TransientOutOfMemory;
bool is_spilling_possible = true;
CreateRequestQueue queue(
/*max_retries=*/2,
/*evict_if_full=*/false,
/*spill_object_callback=*/[&]() { return is_spilling_possible; },
/*on_global_gc=*/[&]() { num_global_gc_++; });
auto return_status = PlasmaError::OutOfMemory;
auto oom_request = [&](bool evict_if_full, PlasmaObject *result) {
if (return_status == PlasmaError::OK) {
result->data_size = 1234;
@ -203,31 +219,31 @@ TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) {
};
auto client = std::make_shared<MockClient>();
auto req_id1 = queue_.AddRequest(ObjectID::Nil(), client, oom_request);
auto req_id2 = queue_.AddRequest(ObjectID::Nil(), client, blocked_request);
auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request);
auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request);
// Transient OOM should not use up any retries.
for (int i = 0; i < 3; i++) {
ASSERT_TRUE(queue_.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
ASSERT_EQ(num_global_gc_, 0);
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
ASSERT_EQ(num_global_gc_, i + 1);
}
// Now we are actually OOM.
return_status = PlasmaError::OutOfMemory;
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue_, req_id1);
ASSERT_REQUEST_UNFINISHED(queue_, req_id2);
ASSERT_EQ(num_global_gc_, 2);
// Now spilling is not possible. We should start raising OOM with retry.
is_spilling_possible = false;
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull());
ASSERT_REQUEST_UNFINISHED(queue, req_id1);
ASSERT_REQUEST_UNFINISHED(queue, req_id2);
ASSERT_EQ(num_global_gc_, 5);
// Retries used up. The first request should reply with OOM and the second
// request should also be served.
ASSERT_TRUE(queue_.ProcessRequests().ok());
ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory);
ASSERT_REQUEST_FINISHED(queue_, req_id2, PlasmaError::OK);
ASSERT_EQ(num_global_gc_, 2);
ASSERT_TRUE(queue.ProcessRequests().ok());
ASSERT_REQUEST_FINISHED(queue, req_id1, PlasmaError::OutOfMemory);
ASSERT_REQUEST_FINISHED(queue, req_id2, PlasmaError::OK);
ASSERT_EQ(num_global_gc_, 6);
AssertNoLeaks();
}
@ -248,6 +264,7 @@ TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) {
CreateRequestQueue queue(
/*max_retries=*/2,
/*evict_if_full=*/false,
/*spill_object_callback=*/[&]() { return false; },
/*on_global_gc=*/[&]() {});
bool first_try = true;

View file

@ -315,6 +315,7 @@ message RestoreSpilledObjectsRequest {
}
message RestoreSpilledObjectsReply {
int64 bytes_restored_total = 1;
}
message DeleteSpilledObjectsRequest {

View file

@ -194,6 +194,7 @@ table NotifyUnblocked {
}
table NotifyDirectCallTaskBlocked {
release_resources: bool;
}
table NotifyDirectCallTaskUnblocked {

View file

@ -22,7 +22,6 @@ namespace raylet {
void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(object_pinning_enabled_);
for (size_t i = 0; i < object_ids.size(); i++) {
const auto &object_id = object_ids[i];
@ -62,7 +61,6 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
// object_pinning_enabled_ flag is off when the --lru-evict flag is on.
if (object_pinning_enabled_) {
absl::MutexLock lock(&mutex_);
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
// The object should be in one of these stats. pinned, spilling, or spilled.
RAY_CHECK((pinned_objects_.count(object_id) > 0) ||
@ -104,50 +102,85 @@ void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) {
}
}
int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill,
int64_t min_bytes_to_spill) {
RAY_CHECK(num_bytes_to_spill >= min_bytes_to_spill);
void LocalObjectManager::SpillObjectUptoMaxThroughput() {
if (RayConfig::instance().object_spilling_config().empty() ||
!RayConfig::instance().automatic_object_spilling_enabled()) {
return min_bytes_to_spill;
return;
}
absl::MutexLock lock(&mutex_);
// Spill as fast as we can using all our spill workers.
bool can_spill_more = true;
while (can_spill_more) {
if (!SpillObjectsOfSize(min_spilling_size_)) {
break;
}
{
absl::MutexLock lock(&mutex_);
num_active_workers_ += 1;
can_spill_more = num_active_workers_ < max_active_workers_;
}
}
}
RAY_LOG(INFO) << "Choosing objects to spill of total size " << num_bytes_to_spill;
bool LocalObjectManager::IsSpillingInProgress() {
absl::MutexLock lock(&mutex_);
return num_active_workers_ > 0;
}
bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
if (RayConfig::instance().object_spilling_config().empty() ||
!RayConfig::instance().automatic_object_spilling_enabled()) {
return false;
}
RAY_LOG(DEBUG) << "Choosing objects to spill of total size " << num_bytes_to_spill;
int64_t bytes_to_spill = 0;
auto it = pinned_objects_.begin();
std::vector<ObjectID> objects_to_spill;
while (bytes_to_spill < num_bytes_to_spill && it != pinned_objects_.end()) {
bytes_to_spill += it->second->GetSize();
objects_to_spill.push_back(it->first);
while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end()) {
if (is_plasma_object_spillable_(it->first)) {
bytes_to_spill += it->second->GetSize();
objects_to_spill.push_back(it->first);
}
it++;
}
if (!objects_to_spill.empty()) {
RAY_LOG(INFO) << "Spilling objects of total size " << bytes_to_spill;
auto start_time = current_time_ms();
SpillObjectsInternal(
objects_to_spill, [bytes_to_spill, start_time](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Error spilling objects " << status.ToString();
} else {
RAY_LOG(INFO) << "Spilled " << bytes_to_spill << " in "
<< (current_time_ms() - start_time) << "ms";
}
});
RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill
<< " num objects " << objects_to_spill.size();
auto start_time = absl::GetCurrentTimeNanos();
SpillObjectsInternal(objects_to_spill, [this, bytes_to_spill, objects_to_spill,
start_time](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Error spilling objects " << status.ToString();
} else {
auto now = absl::GetCurrentTimeNanos();
RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in "
<< (now - start_time) / 1e6 << "ms";
spilled_bytes_total_ += bytes_to_spill;
spilled_objects_total_ += objects_to_spill.size();
// Adjust throughput timing to account for concurrent spill operations.
spill_time_total_s_ += (now - std::max(start_time, last_spill_finish_ns_)) / 1e9;
if (now - last_spill_log_ns_ > 1e9) {
last_spill_log_ns_ = now;
// TODO(ekl) logging at error level until we add a better UX indicator.
RAY_LOG(ERROR) << "Spilled "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024))
<< " MiB, " << spilled_objects_total_
<< " objects, write throughput "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024) /
spill_time_total_s_)
<< " MiB/s";
}
last_spill_finish_ns_ = now;
}
});
return true;
}
// We do not track a mapping between objects that need to be created to
// objects that are being spilled, so we just subtract the total number of
// bytes that are currently being spilled from the amount of space
// requested. If the space is claimed by another client, this client may
// need to request space again.
return min_bytes_to_spill - num_bytes_pending_spill_;
return false;
}
void LocalObjectManager::SpillObjects(const std::vector<ObjectID> &object_ids,
std::function<void(const ray::Status &)> callback) {
absl::MutexLock lock(&mutex_);
SpillObjectsInternal(object_ids, callback);
}
@ -196,7 +229,10 @@ void LocalObjectManager::SpillObjectsInternal(
io_worker->rpc_client()->SpillObjects(
request, [this, objects_to_spill, callback, io_worker](
const ray::Status &status, const rpc::SpillObjectsReply &r) {
absl::MutexLock lock(&mutex_);
{
absl::MutexLock lock(&mutex_);
num_active_workers_ -= 1;
}
io_worker_pool_.PushSpillWorker(io_worker);
if (!status.ok()) {
for (const auto &object_id : objects_to_spill) {
@ -222,7 +258,6 @@ void LocalObjectManager::AddSpilledUrls(
const std::vector<ObjectID> &object_ids, const rpc::SpillObjectsReply &worker_reply,
std::function<void(const ray::Status &)> callback) {
auto num_remaining = std::make_shared<size_t>(object_ids.size());
auto num_bytes_spilled = std::make_shared<size_t>(0);
for (size_t i = 0; i < object_ids.size(); ++i) {
const ObjectID &object_id = object_ids[i];
const std::string &object_url = worker_reply.spilled_objects_url(i);
@ -232,15 +267,12 @@ void LocalObjectManager::AddSpilledUrls(
// be retrieved by other raylets.
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
object_id, object_url,
[this, object_id, object_url, callback, num_remaining,
num_bytes_spilled](Status status) {
[this, object_id, object_url, callback, num_remaining](Status status) {
RAY_CHECK_OK(status);
absl::MutexLock lock(&mutex_);
// Unpin the object.
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
num_bytes_pending_spill_ -= it->second->GetSize();
*num_bytes_spilled += it->second->GetSize();
objects_pending_spill_.erase(it);
// Update the object_id -> url_ref_count to use it for deletion later.
@ -273,20 +305,41 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
<< object_url;
io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback](
std::shared_ptr<WorkerInterface> io_worker) {
auto start_time = absl::GetCurrentTimeNanos();
RAY_LOG(DEBUG) << "Sending restore spilled object request";
rpc::RestoreSpilledObjectsRequest request;
request.add_spilled_objects_url(std::move(object_url));
request.add_object_ids_to_restore(object_id.Binary());
io_worker->rpc_client()->RestoreSpilledObjects(
request,
[this, object_id, callback, io_worker](const ray::Status &status,
const rpc::RestoreSpilledObjectsReply &r) {
[this, start_time, object_id, callback, io_worker](
const ray::Status &status, const rpc::RestoreSpilledObjectsReply &r) {
io_worker_pool_.PushRestoreWorker(io_worker);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send restore spilled object request: "
<< status.ToString();
} else {
RAY_LOG(DEBUG) << "Restored object " << object_id;
auto now = absl::GetCurrentTimeNanos();
auto restored_bytes = r.bytes_restored_total();
RAY_LOG(DEBUG) << "Restored " << restored_bytes << " in "
<< (now - start_time) / 1e6 << "ms. Object id:" << object_id;
restored_bytes_total_ += restored_bytes;
restored_objects_total_ += 1;
// Adjust throughput timing to account for concurrent restore operations.
restore_time_total_s_ +=
(now - std::max(start_time, last_restore_finish_ns_)) / 1e9;
if (now - last_restore_log_ns_ > 1e9) {
last_restore_log_ns_ = now;
// TODO(ekl) logging at error level until we add a better UX indicator.
RAY_LOG(ERROR) << "Restored "
<< static_cast<int>(restored_bytes_total_ / (1024 * 1024))
<< " MiB, " << restored_objects_total_
<< " objects, read throughput "
<< static_cast<int>(restored_bytes_total_ / (1024 * 1024) /
restore_time_total_s_)
<< " MiB/s";
}
last_restore_finish_ns_ = now;
}
if (callback) {
callback(status);
@ -296,7 +349,6 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
}
void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) {
absl::MutexLock lock(&mutex_);
std::vector<std::string> object_urls_to_delete;
// Process upto batch size of objects to delete.

View file

@ -33,13 +33,15 @@ namespace raylet {
/// have been freed, and objects that have been spilled.
class LocalObjectManager {
public:
LocalObjectManager(boost::asio::io_service &io_context, size_t free_objects_batch_size,
int64_t free_objects_period_ms,
IOWorkerPoolInterface &io_worker_pool,
gcs::ObjectInfoAccessor &object_info_accessor,
rpc::CoreWorkerClientPool &owner_client_pool,
bool object_pinning_enabled, bool automatic_object_deletion_enabled,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed)
LocalObjectManager(
boost::asio::io_service &io_context, size_t free_objects_batch_size,
int64_t free_objects_period_ms, IOWorkerPoolInterface &io_worker_pool,
gcs::ObjectInfoAccessor &object_info_accessor,
rpc::CoreWorkerClientPool &owner_client_pool, bool object_pinning_enabled,
bool automatic_object_deletion_enabled, int max_io_workers,
int64_t min_spilling_size,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable)
: free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
@ -48,7 +50,11 @@ class LocalObjectManager {
object_pinning_enabled_(object_pinning_enabled),
automatic_object_deletion_enabled_(automatic_object_deletion_enabled),
on_objects_freed_(on_objects_freed),
last_free_objects_at_ms_(current_time_ms()) {}
last_free_objects_at_ms_(current_time_ms()),
min_spilling_size_(min_spilling_size),
num_active_workers_(0),
max_active_workers_(max_io_workers),
is_plasma_object_spillable_(is_plasma_object_spillable) {}
/// Pin objects.
///
@ -67,22 +73,10 @@ class LocalObjectManager {
void WaitForObjectFree(const rpc::Address &owner_address,
const std::vector<ObjectID> &object_ids);
/// Asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// the amount of space needed after the spilling is complete.
/// The returned value is calculated based off of min_bytes_to_spill. That says,
/// although it fails to spill num_bytes_to_spill, as long as it spills more than
/// min_bytes_to_spill, it will return the value that is less than 0 (meaning we
/// don't need any more additional space).
/// Spill objects as much as possible as fast as possible up to the max throughput.
///
/// \param num_bytes_to_spill The total number of bytes to spill. The method tries to
/// spill bytes as much as this value.
/// \param min_bytes_to_spill The minimum bytes that
/// need to be spilled.
/// \return The number of bytes of space still required after the
/// spill is complete. This return the value is less than 0 if it satifies the
/// min_bytes_to_spill.
int64_t SpillObjectsOfSize(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill);
/// \return True if spilling is in progress.
void SpillObjectUptoMaxThroughput();
/// Spill objects to external storage.
///
@ -114,11 +108,33 @@ class LocalObjectManager {
/// invocation.
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size);
/// Return True if spilling is in progress.
/// This is a narrow interface that is accessed by plasma store.
/// We are using the narrow interface here because plasma store is running in a
/// different thread, and we'd like to avoid making this component thread-safe,
/// which is against the general raylet design.
///
/// \return True if spilling is still in progress. False otherwise.
bool IsSpillingInProgress();
private:
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSize);
FRIEND_TEST(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectNotEvictable);
/// Asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// true if we could spill the corresponding bytes.
/// NOTE(sang): If 0 is given, this method spills a single object.
///
/// \param num_bytes_to_spill The total number of bytes to spill.
/// \return True if it can spill num_bytes_to_spill. False otherwise.
bool SpillObjectsOfSize(int64_t num_bytes_to_spill);
/// Internal helper method for spilling objects.
void SpillObjectsInternal(const std::vector<ObjectID> &objects_ids,
std::function<void(const ray::Status &)> callback)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
std::function<void(const ray::Status &)> callback);
/// Release an object that has been freed by its owner.
void ReleaseFreedObject(const ObjectID &object_id);
@ -164,14 +180,12 @@ class LocalObjectManager {
std::function<void(const std::vector<ObjectID> &)> on_objects_freed_;
// Objects that are pinned on this node.
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_
GUARDED_BY(mutex_);
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;
// Objects that were pinned on this node but that are being spilled.
// These objects will be released once spilling is complete and the URL is
// written to the object directory.
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> objects_pending_spill_
GUARDED_BY(mutex_);
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> objects_pending_spill_;
/// The time that we last sent a FreeObjects request to other nodes for
/// objects that have gone out of scope in the application.
@ -185,7 +199,7 @@ class LocalObjectManager {
/// The total size of the objects that are currently being
/// spilled from this node, in bytes.
size_t num_bytes_pending_spill_ GUARDED_BY(mutex_) = 0;
size_t num_bytes_pending_spill_;
/// This class is accessed by both the raylet and plasma store threads. The
/// mutex protects private members that relate to object spilling.
@ -198,16 +212,63 @@ class LocalObjectManager {
/// A list of object id and url pairs that need to be deleted.
/// We don't instantly delete objects when it goes out of scope from external storages
/// because those objects could be still in progress of spilling.
std::queue<ObjectID> spilled_object_pending_delete_ GUARDED_BY(mutex_);
std::queue<ObjectID> spilled_object_pending_delete_;
/// Mapping from object id to url_with_offsets. We cannot reuse pinned_objects_ because
/// pinned_objects_ entries are deleted when spilling happens.
absl::flat_hash_map<ObjectID, std::string> spilled_objects_url_ GUARDED_BY(mutex_);
absl::flat_hash_map<ObjectID, std::string> spilled_objects_url_;
/// Base URL -> ref_count. It is used because there could be multiple objects
/// within a single spilled file. We need to ref count to avoid deleting the file
/// before all objects within that file are out of scope.
absl::flat_hash_map<std::string, uint64_t> url_ref_count_ GUARDED_BY(mutex_);
absl::flat_hash_map<std::string, uint64_t> url_ref_count_;
/// Minimum bytes to spill to a single IO spill worker.
int64_t min_spilling_size_;
/// The current number of active spill workers.
int64_t num_active_workers_ GUARDED_BY(mutex_);
/// The max number of active spill workers.
const int64_t max_active_workers_;
/// Callback to check if a plasma object is pinned in workers.
/// Return true if unpinned, meaning we can safely spill the object. False otherwise.
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable_;
///
/// Stats
///
/// The last time a spill operation finished.
int64_t last_spill_finish_ns_ = 0;
/// The total wall time in seconds spent in spilling.
double spill_time_total_s_ = 0;
/// The total number of bytes spilled.
int64_t spilled_bytes_total_ = 0;
/// The total number of objects spilled.
int64_t spilled_objects_total_ = 0;
/// The last time a restore operation finished.
int64_t last_restore_finish_ns_ = 0;
/// The total wall time in seconds spent in restoring.
double restore_time_total_s_ = 0;
/// The total number of bytes restored.
int64_t restored_bytes_total_ = 0;
/// The total number of objects restored.
int64_t restored_objects_total_ = 0;
/// The last time a spill log finished.
int64_t last_spill_log_ns_ = 0;
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;
};
}; // namespace raylet

View file

@ -229,6 +229,8 @@ int main(int argc, char *argv[]) {
node_manager_config.store_socket_name = store_socket_name;
node_manager_config.temp_dir = temp_dir;
node_manager_config.session_dir = session_dir;
node_manager_config.max_io_workers = RayConfig::instance().max_io_workers();
node_manager_config.min_spilling_size = RayConfig::instance().min_spilling_size();
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;

View file

@ -116,7 +116,8 @@ std::string WorkerOwnerString(std::shared_ptr<WorkerInterface> &worker) {
NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
std::shared_ptr<ObjectDirectoryInterface> object_directory,
std::function<bool(const ObjectID &)> is_plasma_object_spillable)
: self_node_id_(self_node_id),
io_service_(io_service),
object_manager_(object_manager),
@ -171,14 +172,18 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
/* object_pinning_enabled */ config.object_pinning_enabled,
/* automatic_object_deletion_enabled */
config.automatic_object_deletion_enabled,
/*max_io_workers*/ config.max_io_workers,
/*min_spilling_size*/ config.min_spilling_size,
[this](const std::vector<ObjectID> &object_ids) {
object_manager_.FreeObjects(object_ids,
/*local_only=*/false);
}),
},
is_plasma_object_spillable),
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9),
record_metrics_period_(config.record_metrics_period_ms) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(heartbeat_period_.count() > 0);
@ -553,7 +558,8 @@ void NodeManager::ReportResourceUsage() {
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
auto now = absl::GetCurrentTimeNanos();
if (should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) {
if ((should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) &&
now - last_local_gc_ns_ > local_gc_min_interval_ns_) {
DoLocalGC();
should_local_gc_ = false;
last_local_gc_ns_ = now;
@ -1186,8 +1192,7 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
ProcessFetchOrReconstructMessage(client, message_data);
} break;
case protocol::MessageType::NotifyDirectCallTaskBlocked: {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
HandleDirectCallTaskBlocked(worker);
ProcessDirectCallTaskBlocked(client, message_data);
} break;
case protocol::MessageType::NotifyDirectCallTaskUnblocked: {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
@ -1534,6 +1539,15 @@ void NodeManager::ProcessFetchOrReconstructMessage(
}
}
void NodeManager::ProcessDirectCallTaskBlocked(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
auto message =
flatbuffers::GetRoot<protocol::NotifyDirectCallTaskBlocked>(message_data);
bool release_resources = message->release_resources();
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
HandleDirectCallTaskBlocked(worker, release_resources);
}
void NodeManager::ProcessWaitRequestMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
// Read the data.
@ -2148,9 +2162,9 @@ void NodeManager::SubmitTask(const Task &task) {
}
void NodeManager::HandleDirectCallTaskBlocked(
const std::shared_ptr<WorkerInterface> &worker) {
const std::shared_ptr<WorkerInterface> &worker, bool release_resources) {
if (new_scheduler_enabled_) {
if (!worker || worker->IsBlocked()) {
if (!worker || worker->IsBlocked() || !release_resources) {
return;
}
std::vector<double> cpu_instances;
@ -2169,7 +2183,8 @@ void NodeManager::HandleDirectCallTaskBlocked(
return;
}
if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked()) {
if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked() ||
!release_resources) {
return; // The worker may have died or is no longer processing the task.
}
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
@ -2297,7 +2312,6 @@ void NodeManager::AsyncResolveObjectsFinish(
const std::shared_ptr<ClientConnection> &client, const TaskID &current_task_id,
bool was_blocked) {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
// TODO(swang): Because the object dependencies are tracked in the task
// dependency manager, we could actually remove this message entirely and
// instead unblock the worker once all the objects become available.
@ -3154,9 +3168,6 @@ void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request,
}
void NodeManager::TriggerGlobalGC() {
RAY_LOG(INFO) << "Broadcasting Python GC request to all raylets since the cluster "
<< "is low on resources. This removes Ray actor and object refs "
<< "that are stuck in Python reference cycles.";
should_global_gc_ = true;
// We won't see our own request, so trigger local GC in the next heartbeat.
should_local_gc_ = true;

View file

@ -104,6 +104,10 @@ struct NodeManagerConfig {
std::unordered_map<std::string, std::string> raylet_config;
// The time between record metrics in milliseconds, or -1 to disable.
uint64_t record_metrics_period_ms;
// The number if max io workers.
int max_io_workers;
// The minimum object size that can be spilled by each spill operation.
int64_t min_spilling_size;
};
class NodeManager : public rpc::NodeManagerServiceHandler {
@ -115,7 +119,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory);
std::shared_ptr<ObjectDirectoryInterface> object_directory_,
std::function<bool(const ObjectID &)> is_plasma_object_spillable);
/// Process a new client connection.
///
@ -375,7 +380,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// arrive after the worker lease has been returned to the node manager.
///
/// \param worker Shared ptr to the worker, or nullptr if lost.
void HandleDirectCallTaskBlocked(const std::shared_ptr<WorkerInterface> &worker);
void HandleDirectCallTaskBlocked(const std::shared_ptr<WorkerInterface> &worker,
bool release_resources);
/// Handle a direct call task that is unblocked. Note that this callback may
/// arrive after the worker lease has been returned to the node manager.
@ -437,6 +443,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void ProcessSubmitTaskMessage(const uint8_t *message_data);
/// Process client message of NotifyDirectCallTaskBlocked
///
/// \param message_data A pointer to the message data.
/// \return Void.
void ProcessDirectCallTaskBlocked(const std::shared_ptr<ClientConnection> &client,
const uint8_t *message_data);
/// Process client message of RegisterClientRequest
///
/// \param client The client that sent the message.
@ -745,11 +758,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// on all local workers of this raylet.
bool should_local_gc_ = false;
/// The last time local GC was triggered.
/// The last time local gc was run.
int64_t last_local_gc_ns_ = 0;
/// The interval in nanoseconds between local GC automatic triggers.
const int64_t local_gc_interval_ns_ = 10 * 60 * 1e9;
const int64_t local_gc_interval_ns_;
/// The min interval in nanoseconds between local GC runs (auto + memory pressure
/// triggered).
const int64_t local_gc_min_interval_ns_;
/// These two classes make up the new scheduler. ClusterResourceScheduler is
/// responsible for maintaining a view of the cluster state w.r.t resource

View file

@ -70,24 +70,33 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
gcs_client_))
: std::dynamic_pointer_cast<ObjectDirectoryInterface>(
std::make_shared<ObjectDirectory>(main_service, gcs_client_))),
object_manager_(main_service, self_node_id_, object_manager_config,
object_directory_,
[this](const ObjectID &object_id, const std::string &spilled_url,
std::function<void(const ray::Status &)> callback) {
node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject(
object_id, spilled_url, callback);
},
[this](int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) {
return node_manager_.GetLocalObjectManager().SpillObjectsOfSize(
num_bytes_to_spill, min_bytes_to_spill);
},
[this]() {
// Post on the node manager's event loop since this
// will be called from the plasma store thread.
main_service_.post([this]() { node_manager_.TriggerGlobalGC(); });
}),
object_manager_(
main_service, self_node_id_, object_manager_config, object_directory_,
[this](const ObjectID &object_id, const std::string &spilled_url,
std::function<void(const ray::Status &)> callback) {
node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject(
object_id, spilled_url, callback);
},
[this]() {
// This callback is called from the plasma store thread.
// NOTE: It means the local object manager should be thread-safe.
main_service_.post([this]() {
node_manager_.GetLocalObjectManager().SpillObjectUptoMaxThroughput();
});
return node_manager_.GetLocalObjectManager().IsSpillingInProgress();
},
[this]() {
// Post on the node manager's event loop since this
// callback is called from the plasma store thread.
// This will help keep node manager lock-less.
main_service_.post([this]() { node_manager_.TriggerGlobalGC(); });
}),
node_manager_(main_service, self_node_id_, node_manager_config, object_manager_,
gcs_client_, object_directory_),
gcs_client_, object_directory_,
[this](const ObjectID &object_id) {
// It is used by local_object_store.
return object_manager_.IsPlasmaObjectSpillable(object_id);
}),
socket_name_(socket_name),
acceptor_(main_service, ParseUrlEndpoint(socket_name)),
socket_(main_service) {

View file

@ -236,15 +236,23 @@ class LocalObjectManagerTest : public ::testing::Test {
/*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool,
/*object_pinning_enabled=*/true,
/*automatic_object_delete_enabled=*/true,
/*max_io_workers=*/2,
/*min_spilling_size=*/0,
[&](const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
freed.insert(object_id);
}
},
/*is_plasma_object_spillable=*/
[&](const ray::ObjectID &object_id) {
return unevictable_objects_.count(object_id) == 0;
}),
unpins(std::make_shared<std::unordered_map<ObjectID, int>>()) {
RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}});
}
void TearDown() { unevictable_objects_.clear(); }
std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) {
return url + "?" + "num_objects=" + std::to_string(num_objects) +
"&offset=" + std::to_string(offset);
@ -262,6 +270,8 @@ class LocalObjectManagerTest : public ::testing::Test {
// This hashmap is incremented when objects are unpinned by destroying their
// unique_ptr.
std::shared_ptr<std::unordered_map<ObjectID, int>> unpins;
// Object ids in this field won't be evictable.
std::unordered_set<ObjectID> unevictable_objects_;
};
TEST_F(LocalObjectManagerTest, TestPin) {
@ -416,17 +426,11 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
int64_t num_bytes_required = manager.SpillObjectsOfSize(total_size / 2, total_size / 2);
ASSERT_EQ(num_bytes_required, -object_size / 2);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2));
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
// Check that this returns the total number of bytes currently being spilled.
num_bytes_required = manager.SpillObjectsOfSize(0, 0);
ASSERT_EQ(num_bytes_required, -2 * object_size);
// Check that half the objects get spilled and the URLs get added to the
// global object directory.
std::vector<std::string> urls;
@ -447,9 +451,124 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
ASSERT_EQ((*unpins)[object_url.first], 1);
}
// Check that this returns the total number of bytes currently being spilled.
num_bytes_required = manager.SpillObjectsOfSize(0, 0);
ASSERT_EQ(num_bytes_required, 0);
// Make sure providing 0 bytes to SpillObjectsOfSize will spill one object.
// This is important to cover min_spilling_size_== 0.
ASSERT_TRUE(manager.SpillObjectsOfSize(0));
EXPECT_CALL(worker_pool, PushSpillWorker(_));
const std::string url = BuildURL("url" + std::to_string(object_ids.size()));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
ASSERT_EQ(object_table.object_urls.size(), 3);
urls.push_back(url);
for (auto &object_url : object_table.object_urls) {
auto it = std::find(urls.begin(), urls.end(), object_url.second);
ASSERT_TRUE(it != urls.end());
ASSERT_EQ((*unpins)[object_url.first], 1);
}
// Since there's no more object to spill, this should fail.
ASSERT_FALSE(manager.SpillObjectsOfSize(0));
}
TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 1000;
const ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
unevictable_objects_.emplace(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
manager.PinObjects(object_ids, std::move(objects));
ASSERT_FALSE(manager.SpillObjectsOfSize(1000));
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
// Now object is evictable. Spill should succeed.
unevictable_objects_.erase(object_id);
ASSERT_TRUE(manager.SpillObjectsOfSize(1000));
}
TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t object_size = 1000;
size_t total_objects = 3;
// Pin 3 objects.
for (size_t i = 0; i < total_objects; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
// This will spill until 2 workers are occupied.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(manager.IsSpillingInProgress());
// Spilling is still going on, meaning we can make the pace. So it should return true.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(manager.IsSpillingInProgress());
// No object ids are spilled yet.
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
// Spill one object.
std::vector<std::string> urls;
urls.push_back(BuildURL("url" + std::to_string(0)));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[0]}));
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
// Make sure object is spilled.
ASSERT_EQ(object_table.object_urls.size(), 1);
for (auto &object_url : object_table.object_urls) {
if (urls[0] == object_url.second) {
ASSERT_EQ((*unpins)[object_url.first], 1);
}
}
// Now, there's only one object that is current spilling.
// SpillObjectUptoMaxThroughput will spill one more object (since one worker is
// availlable).
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(manager.IsSpillingInProgress());
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(manager.IsSpillingInProgress());
// Spilling is done for all objects.
for (size_t i = 1; i < object_ids.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
for (size_t i = 1; i < urls.size(); i++) {
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[i]}));
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
ASSERT_EQ(object_table.object_urls.size(), 3);
for (auto &object_url : object_table.object_urls) {
auto it = std::find(urls.begin(), urls.end(), object_url.second);
ASSERT_TRUE(it != urls.end());
ASSERT_EQ((*unpins)[object_url.first], 1);
}
// We cannot spill anymore as there is no more pinned object.
manager.SpillObjectUptoMaxThroughput();
ASSERT_FALSE(manager.IsSpillingInProgress());
}
TEST_F(LocalObjectManagerTest, TestSpillError) {
@ -739,52 +858,6 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
ASSERT_EQ(deleted_urls_size, free_objects_batch_size);
}
TEST_F(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill) {
/// Test the case SpillObjectsOfSize(num_bytes_to_spill, min_bytes_to_spill
/// where num_bytes_to_spill > min_bytes_to_spill.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 1000;
size_t object_len = 3;
for (size_t i = 0; i < object_len; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
// First test when num_bytes_to_spill > min_bytes to spill.
// It means that we cannot spill the num_bytes_required, but we at least spilled the
// required amount, which is the min_bytes_to_spill.
int64_t num_bytes_required = manager.SpillObjectsOfSize(8000, object_size);
// only min bytes to spill is considered.
ASSERT_TRUE(num_bytes_required <= 0);
// Make sure the spilling is done properly.
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_EQ((*unpins).size(), object_len);
}
}
} // namespace raylet
} // namespace ray

View file

@ -189,9 +189,9 @@ Status raylet::RayletClient::NotifyUnblocked(const TaskID &current_task_id) {
return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb);
}
Status raylet::RayletClient::NotifyDirectCallTaskBlocked() {
Status raylet::RayletClient::NotifyDirectCallTaskBlocked(bool release_resources) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb);
auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb, release_resources);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyDirectCallTaskBlocked, &fbb);
}

View file

@ -256,8 +256,9 @@ class RayletClient : public RayletClientInterface {
/// Notify the raylet that this client is blocked. This is only used for direct task
/// calls. Note that ordering of this with respect to Unblock calls is important.
///
/// \return ray::Status.
ray::Status NotifyDirectCallTaskBlocked();
/// \param release_resources: true if the dirct call blocking needs to release
/// resources. \return ray::Status.
ray::Status NotifyDirectCallTaskBlocked(bool release_resources);
/// Notify the raylet that this client is unblocked. This is only used for direct task
/// calls. Note that ordering of this with respect to Block calls is important.