[Object Spilling] Unpin before updating URL (#17994)

* Unpin before updating URL

* Remove unnecessary logs.

* update compiling issue

* Check the consistent local state instead of stale information from obod.

* Fix the test

* Addressed code review.
This commit is contained in:
SangBin Cho 2021-08-26 10:23:53 -07:00 committed by GitHub
parent ea4f54f8ef
commit 405418f8e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 122 additions and 101 deletions

View file

@ -124,7 +124,7 @@ ObjectManager::ObjectManager(
pull_manager_.reset(new PullManager(self_node_id_, object_is_local, send_pull_request,
cancel_pull_request, restore_spilled_object_,
get_time, config.pull_timeout_ms, available_memory,
pin_object));
pin_object, get_spilled_object_url));
// Start object manager rpc server and send & receive request threads
StartRpcService();
}

View file

@ -25,7 +25,8 @@ PullManager::PullManager(
const RestoreSpilledObjectCallback restore_spilled_object,
const std::function<double()> get_time, int pull_timeout_ms,
int64_t num_bytes_available,
std::function<std::unique_ptr<RayObject>(const ObjectID &)> pin_object)
std::function<std::unique_ptr<RayObject>(const ObjectID &)> pin_object,
std::function<std::string(const ObjectID &)> get_locally_spilled_object_url)
: self_node_id_(self_node_id),
object_is_local_(object_is_local),
send_pull_request_(send_pull_request),
@ -35,6 +36,7 @@ PullManager::PullManager(
pull_timeout_ms_(pull_timeout_ms),
num_bytes_available_(num_bytes_available),
pin_object_(pin_object),
get_locally_spilled_object_url_(get_locally_spilled_object_url),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {}
uint64_t PullManager::Pull(const std::vector<rpc::ObjectReference> &object_ref_bundle,
@ -459,9 +461,12 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) {
}
// If we can restore directly from this raylet, then try to do so.
std::string spilled_url = get_locally_spilled_object_url_(object_id);
bool can_restore_directly =
!request.spilled_url.empty() &&
(request.spilled_node_id.IsNil() || request.spilled_node_id == self_node_id_);
!spilled_url.empty() || // If the object is spilled locally
(!request.spilled_url.empty() &&
request.spilled_node_id
.IsNil()); // Or if the object is spilled on external storages.
if (can_restore_directly) {
UpdateRetryTimer(request, object_id);
restore_spilled_object_(object_id, request.spilled_url,

View file

@ -66,7 +66,8 @@ class PullManager {
const RestoreSpilledObjectCallback restore_spilled_object,
const std::function<double()> get_time, int pull_timeout_ms,
int64_t num_bytes_available,
std::function<std::unique_ptr<RayObject>(const ObjectID &object_id)> pin_object);
std::function<std::unique_ptr<RayObject>(const ObjectID &object_id)> pin_object,
std::function<std::string(const ObjectID &)> get_locally_spilled_object_url);
/// Add a new pull request for a bundle of objects. The objects in the
/// request will get pulled once:
@ -358,6 +359,10 @@ class PullManager {
/// The total size of pinned objects.
int64_t pinned_objects_size_ = 0;
// A callback to get the spilled object URL if the object is spilled locally.
// It will return an empty string otherwise.
std::function<std::string(const ObjectID &)> get_locally_spilled_object_url_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
int64_t max_timeout_ = 0;

View file

@ -43,7 +43,10 @@ class PullManagerTestWithCapacity {
restore_object_callback_ = callback;
},
[this]() { return fake_time_; }, 10000, num_available_bytes,
[this](const ObjectID &object_id) { return PinReturn(); }) {}
[this](const ObjectID &object_id) { return PinReturn(); },
[this](const ObjectID &object_id) {
return GetLocalSpilledObjectURL(object_id);
}) {}
void AssertNoLeaks() {
ASSERT_TRUE(pull_manager_.get_request_bundles_.empty());
@ -70,6 +73,10 @@ class PullManagerTestWithCapacity {
}
}
std::string GetLocalSpilledObjectURL(const ObjectID &oid) { return spilled_url_[oid]; }
void ObjectSpilled(const ObjectID &oid, std::string url) { spilled_url_[oid] = url; }
NodeID self_node_id_;
bool object_is_local_;
bool allow_pin_ = false;
@ -79,6 +86,7 @@ class PullManagerTestWithCapacity {
double fake_time_;
PullManager pull_manager_;
std::unordered_map<ObjectID, int> num_abort_calls_;
std::unordered_map<ObjectID, std::string> spilled_url_;
};
class PullManagerTest : public PullManagerTestWithCapacity,
@ -187,6 +195,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) {
NodeID node_that_object_spilled = NodeID::FromRandom();
fake_time_ += 10.;
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
node_that_object_spilled, 0);
@ -195,6 +204,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) {
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
// No retry yet.
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
node_that_object_spilled, 0);
ASSERT_EQ(num_send_pull_request_calls_, 1);
@ -203,6 +213,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) {
// The call can be retried after a delay.
client_ids.insert(node_that_object_spilled);
fake_time_ += 10.;
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
node_that_object_spilled, 0);
ASSERT_EQ(num_send_pull_request_calls_, 2);
@ -210,6 +221,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) {
// Don't restore an object if it's local.
object_is_local_ = true;
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
NodeID::FromRandom(), 0);
ASSERT_EQ(num_send_pull_request_calls_, 2);
@ -245,6 +257,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) {
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
fake_time_ += 10.;
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_,
0);
@ -253,6 +266,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) {
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
// No retry yet.
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_,
0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
@ -260,6 +274,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) {
// The call can be retried after a delay.
fake_time_ += 10.;
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_,
0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
@ -297,6 +312,7 @@ TEST_P(PullManagerTest, TestLoadBalancingRestorationRequest) {
const auto remote_node_that_spilled_object = NodeID::FromRandom();
client_ids.insert(copy_node1);
client_ids.insert(copy_node2);
ObjectSpilled(obj1, "remote_url/foo/bar");
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
remote_node_that_spilled_object, 0);

View file

@ -261,6 +261,7 @@ void LocalObjectManager::SpillObjectsInternal(
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
pinned_objects_size_ += it->second.first->GetSize();
num_bytes_pending_spill_ -= it->second.first->GetSize();
pinned_objects_.emplace(object_id, std::move(it->second));
objects_pending_spill_.erase(it);
}
@ -268,45 +269,18 @@ void LocalObjectManager::SpillObjectsInternal(
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send object spilling request: "
<< status.ToString();
} else {
OnObjectSpilled(objects_to_spill, r);
}
if (callback) {
callback(status);
}
} else {
AddSpilledUrls(objects_to_spill, r, callback);
}
});
});
}
void LocalObjectManager::UnpinSpilledObjectCallback(
const ObjectID &object_id, const std::string &object_url,
std::shared_ptr<size_t> num_remaining,
std::function<void(const ray::Status &)> callback, ray::Status status) {
if (!status.ok()) {
RAY_LOG(DEBUG) << "Failed to send spilled url for object " << object_id
<< " to object directory, considering the object to have been freed: "
<< status.ToString();
} else {
RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url
<< " and object directory has been informed";
}
RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id;
// Unpin the object.
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
num_bytes_pending_spill_ -= it->second.first->GetSize();
objects_pending_spill_.erase(it);
(*num_remaining)--;
if (*num_remaining == 0 && callback) {
callback(status);
}
}
void LocalObjectManager::AddSpilledUrls(
const std::vector<ObjectID> &object_ids, const rpc::SpillObjectsReply &worker_reply,
std::function<void(const ray::Status &)> callback) {
auto num_remaining = std::make_shared<size_t>(object_ids.size());
void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids,
const rpc::SpillObjectsReply &worker_reply) {
for (size_t i = 0; i < static_cast<size_t>(worker_reply.spilled_objects_url_size());
++i) {
const ObjectID &object_id = object_ids[i];
@ -317,17 +291,6 @@ void LocalObjectManager::AddSpilledUrls(
const auto node_id_object_spilled =
is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil();
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
// There are times that restore request comes before the url is added to the object
// directory. By adding the spilled url "before" adding it to the object directory, we
// can process the restore request before object directory replies.
spilled_objects_url_.emplace(object_id, object_url);
auto unpin_callback =
std::bind(&LocalObjectManager::UnpinSpilledObjectCallback, this, object_id,
object_url, num_remaining, callback, std::placeholders::_1);
// Update the object_id -> url_ref_count to use it for deletion later.
// We need to track the references here because a single file can contain
// multiple objects, and we shouldn't delete the file until
@ -342,26 +305,50 @@ void LocalObjectManager::AddSpilledUrls(
url_ref_count_[base_url_it->second] += 1;
}
// TODO(Clark): Don't send RPC to owner if we're fulfilling an owner-initiated
// spill RPC.
// Mark that the object is spilled and unpin the pending requests.
spilled_objects_url_.emplace(object_id, object_url);
RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id;
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
const auto object_size = it->second.first->GetSize();
const auto worker_addr = it->second.second;
num_bytes_pending_spill_ -= object_size;
objects_pending_spill_.erase(it);
// Asynchronously Update the spilled URL.
rpc::AddSpilledUrlRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(object_url);
request.set_spilled_node_id(node_id_object_spilled.Binary());
request.set_size(it->second.first->GetSize());
request.set_size(object_size);
auto owner_client = owner_client_pool_.GetOrConnect(it->second.second);
auto owner_client = owner_client_pool_.GetOrConnect(worker_addr);
RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id
<< " to owner " << WorkerID::FromBinary(it->second.second.worker_id());
// Send spilled URL, spilled node ID, and object size to owner.
<< " to owner " << WorkerID::FromBinary(worker_addr.worker_id());
owner_client->AddSpilledUrl(
request, [unpin_callback](Status status, const rpc::AddSpilledUrlReply &reply) {
unpin_callback(status);
request,
[object_id, object_url](Status status, const rpc::AddSpilledUrlReply &reply) {
// TODO(sang): Currently we assume there's no network failure. We should handle
// it properly.
if (!status.ok()) {
RAY_LOG(DEBUG)
<< "Failed to send spilled url for object " << object_id
<< " to object directory, considering the object to have been freed: "
<< status.ToString();
} else {
RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url
<< " and object directory has been informed";
}
});
}
}
std::string LocalObjectManager::GetSpilledObjectURL(const ObjectID &object_id) {
std::string LocalObjectManager::GetLocalSpilledObjectURL(const ObjectID &object_id) {
if (!is_external_storage_type_fs_) {
// If the external storage is cloud storage like S3, returns the empty string.
// In that case, the URL is supposed to be obtained by OBOD.
return "";
}
auto entry = spilled_objects_url_.find(object_id);
if (entry != spilled_objects_url_.end()) {
return entry->second;
@ -427,7 +414,6 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) {
std::vector<std::string> object_urls_to_delete;
// Process upto batch size of objects to delete.
while (!spilled_object_pending_delete_.empty() &&
object_urls_to_delete.size() < max_batch_size) {

View file

@ -140,8 +140,11 @@ class LocalObjectManager {
/// Record object spilling stats to metrics.
void RecordObjectSpillingStats() const;
/// Return the spilled object URL or the empty string.
std::string GetSpilledObjectURL(const ObjectID &object_id);
/// Return the spilled object URL if the object is spilled locally,
/// or the empty string otherwise.
/// If the external storage is cloud, this will always return an empty string.
/// In that case, the URL is supposed to be obtained by the object directory.
std::string GetLocalSpilledObjectURL(const ObjectID &object_id);
std::string DebugString() const;
@ -168,19 +171,13 @@ class LocalObjectManager {
/// Release an object that has been freed by its owner.
void ReleaseFreedObject(const ObjectID &object_id);
// A callback for unpinning spilled objects. This should be invoked after the object
// has been spilled and after the object directory has been sent the spilled URL.
void UnpinSpilledObjectCallback(const ObjectID &object_id,
const std::string &object_url,
std::shared_ptr<size_t> num_remaining,
std::function<void(const ray::Status &)> callback,
ray::Status status);
/// Add objects' spilled URLs to the global object directory. Call the
/// callback once all URLs have been added.
void AddSpilledUrls(const std::vector<ObjectID> &object_ids,
const rpc::SpillObjectsReply &worker_reply,
std::function<void(const ray::Status &)> callback);
/// Do operations that are needed after spilling objects such as
/// 1. Unpin the pending spilling object.
/// 2. Update the spilled URL to the owner.
/// 3. Update the spilled URL to the local directory if it doesn't
/// use the external storages like S3.
void OnObjectSpilled(const std::vector<ObjectID> &object_ids,
const rpc::SpillObjectsReply &worker_reply);
/// Delete spilled objects stored in given urls.
///

View file

@ -210,7 +210,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
},
/*get_spilled_object_url=*/
[this](const ObjectID &object_id) {
return GetLocalObjectManager().GetSpilledObjectURL(object_id);
return GetLocalObjectManager().GetLocalSpilledObjectURL(object_id);
},
/*spill_objects_callback=*/
[this]() {

View file

@ -404,14 +404,15 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(manager.GetSpilledObjectURL(object_ids[i]).empty());
ASSERT_TRUE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty());
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_FALSE(manager.GetSpilledObjectURL(object_ids[i]).empty());
ASSERT_FALSE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty());
}
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
@ -821,9 +822,9 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) {
for (size_t i = 0; i < free_objects_batch_size; i++) {
if (i < urls.size()) {
ASSERT_EQ(urls[i], manager.GetSpilledObjectURL(object_ids[i]));
ASSERT_EQ(urls[i], manager.GetLocalSpilledObjectURL(object_ids[i]));
} else {
ASSERT_TRUE(manager.GetSpilledObjectURL(object_ids[i]).empty());
ASSERT_TRUE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty());
}
}
}
@ -972,9 +973,10 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
size_t spilled_urls_size = 2;
// Objects are pinned.
for (size_t i = 0; i < free_objects_batch_size; i++) {
for (size_t i = 0; i < spilled_urls_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
@ -986,48 +988,58 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
manager.WaitForObjectFree(owner_address, object_ids);
// Objects are spilled.
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size;
for (int i = 0; i < spilled_urls_size; i++) {
object_ids_to_spill.push_back(object_ids[i]);
std::vector<ObjectID> spill_set_1;
std::vector<ObjectID> spill_set_2;
size_t spill_set_1_size = spilled_urls_size / 2;
size_t spill_set_2_size = spilled_urls_size - spill_set_1_size;
for (size_t i = 0; i < spill_set_1_size; i++) {
spill_set_1.push_back(object_ids[i]);
}
manager.SpillObjects(object_ids_to_spill,
for (size_t i = spill_set_1_size; i < spilled_urls_size; i++) {
spill_set_2.push_back(object_ids[i]);
}
manager.SpillObjects(spill_set_1,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
manager.SpillObjects(spill_set_2,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
// Only 1 object's spilling is done. Everything else is still spilling.
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
std::vector<std::string> urls_spill_set_1;
std::vector<std::string> urls_spill_set_2;
for (size_t i = 0; i < spill_set_1_size; i++) {
urls_spill_set_1.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < 1; i++) {
for (size_t i = spill_set_1_size; i < spilled_urls_size; i++) {
urls_spill_set_2.push_back(BuildURL("url" + std::to_string(i)));
}
// Spillset 1 objects are spilled.
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_1));
for (size_t i = 0; i < spill_set_1_size; i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}
// Every object has gone out of scope.
for (size_t i = 0; i < free_objects_batch_size; i++) {
for (size_t i = 0; i < spilled_urls_size; i++) {
EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary()));
ASSERT_TRUE(subscriber_->PublishObjectEviction());
}
// // Now, deletion queue would process only the first object. Everything else won't be
// Now, deletion queue would process only the first spill set. Everything else won't be
// deleted although it is out of scope because they are still spilling.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
// Only the first entry that is already spilled will be deleted.
ASSERT_EQ(deleted_urls_size, 1);
ASSERT_EQ(deleted_urls_size, spill_set_1_size);
// Now spilling is completely done.
std::vector<std::string> new_urls;
for (size_t i = 1; i < object_ids_to_spill.size(); i++) {
new_urls.push_back(BuildURL("url" + std::to_string(i)));
}
for (size_t i = 1; i < object_ids_to_spill.size(); i++) {
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_2));
for (size_t i = 0; i < spill_set_2_size; i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}
// Every object is now deleted.
manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30);
deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects();
ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size() - 1);
ASSERT_EQ(deleted_urls_size, spill_set_2_size);
}
TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {