From d240d2652500d232266727756ab06aa57b2981fa Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 31 Aug 2021 10:10:28 -0700 Subject: [PATCH] [Object Spilling] Fix a bug where object url is empty. (#18193) * Fix a bug * Addressed code review. * Fix a test --- src/ray/object_manager/pull_manager.cc | 20 ++-- .../object_manager/test/pull_manager_test.cc | 105 ++++++++++++++++++ 2 files changed, 116 insertions(+), 9 deletions(-) diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 707801f99..d5290f724 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -460,16 +460,18 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) { return; } - // If we can restore directly from this raylet, then try to do so. - std::string spilled_url = get_locally_spilled_object_url_(object_id); - bool can_restore_directly = - !spilled_url.empty() || // If the object is spilled locally - (!request.spilled_url.empty() && - request.spilled_node_id - .IsNil()); // Or if the object is spilled on external storages. - if (can_restore_directly) { + // check if we can restore the object directly in the current raylet. + // first check local spilled objects + std::string direct_restore_url = get_locally_spilled_object_url_(object_id); + if (direct_restore_url.empty()) { + if (!request.spilled_url.empty() && request.spilled_node_id.IsNil()) { + direct_restore_url = request.spilled_url; + } + } + if (!direct_restore_url.empty()) { + // Select an url from the object directory update UpdateRetryTimer(request, object_id); - restore_spilled_object_(object_id, request.spilled_url, + restore_spilled_object_(object_id, direct_restore_url, [object_id](const ray::Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Object restore for " << object_id diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index b3610350b..664ccdc20 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -289,6 +289,111 @@ TEST_P(PullManagerTest, TestRestoreSpilledObjectLocal) { AssertNoLeaks(); } +TEST_P(PullManagerTest, TestRestoreSpilledObjectOnLocalStorage) { + /// Test the scneario where the object is spilled to local storage, like filesystems. + auto prio = BundlePriority::TASK_ARGS; + if (GetParam()) { + prio = BundlePriority::GET_REQUEST; + } + auto refs = CreateObjectRefs(1); + auto obj1 = ObjectRefsToIds(refs)[0]; + rpc::Address addr1; + AssertNumActiveRequestsEquals(0); + std::vector objects_to_locate; + auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate); + ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); + + std::unordered_set client_ids; + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); + + // client_ids is empty here, so there's nowhere to pull from. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 0); + + fake_time_ += 10.; + // Objects are spilled locally, but the remote object directory doesn't have the + // information. It should still restore objects. + ObjectSpilled(obj1, "remote_url/foo/bar"); + pull_manager_.OnLocationChange(obj1, client_ids, "", self_node_id_, 0); + + // We request a local restore. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + // The call can be retried after a delay, and the url in the remote object directory is + // updated now. + fake_time_ += 10.; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_, + 0); + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 2); + + ASSERT_TRUE(num_abort_calls_.empty()); + ASSERT_TRUE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id)); + auto objects_to_cancel = pull_manager_.CancelPull(req_id); + ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); + ASSERT_EQ(num_abort_calls_[obj1], 1); + + AssertNoLeaks(); +} + +TEST_P(PullManagerTest, TestRestoreSpilledObjectOnExternalStorage) { + /// Test the scneario where the object is spilled to external storages, such as S3. + auto prio = BundlePriority::TASK_ARGS; + if (GetParam()) { + prio = BundlePriority::GET_REQUEST; + } + auto refs = CreateObjectRefs(1); + auto obj1 = ObjectRefsToIds(refs)[0]; + rpc::Address addr1; + AssertNumActiveRequestsEquals(0); + std::vector objects_to_locate; + auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate); + ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); + + std::unordered_set client_ids; + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); + + // client_ids is empty here, so there's nowhere to pull from. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 0); + + fake_time_ += 10.; + // Objects are spilled to the empty URL locally if it is spilled to external storages. + ObjectSpilled(obj1, ""); + // If objects are spilled to external storages, the node id should be Nil(). + // So this shouldn't invoke restoration. + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", self_node_id_, + 0); + + // We request a local restore. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 0); + + // Now Nil ID is properly updated. + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", NodeID::Nil(), + 0); + + // We request a local restore. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + // The call can be retried after a delay. + fake_time_ += 10.; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", NodeID::Nil(), + 0); + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 2); + + ASSERT_TRUE(num_abort_calls_.empty()); + ASSERT_TRUE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id)); + auto objects_to_cancel = pull_manager_.CancelPull(req_id); + ASSERT_EQ(objects_to_cancel, ObjectRefsToIds(refs)); + ASSERT_EQ(num_abort_calls_[obj1], 1); + + AssertNoLeaks(); +} + TEST_P(PullManagerTest, TestLoadBalancingRestorationRequest) { /* Make sure when the object copy is in other raylet, we pull object from there instead * of requesting the owner node to restore the object. */