This commit is contained in:
scv119 2021-09-05 15:45:59 -07:00
parent ba58f5edb1
commit 87381acba0
4 changed files with 297 additions and 0 deletions

View file

@ -329,6 +329,20 @@ cc_library(
],
)
cc_library(
name = "spill_worker_pool",
hdrs = [
"src/ray/object_manager/plasma/spill_worker_pool.h",
],
copts = PLASMA_COPTS,
linkopts = PLASMA_LINKOPTS,
strip_include_prefix = "src",
deps = [
":ray_common",
"//src/ray/protobuf:common_cc_proto",
],
)
cc_library(
name = "plasma_store_server_lib",
srcs = [
@ -337,6 +351,7 @@ cc_library(
"src/ray/object_manager/plasma/eviction_policy.cc",
"src/ray/object_manager/plasma/get_request_queue.cc",
"src/ray/object_manager/plasma/object_lifecycle_manager.cc",
"src/ray/object_manager/plasma/object_spill_manager.cc",
"src/ray/object_manager/plasma/object_store.cc",
"src/ray/object_manager/plasma/plasma_allocator.cc",
"src/ray/object_manager/plasma/stats_collector.cc",
@ -350,6 +365,7 @@ cc_library(
"src/ray/object_manager/plasma/eviction_policy.h",
"src/ray/object_manager/plasma/get_request_queue.h",
"src/ray/object_manager/plasma/object_lifecycle_manager.h",
"src/ray/object_manager/plasma/object_spill_manager.h",
"src/ray/object_manager/plasma/object_store.h",
"src/ray/object_manager/plasma/plasma_allocator.h",
"src/ray/object_manager/plasma/stats_collector.h",
@ -362,6 +378,7 @@ cc_library(
strip_include_prefix = "src",
deps = [
":plasma_client",
":spill_worker_pool",
],
)

View file

@ -0,0 +1,151 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/object_manager/plasma/object_spill_manager.h"
#include <regex>
namespace plasma {
ObjectSpillManager::ObjectSpillManager(
instrumented_io_context &io_context,
std::unique_ptr<ISpillWorkerPool> spill_worker_pool, int64_t max_active_worker)
: io_context_(io_context),
spill_worker_pool_(std::move(spill_worker_pool)),
kMaxActiveWorker(max_active_worker),
num_active_workers_(0),
spilled_objects_(),
spilled_files_() {}
bool ObjectSpillManager::SubmitSpillTask(
std::vector<ObjectID> objects_to_spill, std::vector<ray::rpc::Address> object_owners,
std::function<void(ray::Status status, std::vector<ObjectID>)> callback) {
if (!CanSubmitSpillTask()) {
return false;
}
num_active_workers_ += 1;
spill_worker_pool_->StartSpillWorker(
objects_to_spill, std::move(object_owners),
[this, objects_to_spill = std::move(objects_to_spill),
callback = std::move(callback)](ray::Status status,
std::vector<std::string> spilled_urls) {
io_context_.post([this, objects_to_spill = std::move(objects_to_spill),
callback = std::move(callback), status,
spilled_urls = std::move(spilled_urls)]() {
OnSpillTaskFinished(std::move(objects_to_spill), status,
std::move(spilled_urls), std::move(callback));
});
});
return true;
}
bool ObjectSpillManager::CanSubmitSpillTask() const {
return num_active_workers_ < kMaxActiveWorker;
}
/// Delete a spilled object from external storage.
void ObjectSpillManager::DeleteSpilledObject(const ObjectID &object_id) {
if (!spilled_objects_.contains(object_id)) {
RAY_LOG(WARNING) << "Deleting an non-existing spilled object " << object_id;
return;
}
auto uri = spilled_objects_.at(object_id);
spilled_objects_.erase(object_id);
RAY_CHECK(spilled_files_.contains(uri.file));
spilled_files_[uri.file]--;
if (spilled_files_[uri.file] > 0) {
return;
}
spilled_files_.erase(uri.file);
DeletedSpilledFile(uri.file);
}
void ObjectSpillManager::DeletedSpilledFile(const std::string &file_to_remove) {
spill_worker_pool_->StartDeleteWorker({file_to_remove}, [](auto status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send delete spilled object request: "
<< status.ToString();
}
});
}
absl::optional<std::string> ObjectSpillManager::GetSpilledUrl(
const ObjectID &object_id) const {
auto it = spilled_objects_.find(object_id);
if (it == spilled_objects_.end()) {
return absl::nullopt;
}
return it->second.ToString();
}
void ObjectSpillManager::OnSpillTaskFinished(
std::vector<ObjectID> objects_to_spill, ray::Status status,
std::vector<std::string> spilled_urls,
std::function<void(ray::Status status, std::vector<ObjectID>)>
task_finished_callback) {
num_active_workers_ -= 1;
if (!status.ok()) {
task_finished_callback(status, {});
return;
}
std::vector<ObjectID> spilled_objects;
for (size_t i = 0; i < spilled_urls.size(); ++i) {
auto uri = ParseSpilledURI(spilled_urls.at(i));
if (!uri.has_value()) {
RAY_LOG(WARNING) << "bad";
continue;
}
auto &object_id = objects_to_spill.at(i);
if (spilled_objects_.contains(object_id)) {
RAY_LOG(WARNING) << "bad";
continue;
}
spilled_objects_[object_id] = uri.value();
spilled_files_[uri->file]++;
spilled_objects.push_back(object_id);
}
task_finished_callback(status, std::move(spilled_objects));
}
absl::optional<ObjectSpillManager::SpilledObjectURI> ObjectSpillManager::ParseSpilledURI(
const std::string &object_url) {
static const std::regex kObjectURLPattern("^(.*)\\?offset=(\\d+)&size=(\\d+)$");
std::smatch match_groups;
if (!std::regex_match(object_url, match_groups, kObjectURLPattern) ||
match_groups.size() != 4) {
return absl::nullopt;
}
SpilledObjectURI uri;
uri.file = match_groups[1].str();
try {
uri.offset = std::stoll(match_groups[2].str());
uri.size = std::stoll(match_groups[3].str());
if (uri.offset < 0 || uri.size < 0) {
RAY_LOG(ERROR) << "Offset and size can't be negative. offset: " << uri.offset
<< ", size: " << uri.size;
return absl::nullopt;
}
} catch (...) {
RAY_LOG(ERROR) << "Failed to parse offset: " << match_groups[2].str()
<< " and size: " << match_groups[3].str();
return absl::nullopt;
}
return std::move(uri);
}
} // namespace plasma

View file

@ -0,0 +1,92 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/id.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/spill_worker_pool.h"
#include "src/ray/protobuf/common.pb.h"
namespace plasma {
class ObjectSpillManager {
public:
ObjectSpillManager(instrumented_io_context &callback_executor,
std::unique_ptr<ISpillWorkerPool> spill_worker_pool,
int64_t max_active_worker);
/// Spill a list of objects in an asynchronous task.
///
/// \param objects_to_spill List of objects to spill.
/// \param task_finished_callback The callback once the async spill task finishes.
/// - If the spill task succeed it returns Status::OK() and a map from object_id
/// to the spilled location.
/// - If the spill task failed, it returns failure Status and a map from object_id
/// to empty strings.
/// \return wether the spill task is successfully scheduled.
/// - It returns true if the spill task is successfuly scheduled,
/// where the callback is guarantee to be called.
/// - Otherwise it returns false and the callback will not be called
bool SubmitSpillTask(std::vector<ObjectID> objects_to_spill,
std::vector<ray::rpc::Address> object_owners,
std::function<void(ray::Status status, std::vector<ObjectID>)>
task_finished_callback);
/// Wether we can submit another spill task.
bool CanSubmitSpillTask() const;
/// Delete a spilled object from external storage.
void DeleteSpilledObject(const ObjectID &object_id);
absl::optional<std::string> GetSpilledUrl(const ObjectID &object_id) const;
private:
struct SpilledObjectURI {
std::string file;
int64_t offset;
int64_t size;
std::string ToString() const {
return absl::StrFormat("%s?offset=%d&size=%d", file, offset, size);
}
};
void OnSpillTaskFinished(std::vector<ObjectID> objects_to_spill, ray::Status status,
std::vector<std::string> spilled_urls,
std::function<void(ray::Status status, std::vector<ObjectID>)>
task_finished_callback);
absl::optional<SpilledObjectURI> ParseSpilledURI(const std::string &s);
void DeletedSpilledFile(const std::string &spilled_file);
private:
instrumented_io_context &io_context_;
std::unique_ptr<ISpillWorkerPool> spill_worker_pool_;
const int64_t kMaxActiveWorker;
int64_t num_active_workers_;
absl::flat_hash_map<ObjectID, SpilledObjectURI> spilled_objects_;
absl::flat_hash_map<std::string, int64_t> spilled_files_;
};
} // namespace plasma

View file

@ -0,0 +1,37 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <string>
#include <vector>
#include "ray/common/id.h"
#include "src/ray/protobuf/common.pb.h"
namespace plasma {
class ISpillWorkerPool {
public:
virtual ~ISpillWorkerPool() = default;
virtual void StartSpillWorker(
std::vector<ObjectID> objects_to_spill,
std::vector<ray::rpc::Address> object_owners,
std::function<void(ray::Status, std::vector<std::string>)> spill_callback) = 0;
virtual void StartDeleteWorker(std::vector<std::string> files_to_delete,
std::function<void(ray::Status)> delete_callback) = 0;
};
} // namespace plasma