[PlacementGroup]Support acquire and return bundle resource from gcs resource manager (#12349)

This commit is contained in:
fangfengbin 2020-12-08 10:29:57 +08:00 committed by GitHub
parent b1f2b142d5
commit 93c0eb249c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 15 deletions

View file

@ -261,7 +261,7 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
}
auto lease_status_tracker =
std::make_shared<LeaseStatusTracker>(placement_group, bundles);
std::make_shared<LeaseStatusTracker>(placement_group, bundles, selected_nodes);
RAY_CHECK(placement_group_leasing_in_progress_
.emplace(placement_group->GetPlacementGroupID(), lease_status_tracker)
.second);
@ -288,10 +288,18 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) {
auto &bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
if (bundle_locations.has_value()) {
// There could be leasing bundles and committed bundles at the same time if placement
// groups are rescheduling.
// groups are rescheduling, so we need to destroy prepared bundles and committed
// bundles at the same time.
DestroyPlacementGroupPreparedBundleResources(placement_group_id);
DestroyPlacementGroupCommittedBundleResources(placement_group_id);
// Return destroyed bundles resources to the cluster resource.
ReturnBundleResources(bundle_locations.value());
}
}
void GcsPlacementGroupScheduler::MarkScheduleCancelled(
@ -456,6 +464,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
ReturnBundleResources(lease_status_tracker->GetBundleLocations());
schedule_failure_handler(placement_group);
return;
}
@ -504,13 +513,24 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
committed_bundle_location_index_.AddBundleLocations(placement_group_id,
prepared_bundle_locations);
// If the placement group scheduling has been cancelled, destroy them.
// NOTE: If the placement group scheduling has been cancelled, we just need to destroy
// the committed bundles. The reason is that only `RemovePlacementGroup` will mark the
// state of placement group as `CANCELLED` and it will also destroy all prepared and
// committed bundles of the placement group.
// However, it cannot destroy the newly submitted bundles in this scheduling, so we need
// to destroy them separately.
if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) {
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
DestroyPlacementGroupCommittedBundleResources(placement_group_id);
ReturnBundleResources(lease_status_tracker->GetBundleLocations());
schedule_failure_handler(placement_group);
return;
}
// Acquire bundle resources from gcs resources manager.
const auto &committed_bundle_locations =
lease_status_tracker->GetCommittedBundleLocations();
AcquireBundleResources(committed_bundle_locations);
if (!lease_status_tracker->AllCommitRequestsSuccessful()) {
// Update the state to be reschedule so that the failure handle will reschedule the
// failed bundles.
@ -520,12 +540,12 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
placement_group->GetMutableBundle(bundle.first.second)->clear_node_id();
}
placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
ReturnBundleResources(uncommitted_bundle_locations);
schedule_failure_handler(placement_group);
return;
}
} else {
schedule_success_handler(placement_group);
}
}
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
const PlacementGroupID &placement_group_id) {
@ -634,6 +654,24 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
}
}
void GcsPlacementGroupScheduler::AcquireBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Acquire bundle resources from gcs resources manager.
for (auto &bundle : *bundle_locations) {
gcs_resource_manager_.AcquireResources(bundle.second.first,
bundle.second.second->GetRequiredResources());
}
}
void GcsPlacementGroupScheduler::ReturnBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Release bundle resources to gcs resources manager.
for (auto &bundle : *bundle_locations) {
gcs_resource_manager_.ReleaseResources(bundle.second.first,
bundle.second.second->GetRequiredResources());
}
}
void BundleLocationIndex::AddBundleLocations(
const PlacementGroupID &placement_group_id,
std::shared_ptr<BundleLocations> bundle_locations) {
@ -734,10 +772,18 @@ void BundleLocationIndex::AddNodes(
LeaseStatusTracker::LeaseStatusTracker(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles)
const std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles,
const ScheduleMap &schedule_map)
: placement_group_(placement_group), bundles_to_schedule_(unplaced_bundles) {
preparing_bundle_locations_ = std::make_shared<BundleLocations>();
uncommitted_bundle_locations_ = std::make_shared<BundleLocations>();
committed_bundle_locations_ = std::make_shared<BundleLocations>();
bundle_locations_ = std::make_shared<BundleLocations>();
for (const auto &bundle : unplaced_bundles) {
const auto &iter = schedule_map.find(bundle->BundleId());
RAY_CHECK(iter != schedule_map.end());
(*bundle_locations_)[bundle->BundleId()] = std::make_pair(iter->second, bundle);
}
}
bool LeaseStatusTracker::MarkPreparePhaseStarted(
@ -788,6 +834,8 @@ void LeaseStatusTracker::MarkCommitRequestReturned(
const auto &bundle_id = bundle->BundleId();
if (!status.ok()) {
uncommitted_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
} else {
committed_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
}
}
@ -817,6 +865,15 @@ const std::shared_ptr<BundleLocations>
return uncommitted_bundle_locations_;
}
const std::shared_ptr<BundleLocations> &LeaseStatusTracker::GetCommittedBundleLocations()
const {
return committed_bundle_locations_;
}
const std::shared_ptr<BundleLocations> &LeaseStatusTracker::GetBundleLocations() const {
return bundle_locations_;
}
const std::vector<std::shared_ptr<BundleSpecification>>
&LeaseStatusTracker::GetBundlesToSchedule() const {
return bundles_to_schedule_;

View file

@ -64,6 +64,8 @@ class GcsPlacementGroupSchedulerInterface {
const NodeID &node_id) = 0;
/// Destroy bundle resources from all nodes in the placement group.
///
/// \param placement_group_id The id of the placement group to be destroyed.
virtual void DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) = 0;
@ -168,8 +170,10 @@ enum class LeasingState {
/// status.
class LeaseStatusTracker {
public:
LeaseStatusTracker(std::shared_ptr<GcsPlacementGroup> placement_group,
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles);
LeaseStatusTracker(
std::shared_ptr<GcsPlacementGroup> placement_group,
const std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles,
const ScheduleMap &schedule_map);
~LeaseStatusTracker() = default;
/// Indicate the tracker that prepare requests are sent to a specific node.
@ -239,6 +243,16 @@ class LeaseStatusTracker {
/// \return Location of bundles that failed to commit resources on a node.
const std::shared_ptr<BundleLocations> &GetUnCommittedBundleLocations() const;
/// This method returns bundle locations that success to commit resources.
///
/// \return Location of bundles that success to commit resources on a node.
const std::shared_ptr<BundleLocations> &GetCommittedBundleLocations() const;
/// This method returns bundle locations.
///
/// \return Location of bundles.
const std::shared_ptr<BundleLocations> &GetBundleLocations() const;
/// Return the leasing state.
///
/// \return Leasing state.
@ -276,6 +290,9 @@ class LeaseStatusTracker {
/// Location of bundles that commit requests failed.
std::shared_ptr<BundleLocations> uncommitted_bundle_locations_;
/// Location of bundles that committed requests success.
std::shared_ptr<BundleLocations> committed_bundle_locations_;
/// The leasing stage. This is used to know the state of current leasing context.
LeasingState leasing_state_ = LeasingState::PREPARING;
@ -288,6 +305,9 @@ class LeaseStatusTracker {
/// Bundles to schedule.
std::vector<std::shared_ptr<BundleSpecification>> bundles_to_schedule_;
/// Location of bundles.
std::shared_ptr<BundleLocations> bundle_locations_;
};
/// A data structure that helps fast bundle location lookup.
@ -492,14 +512,20 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
void DestroyPlacementGroupCommittedBundleResources(
const PlacementGroupID &placement_group_id);
/// Acquire the bundle resources from the cluster resources.
void AcquireBundleResources(const std::shared_ptr<BundleLocations> &bundle_locations);
/// Return the bundle resources to the cluster resources.
void ReturnBundleResources(const std::shared_ptr<BundleLocations> &bundle_locations);
/// Generate schedule context.
std::unique_ptr<ScheduleContext> GetScheduleContext(
const PlacementGroupID &placement_group_id);
/// A timer that ticks every cancel resource failure milliseconds.
boost::asio::deadline_timer return_timer_;
/// Used to update placement group information upon creation, deletion, etc.
/// Used to update placement group information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Reference of GcsNodeManager.

View file

@ -34,7 +34,6 @@ void GcsResourceManager::RemoveResources(const NodeID &node_id) {
bool GcsResourceManager::AcquireResources(const NodeID &node_id,
const ResourceSet &required_resources) {
auto iter = cluster_resources_.find(node_id);
//
RAY_CHECK(iter != cluster_resources_.end()) << "Node " << node_id << " not exist.";
if (!required_resources.IsSubset(iter->second)) {
return false;