[PlacementGroup]Fix destroy bundle resources bug (#12336)

* [PlacementGroup]Fix destroy bundle resources bug

* revert AddBundleLocations code change

* add comment

* fix review comments

Co-authored-by: 灵洵 <fengbin.ffb@antgroup.com>
This commit is contained in:
fangfengbin 2020-11-25 09:45:26 +08:00 committed by GitHub
parent 9f322db71d
commit 5934b20b96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 47 deletions

View file

@ -261,41 +261,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) {
std::shared_ptr<BundleLocations> committed_bundle_locations =
std::make_shared<BundleLocations>();
std::shared_ptr<BundleLocations> leasing_bundle_locations =
std::make_shared<BundleLocations>();
// Check if we can find committed bundle locations.
const auto &maybe_bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
if (maybe_bundle_locations.has_value()) {
committed_bundle_locations = maybe_bundle_locations.value();
}
// Now let's see if there are leasing bundles. There could be leasing bundles and
// committed bundles at the same time if placement groups are rescheduling.
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
if (it != placement_group_leasing_in_progress_.end()) {
const auto &leasing_context = it->second;
leasing_bundle_locations = leasing_context->GetPreparedBundleLocations();
}
// Cancel all resource reservation.
RAY_LOG(INFO) << "Cancelling all bundles of a placement group, id is "
<< placement_group_id;
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
committed_bundle_location_index_.Erase(placement_group_id);
// There could be leasing bundles and committed bundles at the same time if placement
// groups are rescheduling.
DestroyPlacementGroupPreparedBundleResources(placement_group_id);
DestroyPlacementGroupCommittedBundleResources(placement_group_id);
}
void GcsPlacementGroupScheduler::MarkScheduleCancelled(
@ -453,7 +422,10 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
if (!lease_status_tracker->AllPrepareRequestsSuccessful()) {
// Erase the status tracker from a in-memory map if exists.
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
// NOTE: A placement group may be scheduled several times to succeed.
// If a prepare failure occurs during scheduling, we just need to release the prepared
// bundle resources of this scheduling.
DestroyPlacementGroupPreparedBundleResources(placement_group_id);
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
@ -595,20 +567,49 @@ void GcsPlacementGroupScheduler::ReleaseUnusedBundles(
}
}
void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
const PlacementGroupID &placement_group_id) {
// Get the locations of prepared bundles.
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
if (it != placement_group_leasing_in_progress_.end()) {
const auto &leasing_context = it->second;
const auto &leasing_bundle_locations = leasing_context->GetPreparedBundleLocations();
// Cancel all resource reservation of prepared bundles.
RAY_LOG(INFO) << "Cancelling all prepared bundles of a placement group, id is "
<< placement_group_id;
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
}
}
void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
const PlacementGroupID &placement_group_id) {
// Get the locations of committed bundles.
const auto &maybe_bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
if (maybe_bundle_locations.has_value()) {
const auto &committed_bundle_locations = maybe_bundle_locations.value();
// Cancel all resource reservation of committed bundles.
RAY_LOG(INFO) << "Cancelling all committed bundles of a placement group, id is "
<< placement_group_id;
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
committed_bundle_location_index_.Erase(placement_group_id);
}
}
void BundleLocationIndex::AddBundleLocations(
const PlacementGroupID &placement_group_id,
std::shared_ptr<BundleLocations> bundle_locations) {
// Update `placement_group_to_bundle_locations_`.
// The placement group may be scheduled several times to succeed, so we need to merge
// `bundle_locations` instead of covering it directly.
auto iter = placement_group_to_bundle_locations_.find(placement_group_id);
if (iter == placement_group_to_bundle_locations_.end()) {
placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations);
} else {
iter->second->insert(bundle_locations->begin(), bundle_locations->end());
}
// Update `node_to_leased_bundles_`.
for (auto iter : *bundle_locations) {
const auto &node_id = iter.second.first;
if (!node_to_leased_bundles_.contains(node_id)) {

View file

@ -459,6 +459,24 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
/// Destroy the prepared bundle resources with this placement group.
/// The method is idempotent, meaning if all bundles are already cancelled,
/// this method won't do anything.
///
/// \param placement_group_id The id of a placement group to destroy all prepared
/// bundles.
void DestroyPlacementGroupPreparedBundleResources(
const PlacementGroupID &placement_group_id);
/// Destroy the committed bundle resources with this placement group.
/// The method is idempotent, meaning if all bundles are already cancelled,
/// this method won't do anything.
///
/// \param placement_group_id The id of a placement group to destroy all committed
/// bundles.
void DestroyPlacementGroupCommittedBundleResources(
const PlacementGroupID &placement_group_id);
/// Generate schedule context.
std::unique_ptr<ScheduleContext> GetScheduleContext(
const PlacementGroupID &placement_group_id);