[Object Spilling] Fusion small objects (#12087)

This commit is contained in:
SangBin Cho 2020-11-25 10:13:32 -08:00 committed by GitHub
parent 09d5413f70
commit 2e4e285ef0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 413 additions and 117 deletions

View file

@ -611,17 +611,19 @@ cdef c_vector[c_string] spill_objects_handler(
cdef void restore_spilled_objects_handler(
const c_vector[CObjectID]& object_ids_to_restore,
const c_vector[c_string]& object_urls) nogil:
with gil:
urls = []
size = object_urls.size()
for i in range(size):
urls.append(object_urls[i])
object_refs = VectorToObjectRefs(object_ids_to_restore)
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE):
external_storage.restore_spilled_objects(urls)
external_storage.restore_spilled_objects(object_refs, urls)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "

View file

@ -1,8 +1,64 @@
import abc
import os
from typing import List
import urllib
from collections import namedtuple
from typing import List, IO, Tuple
import ray
from ray.ray_constants import DEFAULT_OBJECT_PREFIX
from ray._raylet import ObjectRef
ParsedURL = namedtuple("ParsedURL", "base_url, offset, size")
def create_url_with_offset(*, url: str, offset: int, size: int) -> str:
"""Methods to create a URL with offset.
When ray spills objects, it fuses multiple objects
into one file to optimize the performance. That says, each object
needs to keep tracking of its own special url to store metadata.
This method creates an url_with_offset, which is used internally
by Ray.
Created url_with_offset can be passed to the self._get_base_url method
to parse the filename used to store files.
Example) file://path/to/file?offset=""&size=""
Args:
url(str): url to the object stored in the external storage.
offset(int): Offset from the beginning of the file to
the first bytes of this object.
size(int): Size of the object that is stored in the url.
It is used to calculate the last offset.
Returns:
url_with_offset stored internally to find
objects from external storage.
"""
return f"{url}?offset={offset}&size={size}"
def parse_url_with_offset(url_with_offset: str) -> Tuple[str, int, int]:
"""Parse url_with_offset to retrieve information.
base_url is the url where the object ref
is stored in the external storage.
Args:
url_with_offset(str): url created by create_url_with_offset.
Returns:
named tuple of base_url, offset, and size.
"""
parsed_result = urllib.parse.urlparse(url_with_offset)
query_dict = urllib.parse.parse_qs(parsed_result.query)
# Split by ? to remove the query from the url.
base_url = parsed_result.geturl().split("?")[0]
offset = int(query_dict["offset"][0])
size = int(query_dict["size"][0])
return ParsedURL(base_url=base_url, offset=offset, size=size)
class ExternalStorage(metaclass=abc.ABCMeta):
@ -35,34 +91,88 @@ class ExternalStorage(metaclass=abc.ABCMeta):
worker.core_worker.put_file_like_object(metadata, data_size, file_like,
object_ref)
def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
url: str) -> List[str]:
"""Fuse all given objects into a given file handle.
Args:
f(IO): File handle to fusion all given object refs.
object_refs(list): Object references to fusion to a single file.
url(str): url where the object ref is stored
in the external storage.
Return:
List of urls_with_offset of fusioned objects.
The order of returned keys are equivalent to the one
with given object_refs.
"""
keys = []
offset = 0
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
metadata_len = len(metadata)
buf_len = len(buf)
# 16 bytes to store metadata and buffer length.
data_size_in_bytes = metadata_len + buf_len + 16
f.write(metadata_len.to_bytes(8, byteorder="little"))
f.write(buf_len.to_bytes(8, byteorder="little"))
f.write(metadata)
f.write(memoryview(buf))
url_with_offset = create_url_with_offset(
url=url, offset=offset, size=data_size_in_bytes)
keys.append(url_with_offset.encode())
offset += data_size_in_bytes
return keys
def _size_check(self, metadata_len, buffer_len, obtained_data_size):
"""Check whether or not the obtained_data_size is as expected.
Args:
metadata_len(int): Actual metadata length of the object.
buffer_len(int): Actual buffer length of the object.
obtained_data_size(int): Data size specified in the
url_with_offset.
Raises:
ValueError if obtained_data_size is different from
metadata_len + buffer_len + 16(first 8 bytes to store length).
"""
data_size_in_bytes = metadata_len + buffer_len + 16
if data_size_in_bytes != obtained_data_size:
raise ValueError(
f"Obtained data has a size of {data_size_in_bytes}, "
"although it is supposed to have the "
f"size of {obtained_data_size}.")
@abc.abstractmethod
def spill_objects(self, object_refs):
def spill_objects(self, object_refs) -> List[str]:
"""Spill objects to the external storage. Objects are specified
by their object refs.
Args:
object_refs: The list of the refs of the objects to be spilled.
Returns:
A list of keys corresponding to the input object refs.
A list of internal URLs with object offset.
"""
@abc.abstractmethod
def restore_spilled_objects(self, keys: List[bytes]):
"""Spill objects to the external storage. Objects are specified
by their object refs.
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
"""Restore objects from the external storage.
Args:
keys: A list of bytes corresponding to the spilled objects.
object_refs: List of object IDs (note that it is not ref).
url_with_offset_list: List of url_with_offset.
"""
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""
def spill_objects(self, object_refs):
def spill_objects(self, object_refs) -> List[str]:
raise NotImplementedError("External storage is not initialized")
def restore_spilled_objects(self, keys):
def restore_spilled_objects(self, object_refs, url_with_offset_list):
raise NotImplementedError("External storage is not initialized")
@ -76,37 +186,40 @@ class FileSystemStorage(ExternalStorage):
def __init__(self, directory_path):
self.directory_path = directory_path
self.prefix = "ray_spilled_object_"
self.prefix = DEFAULT_OBJECT_PREFIX
os.makedirs(self.directory_path, exist_ok=True)
if not os.path.exists(self.directory_path):
raise ValueError("The given directory path to store objects, "
f"{self.directory_path}, could not be created.")
def spill_objects(self, object_refs):
keys = []
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
filename = self.prefix + ref.hex()
with open(os.path.join(self.directory_path, filename), "wb") as f:
metadata_len = len(metadata)
buf_len = len(buf)
f.write(metadata_len.to_bytes(8, byteorder="little"))
f.write(buf_len.to_bytes(8, byteorder="little"))
f.write(metadata)
f.write(memoryview(buf))
keys.append(filename.encode())
return keys
def spill_objects(self, object_refs) -> List[str]:
if len(object_refs) == 0:
return []
# Always use the first object ref as a key when fusioning objects.
first_ref = object_refs[0]
filename = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}"
url = f"{os.path.join(self.directory_path, filename)}"
with open(url, "wb") as f:
return self._write_multiple_objects(f, object_refs, url)
def restore_spilled_objects(self, keys):
for k in keys:
filename = k.decode()
ref = ray.ObjectRef(bytes.fromhex(filename[len(self.prefix):]))
with open(os.path.join(self.directory_path, filename), "rb") as f:
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
for i in range(len(object_refs)):
object_ref = object_refs[i]
url_with_offset = url_with_offset_list[i].decode()
# Retrieve the information needed.
parsed_result = parse_url_with_offset(url_with_offset)
base_url = parsed_result.base_url
offset = parsed_result.offset
# Read a part of the file and recover the object.
with open(base_url, "rb") as f:
f.seek(offset)
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, ref)
self._put_object_to_store(metadata, buf_len, f, object_ref)
class ExternalStorageSmartOpenImpl(ExternalStorage):
@ -115,6 +228,10 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
Smart open supports multiple backend with the same APIs.
To use this implementation, you should pre-create the given uri.
For example, if your uri is a local file path, you should pre-create
the directory.
Args:
uri(str): Storage URI used for smart open.
prefix(str): Prefix of objects that are stored.
@ -129,7 +246,7 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
def __init__(self,
uri: str,
prefix: str = "ray_spilled_object_",
prefix: str = DEFAULT_OBJECT_PREFIX,
override_transport_params: dict = None):
try:
from smart_open import open # noqa
@ -142,49 +259,49 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
self.uri = uri.strip("/")
self.prefix = prefix
self.override_transport_params = override_transport_params or {}
self.transport_params = {}.update(self.override_transport_params)
# smart_open always seek to 0 if we don't set this argument.
# This will lead us to call a Object.get when it is not necessary,
# so defer seek and call seek before reading objects instead.
self.transport_params = {"defer_seek": True}
self.transport_params.update(self.override_transport_params)
def spill_objects(self, object_refs):
keys = []
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
key = self.prefix + ref.hex()
self._spill_object(key, ref, buf, metadata)
keys.append(key.encode())
return keys
def restore_spilled_objects(self, keys):
for k in keys:
key = k.decode()
ref = ray.ObjectRef(bytes.fromhex(key[len(self.prefix):]))
self._restore_spilled_object(key, ref)
def _spill_object(self, key, ref, buf, metadata):
def spill_objects(self, object_refs) -> List[str]:
if len(object_refs) == 0:
return []
from smart_open import open
# Always use the first object ref as a key when fusioning objects.
first_ref = object_refs[0]
key = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}"
url = f"{self.uri}/{key}"
with open(
self._build_uri(key), "wb",
url, "wb",
transport_params=self.transport_params) as file_like:
metadata_len = len(metadata)
buf_len = len(buf)
file_like.write(metadata_len.to_bytes(8, byteorder="little"))
file_like.write(buf_len.to_bytes(8, byteorder="little"))
file_like.write(metadata)
file_like.write(memoryview(buf))
return self._write_multiple_objects(file_like, object_refs, url)
def _restore_spilled_object(self, key, ref):
def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
from smart_open import open
for i in range(len(object_refs)):
object_ref = object_refs[i]
url_with_offset = url_with_offset_list[i].decode()
# Retrieve the information needed.
parsed_result = parse_url_with_offset(url_with_offset)
base_url = parsed_result.base_url
offset = parsed_result.offset
with open(
self._build_uri(key), "rb",
transport_params=self.transport_params) as file_like:
metadata_len = int.from_bytes(
file_like.read(8), byteorder="little")
buf_len = int.from_bytes(file_like.read(8), byteorder="little")
metadata = file_like.read(metadata_len)
base_url, "rb",
transport_params=self.transport_params) as f:
# smart open seek reads the file from offset-end_of_the_file
# when the seek is called.
f.seek(offset)
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, file_like, ref)
def _build_uri(self, key):
return f"{self.uri}/{key}"
self._put_object_to_store(metadata, buf_len, f, object_ref)
_external_storage = NullStorage()
@ -223,11 +340,13 @@ def spill_objects(object_refs):
return _external_storage.spill_objects(object_refs)
def restore_spilled_objects(keys: List[bytes]):
"""Spill objects to the external storage. Objects are specified
by their object refs.
def restore_spilled_objects(object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
"""Restore objects from the external storage.
Args:
keys: A list of bytes corresponding to the spilled objects.
object_refs: List of object IDs (note that it is not ref).
url_with_offset_list: List of url_with_offset.
"""
_external_storage.restore_spilled_objects(keys)
_external_storage.restore_spilled_objects(object_refs,
url_with_offset_list)

View file

@ -233,7 +233,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects
(void(const c_vector[c_string]&) nogil) restore_spilled_objects
(void(
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
(void(c_string *stack_out) nogil) get_lang_stack
c_bool ref_counting_enabled
c_bool is_local_mode

View file

@ -52,6 +52,9 @@ cdef class ObjectRef(BaseID):
def task_id(self):
return TaskID(self.data.TaskId().Binary())
def job_id(self):
return self.task_id().job_id()
cdef size_t hash(self):
return self.data.Hash()

View file

@ -124,6 +124,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CActorID ActorId() const
CJobID JobId() const
cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]):
@staticmethod

View file

@ -152,6 +152,9 @@ cdef class TaskID(BaseID):
def actor_id(self):
return ActorID(self.data.ActorId().Binary())
def job_id(self):
return JobID(self.data.JobId().Binary())
cdef size_t hash(self):
return self.data.Hash()

View file

@ -207,3 +207,6 @@ MACH_PAGE_SIZE_BYTES = 4096
# Max 64 bit integer value, which is needed to ensure against overflow
# in C++ when passing integer values cross-language.
MAX_INT64_VALUE = 9223372036854775807
# Object Spilling related constants
DEFAULT_OBJECT_PREFIX = "ray_spilled_object"

View file

@ -9,12 +9,15 @@ import numpy as np
import pytest
import psutil
import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
bucket_name = "object-spilling-test"
spill_local_path = "/tmp/spill"
file_system_object_spilling_config = {
"type": "filesystem",
"params": {
"directory_path": "/tmp"
"directory_path": spill_local_path
}
}
smart_open_object_spilling_config = {
@ -108,6 +111,17 @@ def test_invalid_config_raises_exception(shutdown_only):
})
def test_url_generation_and_parse():
url = "s3://abc/def/ray_good"
offset = 10
size = 30
url_with_offset = create_url_with_offset(url=url, offset=offset, size=size)
parsed_result = parse_url_with_offset(url_with_offset)
assert parsed_result.base_url == url
assert parsed_result.offset == offset
assert parsed_result.size == size
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_manually(object_spilling_config, shutdown_only):
@ -119,6 +133,7 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only):
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
@ -179,6 +194,7 @@ def test_spill_objects_manually_from_workers(object_spilling_config,
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
@ray.remote
@ -209,6 +225,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config,
"automatic_object_spilling_enabled": False,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
arrays = [np.random.rand(100 * 1024) for _ in range(50)]
objects = [ray.put(arr) for arr in arrays]
@ -240,6 +257,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config,
"directory_path": "/tmp"
}
}),
"min_spilling_size": 0,
},
}],
indirect=True)
@ -278,6 +296,7 @@ def test_spill_remote_object(ray_start_cluster_head):
def test_spill_objects_automatically(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
@ -285,27 +304,30 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Wait raylet for starting an IO worker.
time.sleep(1)
solution_buffer = []
buffer_length = 100
# Create objects of more than 800 MiB.
for _ in range(100):
for _ in range(buffer_length):
ref = None
while ref is None:
multiplier = random.choice([1, 2, 3])
arr = np.random.rand(multiplier * 1024 * 1024)
ref = ray.put(arr)
replay_buffer.append(ref)
solution_buffer.append(arr)
print("-----------------------------------")
# randomly sample objects
for _ in range(1000):
ref = random.choice(replay_buffer)
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
assert np.array_equal(sample, solution)
@pytest.mark.skipif(
@ -321,6 +343,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
"automatic_object_spilling_enabled": True,
"max_io_workers": 2,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
},
)
@ -352,13 +375,11 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Wait raylet for starting an IO worker.
time.sleep(1)
# Create objects of more than 400 MiB.
for _ in range(50):
ref = None
@ -373,5 +394,61 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
assert np.array_equal(sample, arr)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_fusion_objects(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
min_spilling_size = 30 * 1024 * 1024
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"min_spilling_size": min_spilling_size,
})
replay_buffer = []
solution_buffer = []
buffer_length = 100
# Create objects of more than 800 MiB.
for _ in range(buffer_length):
ref = None
while ref is None:
multiplier = random.choice([1, 2, 3])
arr = np.random.rand(multiplier * 1024 * 1024)
ref = ray.put(arr)
replay_buffer.append(ref)
solution_buffer.append(arr)
print("-----------------------------------")
# randomly sample objects
for _ in range(1000):
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, solution)
is_test_passing = False
for path in temp_folder.iterdir():
file_size = path.stat().st_size
# Make sure there are at least one
# file_size that exceeds the min_spilling_size.
# If we don't fusion correctly, this cannot happen.
if file_size >= min_spilling_size:
is_test_passing = True
assert is_test_passing
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -332,9 +332,17 @@ RAY_CONFIG(int64_t, max_placement_group_load_report_size, 100)
/// Python IO workers to determine how to store/restore an object to/from
/// external storage.
RAY_CONFIG(std::string, object_spilling_config, "")
/// Whether to enable automatic object spilling. If enabled, then
/// Ray will choose objects to spill when the object store is out of
/// memory.
RAY_CONFIG(bool, automatic_object_spilling_enabled, true)
/// The maximum number of I/O worker that raylet starts.
RAY_CONFIG(int, max_io_workers, 1)
/// Ray's object spilling fuses small objects into a single file before flushing them
/// to optimize the performance.
/// The minimum object size that can be spilled by each spill operation. 100 MB by
/// default. This value is not recommended to set beyond --object-store-memory.
RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)

View file

@ -2346,12 +2346,19 @@ void CoreWorker::HandleRestoreSpilledObjects(
const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
if (options_.restore_spilled_objects != nullptr) {
// Get a list of object ids.
std::vector<ObjectID> object_ids_to_restore;
object_ids_to_restore.reserve(request.object_ids_to_restore_size());
for (const auto &id_binary : request.object_ids_to_restore()) {
object_ids_to_restore.push_back(ObjectID::FromBinary(id_binary));
}
// Get a list of spilled_object_urls.
std::vector<std::string> spilled_objects_url;
spilled_objects_url.reserve(request.spilled_objects_url_size());
for (const auto &url : request.spilled_objects_url()) {
spilled_objects_url.push_back(url);
}
options_.restore_spilled_objects(spilled_objects_url);
options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url);
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
send_reply_callback(

View file

@ -138,7 +138,8 @@ struct CoreWorkerOptions {
/// Application-language callback to spill objects to external storage.
std::function<std::vector<std::string>(const std::vector<ObjectID> &)> spill_objects;
/// Application-language callback to restore objects from external storage.
std::function<void(const std::vector<std::string> &)> restore_spilled_objects;
std::function<void(const std::vector<ObjectID> &, const std::vector<std::string> &)>
restore_spilled_objects;
/// Language worker callback to get the current call stack.
std::function<void(std::string *)> get_lang_stack;
// Function that tries to interrupt the currently running Python thread.

View file

@ -5,10 +5,15 @@
namespace ray {
/// A callback to asynchronously spill objects when space is needed. The
/// callback returns the amount of space still needed after the spilling is
/// complete.
using SpillObjectsCallback = std::function<int64_t(int64_t num_bytes_required)>;
/// A callback to asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// the amount of space needed after the spilling is complete.
/// The returned value is calculated based off of min_bytes_to_spill. That says,
/// although it fails to spill num_bytes_to_spill, as long as it spills more than
/// min_bytes_to_spill, it will return the value that is less than 0 (meaning we
/// don't need any more additional space).
using SpillObjectsCallback =
std::function<int64_t(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill)>;
/// A callback to call when space has been released.
using SpaceReleasedCallback = std::function<void()>;

View file

@ -216,13 +216,18 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE
// make room.
if (space_needed > 0) {
if (spill_objects_callback_) {
// If the space needed is too small, we'd like to bump up to the minimum spilling
// size. Cap the max size to be lower than the plasma store limit.
int64_t byte_to_spill =
std::min(PlasmaAllocator::GetFootprintLimit(),
std::max(space_needed, RayConfig::instance().min_spilling_size()));
// Object spilling is asynchronous so that we do not block the plasma
// store thread. Therefore the client must try again, even if enough
// space will be made after the spill is complete.
// TODO(swang): Only respond to the client with OutOfMemory if we could not
// make enough space through spilling. If we could make enough space,
// respond to the plasma client once spilling is complete.
space_needed = spill_objects_callback_(space_needed);
space_needed = spill_objects_callback_(byte_to_spill, space_needed);
}
if (space_needed > 0) {
// There is still not enough space, even once all evictable objects

View file

@ -309,6 +309,9 @@ message SpillObjectsReply {
message RestoreSpilledObjectsRequest {
// The URLs of spilled objects.
repeated string spilled_objects_url = 1;
// Object ids to restore. The order of object ids
// must be the same as spilled_objects_url.
repeated bytes object_ids_to_restore = 2;
}
message RestoreSpilledObjectsReply {

View file

@ -89,32 +89,34 @@ void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) {
}
}
int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_required) {
int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill,
int64_t min_bytes_to_spill) {
RAY_CHECK(num_bytes_to_spill >= min_bytes_to_spill);
if (RayConfig::instance().object_spilling_config().empty() ||
!RayConfig::instance().automatic_object_spilling_enabled()) {
return num_bytes_required;
return min_bytes_to_spill;
}
absl::MutexLock lock(&mutex_);
RAY_LOG(INFO) << "Choosing objects to spill of total size " << num_bytes_required;
int64_t num_bytes_to_spill = 0;
RAY_LOG(INFO) << "Choosing objects to spill of total size " << num_bytes_to_spill;
int64_t bytes_to_spill = 0;
auto it = pinned_objects_.begin();
std::vector<ObjectID> objects_to_spill;
while (num_bytes_to_spill < num_bytes_required && it != pinned_objects_.end()) {
num_bytes_to_spill += it->second->GetSize();
while (bytes_to_spill < num_bytes_to_spill && it != pinned_objects_.end()) {
bytes_to_spill += it->second->GetSize();
objects_to_spill.push_back(it->first);
it++;
}
if (!objects_to_spill.empty()) {
RAY_LOG(INFO) << "Spilling objects of total size " << num_bytes_to_spill;
RAY_LOG(INFO) << "Spilling objects of total size " << bytes_to_spill;
auto start_time = current_time_ms();
SpillObjectsInternal(
objects_to_spill, [num_bytes_to_spill, start_time](const Status &status) {
objects_to_spill, [bytes_to_spill, start_time](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Error spilling objects " << status.ToString();
} else {
RAY_LOG(INFO) << "Spilled " << num_bytes_to_spill << " in "
RAY_LOG(INFO) << "Spilled " << bytes_to_spill << " in "
<< (current_time_ms() - start_time) << "ms";
}
});
@ -124,8 +126,7 @@ int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_required) {
// bytes that are currently being spilled from the amount of space
// requested. If the space is claimed by another client, this client may
// need to request space again.
num_bytes_required -= num_bytes_pending_spill_;
return num_bytes_required;
return min_bytes_to_spill - num_bytes_pending_spill_;
}
void LocalObjectManager::SpillObjects(const std::vector<ObjectID> &object_ids,
@ -169,7 +170,6 @@ void LocalObjectManager::SpillObjectsInternal(
}
return;
}
io_worker_pool_.PopSpillWorker(
[this, objects_to_spill, callback](std::shared_ptr<WorkerInterface> io_worker) {
rpc::SpillObjectsRequest request;
@ -246,6 +246,7 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
RAY_LOG(DEBUG) << "Sending restore spilled object request";
rpc::RestoreSpilledObjectsRequest request;
request.add_spilled_objects_url(std::move(object_url));
request.add_object_ids_to_restore(object_id.Binary());
io_worker->rpc_client()->RestoreSpilledObjects(
request,
[this, object_id, callback, io_worker](const ray::Status &status,

View file

@ -65,13 +65,22 @@ class LocalObjectManager {
void WaitForObjectFree(const rpc::Address &owner_address,
const std::vector<ObjectID> &object_ids);
/// Asynchronously spill objects whose total size adds up to at least the
/// specified number of bytes.
/// Asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// the amount of space needed after the spilling is complete.
/// The returned value is calculated based off of min_bytes_to_spill. That says,
/// although it fails to spill num_bytes_to_spill, as long as it spills more than
/// min_bytes_to_spill, it will return the value that is less than 0 (meaning we
/// don't need any more additional space).
///
/// \param num_bytes_to_spill The total number of bytes to spill.
/// \return The number of bytes of space still required after the spill is
/// complete.
int64_t SpillObjectsOfSize(int64_t num_bytes_to_spill);
/// \param num_bytes_to_spill The total number of bytes to spill. The method tries to
/// spill bytes as much as this value.
/// \param min_bytes_to_spill The minimum bytes that
/// need to be spilled.
/// \return The number of bytes of space still required after the
/// spill is complete. This return the value is less than 0 if it satifies the
/// min_bytes_to_spill.
int64_t SpillObjectsOfSize(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill);
/// Spill objects to external storage.
///

View file

@ -76,9 +76,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject(
object_id, spilled_url, callback);
},
[this](int64_t num_bytes_required) {
[this](int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) {
return node_manager_.GetLocalObjectManager().SpillObjectsOfSize(
num_bytes_required);
num_bytes_to_spill, min_bytes_to_spill);
}),
node_manager_(main_service, self_node_id_, node_manager_config, object_manager_,
gcs_client_, object_directory_,

View file

@ -380,14 +380,14 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
}
manager.PinObjects(object_ids, std::move(objects));
int64_t num_bytes_required = manager.SpillObjectsOfSize(total_size / 2);
int64_t num_bytes_required = manager.SpillObjectsOfSize(total_size / 2, total_size / 2);
ASSERT_EQ(num_bytes_required, -object_size / 2);
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
// Check that this returns the total number of bytes currently being spilled.
num_bytes_required = manager.SpillObjectsOfSize(0);
num_bytes_required = manager.SpillObjectsOfSize(0, 0);
ASSERT_EQ(num_bytes_required, -2 * object_size);
// Check that half the objects get spilled and the URLs get added to the
@ -411,7 +411,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
}
// Check that this returns the total number of bytes currently being spilled.
num_bytes_required = manager.SpillObjectsOfSize(0);
num_bytes_required = manager.SpillObjectsOfSize(0, 0);
ASSERT_EQ(num_bytes_required, 0);
ASSERT_TRUE(num_callbacks_fired > 0);
}
@ -460,6 +460,52 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
ASSERT_TRUE(num_callbacks_fired > 0);
}
TEST_F(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill) {
/// Test the case SpillObjectsOfSize(num_bytes_to_spill, min_bytes_to_spill
/// where num_bytes_to_spill > min_bytes_to_spill.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 1000;
size_t object_len = 3;
for (size_t i = 0; i < object_len; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
// First test when num_bytes_to_spill > min_bytes to spill.
// It means that we cannot spill the num_bytes_required, but we at least spilled the
// required amount, which is the min_bytes_to_spill.
int64_t num_bytes_required = manager.SpillObjectsOfSize(8000, object_size);
// only min bytes to spill is considered.
ASSERT_TRUE(num_bytes_required <= 0);
// Make sure the spilling is done properly.
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back("url" + std::to_string(i));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_EQ((*unpins).size(), object_len);
}
}
} // namespace raylet
} // namespace ray