diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index fd292f124..348e5e9cc 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1178,7 +1178,7 @@ def test_create_placement_group_after_gcs_server_restarts( # Create placement group 1 successfully. placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) - ray.get(placement_group1.ready(), timeout=2) + ray.get(placement_group1.ready(), timeout=10) table = ray.util.placement_group_table(placement_group1) assert table["state"] == "CREATED" @@ -1188,7 +1188,7 @@ def test_create_placement_group_after_gcs_server_restarts( # Create placement group 2 successfully. placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) - ray.get(placement_group2.ready(), timeout=2) + ray.get(placement_group2.ready(), timeout=10) table = ray.util.placement_group_table(placement_group2) assert table["state"] == "CREATED" @@ -1253,7 +1253,7 @@ def test_create_placement_group_during_gcs_server_restart( cluster.head_node.start_gcs_server() for i in range(0, 10): - ray.get(placement_groups[i].ready(), timeout=2) + ray.get(placement_groups[i].ready()) if __name__ == "__main__": diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 74d95e82a..1028b6337 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -470,6 +470,7 @@ void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) { auto callback = [this, done](const std::unordered_map &result) { + std::unordered_map> node_to_bundles; for (auto &item : result) { auto placement_group = std::make_shared(item.second); if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) { @@ -479,9 +480,20 @@ void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) { item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { pending_placement_groups_.emplace_back(std::move(placement_group)); } + + if (item.second.state() == rpc::PlacementGroupTableData::CREATED || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + const auto &bundles = item.second.bundles(); + for (auto &bundle : bundles) { + node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); + } + } } } + // Notify raylets to release unused bundles. + gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles); + SchedulePendingPlacementGroups(); RAY_LOG(INFO) << "Finished loading initial data."; done(); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 58e12d213..5dc85a047 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -204,6 +204,17 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( std::shared_ptr placement_group, std::function)> failure_callback, std::function)> success_callback) { + // We need to ensure that the PrepareBundleResources won't be sent before the reply of + // ReleaseUnusedBundles is returned. + if (!nodes_of_releasing_unused_bundles_.empty()) { + RAY_LOG(INFO) << "Failed to schedule placement group " << placement_group->GetName() + << ", id: " << placement_group->GetPlacementGroupID() << ", because " + << nodes_of_releasing_unused_bundles_.size() + << " nodes have not released unused bundles."; + failure_callback(placement_group); + return; + } + auto bundles = placement_group->GetUnplacedBundles(); auto strategy = placement_group->GetStrategy(); @@ -556,6 +567,34 @@ GcsPlacementGroupScheduler::GetBundlesOnNode(const NodeID &node_id) { return bundles_on_node; } +void GcsPlacementGroupScheduler::ReleaseUnusedBundles( + const std::unordered_map> &node_to_bundles) { + // The purpose of this function is to release bundles that may be leaked. + // When GCS restarts, it doesn't know which bundles it has scheduled in the + // previous lifecycle. In this case, GCS will send a list of bundle ids that + // are still needed. And Raylet will release other bundles. If the node is + // dead, there is no need to send the request of release unused bundles. + const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); + for (const auto &alive_node : alive_nodes) { + const auto &node_id = alive_node.first; + nodes_of_releasing_unused_bundles_.insert(node_id); + + auto lease_client = GetLeaseClientFromNode(alive_node.second); + auto release_unused_bundles_callback = + [this, node_id](const Status &status, + const rpc::ReleaseUnusedBundlesReply &reply) { + nodes_of_releasing_unused_bundles_.erase(node_id); + }; + auto iter = node_to_bundles.find(alive_node.first); + + // When GCS restarts, some nodes maybe do not have bundles. + // In this case, GCS will send an empty list. + auto bundles_in_use = + iter != node_to_bundles.end() ? iter->second : std::vector{}; + lease_client->ReleaseUnusedBundles(bundles_in_use, release_unused_bundles_callback); + } +} + void BundleLocationIndex::AddBundleLocations( const PlacementGroupID &placement_group_id, std::shared_ptr bundle_locations) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 5baadc6d4..7ccabdc12 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -74,6 +74,12 @@ class GcsPlacementGroupSchedulerInterface { /// \param placement_group_id The placement group id scheduling is in progress. virtual void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) = 0; + /// Notify raylets to release unused bundles. + /// + /// \param node_to_bundles Bundles used by each node. + virtual void ReleaseUnusedBundles( + const std::unordered_map> &node_to_bundles) = 0; + virtual ~GcsPlacementGroupSchedulerInterface() {} }; @@ -385,6 +391,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { absl::flat_hash_map> GetBundlesOnNode( const NodeID &node_id) override; + /// Notify raylets to release unused bundles. + /// + /// \param node_to_bundles Bundles used by each node. + void ReleaseUnusedBundles(const std::unordered_map> + &node_to_bundles) override; + protected: /// Send a bundle PREPARE request to a node. The PREPARE request will lock resources /// on a node until COMMIT or CANCEL requests are sent to a node. @@ -477,6 +489,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Set of placement group that have lease requests in flight to nodes. absl::flat_hash_map> placement_group_leasing_in_progress_; + + /// The nodes which are releasing unused bundles. + absl::flat_hash_set nodes_of_releasing_unused_bundles_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index af86eab3d..9f6050ace 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -40,6 +40,10 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf MOCK_METHOD1(MarkScheduleCancelled, void(const PlacementGroupID &placement_group_id)); + MOCK_METHOD1( + ReleaseUnusedBundles, + void(const std::unordered_map> &node_to_bundles)); + absl::flat_hash_map> GetBundlesOnNode( const NodeID &node_id) override { absl::flat_hash_map> bundles; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 5e6fb352b..021ccdef4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -1014,6 +1014,13 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommitPr WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); } +TEST_F(GcsPlacementGroupSchedulerTest, TestReleaseUnusedBundles) { + SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::SPREAD); + std::unordered_map> node_to_bundle; + scheduler_->ReleaseUnusedBundles(node_to_bundle); + ASSERT_EQ(1, raylet_clients_[0]->num_release_unused_bundles_requested); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index c6134e08e..24b71c4ee 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -183,6 +183,12 @@ struct GcsServerMocker { return_callbacks.push_back(callback); } + void ReleaseUnusedBundles( + const std::vector &bundles_in_use, + const rpc::ClientCallback &callback) override { + ++num_release_unused_bundles_requested; + } + // Trigger reply to PrepareBundleResources. bool GrantPrepareBundleResources(bool success = true) { Status status = Status::OK(); @@ -231,6 +237,7 @@ struct GcsServerMocker { int num_lease_requested = 0; int num_return_requested = 0; int num_commit_requested = 0; + int num_release_unused_bundles_requested = 0; NodeID node_id = NodeID::FromRandom(); std::list> lease_callbacks = {}; std::list> commit_callbacks = {}; diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 887787cc4..087135e39 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -148,6 +148,13 @@ message RequestObjectSpillageReply { bool success = 1; } +message ReleaseUnusedBundlesRequest { + repeated Bundle bundles_in_use = 1; +} + +message ReleaseUnusedBundlesReply { +} + // Service for inter-node-manager communication. service NodeManagerService { // Request a worker from the raylet. @@ -186,4 +193,10 @@ service NodeManagerService { // Ask the raylet to spill an object to external storage. rpc RequestObjectSpillage(RequestObjectSpillageRequest) returns (RequestObjectSpillageReply); + // This method is only used by GCS, and the purpose is to release bundles + // that may be leaked. When GCS restarts, it doesn't know which bundles it has leased + // in the previous lifecycle. In this case, GCS will send a list of bundles that + // are still needed. And Raylet will release other bundles. + rpc ReleaseUnusedBundles(ReleaseUnusedBundlesRequest) + returns (ReleaseUnusedBundlesReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a8bf79699..60705cef6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -558,6 +558,36 @@ void NodeManager::HandleRequestObjectSpillage( }); } +void NodeManager::HandleReleaseUnusedBundles( + const rpc::ReleaseUnusedBundlesRequest &request, + rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; + RAY_LOG(DEBUG) << "Releasing unused bundles."; + std::unordered_set in_use_bundles; + for (int index = 0; index < request.bundles_in_use_size(); ++index) { + const auto &bundle_id = request.bundles_in_use(index).bundle_id(); + in_use_bundles.emplace( + std::make_pair(PlacementGroupID::FromBinary(bundle_id.placement_group_id()), + bundle_id.bundle_index())); + } + + // TODO(ffbin): Kill all workers that are currently associated with the unused bundles. + // At present, the worker does not have a bundle ID, so we cannot filter out the workers + // used by unused bundles. We will solve it in next pr. + + // Return unused bundle resources. + for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { + if (0 == in_use_bundles.count(iter->first)) { + ReturnBundleResources(*iter->second); + bundle_spec_map_.erase(iter++); + } else { + iter++; + } + } + + send_reply_callback(Status::OK(), nullptr, nullptr); +} + // TODO(edoakes): this function is problematic because it both sends warnings spuriously // under normal conditions and sometimes doesn't send a warning under actual deadlock // conditions. The current logic is to push a warning when: all running tasks are @@ -1986,6 +2016,8 @@ bool NodeManager::PrepareBundle( // Register bundle state. bundle_state->state = CommitState::PREPARED; bundle_state_map_.emplace(bundle_id, bundle_state); + bundle_spec_map_.emplace( + bundle_id, std::make_shared(bundle_spec.GetMessage())); } return bundle_state->acquired_resources.AvailableResources().size() > 0; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d911155d2..6e7726528 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -645,6 +645,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::RequestObjectSpillageReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `ReleaseUnusedBundles` request. + void HandleReleaseUnusedBundles(const rpc::ReleaseUnusedBundlesRequest &request, + rpc::ReleaseUnusedBundlesReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Trigger global GC across the cluster to free up references to actors or /// object ids. void TriggerGlobalGC(); @@ -816,6 +821,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// creation. absl::flat_hash_map, pair_hash> bundle_state_map_; + + /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. + absl::flat_hash_map, pair_hash> + bundle_spec_map_; }; } // namespace raylet diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 77c4aff93..e541ee413 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -402,6 +402,25 @@ void raylet::RayletClient::CancelResourceReserve( grpc_client_->CancelResourceReserve(request, callback); } +void raylet::RayletClient::ReleaseUnusedBundles( + const std::vector &bundles_in_use, + const rpc::ClientCallback &callback) { + rpc::ReleaseUnusedBundlesRequest request; + for (auto &bundle : bundles_in_use) { + request.add_bundles_in_use()->CopyFrom(bundle); + } + grpc_client_->ReleaseUnusedBundles( + request, + [callback](const Status &status, const rpc::ReleaseUnusedBundlesReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) + << "Error releasing bundles from raylet, the raylet may have died:" + << status; + } + callback(status, reply); + }); +} + void raylet::RayletClient::PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, const rpc::ClientCallback &callback) { diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 936f6a3c1..8799366a6 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -118,6 +118,10 @@ class ResourceReserveInterface { BundleSpecification &bundle_spec, const ray::rpc::ClientCallback &callback) = 0; + virtual void ReleaseUnusedBundles( + const std::vector &bundles_in_use, + const rpc::ClientCallback &callback) = 0; + virtual ~ResourceReserveInterface(){}; }; @@ -380,6 +384,11 @@ class RayletClient : public PinObjectsInterface, const ray::rpc::ClientCallback &callback) override; + /// Implements ReleaseUnusedBundlesInterface. + void ReleaseUnusedBundles( + const std::vector &bundles_in_use, + const rpc::ClientCallback &callback) override; + void PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, const ray::rpc::ClientCallback &callback) override; diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index e82c45b7e..1c9b16c18 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -100,6 +100,9 @@ class NodeManagerWorkerClient /// Ask the raylet to spill an object to external storage. VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestObjectSpillage, grpc_client_, ) + /// Release unused bundles. + VOID_RPC_CLIENT_METHOD(NodeManagerService, ReleaseUnusedBundles, grpc_client_, ) + private: /// Constructor. /// diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 2db6706eb..08893d49f 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -35,7 +35,8 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, PrepareBundleResources) \ RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \ RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \ - RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) + RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \ + RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -100,6 +101,10 @@ class NodeManagerServiceHandler { virtual void HandleRequestObjectSpillage(const RequestObjectSpillageRequest &request, RequestObjectSpillageReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleReleaseUnusedBundles(const ReleaseUnusedBundlesRequest &request, + ReleaseUnusedBundlesReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`.