diff --git a/BUILD.bazel b/BUILD.bazel index b9c21b71b..4a6343984 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -685,6 +685,7 @@ cc_library( ":ray_util", ":runtime_env_rpc", ":service_based_gcs_client_lib", + ":spill_worker_pool", ":stats_lib", ":worker_rpc", "//src/ray/protobuf:common_cc_proto", diff --git a/src/ray/raylet/io_spill_worker_pool.cpp b/src/ray/raylet/io_spill_worker_pool.cpp new file mode 100644 index 000000000..8270a4687 --- /dev/null +++ b/src/ray/raylet/io_spill_worker_pool.cpp @@ -0,0 +1,95 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/io_spill_worker_pool.h" + +namespace ray { +namespace raylet { + +IOSpillWorkerPool::IOSpillWorkerPool(instrumented_io_context &worker_pooler_executor, + IOWorkerPoolInterface &io_worker_pool) + : executor_(worker_pooler_executor), io_worker_pool_(io_worker_pool) {} + +void IOSpillWorkerPool::StartSpillWorker( + std::vector objects_to_spill, std::vector object_owners, + std::function)> spill_callback) { + executor_.post([this, objects_to_spill = std::move(objects_to_spill), + object_owners = std:: : move(object_owners), + spill_callback = std::move(spill_callback)]() { + StartSpillWorkerImpl(objects_to_spill, object_owners, spill_callback); + }); +} + +void IOSpillWorkerPool::StartDeleteWorker( + std::vector files_to_delete, + std::function delete_callback) { + executor_.post([this, files_to_delete = std::move(files_to_delete), + delete_callback = std::move(delete_callback)]() { + StartDeleteWorkerImpl(files_to_delete, delete_callback); + }); +} + +void IOSpillWorkerPool::StartSpillWorkerImpl( + std::vector objects_to_spill, std::vector object_owners, + std::function)> spill_callback) { + io_worker_pool_.PopSpillWorker( + [this, objects_to_spill = std::move(objects_to_spill), + object_owners = std::move(object_owners), + callback = std::move(spill_callback)](std::shared_ptr io_worker) { + RAY_CHECK(objects_to_spill.size() == object_owners.size()); + + rpc::SpillObjectsRequest request; + for (size_t i = 0; i < objects_to_spill.size(); i++) { + const auto &object_id = objects_to_spill.at(i); + const auto &owner = object_owners.at(i); + RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; + auto ref = request.add_object_refs_to_spill(); + ref->set_object_id(object_id.Binary()); + ref->mutable_owner_address()->CopyFrom(owner); + } + + io_worker->rpc_client()->SpillObjects( + request, [this, objects_to_spill = std::move(objects_to_spill), + callback = std::move(callback), io_worker]( + const ray::Status &status, const rpc::SpillObjectsReply &reply) { + io_worker_pool_.PushSpillWorker(io_worker); + std::vector spilled_urls; + for (int64_t i = 0; i < reply.spilled_objects_url_size(); ++i) { + spilled_urls.push_back(reply.spilled_objects_url(i)); + } + callback(status, std::move(spilled_urls)); + }); + }); +} + +void IOSpillWorkerPool::StartDeleteWorkerImpl( + std::vector files_to_delete, + std::function delete_callback) { + io_worker_pool_.PopDeleteWorker([this, files_to_delete = std::move(files_to_delete)]( + std::shared_ptr io_worker) { + RAY_LOG(DEBUG) << "Sending delete spilled object request. Length: " + << files_to_delete.size(); + rpc::DeleteSpilledObjectsRequest request; + request.add_spilled_objects_url(std::move(files_to_delete)); + io_worker->rpc_client()->DeleteSpilledObjects( + request, [this, io_worker](const ray::Status &status, + const rpc::DeleteSpilledObjectsReply /* unused */) { + io_worker_pool_.PushDeleteWorker(io_worker); + delete_callback(status); + }); + }); +} + +} // namespace raylet +} // namespace ray \ No newline at end of file diff --git a/src/ray/raylet/io_spill_worker_pool.h b/src/ray/raylet/io_spill_worker_pool.h new file mode 100644 index 000000000..7f0a78440 --- /dev/null +++ b/src/ray/raylet/io_spill_worker_pool.h @@ -0,0 +1,52 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/object_manager/plasma/spill_worker_pool.h" +#include "ray/raylet/worker_pool.h" + +namespace ray { +namespace raylet { +class IOSpillWorkerPool : plasma::ISpillWorkerPool { + public: + IOSpillWorkerPool(instrumented_io_context &worker_pooler_executor, + IOWorkerPoolInterface &io_worker_pool); + + void StartSpillWorker( + std::vector objects_to_spill, + std::vector object_owners, + std::function)> spill_callback) override; + + void StartDeleteWorker(std::vector files_to_delete, + std::function delete_callback) override; + + private: + void StartSpillWorkerImpl( + std::vector objects_to_spill, + std::vector object_owners, + std::function)> spill_callback); + + void StartDeleteWorkerImpl( + std::vector objects_to_spill, + std::vector object_owners, + std::function)> spill_callback); + + private: + instrumented_io_context &executor_; + IOWorkerPoolInterface &io_worker_pool_; +}; + +} // namespace raylet +} // namespace ray \ No newline at end of file