From a0c7907d8808a193411a24b7d1363116084aaa99 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 31 Aug 2020 15:54:34 -0700 Subject: [PATCH] [Placement Group] Leasing context refactoring part 2 (#10413) * In progress. * Refactoring done, but still failing tests. * Fix issues. * Addressed code review. * Addressed code review. --- .../gcs_placement_group_scheduler.cc | 145 +++++++++++++----- .../gcs_placement_group_scheduler.h | 75 +++++++-- .../gcs_placement_group_scheduler_test.cc | 1 - 3 files changed, 162 insertions(+), 59 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 20ed16cac..b836eed5a 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -214,43 +214,26 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( return; } - // If schedule success, the decision will be set as schedule_map[bundles[pos]] - // else will be set ClientID::Nil(). - auto bundle_locations = std::make_shared(); - // To count how many scheduler have been return, which include success and failure. - auto finished_count = std::make_shared(); - RAY_CHECK( - placement_group_leasing_in_progress_.emplace(placement_group->GetPlacementGroupID()) - .second); + auto lease_status_tracker = + std::make_shared(placement_group, bundles); + RAY_CHECK(placement_group_leasing_in_progress_ + .emplace(placement_group->GetPlacementGroupID(), lease_status_tracker) + .second); /// TODO(AlisaWu): Change the strategy when reserve resource failed. for (auto &bundle : bundles) { const auto &bundle_id = bundle->BundleId(); const auto &node_id = selected_nodes[bundle_id]; - RAY_CHECK(node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second); - + lease_status_tracker->MarkLeaseStarted(node_id, bundle); + // TODO(sang): The callback might not be called at all if nodes are dead. We should + // handle this case properly. ReserveResourceFromNode( bundle, gcs_node_manager_.GetNode(node_id), - [this, bundle_id, bundle, bundles, node_id, placement_group, bundle_locations, - finished_count, failure_callback, success_callback](const Status &status) { - auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id); - RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end()); - auto bundle_iter = leasing_bundles->second.find(bundle->BundleId()); - RAY_CHECK(bundle_iter != leasing_bundles->second.end()); - // Remove the bundle from the leasing map as the reply is returned from the - // remote node. - leasing_bundles->second.erase(bundle_iter); - if (leasing_bundles->second.empty()) { - node_to_bundles_when_leasing_.erase(leasing_bundles); - } - - if (status.ok()) { - (*bundle_locations)[bundle_id] = std::make_pair(node_id, bundle); - } - - if (++(*finished_count) == bundles.size()) { - OnAllBundleSchedulingRequestReturned(placement_group, bundles, - bundle_locations, failure_callback, + [this, bundle, node_id, lease_status_tracker, failure_callback, + success_callback](const Status &status) { + lease_status_tracker->MarkLeaseReturned(node_id, bundle, status); + if (lease_status_tracker->IsAllLeaseRequestReturned()) { + OnAllBundleSchedulingRequestReturned(lease_status_tracker, failure_callback, success_callback); } }); @@ -281,7 +264,7 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled( const PlacementGroupID &placement_group_id) { auto it = placement_group_leasing_in_progress_.find(placement_group_id); RAY_CHECK(it != placement_group_leasing_in_progress_.end()); - placement_group_leasing_in_progress_.erase(it); + it->second->MarkPlacementGroupScheduleCancelled(); } void GcsPlacementGroupScheduler::ReserveResourceFromNode( @@ -350,19 +333,21 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a } void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( - const std::shared_ptr &placement_group, - const std::vector> &bundles, - const std::shared_ptr &bundle_locations, + const std::shared_ptr &lease_status_tracker, const std::function)> &schedule_failure_handler, const std::function)> &schedule_success_handler) { + RAY_CHECK(lease_status_tracker->IsAllLeaseRequestReturned()) + << "This method can be called only after all bundle scheduling requests are " + "returend."; + const auto &placement_group = lease_status_tracker->GetPlacementGroup(); + const auto &bundles = lease_status_tracker->GetUnplacedBundles(); + const auto &bundle_locations = lease_status_tracker->GetBundleLocations(); const auto &placement_group_id = placement_group->GetPlacementGroupID(); bundle_location_index_.AddBundleLocations(placement_group_id, bundle_locations); - if (placement_group_leasing_in_progress_.find(placement_group_id) == - placement_group_leasing_in_progress_.end() || - bundle_locations->size() != bundles.size()) { + if (!lease_status_tracker->IsLeasingSucceed()) { // If the lease request has been already cancelled // or not every lease request succeeds. DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); @@ -388,12 +373,9 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( ->set_node_id(location.first.Binary()); } } - // Erase leasing in progress placement group. - // This could've been removed if the leasing request is cancelled already. auto it = placement_group_leasing_in_progress_.find(placement_group_id); - if (it != placement_group_leasing_in_progress_.end()) { - placement_group_leasing_in_progress_.erase(it); - } + RAY_CHECK(it != placement_group_leasing_in_progress_.end()); + placement_group_leasing_in_progress_.erase(it); } std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( @@ -522,5 +504,84 @@ void BundleLocationIndex::AddNodes( } } +LeaseStatusTracker::LeaseStatusTracker( + std::shared_ptr placement_group, + std::vector> &unplaced_bundles) + : placement_group_(placement_group), unplaced_bundles_(unplaced_bundles) { + bundle_locations_ = std::make_shared(); +} + +bool LeaseStatusTracker::MarkLeaseStarted(const ClientID &node_id, + std::shared_ptr bundle) { + const auto &bundle_id = bundle->BundleId(); + return node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second; +} + +void LeaseStatusTracker::MarkLeaseReturned( + const ClientID &node_id, const std::shared_ptr bundle, + const Status &status) { + RAY_CHECK(returned_count_ <= unplaced_bundles_.size()); + auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id); + RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end()); + auto bundle_iter = leasing_bundles->second.find(bundle->BundleId()); + RAY_CHECK(bundle_iter != leasing_bundles->second.end()); + + // Remove the bundle from the leasing map as the reply is returned from the + // remote node. + leasing_bundles->second.erase(bundle_iter); + if (leasing_bundles->second.empty()) { + node_to_bundles_when_leasing_.erase(leasing_bundles); + } + + // If the request succeeds, record it. + const auto &bundle_id = bundle->BundleId(); + if (status.ok()) { + bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle)); + } + returned_count_ += 1; + // If every bundle lease request is returned, mark it as all returned. + if (IsAllLeaseRequestReturned()) { + UpdateLeasingState(LeasingState::ALL_RETURNED); + } +} + +bool LeaseStatusTracker::IsAllLeaseRequestReturned() const { + return returned_count_ == unplaced_bundles_.size(); +} + +bool LeaseStatusTracker::IsLeasingSucceed() const { + return IsAllLeaseRequestReturned() && + (bundle_locations_->size() == unplaced_bundles_.size()) && + (leasing_state_ != LeasingState::CANCELLED); +} + +const std::shared_ptr &LeaseStatusTracker::GetPlacementGroup() const { + return placement_group_; +} + +const std::shared_ptr &LeaseStatusTracker::GetBundleLocations() const { + return bundle_locations_; +} + +const std::vector> + &LeaseStatusTracker::GetUnplacedBundles() const { + return unplaced_bundles_; +} + +const LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; } + +void LeaseStatusTracker::MarkPlacementGroupScheduleCancelled() { + UpdateLeasingState(LeasingState::CANCELLED); +} + +bool LeaseStatusTracker::UpdateLeasingState(LeasingState leasing_state) { + // If the lease was cancelled, we cannot update the state. + if (leasing_state_ == LeasingState::CANCELLED) { + return false; + } + leasing_state_ = leasing_state; + return true; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 4a80dbeee..d886ebb5a 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -131,17 +131,68 @@ class GcsStrictSpreadStrategy : public GcsScheduleStrategy { const std::unique_ptr &context) override; }; +enum class LeasingState { + // TODO(sang): Use prepare and commit instead for 2PC. + /// The phase where lease requests haven't been returned. + SCHEDULING, + /// The phase where lease requests have returned + ALL_RETURNED, + /// Placement group has been removed, and this leasing is not valid. + CANCELLED +}; + /// A data structure that encapsulates information regarding bundle resource leasing /// status. -class LeasingContext { - // TODO(sang): Implement in the next PR. +class LeaseStatusTracker { + public: + LeaseStatusTracker(std::shared_ptr placement_group, + std::vector> &unplaced_bundles); + ~LeaseStatusTracker() = default; + + bool MarkLeaseStarted(const ClientID &node_id, + std::shared_ptr bundle); + void MarkLeaseReturned(const ClientID &node_id, + std::shared_ptr bundle, + const Status &status); + bool IsAllLeaseRequestReturned() const; + bool IsLeasingSucceed() const; + const std::shared_ptr &GetPlacementGroup() const; + const std::vector> &GetUnplacedBundles() const; + const std::shared_ptr &GetBundleLocations() const; + const LeasingState GetLeasingState() const; + void MarkPlacementGroupScheduleCancelled(); + + private: + /// Method to update leasing states. + /// + /// \param leasing_state The state to update. + /// \return True if succeeds to update. False otherwise. + bool UpdateLeasingState(LeasingState leasing_state); + /// Placement group of which this leasing context is associated with. + std::shared_ptr placement_group_; + /// Location of bundles that lease requests were sent. + /// If schedule success, the decision will be set as schedule_map[bundles[pos]] + /// else will be set ClientID::Nil(). + std::shared_ptr bundle_locations_; + /// Number of lease requests that are returned. + size_t returned_count_ = 0; + /// The leasing stage. This is used to know the state of current leasing context. + LeasingState leasing_state_ = LeasingState::SCHEDULING; + /// Map from node ID to the set of bundles for whom we are trying to acquire a lease + /// from that node. This is needed so that we can retry lease requests from the node + /// until we receive a reply or the node is removed. + /// TODO(sang): We don't currently handle retry. + absl::flat_hash_map> + node_to_bundles_when_leasing_; + /// Unplaced bundle specification for this leasing context. + std::vector> unplaced_bundles_; }; /// A data structure that helps fast bundle location lookup. class BundleLocationIndex { public: BundleLocationIndex() {} - ~BundleLocationIndex() {} + ~BundleLocationIndex() = default; /// Add bundle locations to index. /// @@ -265,9 +316,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { const rpc::Address &raylet_address); void OnAllBundleSchedulingRequestReturned( - const std::shared_ptr &placement_group, - const std::vector> &bundles, - const std::shared_ptr &bundle_locations, + const std::shared_ptr &lease_status_tracker, const std::function)> &schedule_failure_handler, const std::function)> @@ -296,19 +345,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// A vector to store all the schedule strategy. std::vector> scheduler_strategies_; - /// Map from node ID to the set of bundles for whom we are trying to acquire a lease - /// from that node. This is needed so that we can retry lease requests from the node - /// until we receive a reply or the node is removed. - /// TODO(sang): We don't currently handle retry. - absl::flat_hash_map> - node_to_bundles_when_leasing_; + /// Index to lookup bundle locations of node or placement group. + BundleLocationIndex bundle_location_index_; /// Set of placement group that have lease requests in flight to nodes. /// It is required to know if placement group has been removed or not. - absl::flat_hash_set placement_group_leasing_in_progress_; - - /// Index to lookup bundle locations of node or placement group. - BundleLocationIndex bundle_location_index_; + absl::flat_hash_map> + placement_group_leasing_in_progress_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index a42c7ea42..a02343003 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -502,7 +502,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { auto bundles_on_node1 = scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id())); ASSERT_EQ(1, bundles_on_node1.size()); - // One node is dead, reschedule the placement group. auto bundle_on_dead_node = placement_group->GetMutableBundle(0); bundle_on_dead_node->clear_node_id();