diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 902c391f5..56169a3b8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -261,41 +261,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( const PlacementGroupID &placement_group_id) { - std::shared_ptr committed_bundle_locations = - std::make_shared(); - std::shared_ptr leasing_bundle_locations = - std::make_shared(); - - // Check if we can find committed bundle locations. - const auto &maybe_bundle_locations = - committed_bundle_location_index_.GetBundleLocations(placement_group_id); - if (maybe_bundle_locations.has_value()) { - committed_bundle_locations = maybe_bundle_locations.value(); - } - - // Now let's see if there are leasing bundles. There could be leasing bundles and - // committed bundles at the same time if placement groups are rescheduling. - auto it = placement_group_leasing_in_progress_.find(placement_group_id); - if (it != placement_group_leasing_in_progress_.end()) { - const auto &leasing_context = it->second; - leasing_bundle_locations = leasing_context->GetPreparedBundleLocations(); - } - - // Cancel all resource reservation. - RAY_LOG(INFO) << "Cancelling all bundles of a placement group, id is " - << placement_group_id; - for (const auto &iter : *(committed_bundle_locations)) { - auto &bundle_spec = iter.second.second; - auto &node_id = iter.second.first; - CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); - } - - for (const auto &iter : *(leasing_bundle_locations)) { - auto &bundle_spec = iter.second.second; - auto &node_id = iter.second.first; - CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); - } - committed_bundle_location_index_.Erase(placement_group_id); + // There could be leasing bundles and committed bundles at the same time if placement + // groups are rescheduling. + DestroyPlacementGroupPreparedBundleResources(placement_group_id); + DestroyPlacementGroupCommittedBundleResources(placement_group_id); } void GcsPlacementGroupScheduler::MarkScheduleCancelled( @@ -453,7 +422,10 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( if (!lease_status_tracker->AllPrepareRequestsSuccessful()) { // Erase the status tracker from a in-memory map if exists. - DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); + // NOTE: A placement group may be scheduled several times to succeed. + // If a prepare failure occurs during scheduling, we just need to release the prepared + // bundle resources of this scheduling. + DestroyPlacementGroupPreparedBundleResources(placement_group_id); auto it = placement_group_leasing_in_progress_.find(placement_group_id); RAY_CHECK(it != placement_group_leasing_in_progress_.end()); placement_group_leasing_in_progress_.erase(it); @@ -595,20 +567,49 @@ void GcsPlacementGroupScheduler::ReleaseUnusedBundles( } } +void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources( + const PlacementGroupID &placement_group_id) { + // Get the locations of prepared bundles. + auto it = placement_group_leasing_in_progress_.find(placement_group_id); + if (it != placement_group_leasing_in_progress_.end()) { + const auto &leasing_context = it->second; + const auto &leasing_bundle_locations = leasing_context->GetPreparedBundleLocations(); + + // Cancel all resource reservation of prepared bundles. + RAY_LOG(INFO) << "Cancelling all prepared bundles of a placement group, id is " + << placement_group_id; + for (const auto &iter : *(leasing_bundle_locations)) { + auto &bundle_spec = iter.second.second; + auto &node_id = iter.second.first; + CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); + } + } +} + +void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources( + const PlacementGroupID &placement_group_id) { + // Get the locations of committed bundles. + const auto &maybe_bundle_locations = + committed_bundle_location_index_.GetBundleLocations(placement_group_id); + if (maybe_bundle_locations.has_value()) { + const auto &committed_bundle_locations = maybe_bundle_locations.value(); + + // Cancel all resource reservation of committed bundles. + RAY_LOG(INFO) << "Cancelling all committed bundles of a placement group, id is " + << placement_group_id; + for (const auto &iter : *(committed_bundle_locations)) { + auto &bundle_spec = iter.second.second; + auto &node_id = iter.second.first; + CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); + } + committed_bundle_location_index_.Erase(placement_group_id); + } +} + void BundleLocationIndex::AddBundleLocations( const PlacementGroupID &placement_group_id, std::shared_ptr bundle_locations) { - // Update `placement_group_to_bundle_locations_`. - // The placement group may be scheduled several times to succeed, so we need to merge - // `bundle_locations` instead of covering it directly. - auto iter = placement_group_to_bundle_locations_.find(placement_group_id); - if (iter == placement_group_to_bundle_locations_.end()) { - placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations); - } else { - iter->second->insert(bundle_locations->begin(), bundle_locations->end()); - } - - // Update `node_to_leased_bundles_`. + placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations); for (auto iter : *bundle_locations) { const auto &node_id = iter.second.first; if (!node_to_leased_bundles_.contains(node_id)) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index d64a7ce41..2444c4804 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -459,6 +459,24 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { const std::function)> &schedule_success_handler); + /// Destroy the prepared bundle resources with this placement group. + /// The method is idempotent, meaning if all bundles are already cancelled, + /// this method won't do anything. + /// + /// \param placement_group_id The id of a placement group to destroy all prepared + /// bundles. + void DestroyPlacementGroupPreparedBundleResources( + const PlacementGroupID &placement_group_id); + + /// Destroy the committed bundle resources with this placement group. + /// The method is idempotent, meaning if all bundles are already cancelled, + /// this method won't do anything. + /// + /// \param placement_group_id The id of a placement group to destroy all committed + /// bundles. + void DestroyPlacementGroupCommittedBundleResources( + const PlacementGroupID &placement_group_id); + /// Generate schedule context. std::unique_ptr GetScheduleContext( const PlacementGroupID &placement_group_id);