[Object Spilling] Delete spilled objects when references are gone out of scope. (#12341)

This commit is contained in:
SangBin Cho 2020-12-01 13:10:39 -08:00 committed by GitHub
parent ef1b0c13c3
commit 0e892908f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 968 additions and 11 deletions

View file

@ -54,6 +54,7 @@ from ray.includes.common cimport (
CTaskType,
CPlacementStrategy,
CRayFunction,
CWorkerType,
move,
LANGUAGE_CPP,
LANGUAGE_JAVA,
@ -631,6 +632,46 @@ cdef void restore_spilled_objects_handler(
job_id=None)
cdef void delete_spilled_objects_handler(
const c_vector[c_string]& object_urls,
CWorkerType worker_type) nogil:
with gil:
urls = []
size = object_urls.size()
for i in range(size):
urls.append(object_urls[i])
try:
# Get proctitle.
if <int> worker_type == <int> WORKER_TYPE_SPILL_WORKER:
original_proctitle = (
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE)
proctitle = (
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_DELETE)
elif <int> worker_type == <int> WORKER_TYPE_RESTORE_WORKER:
original_proctitle = (
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE)
proctitle = (
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_DELETE)
else:
assert False, ("This line shouldn't be reachable.")
# Delete objects.
with ray.worker._changeproctitle(
proctitle,
original_proctitle):
external_storage.delete_spilled_objects(urls)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "
"was deleting spilled objects.")
logger.exception(exception_str)
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"delete_spilled_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
# This function introduces ~2-7us of overhead per call (i.e., it can be called
# up to hundreds of thousands of times per second).
cdef void get_py_stack(c_string* stack_out) nogil:
@ -739,6 +780,7 @@ cdef class CoreWorker:
options.gc_collect = gc_collect
options.spill_objects = spill_objects_handler
options.restore_spilled_objects = restore_spilled_objects_handler
options.delete_spilled_objects = delete_spilled_objects_handler
options.get_lang_stack = get_py_stack
options.ref_counting_enabled = True
options.is_local_mode = local_mode
@ -1473,6 +1515,10 @@ cdef class CoreWorker:
def force_spill_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
object_ids = ObjectRefsToVector(object_refs)
assert not RayConfig.instance().automatic_object_deletion_enabled(), (
"Automatic object deletion is not supported for"
"force_spill_objects yet. Please set"
"automatic_object_deletion_enabled: False in Ray's system config.")
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.SpillObjects(object_ids))

View file

@ -165,6 +165,14 @@ class ExternalStorage(metaclass=abc.ABCMeta):
url_with_offset_list: List of url_with_offset.
"""
@abc.abstractmethod
def delete_spilled_objects(self, urls: List[str]):
"""Delete objects that are spilled to the external storage.
Args:
urls: URLs that store spilled object files.
"""
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""
@ -175,6 +183,9 @@ class NullStorage(ExternalStorage):
def restore_spilled_objects(self, object_refs, url_with_offset_list):
raise NotImplementedError("External storage is not initialized")
def delete_spilled_objects(self, urls: List[str]):
raise NotImplementedError("External storage is not initialized")
class FileSystemStorage(ExternalStorage):
"""The class for filesystem-like external storage.
@ -221,6 +232,11 @@ class FileSystemStorage(ExternalStorage):
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
def delete_spilled_objects(self, urls: List[str]):
for url in urls:
filename = parse_url_with_offset(url.decode()).base_url
os.remove(os.path.join(self.directory_path, filename))
class ExternalStorageSmartOpenImpl(ExternalStorage):
"""The external storage class implemented by smart_open.
@ -303,6 +319,9 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
def delete_spilled_objects(self, urls: List[str]):
pass
_external_storage = NullStorage()
@ -350,3 +369,12 @@ def restore_spilled_objects(object_refs: List[ObjectRef],
"""
_external_storage.restore_spilled_objects(object_refs,
url_with_offset_list)
def delete_spilled_objects(urls: List[str]):
"""Delete objects that are spilled to the external storage.
Args:
urls: URLs that store spilled object files.
"""
_external_storage.delete_spilled_objects(urls)

View file

@ -231,6 +231,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(void(
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
(void(
const c_vector[c_string]&,
CWorkerType) nogil) delete_spilled_objects
(void(c_string *stack_out) nogil) get_lang_stack
c_bool ref_counting_enabled
c_bool is_local_mode

View file

@ -66,3 +66,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
uint64_t metrics_report_interval_ms() const
c_bool enable_timeline() const
c_bool automatic_object_deletion_enabled() const

View file

@ -115,3 +115,7 @@ cdef class Config:
@staticmethod
def enable_timeline():
return RayConfig.instance().enable_timeline()
@staticmethod
def automatic_object_deletion_enabled():
return RayConfig.instance().automatic_object_deletion_enabled()

View file

@ -187,6 +187,10 @@ WORKER_PROCESS_TYPE_SPILL_WORKER = (
f"ray::SPILL_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}")
WORKER_PROCESS_TYPE_RESTORE_WORKER = (
f"ray::RESTORE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}")
WORKER_PROCESS_TYPE_SPILL_WORKER_DELETE = (
f"ray::DELETE_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}")
WORKER_PROCESS_TYPE_RESTORE_WORKER_DELETE = (
f"ray::DELETE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}")
LOG_MONITOR_MAX_OPEN_FILES = 200

View file

@ -1,5 +1,6 @@
import copy
import json
import os
import random
import platform
import sys
@ -11,6 +12,7 @@ import psutil
import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray.test_utils import wait_for_condition
bucket_name = "object-spilling-test"
spill_local_path = "/tmp/spill"
@ -57,6 +59,7 @@ def test_sample_benchmark(object_spilling_config, shutdown_only):
"object_store_full_max_retries": 0,
"max_io_workers": max_io_workers,
"object_spilling_config": object_spilling_config,
"automatic_object_deletion_enabled": False,
})
arr = np.random.rand(object_size)
replay_buffer = []
@ -134,6 +137,7 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only):
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
@ -195,6 +199,7 @@ def test_spill_objects_manually_from_workers(object_spilling_config,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
@ray.remote
@ -226,6 +231,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config,
"max_io_workers": 4,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
"automatic_object_deletion_enabled": False,
})
arrays = [np.random.rand(100 * 1024) for _ in range(50)]
objects = [ray.put(arr) for arr in arrays]
@ -396,6 +402,246 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
for _ in range(80):
ref = None
while ref is None:
ref = ray.put(arr)
replay_buffer.append(ref)
print("-----------------------------------")
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
del replay_buffer
del ref
wait_for_condition(is_dir_empty)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
for _ in range(80):
ref = None
while ref is None:
ref = ray.put(arr)
replay_buffer.append(ref)
# Remove the replay buffer with 60% probability.
if random.randint(0, 9) < 6:
replay_buffer.pop()
# Do random sampling.
for _ in range(200):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# After all, make sure all objects are killed without race condition.
del replay_buffer
del ref
wait_for_condition(is_dir_empty, timeout=1000)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"min_spilling_size": 0,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
@ray.remote
class Actor:
def __init__(self):
self.replay_buffer = []
def get_pid(self):
return os.getpid()
def create_objects(self):
for _ in range(80):
ref = None
while ref is None:
ref = ray.put(arr)
self.replay_buffer.append(ref)
# Remove the replay buffer with 60% probability.
if random.randint(0, 9) < 6:
self.replay_buffer.pop()
# Do random sampling.
for _ in range(200):
ref = random.choice(self.replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
a = Actor.remote()
actor_pid = ray.get(a.get_pid.remote())
ray.get(a.create_objects.remote())
os.kill(actor_pid, 9)
def wait_until_actor_dead():
try:
ray.get(a.get_pid.remote())
except ray.exceptions.RayActorError:
return True
return False
wait_for_condition(wait_until_actor_dead)
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# After all, make sure all objects are deleted upon worker failures.
wait_for_condition(is_dir_empty, timeout=1000)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
cluster = ray_start_cluster
# Head node.
cluster.add_node(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 2,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
})
# Add 2 worker nodes.
for _ in range(2):
cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024)
ray.init(address=cluster.address)
arr = np.random.rand(1024 * 1024) # 8 MB data
@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
self.replay_buffer = []
def ping(self):
return
def create_objects(self):
for _ in range(80):
ref = None
while ref is None:
ref = ray.put(arr)
self.replay_buffer.append(ref)
# Remove the replay buffer with 60% probability.
if random.randint(0, 9) < 6:
self.replay_buffer.pop()
# Do random sampling.
for _ in range(200):
ref = random.choice(self.replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
actors = [Actor.remote() for _ in range(3)]
ray.get([actor.create_objects.remote() for actor in actors])
def wait_until_actor_dead(actor):
try:
ray.get(actor.ping.remote())
except ray.exceptions.RayActorError:
return True
return False
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# Kill actors to remove all references.
for actor in actors:
ray.kill(actor)
wait_for_condition(lambda: wait_until_actor_dead(actor))
# The multi node deletion should work.
wait_for_condition(is_dir_empty)
def test_fusion_objects(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"

View file

@ -351,3 +351,8 @@ RAY_CONFIG(int, max_io_workers, 1)
/// The minimum object size that can be spilled by each spill operation. 100 MB by
/// default. This value is not recommended to set beyond --object-store-memory.
RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)
/// Whether to enable automatic object deletion when refs are gone out of scope.
/// When it is true, manual (force) spilling is not available.
/// TODO(sang): Fix it.
RAY_CONFIG(bool, automatic_object_deletion_enabled, true)

View file

@ -2358,6 +2358,24 @@ void CoreWorker::HandleRestoreSpilledObjects(
}
}
void CoreWorker::HandleDeleteSpilledObjects(
const rpc::DeleteSpilledObjectsRequest &request,
rpc::DeleteSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
if (options_.delete_spilled_objects != nullptr) {
std::vector<std::string> spilled_objects_url;
spilled_objects_url.reserve(request.spilled_objects_url_size());
for (const auto &url : request.spilled_objects_url()) {
spilled_objects_url.push_back(url);
}
options_.delete_spilled_objects(spilled_objects_url, worker_context_.GetWorkerType());
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
send_reply_callback(
Status::NotImplemented("Delete spilled objects callback not defined"), nullptr,
nullptr);
}
}
void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) {
send_reply_callback(Status::OK(), nullptr, nullptr);

View file

@ -81,6 +81,7 @@ struct CoreWorkerOptions {
gc_collect(nullptr),
spill_objects(nullptr),
restore_spilled_objects(nullptr),
delete_spilled_objects(nullptr),
get_lang_stack(nullptr),
kill_main(nullptr),
ref_counting_enabled(false),
@ -140,6 +141,9 @@ struct CoreWorkerOptions {
/// Application-language callback to restore objects from external storage.
std::function<void(const std::vector<ObjectID> &, const std::vector<std::string> &)>
restore_spilled_objects;
/// Application-language callback to delete objects from external storage.
std::function<void(const std::vector<std::string> &, rpc::WorkerType)>
delete_spilled_objects;
/// Language worker callback to get the current call stack.
std::function<void(std::string *)> get_lang_stack;
// Function that tries to interrupt the currently running Python thread.
@ -874,6 +878,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::RestoreSpilledObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
// Delete objects from external storage.
void HandleDeleteSpilledObjects(const rpc::DeleteSpilledObjectsRequest &request,
rpc::DeleteSpilledObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
// Make the this worker exit.
void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

View file

@ -317,6 +317,14 @@ message RestoreSpilledObjectsRequest {
message RestoreSpilledObjectsReply {
}
message DeleteSpilledObjectsRequest {
// The URLs of spilled objects.
repeated string spilled_objects_url = 1;
}
message DeleteSpilledObjectsReply {
}
message ExitRequest {
}
@ -366,6 +374,9 @@ service CoreWorkerService {
// Restore spilled objects from external storage. Caller: raylet; callee: I/O worker.
rpc RestoreSpilledObjects(RestoreSpilledObjectsRequest)
returns (RestoreSpilledObjectsReply);
// Delete spilled objects from external storage. Caller: raylet; callee: I/O worker.
rpc DeleteSpilledObjects(DeleteSpilledObjectsRequest)
returns (DeleteSpilledObjectsReply);
// Notification from raylet that an object ID is available in local plasma.
rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply);
// Request for a worker to exit.

View file

@ -13,6 +13,8 @@
// limitations under the License.
#include "ray/raylet/local_object_manager.h"
#include "ray/util/asio_util.h"
#include "ray/util/util.h"
namespace ray {
@ -21,6 +23,7 @@ namespace raylet {
void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(object_pinning_enabled_);
for (size_t i = 0; i < object_ids.size(); i++) {
const auto &object_id = object_ids[i];
auto &object = objects[i];
@ -57,9 +60,17 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
}
void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
{
// object_pinning_enabled_ flag is off when the --lru-evict flag is on.
if (object_pinning_enabled_) {
absl::MutexLock lock(&mutex_);
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
// The object should be in one of these stats. pinned, spilling, or spilled.
RAY_CHECK((pinned_objects_.count(object_id) > 0) ||
(spilled_objects_url_.count(object_id) > 0) ||
(objects_pending_spill_.count(object_id) > 0));
if (automatic_object_deletion_enabled_) {
spilled_object_pending_delete_.push(object_id);
}
pinned_objects_.erase(object_id);
}
@ -79,6 +90,10 @@ void LocalObjectManager::FlushFreeObjects() {
on_objects_freed_(objects_to_free_);
objects_to_free_.clear();
}
if (object_pinning_enabled_ && automatic_object_deletion_enabled_) {
// Deletion wouldn't work when the object pinning is not enabled.
ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_);
}
last_free_objects_at_ms_ = current_time_ms();
}
@ -92,6 +107,7 @@ void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) {
int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill,
int64_t min_bytes_to_spill) {
RAY_CHECK(num_bytes_to_spill >= min_bytes_to_spill);
if (RayConfig::instance().object_spilling_config().empty() ||
!RayConfig::instance().automatic_object_spilling_enabled()) {
return min_bytes_to_spill;
@ -180,8 +196,8 @@ void LocalObjectManager::SpillObjectsInternal(
io_worker->rpc_client()->SpillObjects(
request, [this, objects_to_spill, callback, io_worker](
const ray::Status &status, const rpc::SpillObjectsReply &r) {
io_worker_pool_.PushSpillWorker(io_worker);
absl::MutexLock lock(&mutex_);
io_worker_pool_.PushSpillWorker(io_worker);
if (!status.ok()) {
for (const auto &object_id : objects_to_spill) {
auto it = objects_pending_spill_.find(object_id);
@ -217,7 +233,8 @@ void LocalObjectManager::AddSpilledUrls(
// be retrieved by other raylets.
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
object_id, object_url,
[this, object_id, callback, num_remaining, num_bytes_spilled](Status status) {
[this, object_id, object_url, callback, num_remaining,
num_bytes_spilled](Status status) {
RAY_CHECK_OK(status);
absl::MutexLock lock(&mutex_);
// Unpin the object.
@ -227,6 +244,21 @@ void LocalObjectManager::AddSpilledUrls(
*num_bytes_spilled += it->second->GetSize();
objects_pending_spill_.erase(it);
// Update the object_id -> url_ref_count to use it for deletion later.
// We need to track the references here because a single file can contain
// multiple objects, and we shouldn't delete the file until
// all the objects are gone out of scope.
// object_url is equivalent to url_with_offset.
auto parsed_url = ParseURL(object_url);
const auto base_url_it = parsed_url->find("url");
RAY_CHECK(base_url_it != parsed_url->end());
if (!url_ref_count_.contains(base_url_it->second)) {
url_ref_count_[base_url_it->second] = 1;
} else {
url_ref_count_[base_url_it->second] += 1;
}
spilled_objects_url_.emplace(object_id, object_url);
(*num_remaining)--;
if (*num_remaining == 0 && callback) {
callback(status);
@ -265,6 +297,73 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
});
}
void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) {
absl::MutexLock lock(&mutex_);
std::vector<std::string> object_urls_to_delete;
// Process upto batch size of objects to delete.
while (!spilled_object_pending_delete_.empty() &&
object_urls_to_delete.size() < max_batch_size) {
auto &object_id = spilled_object_pending_delete_.front();
// If the object is still spilling, do nothing. This will block other entries to be
// processed, but it should be fine because the spilling will be eventually done, and
// deleting objects is the low priority tasks.
// This will instead enable simpler logic after this block.
if (objects_pending_spill_.contains(object_id)) {
break;
}
// Object id is either spilled or not spilled at this point.
const auto spilled_objects_url_it = spilled_objects_url_.find(object_id);
if (spilled_objects_url_it != spilled_objects_url_.end()) {
// If the object was spilled, see if we can delete it. We should first check the ref
// count.
std::string &object_url = spilled_objects_url_it->second;
// Note that here, we need to parse the object url to obtain the base_url.
auto parsed_url = ParseURL(object_url);
const auto base_url_it = parsed_url->find("url");
RAY_CHECK(base_url_it != parsed_url->end());
const auto &url_ref_count_it = url_ref_count_.find(base_url_it->second);
RAY_CHECK(url_ref_count_it != url_ref_count_.end())
<< "url_ref_count_ should exist when spilled_objects_url_ exists. Please "
"submit a Github issue if you see this error.";
url_ref_count_it->second -= 1;
// If there's no more refs, delete the object.
if (url_ref_count_it->second == 0) {
url_ref_count_.erase(url_ref_count_it);
object_urls_to_delete.emplace_back(object_url);
}
spilled_objects_url_.erase(spilled_objects_url_it);
}
spilled_object_pending_delete_.pop();
}
if (object_urls_to_delete.size() > 0) {
DeleteSpilledObjects(object_urls_to_delete);
}
}
void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> &urls_to_delete) {
io_worker_pool_.PopDeleteWorker(
[this, urls_to_delete](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending delete spilled object request. Length: "
<< urls_to_delete.size();
rpc::DeleteSpilledObjectsRequest request;
for (const auto &url : urls_to_delete) {
request.add_spilled_objects_url(std::move(url));
}
io_worker->rpc_client()->DeleteSpilledObjects(
request, [this, io_worker](const ray::Status &status,
const rpc::DeleteSpilledObjectsReply &reply) {
io_worker_pool_.PushDeleteWorker(io_worker);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send delete spilled object request: "
<< status.ToString();
}
});
});
}
}; // namespace raylet
}; // namespace ray

View file

@ -33,10 +33,12 @@ namespace raylet {
/// have been freed, and objects that have been spilled.
class LocalObjectManager {
public:
LocalObjectManager(size_t free_objects_batch_size, int64_t free_objects_period_ms,
LocalObjectManager(boost::asio::io_service &io_context, size_t free_objects_batch_size,
int64_t free_objects_period_ms,
IOWorkerPoolInterface &io_worker_pool,
gcs::ObjectInfoAccessor &object_info_accessor,
rpc::CoreWorkerClientPool &owner_client_pool,
bool object_pinning_enabled, bool automatic_object_deletion_enabled,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
SpaceReleasedCallback on_objects_spilled)
: free_objects_period_ms_(free_objects_period_ms),
@ -44,6 +46,8 @@ class LocalObjectManager {
io_worker_pool_(io_worker_pool),
object_info_accessor_(object_info_accessor),
owner_client_pool_(owner_client_pool),
object_pinning_enabled_(object_pinning_enabled),
automatic_object_deletion_enabled_(automatic_object_deletion_enabled),
on_objects_freed_(on_objects_freed),
on_objects_spilled_(on_objects_spilled),
last_free_objects_at_ms_(current_time_ms()) {}
@ -102,6 +106,16 @@ class LocalObjectManager {
/// Try to clear any objects that have been freed.
void FlushFreeObjectsIfNeeded(int64_t now_ms);
/// Judge if objects are deletable from pending_delete_queue and delete them if
/// necessary.
/// TODO(sang): We currently only use 1 IO worker per each call to this method because
/// delete is a low priority tasks. But we can potentially support more workers to be
/// used at once.
///
/// \param max_batch_size Maximum number of objects that can be deleted by one
/// invocation.
void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size);
private:
/// Internal helper method for spilling objects.
void SpillObjectsInternal(const std::vector<ObjectID> &objects_ids,
@ -121,6 +135,11 @@ class LocalObjectManager {
const rpc::SpillObjectsReply &worker_reply,
std::function<void(const ray::Status &)> callback);
/// Delete spilled objects stored in given urls.
///
/// \param urls_to_delete List of urls to delete from external storages.
void DeleteSpilledObjects(std::vector<std::string> &urls_to_delete);
/// The period between attempts to eagerly evict objects from plasma.
const int64_t free_objects_period_ms_;
@ -137,6 +156,12 @@ class LocalObjectManager {
/// this node.
rpc::CoreWorkerClientPool &owner_client_pool_;
/// Whether to enable pinning for plasma objects.
bool object_pinning_enabled_;
/// Whether to enable automatic deletion when refs are gone out of scope.
bool automatic_object_deletion_enabled_;
/// A callback to call when an object has been freed.
std::function<void(const std::vector<ObjectID> &)> on_objects_freed_;
@ -171,6 +196,24 @@ class LocalObjectManager {
/// This class is accessed by both the raylet and plasma store threads. The
/// mutex protects private members that relate to object spilling.
mutable absl::Mutex mutex_;
///
/// Fields below are used to delete spilled objects.
///
/// A list of object id and url pairs that need to be deleted.
/// We don't instantly delete objects when it goes out of scope from external storages
/// because those objects could be still in progress of spilling.
std::queue<ObjectID> spilled_object_pending_delete_ GUARDED_BY(mutex_);
/// Mapping from object id to url_with_offsets. We cannot reuse pinned_objects_ because
/// pinned_objects_ entries are deleted when spilling happens.
absl::flat_hash_map<ObjectID, std::string> spilled_objects_url_ GUARDED_BY(mutex_);
/// Base URL -> ref_count. It is used because there could be multiple objects
/// within a single spilled file. We need to ref count to avoid deleting the file
/// before all objects within that file are out of scope.
absl::flat_hash_map<std::string, uint64_t> url_ref_count_ GUARDED_BY(mutex_);
};
}; // namespace raylet

View file

@ -225,6 +225,8 @@ int main(int argc, char *argv[]) {
RayConfig::instance().fair_queueing_enabled();
node_manager_config.object_pinning_enabled =
RayConfig::instance().object_pinning_enabled();
node_manager_config.automatic_object_deletion_enabled =
RayConfig::instance().automatic_object_deletion_enabled();
node_manager_config.store_socket_name = store_socket_name;
node_manager_config.temp_dir = temp_dir;
node_manager_config.session_dir = session_dir;

View file

@ -157,9 +157,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
agent_manager_service_(io_service, *agent_manager_service_handler_),
client_call_manager_(io_service),
worker_rpc_pool_(client_call_manager_),
local_object_manager_(RayConfig::instance().free_objects_batch_size(),
local_object_manager_(io_service_, RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(),
worker_pool_, gcs_client_->Objects(), worker_rpc_pool_,
/* object_pinning_enabled */ config.object_pinning_enabled,
/* automatic_object_deletion_enabled */
config.automatic_object_deletion_enabled,
[this](const std::vector<ObjectID> &object_ids) {
object_manager_.FreeObjects(object_ids,
/*local_only=*/false);

View file

@ -91,6 +91,8 @@ struct NodeManagerConfig {
bool fair_queueing_enabled;
/// Whether to enable pinning for plasma objects.
bool object_pinning_enabled;
/// Whether to enable automatic object deletion for object spilling.
bool automatic_object_deletion_enabled;
/// The store socket name.
std::string store_socket_name;
/// The path to the ray temp dir.

View file

@ -85,7 +85,35 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface {
callback(Status(), reply);
}
void DeleteSpilledObjects(
const rpc::DeleteSpilledObjectsRequest &request,
const rpc::ClientCallback<rpc::DeleteSpilledObjectsReply> &callback) override {
rpc::DeleteSpilledObjectsReply reply;
delete_requests.push_back(request);
delete_callbacks.push_back(callback);
}
/// Return the number of deleted urls.
int ReplyDeleteSpilledObjects(Status status = Status::OK()) {
if (delete_callbacks.size() == 0) {
return 0;
}
auto callback = delete_callbacks.front();
auto reply = rpc::DeleteSpilledObjectsReply();
callback(status, reply);
auto &request = delete_requests.front();
int deleted_urls_size = request.spilled_objects_url_size();
delete_callbacks.pop_front();
delete_requests.pop_front();
return deleted_urls_size;
}
std::list<rpc::ClientCallback<rpc::SpillObjectsReply>> callbacks;
std::list<rpc::ClientCallback<rpc::DeleteSpilledObjectsReply>> delete_callbacks;
std::list<rpc::DeleteSpilledObjectsRequest const> delete_requests;
};
class MockIOWorker : public MockWorker {
@ -105,6 +133,8 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
MOCK_METHOD1(PushRestoreWorker, void(const std::shared_ptr<WorkerInterface> &worker));
MOCK_METHOD1(PushDeleteWorker, void(const std::shared_ptr<WorkerInterface> &worker));
void PopSpillWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) override {
callback(io_worker);
@ -115,6 +145,11 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
callback(io_worker);
}
void PopDeleteWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) override {
callback(io_worker);
}
std::shared_ptr<MockIOWorkerClient> io_worker_client =
std::make_shared<MockIOWorkerClient>();
std::shared_ptr<WorkerInterface> io_worker =
@ -197,8 +232,10 @@ class LocalObjectManagerTest : public ::testing::Test {
LocalObjectManagerTest()
: owner_client(std::make_shared<MockWorkerClient>()),
client_pool([&](const rpc::Address &addr) { return owner_client; }),
manager(free_objects_batch_size,
manager(io_service_, free_objects_batch_size,
/*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool,
/*object_pinning_enabled=*/true,
/*automatic_object_delete_enabled=*/true,
[&](const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
freed.insert(object_id);
@ -209,6 +246,12 @@ class LocalObjectManagerTest : public ::testing::Test {
RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}});
}
std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) {
return url + "?" + "num_objects=" + std::to_string(num_objects) +
"&offset=" + std::to_string(offset);
}
boost::asio::io_service io_service_;
size_t free_objects_batch_size = 3;
std::shared_ptr<MockWorkerClient> owner_client;
rpc::CoreWorkerClientPool client_pool;
@ -292,7 +335,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
EXPECT_CALL(worker_pool, PushSpillWorker(_));
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back("url" + std::to_string(i));
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
@ -342,7 +385,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back("url" + std::to_string(i));
urls.push_back(BuildURL("url" + std::to_string(i)));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
@ -394,7 +437,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
// global object directory.
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size() / 2 + 1; i++) {
urls.push_back("url" + std::to_string(i));
urls.push_back(BuildURL("url" + std::to_string(i)));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_));
// Objects should get freed even though we didn't wait for the owner's notice
@ -450,7 +493,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
std::string url = "url";
std::string url = BuildURL("url");
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
@ -460,6 +503,250 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
ASSERT_TRUE(num_callbacks_fired > 0);
}
TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) {
// Make sure the delete queue won't delete any object when there are no spilled objects.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
// Make sure when there is no spilled object, nothing is deleted.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.WaitForObjectFree(owner_address, object_ids);
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(freed.empty());
ASSERT_TRUE(owner_client->ReplyObjectEviction());
}
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, 0);
}
TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
// Make sure spilled objects are deleted when the delete queue is processed.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.WaitForObjectFree(owner_address, object_ids);
// 2 Objects are spilled out of 3.
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size - 1;
for (int i = 0; i < spilled_urls_size; i++) {
object_ids_to_spill.push_back(object_ids[i]);
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// All objects are out of scope now.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(owner_client->ReplyObjectEviction());
}
// // Make sure all spilled objects are deleted.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size());
}
TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) {
// Make sure an url is deleted only when every object stored in that url is deleted
// (when ref_count == 0).
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
// Objects are pinned.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.WaitForObjectFree(owner_address, object_ids);
// Every object is spilled.
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size;
for (int i = 0; i < spilled_urls_size; i++) {
object_ids_to_spill.push_back(object_ids[i]);
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
// Note every object has the same url. It means all objects are fused.
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
// Simulate the situation where there's a single file that contains multiple objects.
urls.push_back(BuildURL("unified_url",
/*offset=*/i,
/*num_objects*/ object_ids_to_spill.size()));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// Everything is evicted except the last object. In this case, ref count is still > 0.
for (size_t i = 0; i < free_objects_batch_size - 1; i++) {
ASSERT_TRUE(owner_client->ReplyObjectEviction());
}
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
// Nothing is deleted yet because the ref count is > 0.
ASSERT_EQ(deleted_urls_size, 0);
// The last reference is deleted.
ASSERT_TRUE(owner_client->ReplyObjectEviction());
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
// Now the object is deleted.
ASSERT_EQ(deleted_urls_size, 1);
}
TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
// Make sure the object delete queue is blocked when there are spilling objects.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
// Objects are pinned.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.WaitForObjectFree(owner_address, object_ids);
// Objects are spilled.
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size;
for (int i = 0; i < spilled_urls_size; i++) {
object_ids_to_spill.push_back(object_ids[i]);
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
// Only 1 object's spilling is done. Everything else is still spilling.
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < 1; i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// Every object has gone out of scope.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(owner_client->ReplyObjectEviction());
}
// // Now, deletion queue would process only the first object. Everything else won't be
// deleted although it is out of scope because they are still spilling.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
// Only the first entry that is already spilled will be deleted.
ASSERT_EQ(deleted_urls_size, 1);
// Now spilling is completely done.
std::vector<std::string> new_urls;
for (size_t i = 1; i < object_ids_to_spill.size(); i++) {
new_urls.push_back(BuildURL("url" + std::to_string(i)));
}
for (size_t i = 1; i < object_ids_to_spill.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// Every object is now deleted.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size() - 1);
}
TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
// Make sure deletion queue can only process upto X entries.
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
// Make sure when there is no spilled object, nothing is deleted.
for (size_t i = 0; i < free_objects_batch_size + 1; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.WaitForObjectFree(owner_address, object_ids);
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size;
for (int i = 0; i < spilled_urls_size; i++) {
object_ids_to_spill.push_back(object_ids[i]);
}
// All the entries are spilled.
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// Every reference has gone out of scope.
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(owner_client->ReplyObjectEviction());
}
// The spilled objects should be deleted as number of spilled urls exceeds the batch
// size.
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, free_objects_batch_size);
}
TEST_F(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill) {
/// Test the case SpillObjectsOfSize(num_bytes_to_spill, min_bytes_to_spill
@ -494,7 +781,7 @@ TEST_F(LocalObjectManagerTest,
// Make sure the spilling is done properly.
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back("url" + std::to_string(i));
urls.push_back(BuildURL("url" + std::to_string(i)));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));

View file

@ -656,6 +656,29 @@ void WorkerPool::PopIOWorkerInternal(
}
}
void WorkerPool::PushDeleteWorker(const std::shared_ptr<WorkerInterface> &worker) {
RAY_CHECK(IsIOWorkerType(worker->GetWorkerType()));
if (worker->GetWorkerType() == rpc::WorkerType::RESTORE_WORKER) {
PushRestoreWorker(worker);
} else {
PushSpillWorker(worker);
}
}
void WorkerPool::PopDeleteWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) {
auto &state = GetStateForLanguage(Language::PYTHON);
// Choose an I/O worker with more idle workers.
size_t num_spill_idle_workers = state.spill_io_worker_state.idle_io_workers.size();
size_t num_restore_idle_workers = state.restore_io_worker_state.idle_io_workers.size();
if (num_restore_idle_workers < num_spill_idle_workers) {
PopSpillWorker(callback);
} else {
PopRestoreWorker(callback);
}
}
void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
// Since the worker is now idle, unset its assigned task ID.
RAY_CHECK(worker->GetAssignedTaskId().IsNil())

View file

@ -72,6 +72,11 @@ class IOWorkerPoolInterface {
virtual void PopRestoreWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) = 0;
virtual void PushDeleteWorker(const std::shared_ptr<WorkerInterface> &worker) = 0;
virtual void PopDeleteWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) = 0;
virtual ~IOWorkerPoolInterface(){};
};
@ -215,6 +220,22 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// \param callback The callback that returns an available restore I/O worker.
void PopRestoreWorker(std::function<void(std::shared_ptr<WorkerInterface>)> callback);
/// Add an idle delete I/O worker to the pool.
///
/// NOTE: There's currently no concept of delete workers or delete worker pools.
/// When deleting objects, it shares the workers within restore or spill worker pools.
/// This method is just a higher level abstraction to hide that implementation detail.
///
/// \param worker The idle I/O worker. It could be either spill or restore I/O worker.
void PushDeleteWorker(const std::shared_ptr<WorkerInterface> &worker);
/// Pop an idle delete I/O worker from the pool and trigger a callback when
/// when delete I/O worker is available.
/// NOTE: There's currently no concept of delete workers or delete worker pools.
/// This method just finds more available I/O workers from either spill or restore pool
/// and pop them out.
void PopDeleteWorker(std::function<void(std::shared_ptr<WorkerInterface>)> callback);
/// Add an idle worker to the pool.
///
/// \param The idle worker to add.

View file

@ -732,6 +732,44 @@ TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) {
ASSERT_EQ(worker_pool_->GetProcessSize(), 2 * MAX_IO_WORKER_SIZE);
}
TEST_P(WorkerPoolTest, DeleteWorkerPushPop) {
/// Make sure delete workers always pop an I/O worker that has more idle worker in their
/// pools.
// 2 spill worker and 1 restore worker.
std::unordered_set<std::shared_ptr<WorkerInterface>> spill_workers;
spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy()));
spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy()));
std::unordered_set<std::shared_ptr<WorkerInterface>> restore_workers;
restore_workers.insert(CreateRestoreWorker(Process::CreateNewDummy()));
for (const auto &worker : spill_workers) {
worker_pool_->PushSpillWorker(worker);
}
for (const auto &worker : restore_workers) {
worker_pool_->PushRestoreWorker(worker);
}
// PopDeleteWorker should pop a spill worker in this case.
worker_pool_->PopDeleteWorker([this](std::shared_ptr<WorkerInterface> worker) {
ASSERT_EQ(worker->GetWorkerType(), rpc::WorkerType::SPILL_WORKER);
worker_pool_->PushDeleteWorker(worker);
});
// Add 2 more restore workers. Now we have 2 spill workers and 3 restore workers.
for (int i = 0; i < 2; i++) {
auto restore_worker = CreateRestoreWorker(Process::CreateNewDummy());
restore_workers.insert(restore_worker);
worker_pool_->PushRestoreWorker(restore_worker);
}
// PopDeleteWorker should pop a spill worker in this case.
worker_pool_->PopDeleteWorker([this](std::shared_ptr<WorkerInterface> worker) {
ASSERT_EQ(worker->GetWorkerType(), rpc::WorkerType::RESTORE_WORKER);
worker_pool_->PushDeleteWorker(worker);
});
}
INSTANTIATE_TEST_CASE_P(WorkerPoolMultiTenancyTest, WorkerPoolTest,
::testing::Values(true, false));

View file

@ -182,6 +182,10 @@ class CoreWorkerClientInterface {
const RestoreSpilledObjectsRequest &request,
const ClientCallback<RestoreSpilledObjectsReply> &callback) {}
virtual void DeleteSpilledObjects(
const DeleteSpilledObjectsRequest &request,
const ClientCallback<DeleteSpilledObjectsReply> &callback) {}
virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request,
const ClientCallback<PlasmaObjectReadyReply> &callback) {
}
@ -245,6 +249,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
VOID_RPC_CLIENT_METHOD(CoreWorkerService, RestoreSpilledObjects, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, DeleteSpilledObjects, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override)

View file

@ -43,6 +43,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \
RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \
RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \
RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \
RPC_SERVICE_HANDLER(CoreWorkerService, Exit)
@ -63,6 +64,7 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit)

View file

@ -327,3 +327,38 @@ std::string CreateCommandLine(const std::vector<std::string> &args,
}
return result;
}
std::shared_ptr<std::unordered_map<std::string, std::string>> ParseURL(std::string url) {
auto result = std::make_shared<std::unordered_map<std::string, std::string>>();
std::string delimiter = "?";
size_t pos = 0;
pos = url.find(delimiter);
if (pos == std::string::npos) {
return result;
}
const std::string base_url = url.substr(0, pos);
result->emplace("url", base_url);
url.erase(0, pos + delimiter.length());
const std::string query_delimeter = "&";
auto parse_key_value_with_equal_delimter = [](std::string key_value) {
// Parse the query key value pair.
const std::string key_value_delimter = "=";
size_t key_value_pos = 0;
key_value_pos = key_value.find(key_value_delimter);
const std::string key = key_value.substr(0, key_value_pos);
return std::make_pair(key, key_value.substr(key.size() + 1));
};
while ((pos = url.find(query_delimeter)) != std::string::npos) {
std::string token = url.substr(0, pos);
auto key_value_pair = parse_key_value_with_equal_delimter(token);
result->emplace(key_value_pair.first, key_value_pair.second);
url.erase(0, pos + delimiter.length());
}
std::string token = url.substr(0, pos);
auto key_value_pair = parse_key_value_with_equal_delimter(token);
result->emplace(key_value_pair.first, key_value_pair.second);
return result;
}

View file

@ -116,6 +116,16 @@ std::string EndpointToUrl(
boost::asio::generic::basic_endpoint<boost::asio::generic::stream_protocol>
ParseUrlEndpoint(const std::string &endpoint, int default_port = 0);
/// Parse the url and return a pair of base_url and query string map.
/// EX) http://abc?num_objects=9&offset=8388878
/// will be returned as
/// {
/// url: http://abc,
/// num_objects: 9,
/// offset: 8388878
/// }
std::shared_ptr<std::unordered_map<std::string, std::string>> ParseURL(std::string url);
class InitShutdownRAII {
public:
/// Type of the Shutdown function.

View file

@ -5,6 +5,7 @@
#include <boost/asio/generic/basic_endpoint.hpp>
#include "gtest/gtest.h"
#include "ray/util/logging.h"
static const char *argv0 = NULL;
@ -87,6 +88,15 @@ TEST(UtilTest, ParseCommandLineTest) {
ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"}));
}
TEST(UtilTest, ParseURLTest) {
const std::string url = "http://abc?num_objects=9&offset=8388878&size=8388878";
auto parsed_url = *ParseURL(url);
ASSERT_EQ(parsed_url["url"], "http://abc");
ASSERT_EQ(parsed_url["num_objects"], "9");
ASSERT_EQ(parsed_url["offset"], "8388878");
ASSERT_EQ(parsed_url["size"], "8388878");
}
TEST(UtilTest, CreateCommandLineTest) {
typedef std::vector<std::string> ArgList;
CommandLineSyntax posix = CommandLineSyntax::POSIX, win32 = CommandLineSyntax::Windows,