From 2e4e285ef079a07fa4f54269751040f5a176785a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 25 Nov 2020 10:13:32 -0800 Subject: [PATCH] [Object Spilling] Fusion small objects (#12087) --- python/ray/_raylet.pyx | 4 +- python/ray/external_storage.py | 263 +++++++++++++----- python/ray/includes/libcoreworker.pxd | 6 +- python/ray/includes/object_ref.pxi | 3 + python/ray/includes/unique_ids.pxd | 2 + python/ray/includes/unique_ids.pxi | 3 + python/ray/ray_constants.py | 3 + python/ray/tests/test_object_spilling.py | 101 ++++++- src/ray/common/ray_config_def.h | 8 + src/ray/core_worker/core_worker.cc | 9 +- src/ray/core_worker/core_worker.h | 3 +- src/ray/object_manager/common.h | 13 +- src/ray/object_manager/plasma/store.cc | 7 +- src/ray/protobuf/core_worker.proto | 3 + src/ray/raylet/local_object_manager.cc | 25 +- src/ray/raylet/local_object_manager.h | 21 +- src/ray/raylet/raylet.cc | 4 +- .../raylet/test/local_object_manager_test.cc | 52 +++- 18 files changed, 413 insertions(+), 117 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 31e97798e..6b6741f1c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -611,17 +611,19 @@ cdef c_vector[c_string] spill_objects_handler( cdef void restore_spilled_objects_handler( + const c_vector[CObjectID]& object_ids_to_restore, const c_vector[c_string]& object_urls) nogil: with gil: urls = [] size = object_urls.size() for i in range(size): urls.append(object_urls[i]) + object_refs = VectorToObjectRefs(object_ids_to_restore) try: with ray.worker._changeproctitle( ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER, ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE): - external_storage.restore_spilled_objects(urls) + external_storage.restore_spilled_objects(object_refs, urls) except Exception: exception_str = ( "An unexpected internal error occurred while the IO worker " diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 45aeb2771..8011f023c 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -1,8 +1,64 @@ import abc import os -from typing import List +import urllib +from collections import namedtuple +from typing import List, IO, Tuple import ray +from ray.ray_constants import DEFAULT_OBJECT_PREFIX +from ray._raylet import ObjectRef + +ParsedURL = namedtuple("ParsedURL", "base_url, offset, size") + + +def create_url_with_offset(*, url: str, offset: int, size: int) -> str: + """Methods to create a URL with offset. + + When ray spills objects, it fuses multiple objects + into one file to optimize the performance. That says, each object + needs to keep tracking of its own special url to store metadata. + + This method creates an url_with_offset, which is used internally + by Ray. + + Created url_with_offset can be passed to the self._get_base_url method + to parse the filename used to store files. + + Example) file://path/to/file?offset=""&size="" + + Args: + url(str): url to the object stored in the external storage. + offset(int): Offset from the beginning of the file to + the first bytes of this object. + size(int): Size of the object that is stored in the url. + It is used to calculate the last offset. + + Returns: + url_with_offset stored internally to find + objects from external storage. + """ + return f"{url}?offset={offset}&size={size}" + + +def parse_url_with_offset(url_with_offset: str) -> Tuple[str, int, int]: + """Parse url_with_offset to retrieve information. + + base_url is the url where the object ref + is stored in the external storage. + + Args: + url_with_offset(str): url created by create_url_with_offset. + + Returns: + named tuple of base_url, offset, and size. + """ + parsed_result = urllib.parse.urlparse(url_with_offset) + query_dict = urllib.parse.parse_qs(parsed_result.query) + # Split by ? to remove the query from the url. + base_url = parsed_result.geturl().split("?")[0] + offset = int(query_dict["offset"][0]) + size = int(query_dict["size"][0]) + return ParsedURL(base_url=base_url, offset=offset, size=size) class ExternalStorage(metaclass=abc.ABCMeta): @@ -35,34 +91,88 @@ class ExternalStorage(metaclass=abc.ABCMeta): worker.core_worker.put_file_like_object(metadata, data_size, file_like, object_ref) + def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef], + url: str) -> List[str]: + """Fuse all given objects into a given file handle. + + Args: + f(IO): File handle to fusion all given object refs. + object_refs(list): Object references to fusion to a single file. + url(str): url where the object ref is stored + in the external storage. + + Return: + List of urls_with_offset of fusioned objects. + The order of returned keys are equivalent to the one + with given object_refs. + """ + keys = [] + offset = 0 + ray_object_pairs = self._get_objects_from_store(object_refs) + for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): + metadata_len = len(metadata) + buf_len = len(buf) + # 16 bytes to store metadata and buffer length. + data_size_in_bytes = metadata_len + buf_len + 16 + f.write(metadata_len.to_bytes(8, byteorder="little")) + f.write(buf_len.to_bytes(8, byteorder="little")) + f.write(metadata) + f.write(memoryview(buf)) + url_with_offset = create_url_with_offset( + url=url, offset=offset, size=data_size_in_bytes) + keys.append(url_with_offset.encode()) + offset += data_size_in_bytes + return keys + + def _size_check(self, metadata_len, buffer_len, obtained_data_size): + """Check whether or not the obtained_data_size is as expected. + + Args: + metadata_len(int): Actual metadata length of the object. + buffer_len(int): Actual buffer length of the object. + obtained_data_size(int): Data size specified in the + url_with_offset. + + Raises: + ValueError if obtained_data_size is different from + metadata_len + buffer_len + 16(first 8 bytes to store length). + """ + data_size_in_bytes = metadata_len + buffer_len + 16 + if data_size_in_bytes != obtained_data_size: + raise ValueError( + f"Obtained data has a size of {data_size_in_bytes}, " + "although it is supposed to have the " + f"size of {obtained_data_size}.") + @abc.abstractmethod - def spill_objects(self, object_refs): + def spill_objects(self, object_refs) -> List[str]: """Spill objects to the external storage. Objects are specified by their object refs. Args: object_refs: The list of the refs of the objects to be spilled. Returns: - A list of keys corresponding to the input object refs. + A list of internal URLs with object offset. """ @abc.abstractmethod - def restore_spilled_objects(self, keys: List[bytes]): - """Spill objects to the external storage. Objects are specified - by their object refs. + def restore_spilled_objects(self, object_refs: List[ObjectRef], + url_with_offset_list: List[str]): + """Restore objects from the external storage. Args: - keys: A list of bytes corresponding to the spilled objects. + object_refs: List of object IDs (note that it is not ref). + url_with_offset_list: List of url_with_offset. """ class NullStorage(ExternalStorage): """The class that represents an uninitialized external storage.""" - def spill_objects(self, object_refs): + def spill_objects(self, object_refs) -> List[str]: raise NotImplementedError("External storage is not initialized") - def restore_spilled_objects(self, keys): + def restore_spilled_objects(self, object_refs, url_with_offset_list): raise NotImplementedError("External storage is not initialized") @@ -76,37 +186,40 @@ class FileSystemStorage(ExternalStorage): def __init__(self, directory_path): self.directory_path = directory_path - self.prefix = "ray_spilled_object_" + self.prefix = DEFAULT_OBJECT_PREFIX os.makedirs(self.directory_path, exist_ok=True) if not os.path.exists(self.directory_path): raise ValueError("The given directory path to store objects, " f"{self.directory_path}, could not be created.") - def spill_objects(self, object_refs): - keys = [] - ray_object_pairs = self._get_objects_from_store(object_refs) - for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): - filename = self.prefix + ref.hex() - with open(os.path.join(self.directory_path, filename), "wb") as f: - metadata_len = len(metadata) - buf_len = len(buf) - f.write(metadata_len.to_bytes(8, byteorder="little")) - f.write(buf_len.to_bytes(8, byteorder="little")) - f.write(metadata) - f.write(memoryview(buf)) - keys.append(filename.encode()) - return keys + def spill_objects(self, object_refs) -> List[str]: + if len(object_refs) == 0: + return [] + # Always use the first object ref as a key when fusioning objects. + first_ref = object_refs[0] + filename = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}" + url = f"{os.path.join(self.directory_path, filename)}" + with open(url, "wb") as f: + return self._write_multiple_objects(f, object_refs, url) - def restore_spilled_objects(self, keys): - for k in keys: - filename = k.decode() - ref = ray.ObjectRef(bytes.fromhex(filename[len(self.prefix):])) - with open(os.path.join(self.directory_path, filename), "rb") as f: + def restore_spilled_objects(self, object_refs: List[ObjectRef], + url_with_offset_list: List[str]): + for i in range(len(object_refs)): + object_ref = object_refs[i] + url_with_offset = url_with_offset_list[i].decode() + # Retrieve the information needed. + parsed_result = parse_url_with_offset(url_with_offset) + base_url = parsed_result.base_url + offset = parsed_result.offset + # Read a part of the file and recover the object. + with open(base_url, "rb") as f: + f.seek(offset) metadata_len = int.from_bytes(f.read(8), byteorder="little") buf_len = int.from_bytes(f.read(8), byteorder="little") + self._size_check(metadata_len, buf_len, parsed_result.size) metadata = f.read(metadata_len) # read remaining data to our buffer - self._put_object_to_store(metadata, buf_len, f, ref) + self._put_object_to_store(metadata, buf_len, f, object_ref) class ExternalStorageSmartOpenImpl(ExternalStorage): @@ -115,6 +228,10 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): Smart open supports multiple backend with the same APIs. + To use this implementation, you should pre-create the given uri. + For example, if your uri is a local file path, you should pre-create + the directory. + Args: uri(str): Storage URI used for smart open. prefix(str): Prefix of objects that are stored. @@ -129,7 +246,7 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): def __init__(self, uri: str, - prefix: str = "ray_spilled_object_", + prefix: str = DEFAULT_OBJECT_PREFIX, override_transport_params: dict = None): try: from smart_open import open # noqa @@ -142,49 +259,49 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): self.uri = uri.strip("/") self.prefix = prefix self.override_transport_params = override_transport_params or {} - self.transport_params = {}.update(self.override_transport_params) + # smart_open always seek to 0 if we don't set this argument. + # This will lead us to call a Object.get when it is not necessary, + # so defer seek and call seek before reading objects instead. + self.transport_params = {"defer_seek": True} + self.transport_params.update(self.override_transport_params) - def spill_objects(self, object_refs): - keys = [] - ray_object_pairs = self._get_objects_from_store(object_refs) - for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): - key = self.prefix + ref.hex() - self._spill_object(key, ref, buf, metadata) - keys.append(key.encode()) - return keys - - def restore_spilled_objects(self, keys): - for k in keys: - key = k.decode() - ref = ray.ObjectRef(bytes.fromhex(key[len(self.prefix):])) - self._restore_spilled_object(key, ref) - - def _spill_object(self, key, ref, buf, metadata): + def spill_objects(self, object_refs) -> List[str]: + if len(object_refs) == 0: + return [] from smart_open import open + # Always use the first object ref as a key when fusioning objects. + first_ref = object_refs[0] + key = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}" + url = f"{self.uri}/{key}" with open( - self._build_uri(key), "wb", + url, "wb", transport_params=self.transport_params) as file_like: - metadata_len = len(metadata) - buf_len = len(buf) - file_like.write(metadata_len.to_bytes(8, byteorder="little")) - file_like.write(buf_len.to_bytes(8, byteorder="little")) - file_like.write(metadata) - file_like.write(memoryview(buf)) + return self._write_multiple_objects(file_like, object_refs, url) - def _restore_spilled_object(self, key, ref): + def restore_spilled_objects(self, object_refs: List[ObjectRef], + url_with_offset_list: List[str]): from smart_open import open - with open( - self._build_uri(key), "rb", - transport_params=self.transport_params) as file_like: - metadata_len = int.from_bytes( - file_like.read(8), byteorder="little") - buf_len = int.from_bytes(file_like.read(8), byteorder="little") - metadata = file_like.read(metadata_len) - # read remaining data to our buffer - self._put_object_to_store(metadata, buf_len, file_like, ref) + for i in range(len(object_refs)): + object_ref = object_refs[i] + url_with_offset = url_with_offset_list[i].decode() - def _build_uri(self, key): - return f"{self.uri}/{key}" + # Retrieve the information needed. + parsed_result = parse_url_with_offset(url_with_offset) + base_url = parsed_result.base_url + offset = parsed_result.offset + + with open( + base_url, "rb", + transport_params=self.transport_params) as f: + # smart open seek reads the file from offset-end_of_the_file + # when the seek is called. + f.seek(offset) + metadata_len = int.from_bytes(f.read(8), byteorder="little") + buf_len = int.from_bytes(f.read(8), byteorder="little") + self._size_check(metadata_len, buf_len, parsed_result.size) + metadata = f.read(metadata_len) + # read remaining data to our buffer + self._put_object_to_store(metadata, buf_len, f, object_ref) _external_storage = NullStorage() @@ -223,11 +340,13 @@ def spill_objects(object_refs): return _external_storage.spill_objects(object_refs) -def restore_spilled_objects(keys: List[bytes]): - """Spill objects to the external storage. Objects are specified - by their object refs. +def restore_spilled_objects(object_refs: List[ObjectRef], + url_with_offset_list: List[str]): + """Restore objects from the external storage. Args: - keys: A list of bytes corresponding to the spilled objects. + object_refs: List of object IDs (note that it is not ref). + url_with_offset_list: List of url_with_offset. """ - _external_storage.restore_spilled_objects(keys) + _external_storage.restore_spilled_objects(object_refs, + url_with_offset_list) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 195882938..fa25d6f66 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -232,8 +232,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (void(const CWorkerID &) nogil) on_worker_shutdown (CRayStatus() nogil) check_signals (void() nogil) gc_collect - (c_vector[c_string](const c_vector[CObjectID]&) nogil) spill_objects - (void(const c_vector[c_string]&) nogil) restore_spilled_objects + (c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects + (void( + const c_vector[CObjectID] &, + const c_vector[c_string] &) nogil) restore_spilled_objects (void(c_string *stack_out) nogil) get_lang_stack c_bool ref_counting_enabled c_bool is_local_mode diff --git a/python/ray/includes/object_ref.pxi b/python/ray/includes/object_ref.pxi index ae99c088c..3353e696e 100644 --- a/python/ray/includes/object_ref.pxi +++ b/python/ray/includes/object_ref.pxi @@ -52,6 +52,9 @@ cdef class ObjectRef(BaseID): def task_id(self): return TaskID(self.data.TaskId().Binary()) + def job_id(self): + return self.task_id().job_id() + cdef size_t hash(self): return self.data.Hash() diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index e79da124d..f0c444a29 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -124,6 +124,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CActorID ActorId() const + CJobID JobId() const + cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]): @staticmethod diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 409c44382..d98179f2d 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -152,6 +152,9 @@ cdef class TaskID(BaseID): def actor_id(self): return ActorID(self.data.ActorId().Binary()) + def job_id(self): + return JobID(self.data.JobId().Binary()) + cdef size_t hash(self): return self.data.Hash() diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 421bd58ba..9e39b3e7a 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -207,3 +207,6 @@ MACH_PAGE_SIZE_BYTES = 4096 # Max 64 bit integer value, which is needed to ensure against overflow # in C++ when passing integer values cross-language. MAX_INT64_VALUE = 9223372036854775807 + +# Object Spilling related constants +DEFAULT_OBJECT_PREFIX = "ray_spilled_object" diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 22bd3989e..883a960ab 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -9,12 +9,15 @@ import numpy as np import pytest import psutil import ray +from ray.external_storage import (create_url_with_offset, + parse_url_with_offset) bucket_name = "object-spilling-test" +spill_local_path = "/tmp/spill" file_system_object_spilling_config = { "type": "filesystem", "params": { - "directory_path": "/tmp" + "directory_path": spill_local_path } } smart_open_object_spilling_config = { @@ -108,6 +111,17 @@ def test_invalid_config_raises_exception(shutdown_only): }) +def test_url_generation_and_parse(): + url = "s3://abc/def/ray_good" + offset = 10 + size = 30 + url_with_offset = create_url_with_offset(url=url, offset=offset, size=size) + parsed_result = parse_url_with_offset(url_with_offset) + assert parsed_result.base_url == url + assert parsed_result.offset == offset + assert parsed_result.size == size + + @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") def test_spill_objects_manually(object_spilling_config, shutdown_only): @@ -119,6 +133,7 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only): "automatic_object_spilling_enabled": False, "max_io_workers": 4, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0, }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] @@ -179,6 +194,7 @@ def test_spill_objects_manually_from_workers(object_spilling_config, "automatic_object_spilling_enabled": False, "max_io_workers": 4, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0, }) @ray.remote @@ -209,6 +225,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config, "automatic_object_spilling_enabled": False, "max_io_workers": 4, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0, }) arrays = [np.random.rand(100 * 1024) for _ in range(50)] objects = [ray.put(arr) for arr in arrays] @@ -240,6 +257,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config, "directory_path": "/tmp" } }), + "min_spilling_size": 0, }, }], indirect=True) @@ -278,6 +296,7 @@ def test_spill_remote_object(ray_start_cluster_head): def test_spill_objects_automatically(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( + num_cpus=1, object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, @@ -285,27 +304,30 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): "object_store_full_max_retries": 4, "object_store_full_initial_delay_ms": 100, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0 }) - arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] - - # Wait raylet for starting an IO worker. - time.sleep(1) + solution_buffer = [] + buffer_length = 100 # Create objects of more than 800 MiB. - for _ in range(100): + for _ in range(buffer_length): ref = None while ref is None: + multiplier = random.choice([1, 2, 3]) + arr = np.random.rand(multiplier * 1024 * 1024) ref = ray.put(arr) replay_buffer.append(ref) + solution_buffer.append(arr) print("-----------------------------------") - # randomly sample objects for _ in range(1000): - ref = random.choice(replay_buffer) + index = random.choice(list(range(buffer_length))) + ref = replay_buffer[index] + solution = solution_buffer[index] sample = ray.get(ref, timeout=0) - assert np.array_equal(sample, arr) + assert np.array_equal(sample, solution) @pytest.mark.skipif( @@ -321,6 +343,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only): "automatic_object_spilling_enabled": True, "max_io_workers": 2, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0, }, ) @@ -352,13 +375,11 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): "object_store_full_max_retries": 4, "object_store_full_initial_delay_ms": 100, "object_spilling_config": object_spilling_config, + "min_spilling_size": 0, }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] - # Wait raylet for starting an IO worker. - time.sleep(1) - # Create objects of more than 400 MiB. for _ in range(50): ref = None @@ -373,5 +394,61 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): assert np.array_equal(sample, arr) +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_fusion_objects(tmp_path, shutdown_only): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + min_spilling_size = 30 * 1024 * 1024 + ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 4, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + "min_spilling_size": min_spilling_size, + }) + replay_buffer = [] + solution_buffer = [] + buffer_length = 100 + + # Create objects of more than 800 MiB. + for _ in range(buffer_length): + ref = None + while ref is None: + multiplier = random.choice([1, 2, 3]) + arr = np.random.rand(multiplier * 1024 * 1024) + ref = ray.put(arr) + replay_buffer.append(ref) + solution_buffer.append(arr) + + print("-----------------------------------") + # randomly sample objects + for _ in range(1000): + index = random.choice(list(range(buffer_length))) + ref = replay_buffer[index] + solution = solution_buffer[index] + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, solution) + + is_test_passing = False + for path in temp_folder.iterdir(): + file_size = path.stat().st_size + # Make sure there are at least one + # file_size that exceeds the min_spilling_size. + # If we don't fusion correctly, this cannot happen. + if file_size >= min_spilling_size: + is_test_passing = True + assert is_test_passing + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 85b8a21a7..3b9dd9b0a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -332,9 +332,17 @@ RAY_CONFIG(int64_t, max_placement_group_load_report_size, 100) /// Python IO workers to determine how to store/restore an object to/from /// external storage. RAY_CONFIG(std::string, object_spilling_config, "") + /// Whether to enable automatic object spilling. If enabled, then /// Ray will choose objects to spill when the object store is out of /// memory. RAY_CONFIG(bool, automatic_object_spilling_enabled, true) + /// The maximum number of I/O worker that raylet starts. RAY_CONFIG(int, max_io_workers, 1) + +/// Ray's object spilling fuses small objects into a single file before flushing them +/// to optimize the performance. +/// The minimum object size that can be spilled by each spill operation. 100 MB by +/// default. This value is not recommended to set beyond --object-store-memory. +RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 39aa6a147..e0c2bad94 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2346,12 +2346,19 @@ void CoreWorker::HandleRestoreSpilledObjects( const rpc::RestoreSpilledObjectsRequest &request, rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { if (options_.restore_spilled_objects != nullptr) { + // Get a list of object ids. + std::vector object_ids_to_restore; + object_ids_to_restore.reserve(request.object_ids_to_restore_size()); + for (const auto &id_binary : request.object_ids_to_restore()) { + object_ids_to_restore.push_back(ObjectID::FromBinary(id_binary)); + } + // Get a list of spilled_object_urls. std::vector spilled_objects_url; spilled_objects_url.reserve(request.spilled_objects_url_size()); for (const auto &url : request.spilled_objects_url()) { spilled_objects_url.push_back(url); } - options_.restore_spilled_objects(spilled_objects_url); + options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url); send_reply_callback(Status::OK(), nullptr, nullptr); } else { send_reply_callback( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 7612f1642..dae8ecd9e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -138,7 +138,8 @@ struct CoreWorkerOptions { /// Application-language callback to spill objects to external storage. std::function(const std::vector &)> spill_objects; /// Application-language callback to restore objects from external storage. - std::function &)> restore_spilled_objects; + std::function &, const std::vector &)> + restore_spilled_objects; /// Language worker callback to get the current call stack. std::function get_lang_stack; // Function that tries to interrupt the currently running Python thread. diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index 063f1fb51..eacfb4b13 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -5,10 +5,15 @@ namespace ray { -/// A callback to asynchronously spill objects when space is needed. The -/// callback returns the amount of space still needed after the spilling is -/// complete. -using SpillObjectsCallback = std::function; +/// A callback to asynchronously spill objects when space is needed. +/// The callback tries to spill objects as much as num_bytes_to_spill and returns +/// the amount of space needed after the spilling is complete. +/// The returned value is calculated based off of min_bytes_to_spill. That says, +/// although it fails to spill num_bytes_to_spill, as long as it spills more than +/// min_bytes_to_spill, it will return the value that is less than 0 (meaning we +/// don't need any more additional space). +using SpillObjectsCallback = + std::function; /// A callback to call when space has been released. using SpaceReleasedCallback = std::function; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index f3586d0b2..7a38f9934 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -216,13 +216,18 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE // make room. if (space_needed > 0) { if (spill_objects_callback_) { + // If the space needed is too small, we'd like to bump up to the minimum spilling + // size. Cap the max size to be lower than the plasma store limit. + int64_t byte_to_spill = + std::min(PlasmaAllocator::GetFootprintLimit(), + std::max(space_needed, RayConfig::instance().min_spilling_size())); // Object spilling is asynchronous so that we do not block the plasma // store thread. Therefore the client must try again, even if enough // space will be made after the spill is complete. // TODO(swang): Only respond to the client with OutOfMemory if we could not // make enough space through spilling. If we could make enough space, // respond to the plasma client once spilling is complete. - space_needed = spill_objects_callback_(space_needed); + space_needed = spill_objects_callback_(byte_to_spill, space_needed); } if (space_needed > 0) { // There is still not enough space, even once all evictable objects diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index bb61612eb..da411294e 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -309,6 +309,9 @@ message SpillObjectsReply { message RestoreSpilledObjectsRequest { // The URLs of spilled objects. repeated string spilled_objects_url = 1; + // Object ids to restore. The order of object ids + // must be the same as spilled_objects_url. + repeated bytes object_ids_to_restore = 2; } message RestoreSpilledObjectsReply { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index b3888b28b..0440276b2 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -89,32 +89,34 @@ void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) { } } -int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_required) { +int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill, + int64_t min_bytes_to_spill) { + RAY_CHECK(num_bytes_to_spill >= min_bytes_to_spill); if (RayConfig::instance().object_spilling_config().empty() || !RayConfig::instance().automatic_object_spilling_enabled()) { - return num_bytes_required; + return min_bytes_to_spill; } absl::MutexLock lock(&mutex_); - RAY_LOG(INFO) << "Choosing objects to spill of total size " << num_bytes_required; - int64_t num_bytes_to_spill = 0; + RAY_LOG(INFO) << "Choosing objects to spill of total size " << num_bytes_to_spill; + int64_t bytes_to_spill = 0; auto it = pinned_objects_.begin(); std::vector objects_to_spill; - while (num_bytes_to_spill < num_bytes_required && it != pinned_objects_.end()) { - num_bytes_to_spill += it->second->GetSize(); + while (bytes_to_spill < num_bytes_to_spill && it != pinned_objects_.end()) { + bytes_to_spill += it->second->GetSize(); objects_to_spill.push_back(it->first); it++; } if (!objects_to_spill.empty()) { - RAY_LOG(INFO) << "Spilling objects of total size " << num_bytes_to_spill; + RAY_LOG(INFO) << "Spilling objects of total size " << bytes_to_spill; auto start_time = current_time_ms(); SpillObjectsInternal( - objects_to_spill, [num_bytes_to_spill, start_time](const Status &status) { + objects_to_spill, [bytes_to_spill, start_time](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Error spilling objects " << status.ToString(); } else { - RAY_LOG(INFO) << "Spilled " << num_bytes_to_spill << " in " + RAY_LOG(INFO) << "Spilled " << bytes_to_spill << " in " << (current_time_ms() - start_time) << "ms"; } }); @@ -124,8 +126,7 @@ int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_required) { // bytes that are currently being spilled from the amount of space // requested. If the space is claimed by another client, this client may // need to request space again. - num_bytes_required -= num_bytes_pending_spill_; - return num_bytes_required; + return min_bytes_to_spill - num_bytes_pending_spill_; } void LocalObjectManager::SpillObjects(const std::vector &object_ids, @@ -169,7 +170,6 @@ void LocalObjectManager::SpillObjectsInternal( } return; } - io_worker_pool_.PopSpillWorker( [this, objects_to_spill, callback](std::shared_ptr io_worker) { rpc::SpillObjectsRequest request; @@ -246,6 +246,7 @@ void LocalObjectManager::AsyncRestoreSpilledObject( RAY_LOG(DEBUG) << "Sending restore spilled object request"; rpc::RestoreSpilledObjectsRequest request; request.add_spilled_objects_url(std::move(object_url)); + request.add_object_ids_to_restore(object_id.Binary()); io_worker->rpc_client()->RestoreSpilledObjects( request, [this, object_id, callback, io_worker](const ray::Status &status, diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 8305d49ec..f645062ae 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -65,13 +65,22 @@ class LocalObjectManager { void WaitForObjectFree(const rpc::Address &owner_address, const std::vector &object_ids); - /// Asynchronously spill objects whose total size adds up to at least the - /// specified number of bytes. + /// Asynchronously spill objects when space is needed. + /// The callback tries to spill objects as much as num_bytes_to_spill and returns + /// the amount of space needed after the spilling is complete. + /// The returned value is calculated based off of min_bytes_to_spill. That says, + /// although it fails to spill num_bytes_to_spill, as long as it spills more than + /// min_bytes_to_spill, it will return the value that is less than 0 (meaning we + /// don't need any more additional space). /// - /// \param num_bytes_to_spill The total number of bytes to spill. - /// \return The number of bytes of space still required after the spill is - /// complete. - int64_t SpillObjectsOfSize(int64_t num_bytes_to_spill); + /// \param num_bytes_to_spill The total number of bytes to spill. The method tries to + /// spill bytes as much as this value. + /// \param min_bytes_to_spill The minimum bytes that + /// need to be spilled. + /// \return The number of bytes of space still required after the + /// spill is complete. This return the value is less than 0 if it satifies the + /// min_bytes_to_spill. + int64_t SpillObjectsOfSize(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill); /// Spill objects to external storage. /// diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index c7623f420..467a5301c 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -76,9 +76,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject( object_id, spilled_url, callback); }, - [this](int64_t num_bytes_required) { + [this](int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) { return node_manager_.GetLocalObjectManager().SpillObjectsOfSize( - num_bytes_required); + num_bytes_to_spill, min_bytes_to_spill); }), node_manager_(main_service, self_node_id_, node_manager_config, object_manager_, gcs_client_, object_directory_, diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index e6e7c532b..4311c613b 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -380,14 +380,14 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { } manager.PinObjects(object_ids, std::move(objects)); - int64_t num_bytes_required = manager.SpillObjectsOfSize(total_size / 2); + int64_t num_bytes_required = manager.SpillObjectsOfSize(total_size / 2, total_size / 2); ASSERT_EQ(num_bytes_required, -object_size / 2); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); } // Check that this returns the total number of bytes currently being spilled. - num_bytes_required = manager.SpillObjectsOfSize(0); + num_bytes_required = manager.SpillObjectsOfSize(0, 0); ASSERT_EQ(num_bytes_required, -2 * object_size); // Check that half the objects get spilled and the URLs get added to the @@ -411,7 +411,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { } // Check that this returns the total number of bytes currently being spilled. - num_bytes_required = manager.SpillObjectsOfSize(0); + num_bytes_required = manager.SpillObjectsOfSize(0, 0); ASSERT_EQ(num_bytes_required, 0); ASSERT_TRUE(num_callbacks_fired > 0); } @@ -460,6 +460,52 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_TRUE(num_callbacks_fired > 0); } +TEST_F(LocalObjectManagerTest, + TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill) { + /// Test the case SpillObjectsOfSize(num_bytes_to_spill, min_bytes_to_spill + /// where num_bytes_to_spill > min_bytes_to_spill. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + std::vector> objects; + int64_t total_size = 0; + int64_t object_size = 1000; + size_t object_len = 3; + + for (size_t i = 0; i < object_len; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(object_size, object_id, unpins); + total_size += object_size; + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + + // First test when num_bytes_to_spill > min_bytes to spill. + // It means that we cannot spill the num_bytes_required, but we at least spilled the + // required amount, which is the min_bytes_to_spill. + int64_t num_bytes_required = manager.SpillObjectsOfSize(8000, object_size); + // only min bytes to spill is considered. + ASSERT_TRUE(num_bytes_required <= 0); + + // Make sure the spilling is done properly. + std::vector urls; + for (size_t i = 0; i < object_ids.size(); i++) { + urls.push_back("url" + std::to_string(i)); + } + EXPECT_CALL(worker_pool, PushSpillWorker(_)); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + for (size_t i = 0; i < object_ids.size(); i++) { + ASSERT_EQ((*unpins).size(), object_len); + } +} + } // namespace raylet } // namespace ray