From 405418f8e87ccbfa780cf98015dc7fa0dfa6fc76 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 26 Aug 2021 10:23:53 -0700 Subject: [PATCH] [Object Spilling] Unpin before updating URL (#17994) * Unpin before updating URL * Remove unnecessary logs. * update compiling issue * Check the consistent local state instead of stale information from obod. * Fix the test * Addressed code review. --- src/ray/object_manager/object_manager.cc | 2 +- src/ray/object_manager/pull_manager.cc | 11 ++- src/ray/object_manager/pull_manager.h | 7 +- .../object_manager/test/pull_manager_test.cc | 18 +++- src/ray/raylet/local_object_manager.cc | 94 ++++++++----------- src/ray/raylet/local_object_manager.h | 27 +++--- src/ray/raylet/node_manager.cc | 2 +- .../raylet/test/local_object_manager_test.cc | 62 +++++++----- 8 files changed, 122 insertions(+), 101 deletions(-) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index e45485174..90156e935 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -124,7 +124,7 @@ ObjectManager::ObjectManager( pull_manager_.reset(new PullManager(self_node_id_, object_is_local, send_pull_request, cancel_pull_request, restore_spilled_object_, get_time, config.pull_timeout_ms, available_memory, - pin_object)); + pin_object, get_spilled_object_url)); // Start object manager rpc server and send & receive request threads StartRpcService(); } diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 34678ea5a..707801f99 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -25,7 +25,8 @@ PullManager::PullManager( const RestoreSpilledObjectCallback restore_spilled_object, const std::function get_time, int pull_timeout_ms, int64_t num_bytes_available, - std::function(const ObjectID &)> pin_object) + std::function(const ObjectID &)> pin_object, + std::function get_locally_spilled_object_url) : self_node_id_(self_node_id), object_is_local_(object_is_local), send_pull_request_(send_pull_request), @@ -35,6 +36,7 @@ PullManager::PullManager( pull_timeout_ms_(pull_timeout_ms), num_bytes_available_(num_bytes_available), pin_object_(pin_object), + get_locally_spilled_object_url_(get_locally_spilled_object_url), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {} uint64_t PullManager::Pull(const std::vector &object_ref_bundle, @@ -459,9 +461,12 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) { } // If we can restore directly from this raylet, then try to do so. + std::string spilled_url = get_locally_spilled_object_url_(object_id); bool can_restore_directly = - !request.spilled_url.empty() && - (request.spilled_node_id.IsNil() || request.spilled_node_id == self_node_id_); + !spilled_url.empty() || // If the object is spilled locally + (!request.spilled_url.empty() && + request.spilled_node_id + .IsNil()); // Or if the object is spilled on external storages. if (can_restore_directly) { UpdateRetryTimer(request, object_id); restore_spilled_object_(object_id, request.spilled_url, diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index cb889a859..9b7f20c14 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -66,7 +66,8 @@ class PullManager { const RestoreSpilledObjectCallback restore_spilled_object, const std::function get_time, int pull_timeout_ms, int64_t num_bytes_available, - std::function(const ObjectID &object_id)> pin_object); + std::function(const ObjectID &object_id)> pin_object, + std::function get_locally_spilled_object_url); /// Add a new pull request for a bundle of objects. The objects in the /// request will get pulled once: @@ -358,6 +359,10 @@ class PullManager { /// The total size of pinned objects. int64_t pinned_objects_size_ = 0; + // A callback to get the spilled object URL if the object is spilled locally. + // It will return an empty string otherwise. + std::function get_locally_spilled_object_url_; + /// Internally maintained random number generator. std::mt19937_64 gen_; int64_t max_timeout_ = 0; diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index ba522845e..b3610350b 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -43,7 +43,10 @@ class PullManagerTestWithCapacity { restore_object_callback_ = callback; }, [this]() { return fake_time_; }, 10000, num_available_bytes, - [this](const ObjectID &object_id) { return PinReturn(); }) {} + [this](const ObjectID &object_id) { return PinReturn(); }, + [this](const ObjectID &object_id) { + return GetLocalSpilledObjectURL(object_id); + }) {} void AssertNoLeaks() { ASSERT_TRUE(pull_manager_.get_request_bundles_.empty()); @@ -70,6 +73,10 @@ class PullManagerTestWithCapacity { } } + std::string GetLocalSpilledObjectURL(const ObjectID &oid) { return spilled_url_[oid]; } + + void ObjectSpilled(const ObjectID &oid, std::string url) { spilled_url_[oid] = url; } + NodeID self_node_id_; bool object_is_local_; bool allow_pin_ = false; @@ -79,6 +86,7 @@ class PullManagerTestWithCapacity { double fake_time_; PullManager pull_manager_; std::unordered_map num_abort_calls_; + std::unordered_map spilled_url_; }; class PullManagerTest : public PullManagerTestWithCapacity, @@ -187,6 +195,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) { NodeID node_that_object_spilled = NodeID::FromRandom(); fake_time_ += 10.; + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", node_that_object_spilled, 0); @@ -195,6 +204,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) { ASSERT_EQ(num_restore_spilled_object_calls_, 0); // No retry yet. + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", node_that_object_spilled, 0); ASSERT_EQ(num_send_pull_request_calls_, 1); @@ -203,6 +213,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) { // The call can be retried after a delay. client_ids.insert(node_that_object_spilled); fake_time_ += 10.; + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", node_that_object_spilled, 0); ASSERT_EQ(num_send_pull_request_calls_, 2); @@ -210,6 +221,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectRemote) { // Don't restore an object if it's local. object_is_local_ = true; + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", NodeID::FromRandom(), 0); ASSERT_EQ(num_send_pull_request_calls_, 2); @@ -245,6 +257,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) { ASSERT_EQ(num_restore_spilled_object_calls_, 0); fake_time_ += 10.; + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_, 0); @@ -253,6 +266,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) { ASSERT_EQ(num_restore_spilled_object_calls_, 1); // No retry yet. + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_, 0); ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -260,6 +274,7 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) { // The call can be retried after a delay. fake_time_ += 10.; + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_, 0); ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -297,6 +312,7 @@ TEST_P(PullManagerTest, TestLoadBalancingRestorationRequest) { const auto remote_node_that_spilled_object = NodeID::FromRandom(); client_ids.insert(copy_node1); client_ids.insert(copy_node2); + ObjectSpilled(obj1, "remote_url/foo/bar"); pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", remote_node_that_spilled_object, 0); diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index c8c930a22..8a3cd3105 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -261,6 +261,7 @@ void LocalObjectManager::SpillObjectsInternal( auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); pinned_objects_size_ += it->second.first->GetSize(); + num_bytes_pending_spill_ -= it->second.first->GetSize(); pinned_objects_.emplace(object_id, std::move(it->second)); objects_pending_spill_.erase(it); } @@ -268,45 +269,18 @@ void LocalObjectManager::SpillObjectsInternal( if (!status.ok()) { RAY_LOG(ERROR) << "Failed to send object spilling request: " << status.ToString(); - if (callback) { - callback(status); - } } else { - AddSpilledUrls(objects_to_spill, r, callback); + OnObjectSpilled(objects_to_spill, r); + } + if (callback) { + callback(status); } }); }); } -void LocalObjectManager::UnpinSpilledObjectCallback( - const ObjectID &object_id, const std::string &object_url, - std::shared_ptr num_remaining, - std::function callback, ray::Status status) { - if (!status.ok()) { - RAY_LOG(DEBUG) << "Failed to send spilled url for object " << object_id - << " to object directory, considering the object to have been freed: " - << status.ToString(); - } else { - RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url - << " and object directory has been informed"; - } - RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id; - // Unpin the object. - auto it = objects_pending_spill_.find(object_id); - RAY_CHECK(it != objects_pending_spill_.end()); - num_bytes_pending_spill_ -= it->second.first->GetSize(); - objects_pending_spill_.erase(it); - - (*num_remaining)--; - if (*num_remaining == 0 && callback) { - callback(status); - } -} - -void LocalObjectManager::AddSpilledUrls( - const std::vector &object_ids, const rpc::SpillObjectsReply &worker_reply, - std::function callback) { - auto num_remaining = std::make_shared(object_ids.size()); +void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids, + const rpc::SpillObjectsReply &worker_reply) { for (size_t i = 0; i < static_cast(worker_reply.spilled_objects_url_size()); ++i) { const ObjectID &object_id = object_ids[i]; @@ -317,17 +291,6 @@ void LocalObjectManager::AddSpilledUrls( const auto node_id_object_spilled = is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil(); - auto it = objects_pending_spill_.find(object_id); - RAY_CHECK(it != objects_pending_spill_.end()); - - // There are times that restore request comes before the url is added to the object - // directory. By adding the spilled url "before" adding it to the object directory, we - // can process the restore request before object directory replies. - spilled_objects_url_.emplace(object_id, object_url); - auto unpin_callback = - std::bind(&LocalObjectManager::UnpinSpilledObjectCallback, this, object_id, - object_url, num_remaining, callback, std::placeholders::_1); - // Update the object_id -> url_ref_count to use it for deletion later. // We need to track the references here because a single file can contain // multiple objects, and we shouldn't delete the file until @@ -342,26 +305,50 @@ void LocalObjectManager::AddSpilledUrls( url_ref_count_[base_url_it->second] += 1; } - // TODO(Clark): Don't send RPC to owner if we're fulfilling an owner-initiated - // spill RPC. + // Mark that the object is spilled and unpin the pending requests. + spilled_objects_url_.emplace(object_id, object_url); + RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id; + auto it = objects_pending_spill_.find(object_id); + RAY_CHECK(it != objects_pending_spill_.end()); + const auto object_size = it->second.first->GetSize(); + const auto worker_addr = it->second.second; + num_bytes_pending_spill_ -= object_size; + objects_pending_spill_.erase(it); + + // Asynchronously Update the spilled URL. rpc::AddSpilledUrlRequest request; request.set_object_id(object_id.Binary()); request.set_spilled_url(object_url); request.set_spilled_node_id(node_id_object_spilled.Binary()); - request.set_size(it->second.first->GetSize()); + request.set_size(object_size); - auto owner_client = owner_client_pool_.GetOrConnect(it->second.second); + auto owner_client = owner_client_pool_.GetOrConnect(worker_addr); RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id - << " to owner " << WorkerID::FromBinary(it->second.second.worker_id()); - // Send spilled URL, spilled node ID, and object size to owner. + << " to owner " << WorkerID::FromBinary(worker_addr.worker_id()); owner_client->AddSpilledUrl( - request, [unpin_callback](Status status, const rpc::AddSpilledUrlReply &reply) { - unpin_callback(status); + request, + [object_id, object_url](Status status, const rpc::AddSpilledUrlReply &reply) { + // TODO(sang): Currently we assume there's no network failure. We should handle + // it properly. + if (!status.ok()) { + RAY_LOG(DEBUG) + << "Failed to send spilled url for object " << object_id + << " to object directory, considering the object to have been freed: " + << status.ToString(); + } else { + RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url + << " and object directory has been informed"; + } }); } } -std::string LocalObjectManager::GetSpilledObjectURL(const ObjectID &object_id) { +std::string LocalObjectManager::GetLocalSpilledObjectURL(const ObjectID &object_id) { + if (!is_external_storage_type_fs_) { + // If the external storage is cloud storage like S3, returns the empty string. + // In that case, the URL is supposed to be obtained by OBOD. + return ""; + } auto entry = spilled_objects_url_.find(object_id); if (entry != spilled_objects_url_.end()) { return entry->second; @@ -427,7 +414,6 @@ void LocalObjectManager::AsyncRestoreSpilledObject( void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) { std::vector object_urls_to_delete; - // Process upto batch size of objects to delete. while (!spilled_object_pending_delete_.empty() && object_urls_to_delete.size() < max_batch_size) { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 7fded067a..83c2b5612 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -140,8 +140,11 @@ class LocalObjectManager { /// Record object spilling stats to metrics. void RecordObjectSpillingStats() const; - /// Return the spilled object URL or the empty string. - std::string GetSpilledObjectURL(const ObjectID &object_id); + /// Return the spilled object URL if the object is spilled locally, + /// or the empty string otherwise. + /// If the external storage is cloud, this will always return an empty string. + /// In that case, the URL is supposed to be obtained by the object directory. + std::string GetLocalSpilledObjectURL(const ObjectID &object_id); std::string DebugString() const; @@ -168,19 +171,13 @@ class LocalObjectManager { /// Release an object that has been freed by its owner. void ReleaseFreedObject(const ObjectID &object_id); - // A callback for unpinning spilled objects. This should be invoked after the object - // has been spilled and after the object directory has been sent the spilled URL. - void UnpinSpilledObjectCallback(const ObjectID &object_id, - const std::string &object_url, - std::shared_ptr num_remaining, - std::function callback, - ray::Status status); - - /// Add objects' spilled URLs to the global object directory. Call the - /// callback once all URLs have been added. - void AddSpilledUrls(const std::vector &object_ids, - const rpc::SpillObjectsReply &worker_reply, - std::function callback); + /// Do operations that are needed after spilling objects such as + /// 1. Unpin the pending spilling object. + /// 2. Update the spilled URL to the owner. + /// 3. Update the spilled URL to the local directory if it doesn't + /// use the external storages like S3. + void OnObjectSpilled(const std::vector &object_ids, + const rpc::SpillObjectsReply &worker_reply); /// Delete spilled objects stored in given urls. /// diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c5cc84511..e791a992c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -210,7 +210,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self }, /*get_spilled_object_url=*/ [this](const ObjectID &object_id) { - return GetLocalObjectManager().GetSpilledObjectURL(object_id); + return GetLocalObjectManager().GetLocalSpilledObjectURL(object_id); }, /*spill_objects_callback=*/ [this]() { diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 7f27aa614..0fba68bd9 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -404,14 +404,15 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + std::vector urls; for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(manager.GetSpilledObjectURL(object_ids[i]).empty()); + ASSERT_TRUE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty()); urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_FALSE(manager.GetSpilledObjectURL(object_ids[i]).empty()); + ASSERT_FALSE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty()); } for (size_t i = 0; i < object_ids.size(); i++) { ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); @@ -821,9 +822,9 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) { for (size_t i = 0; i < free_objects_batch_size; i++) { if (i < urls.size()) { - ASSERT_EQ(urls[i], manager.GetSpilledObjectURL(object_ids[i])); + ASSERT_EQ(urls[i], manager.GetLocalSpilledObjectURL(object_ids[i])); } else { - ASSERT_TRUE(manager.GetSpilledObjectURL(object_ids[i]).empty()); + ASSERT_TRUE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty()); } } } @@ -972,9 +973,10 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { owner_address.set_worker_id(WorkerID::FromRandom().Binary()); std::vector object_ids; std::vector> objects; + size_t spilled_urls_size = 2; // Objects are pinned. - for (size_t i = 0; i < free_objects_batch_size; i++) { + for (size_t i = 0; i < spilled_urls_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); auto data_buffer = std::make_shared(0, object_id, unpins); @@ -986,48 +988,58 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { manager.WaitForObjectFree(owner_address, object_ids); // Objects are spilled. - std::vector object_ids_to_spill; - int spilled_urls_size = free_objects_batch_size; - for (int i = 0; i < spilled_urls_size; i++) { - object_ids_to_spill.push_back(object_ids[i]); + std::vector spill_set_1; + std::vector spill_set_2; + size_t spill_set_1_size = spilled_urls_size / 2; + size_t spill_set_2_size = spilled_urls_size - spill_set_1_size; + + for (size_t i = 0; i < spill_set_1_size; i++) { + spill_set_1.push_back(object_ids[i]); } - manager.SpillObjects(object_ids_to_spill, + for (size_t i = spill_set_1_size; i < spilled_urls_size; i++) { + spill_set_2.push_back(object_ids[i]); + } + manager.SpillObjects(spill_set_1, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + manager.SpillObjects(spill_set_2, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); - std::vector urls; - // Only 1 object's spilling is done. Everything else is still spilling. - for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - urls.push_back(BuildURL("url" + std::to_string(i))); + std::vector urls_spill_set_1; + std::vector urls_spill_set_2; + for (size_t i = 0; i < spill_set_1_size; i++) { + urls_spill_set_1.push_back(BuildURL("url" + std::to_string(i))); } - ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < 1; i++) { + for (size_t i = spill_set_1_size; i < spilled_urls_size; i++) { + urls_spill_set_2.push_back(BuildURL("url" + std::to_string(i))); + } + + // Spillset 1 objects are spilled. + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_1)); + for (size_t i = 0; i < spill_set_1_size; i++) { ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); } // Every object has gone out of scope. - for (size_t i = 0; i < free_objects_batch_size; i++) { + for (size_t i = 0; i < spilled_urls_size; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); ASSERT_TRUE(subscriber_->PublishObjectEviction()); } - // // Now, deletion queue would process only the first object. Everything else won't be + // Now, deletion queue would process only the first spill set. Everything else won't be // deleted although it is out of scope because they are still spilling. manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); // Only the first entry that is already spilled will be deleted. - ASSERT_EQ(deleted_urls_size, 1); + ASSERT_EQ(deleted_urls_size, spill_set_1_size); // Now spilling is completely done. - std::vector new_urls; - for (size_t i = 1; i < object_ids_to_spill.size(); i++) { - new_urls.push_back(BuildURL("url" + std::to_string(i))); - } - for (size_t i = 1; i < object_ids_to_spill.size(); i++) { + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_2)); + for (size_t i = 0; i < spill_set_2_size; i++) { ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); } // Every object is now deleted. manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); - ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size() - 1); + ASSERT_EQ(deleted_urls_size, spill_set_2_size); } TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {