From 7f91cfedd51cef0db6382b858391a58406251d6d Mon Sep 17 00:00:00 2001 From: "DK.Pino" Date: Thu, 17 Jun 2021 12:48:39 +0800 Subject: [PATCH] [Placement Group] Support infeasible placement groups for Placement Group. (#16188) * init * update comment * update logical * ut failing * compile passing * add ut * lint * fix comment * lint * fix ut and typo * fix ut and typo * lint * typo --- python/ray/tests/test_multi_node_2.py | 2 +- .../gcs_server/gcs_placement_group_manager.cc | 84 +++++++++++++---- .../gcs_server/gcs_placement_group_manager.h | 13 ++- .../gcs_placement_group_scheduler.cc | 91 ++++++++++--------- .../gcs_placement_group_scheduler.h | 80 ++++++++-------- .../gcs/gcs_server/gcs_resource_scheduler.cc | 86 +++++++++++------- .../gcs/gcs_server/gcs_resource_scheduler.h | 52 +++++++---- src/ray/gcs/gcs_server/gcs_server.cc | 2 +- .../test/gcs_placement_group_manager_test.cc | 35 ++++++- .../gcs_placement_group_scheduler_test.cc | 70 ++++++++------ .../test/gcs_resource_scheduler_test.cc | 55 ++++++++++- 11 files changed, 380 insertions(+), 190 deletions(-) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 6c7184d31..96fb06f7b 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -105,7 +105,7 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): strategy = "STRICT_PACK" pg = placement_group(pg_demands, strategy=strategy) pg.ready() - time.sleep(2) # wait for placemnt groups to propogate. + time.sleep(2) # wait for placement groups to propagate. # Disable event clearing for test. monitor.event_summarizer.clear = lambda *a: None diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index a9c306966..177801c5c 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -221,27 +221,33 @@ PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName( } void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( - std::shared_ptr placement_group) { + std::shared_ptr placement_group, bool is_feasible) { RAY_LOG(DEBUG) << "Failed to create placement group " << placement_group->GetName() << ", id: " << placement_group->GetPlacementGroupID() << ", try again."; - // We will attempt to schedule this placement_group once an eligible node is - // registered. - auto state = placement_group->GetState(); - RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING || - state == rpc::PlacementGroupTableData::PENDING || - state == rpc::PlacementGroupTableData::REMOVED) - << "State: " << state; - if (state == rpc::PlacementGroupTableData::RESCHEDULING) { - // NOTE: If a node is dead, the placement group scheduler should try to recover the - // group by rescheduling the bundles of the dead node. This should have higher - // priority than trying to place other placement groups. - pending_placement_groups_.emplace_front(std::move(placement_group)); - } else { - pending_placement_groups_.emplace_back(std::move(placement_group)); - } - MarkSchedulingDone(); - RetryCreatingPlacementGroup(); + if (!is_feasible) { + // We will attempt to schedule this placement_group once an eligible node is + // registered. + infeasible_placement_groups_.emplace_back(std::move(placement_group)); + MarkSchedulingDone(); + } else { + auto state = placement_group->GetState(); + RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING || + state == rpc::PlacementGroupTableData::PENDING || + state == rpc::PlacementGroupTableData::REMOVED) + << "State: " << state; + if (state == rpc::PlacementGroupTableData::RESCHEDULING) { + // NOTE: If a node is dead, the placement group scheduler should try to recover the + // group by rescheduling the bundles of the dead node. This should have higher + // priority than trying to place other placement groups. + pending_placement_groups_.emplace_front(std::move(placement_group)); + } else { + pending_placement_groups_.emplace_back(std::move(placement_group)); + } + + MarkSchedulingDone(); + RetryCreatingPlacementGroup(); + } } void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( @@ -284,8 +290,8 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { MarkSchedulingStarted(placement_group_id); gcs_placement_group_scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { - OnPlacementGroupCreationFailed(std::move(placement_group)); + [this](std::shared_ptr placement_group, bool is_insfeasble) { + OnPlacementGroupCreationFailed(std::move(placement_group), is_insfeasble); }, [this](std::shared_ptr placement_group) { OnPlacementGroupCreationSuccess(std::move(placement_group)); @@ -383,6 +389,17 @@ void GcsPlacementGroupManager::RemovePlacementGroup( pending_placement_groups_.erase(pending_it); } + // Remove a placement group from infeasible queue if exists. + pending_it = std::find_if( + infeasible_placement_groups_.begin(), infeasible_placement_groups_.end(), + [placement_group_id](const std::shared_ptr &placement_group) { + return placement_group->GetPlacementGroupID() == placement_group_id; + }); + if (pending_it != infeasible_placement_groups_.end()) { + // The placement group is infeasible now, remove it from the queue. + infeasible_placement_groups_.erase(pending_it); + } + // Flush the status and respond to workers. placement_group->UpdateState(rpc::PlacementGroupTableData::REMOVED); RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( @@ -570,6 +587,22 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) { SchedulePendingPlacementGroups(); } +void GcsPlacementGroupManager::OnNodeAdd(const NodeID &node_id) { + RAY_LOG(INFO) + << "A new node: " << node_id + << " registered, will try to reschedule all the infeasible placement groups."; + + // Move all the infeasible placement groups to the pending queue so that we can + // reschedule them. + if (infeasible_placement_groups_.size() > 0) { + auto end_it = pending_placement_groups_.end(); + pending_placement_groups_.insert(end_it, infeasible_placement_groups_.cbegin(), + infeasible_placement_groups_.cend()); + infeasible_placement_groups_.clear(); + } + SchedulePendingPlacementGroups(); +} + void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead( const JobID &job_id) { for (const auto &it : registered_placement_groups_) { @@ -620,6 +653,17 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() { break; } } + // NOTE: Infeasible placement groups also belong to the pending queue when report + // metrics. + for (const auto &pending_pg_spec : infeasible_placement_groups_) { + auto placement_group_data = placement_group_load->add_placement_group_data(); + auto placement_group_table_data = pending_pg_spec->GetPlacementGroupTableData(); + placement_group_data->Swap(&placement_group_table_data); + total_cnt += 1; + if (total_cnt >= RayConfig::instance().max_placement_group_load_report_size()) { + break; + } + } gcs_resource_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index b76f4bd00..bc3407fd8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -207,7 +207,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// an placement_group creation task is infeasible. /// /// \param placement_group The placement_group whose creation task is infeasible. - void OnPlacementGroupCreationFailed(std::shared_ptr placement_group); + /// \param is_feasible whether the scheduler can be retry or not currently. + void OnPlacementGroupCreationFailed(std::shared_ptr placement_group, + bool is_feasible = true); /// Handle placement_group creation task success. This should be called when the /// placement_group creation task has been scheduled successfully. @@ -226,6 +228,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param node_id The specified node id. void OnNodeDead(const NodeID &node_id); + /// Handle a node register. This will try to reschedule all the infeasible + /// placement groups. + /// + /// \param node_id The specified node id. + void OnNodeAdd(const NodeID &node_id); + /// Clean placement group that belongs to the job id if necessary. /// /// This interface is a part of automatic lifecycle management for placement groups. @@ -321,6 +329,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// `std::priority_queue`. std::deque> pending_placement_groups_; + /// The infeasible placement_groups that can't be scheduled currently. + std::deque> infeasible_placement_groups_; + /// The scheduler to schedule all registered placement_groups. std::shared_ptr gcs_placement_group_scheduler_; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 7071f86fe..c2ca3c3c8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -47,55 +47,58 @@ std::vector GcsScheduleStrategy::GetRequiredResourcesFromBundles( return required_resources; } -ScheduleMap GcsScheduleStrategy::GenerateScheduleMap( +ScheduleResult GcsScheduleStrategy::GenerateScheduleResult( const std::vector> &bundles, - const std::vector &selected_nodes) { + const std::vector &selected_nodes, const SchedulingResultStatus &status) { ScheduleMap schedule_map; - if (!selected_nodes.empty()) { + if (status == SUCCESS && !selected_nodes.empty()) { RAY_CHECK(bundles.size() == selected_nodes.size()); int index = 0; for (const auto &bundle : bundles) { schedule_map[bundle->BundleId()] = selected_nodes[index++]; } } - return schedule_map; + return std::make_pair(status, schedule_map); } -ScheduleMap GcsStrictPackStrategy::Schedule( - std::vector> &bundles, +ScheduleResult GcsStrictPackStrategy::Schedule( + const std::vector> &bundles, const std::unique_ptr &context, GcsResourceScheduler &gcs_resource_scheduler) { const auto &required_resources = GetRequiredResourcesFromBundles(bundles); - const auto &selected_nodes = + const auto &scheduling_result = gcs_resource_scheduler.Schedule(required_resources, SchedulingType::STRICT_PACK); - return GenerateScheduleMap(bundles, selected_nodes); + return GenerateScheduleResult(bundles, scheduling_result.second, + scheduling_result.first); } -ScheduleMap GcsPackStrategy::Schedule( - std::vector> &bundles, +ScheduleResult GcsPackStrategy::Schedule( + const std::vector> &bundles, const std::unique_ptr &context, GcsResourceScheduler &gcs_resource_scheduler) { // The current algorithm is to select a node and deploy as many bundles as possible. // First fill up a node. If the node resource is insufficient, select a new node. // TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop. const auto &required_resources = GetRequiredResourcesFromBundles(bundles); - const auto &selected_nodes = + const auto &scheduling_result = gcs_resource_scheduler.Schedule(required_resources, SchedulingType::PACK); - return GenerateScheduleMap(bundles, selected_nodes); + return GenerateScheduleResult(bundles, scheduling_result.second, + scheduling_result.first); } -ScheduleMap GcsSpreadStrategy::Schedule( - std::vector> &bundles, +ScheduleResult GcsSpreadStrategy::Schedule( + const std::vector> &bundles, const std::unique_ptr &context, GcsResourceScheduler &gcs_resource_scheduler) { const auto &required_resources = GetRequiredResourcesFromBundles(bundles); - const auto &selected_nodes = + const auto &scheduling_result = gcs_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD); - return GenerateScheduleMap(bundles, selected_nodes); + return GenerateScheduleResult(bundles, scheduling_result.second, + scheduling_result.first); } -ScheduleMap GcsStrictSpreadStrategy::Schedule( - std::vector> &bundles, +ScheduleResult GcsStrictSpreadStrategy::Schedule( + const std::vector> &bundles, const std::unique_ptr &context, GcsResourceScheduler &gcs_resource_scheduler) { // TODO(ffbin): A bundle may require special resources, such as GPU. We need to @@ -112,18 +115,19 @@ ScheduleMap GcsStrictSpreadStrategy::Schedule( } const auto &required_resources = GetRequiredResourcesFromBundles(bundles); - const auto &selected_nodes = gcs_resource_scheduler.Schedule( + const auto &scheduling_result = gcs_resource_scheduler.Schedule( required_resources, SchedulingType::STRICT_SPREAD, /*node_filter_func=*/[&nodes_in_use](const NodeID &node_id) { return nodes_in_use.count(node_id) == 0; }); - return GenerateScheduleMap(bundles, selected_nodes); + return GenerateScheduleResult(bundles, scheduling_result.second, + scheduling_result.first); } void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( std::shared_ptr placement_group, - std::function)> failure_callback, - std::function)> success_callback) { + PGSchedulingFailureCallback failure_callback, + PGSchedulingSuccessfulCallback success_callback) { // We need to ensure that the PrepareBundleResources won't be sent before the reply of // ReleaseUnusedBundles is returned. if (!nodes_of_releasing_unused_bundles_.empty()) { @@ -131,26 +135,29 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( << ", id: " << placement_group->GetPlacementGroupID() << ", because " << nodes_of_releasing_unused_bundles_.size() << " nodes have not released unused bundles."; - failure_callback(placement_group); + failure_callback(placement_group, true); return; } - auto bundles = placement_group->GetUnplacedBundles(); - auto strategy = placement_group->GetStrategy(); + const auto &bundles = placement_group->GetUnplacedBundles(); + const auto &strategy = placement_group->GetStrategy(); RAY_LOG(DEBUG) << "Scheduling placement group " << placement_group->GetName() << ", id: " << placement_group->GetPlacementGroupID() << ", bundles size = " << bundles.size(); - auto selected_nodes = scheduler_strategies_[strategy]->Schedule( + auto scheduling_result = scheduler_strategies_[strategy]->Schedule( bundles, GetScheduleContext(placement_group->GetPlacementGroupID()), gcs_resource_scheduler_); - // If no nodes are available, scheduling fails. - if (selected_nodes.empty()) { + auto result_status = scheduling_result.first; + auto selected_nodes = scheduling_result.second; + + if (result_status != SUCCESS) { RAY_LOG(DEBUG) << "Failed to schedule placement group " << placement_group->GetName() << ", id: " << placement_group->GetPlacementGroupID() - << ", because no nodes are available."; - failure_callback(placement_group); + << ", because current reource can't satisfied this required resource."; + const bool &retryable = (result_status == FAILED) ? true : false; + failure_callback(placement_group, retryable); return; } @@ -296,10 +303,8 @@ GcsPlacementGroupScheduler::GetLeaseClientFromNode( void GcsPlacementGroupScheduler::CommitAllBundles( const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler) { + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler) { const std::shared_ptr &prepared_bundle_locations = lease_status_tracker->GetPreparedBundleLocations(); lease_status_tracker->MarkCommitPhaseStarted(); @@ -330,10 +335,8 @@ void GcsPlacementGroupScheduler::CommitAllBundles( void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler) { + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler) { RAY_CHECK(lease_status_tracker->AllPrepareRequestsReturned()) << "This method can be called only after all bundle scheduling requests are " "returned."; @@ -353,7 +356,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( RAY_CHECK(it != placement_group_leasing_in_progress_.end()); placement_group_leasing_in_progress_.erase(it); ReturnBundleResources(lease_status_tracker->GetBundleLocations()); - schedule_failure_handler(placement_group); + schedule_failure_handler(placement_group, true); return; } @@ -383,10 +386,8 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned( const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler) { + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler) { const auto &placement_group = lease_status_tracker->GetPlacementGroup(); const auto &prepared_bundle_locations = lease_status_tracker->GetPreparedBundleLocations(); @@ -410,7 +411,7 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned( if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) { DestroyPlacementGroupCommittedBundleResources(placement_group_id); ReturnBundleResources(lease_status_tracker->GetBundleLocations()); - schedule_failure_handler(placement_group); + schedule_failure_handler(placement_group, true); return; } @@ -429,7 +430,7 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned( } placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); ReturnBundleResources(uncommitted_bundle_locations); - schedule_failure_handler(placement_group); + schedule_failure_handler(placement_group, true); } else { schedule_success_handler(placement_group); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 1e768a4f9..bdfee4276 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -31,9 +31,16 @@ namespace ray { namespace gcs { +class GcsPlacementGroup; + using ReserveResourceClientFactoryFn = std::function(const rpc::Address &address)>; +using PGSchedulingFailureCallback = + std::function, bool)>; +using PGSchedulingSuccessfulCallback = + std::function)>; + struct pair_hash { template std::size_t operator()(const std::pair &pair) const { @@ -41,12 +48,11 @@ struct pair_hash { } }; using ScheduleMap = std::unordered_map; +using ScheduleResult = std::pair; using BundleLocations = absl::flat_hash_map>, pair_hash>; -class GcsPlacementGroup; - class GcsPlacementGroupSchedulerInterface { public: /// Schedule unplaced bundles of the specified placement group. @@ -56,8 +62,8 @@ class GcsPlacementGroupSchedulerInterface { /// \param success_callback This function is called if the schedule is successful. virtual void ScheduleUnplacedBundles( std::shared_ptr placement_group, - std::function)> failure_callback, - std::function)> success_callback) = 0; + PGSchedulingFailureCallback failure_callback, + PGSchedulingSuccessfulCallback success_callback) = 0; /// Get bundles belong to the specified node. /// @@ -105,8 +111,8 @@ class ScheduleContext { class GcsScheduleStrategy { public: virtual ~GcsScheduleStrategy() {} - virtual ScheduleMap Schedule( - std::vector> &bundles, + virtual ScheduleResult Schedule( + const std::vector> &bundles, const std::unique_ptr &context, GcsResourceScheduler &gcs_resource_scheduler) = 0; @@ -118,14 +124,15 @@ class GcsScheduleStrategy { std::vector GetRequiredResourcesFromBundles( const std::vector> &bundles); - /// Generate `ScheduleMap` from bundles and nodes . + /// Generate `ScheduleResult` from bundles and nodes . /// /// \param bundles Bundles to be scheduled. /// \param selected_nodes selected_nodes to be scheduled. - /// \return Required resources. - ScheduleMap GenerateScheduleMap( + /// \param status Status of the scheduling result. + /// \return The scheduling result from the required resource. + ScheduleResult GenerateScheduleResult( const std::vector> &bundles, - const std::vector &selected_nodes); + const std::vector &selected_nodes, const SchedulingResultStatus &status); }; /// The `GcsPackStrategy` is that pack all bundles in one node as much as possible. @@ -133,26 +140,29 @@ class GcsScheduleStrategy { /// nodes. class GcsPackStrategy : public GcsScheduleStrategy { public: - ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context, - GcsResourceScheduler &gcs_resource_scheduler) override; + ScheduleResult Schedule( + const std::vector> &bundles, + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsSpreadStrategy` is that spread all bundles in different nodes. class GcsSpreadStrategy : public GcsScheduleStrategy { public: - ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context, - GcsResourceScheduler &gcs_resource_scheduler) override; + ScheduleResult Schedule( + const std::vector> &bundles, + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one /// node does not have enough resources, it will fail to schedule. class GcsStrictPackStrategy : public GcsScheduleStrategy { public: - ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context, - GcsResourceScheduler &gcs_resource_scheduler) override; + ScheduleResult Schedule( + const std::vector> &bundles, + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes. @@ -160,9 +170,10 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy { /// If the node resource is insufficient, it will fail to schedule. class GcsStrictSpreadStrategy : public GcsScheduleStrategy { public: - ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context, - GcsResourceScheduler &gcs_resource_scheduler) override; + ScheduleResult Schedule( + const std::vector> &bundles, + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; enum class LeasingState { @@ -407,10 +418,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param placement_group to be scheduled. /// \param failure_callback This function is called if the schedule is failed. /// \param success_callback This function is called if the schedule is successful. - void ScheduleUnplacedBundles( - std::shared_ptr placement_group, - std::function)> failure_handler, - std::function)> success_handler) override; + void ScheduleUnplacedBundles(std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_handler, + PGSchedulingSuccessfulCallback success_handler) override; /// Destroy the actual bundle resources or locked resources (for 2PC) /// on all nodes associated with this placement group. @@ -485,25 +495,19 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Called when all prepare requests are returned from nodes. void OnAllBundlePrepareRequestReturned( const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler); + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler); /// Called when all commit requests are returned from nodes. void OnAllBundleCommitRequestReturned( const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler); + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler); /// Commit all bundles recorded in lease status tracker. void CommitAllBundles(const std::shared_ptr &lease_status_tracker, - const std::function)> - &schedule_failure_handler, - const std::function)> - &schedule_success_handler); + const PGSchedulingFailureCallback &schedule_failure_handler, + const PGSchedulingSuccessfulCallback &schedule_success_handler); /// Destroy the prepared bundle resources with this placement group. /// The method is idempotent, meaning if all bundles are already cancelled, diff --git a/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc b/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc index 2c1d49883..ea211b292 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc @@ -54,7 +54,7 @@ double LeastResourceScorer::Calculate(const FixedPoint &requested, ///////////////////////////////////////////////////////////////////////////////////////// -std::vector GcsResourceScheduler::Schedule( +SchedulingResult GcsResourceScheduler::Schedule( const std::vector &required_resources_list, const SchedulingType &scheduling_type, const std::function &node_filter_func) { @@ -65,7 +65,7 @@ std::vector GcsResourceScheduler::Schedule( FilterCandidateNodes(cluster_resources, node_filter_func); if (candidate_nodes.empty()) { RAY_LOG(DEBUG) << "The candidate nodes is empty, return directly."; - return {}; + return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector()); } // First schedule scarce resources (such as GPU) and large capacity resources to improve @@ -73,7 +73,7 @@ std::vector GcsResourceScheduler::Schedule( const auto &to_schedule_resources = SortRequiredResources(required_resources_list); // Score and rank nodes. - std::vector result; + SchedulingResult result; switch (scheduling_type) { case SPREAD: result = SpreadSchedule(to_schedule_resources, candidate_nodes); @@ -115,18 +115,18 @@ const std::vector &GcsResourceScheduler::SortRequiredResources( return required_resources; } -std::vector GcsResourceScheduler::StrictSpreadSchedule( +SchedulingResult GcsResourceScheduler::StrictSpreadSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes) { - std::vector result; if (required_resources_list.size() > candidate_nodes.size()) { RAY_LOG(DEBUG) << "The number of required resources " << required_resources_list.size() << " is greater than the number of candidate nodes " << candidate_nodes.size() << ", scheduling fails."; - return result; + return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector()); } + std::vector result_nodes; absl::flat_hash_set candidate_nodes_copy(candidate_nodes); for (const auto &iter : required_resources_list) { // Score and sort nodes. @@ -135,7 +135,7 @@ std::vector GcsResourceScheduler::StrictSpreadSchedule( // There are nodes to meet the scheduling requirements. if (!node_scores.empty() && node_scores.front().second >= 0) { const auto &highest_score_node_id = node_scores.front().first; - result.push_back(highest_score_node_id); + result_nodes.push_back(highest_score_node_id); candidate_nodes_copy.erase(highest_score_node_id); } else { // There is no node to meet the scheduling requirements. @@ -143,17 +143,17 @@ std::vector GcsResourceScheduler::StrictSpreadSchedule( } } - if (result.size() != required_resources_list.size()) { - // Unable to meet the resources required for scheduling, scheduling failed. - result.clear(); + if (result_nodes.size() != required_resources_list.size()) { + // Can't meet the scheduling requirements temporarily. + return std::make_pair(SchedulingResultStatus::FAILED, std::vector()); } - return result; + return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes); } -std::vector GcsResourceScheduler::SpreadSchedule( +SchedulingResult GcsResourceScheduler::SpreadSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes) { - std::vector result; + std::vector result_nodes; absl::flat_hash_set candidate_nodes_copy(candidate_nodes); absl::flat_hash_set selected_nodes; for (const auto &iter : required_resources_list) { @@ -163,7 +163,7 @@ std::vector GcsResourceScheduler::SpreadSchedule( // There are nodes to meet the scheduling requirements. if (!node_scores.empty() && node_scores.front().second >= 0) { const auto &highest_score_node_id = node_scores.front().first; - result.push_back(highest_score_node_id); + result_nodes.push_back(highest_score_node_id); RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter)); candidate_nodes_copy.erase(highest_score_node_id); selected_nodes.insert(highest_score_node_id); @@ -172,7 +172,7 @@ std::vector GcsResourceScheduler::SpreadSchedule( const auto &node_scores = ScoreNodes(iter, selected_nodes); if (!node_scores.empty() && node_scores.front().second >= 0) { const auto &highest_score_node_id = node_scores.front().first; - result.push_back(highest_score_node_id); + result_nodes.push_back(highest_score_node_id); RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter)); } else { break; @@ -181,26 +181,39 @@ std::vector GcsResourceScheduler::SpreadSchedule( } // Releasing the resources temporarily deducted from `gcs_resource_manager_`. - ReleaseTemporarilyDeductedResources(required_resources_list, result); + ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes); - if (result.size() != required_resources_list.size()) { - // Unable to meet the resources required for scheduling, scheduling failed. - result.clear(); + if (result_nodes.size() != required_resources_list.size()) { + // Can't meet the scheduling requirements temporarily. + return std::make_pair(SchedulingResultStatus::FAILED, std::vector()); } - return result; + return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes); } -std::vector GcsResourceScheduler::StrictPackSchedule( +SchedulingResult GcsResourceScheduler::StrictPackSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes) { - std::vector result; - // Aggregate required resources. ResourceSet required_resources; for (const auto &iter : required_resources_list) { required_resources.AddResources(iter); } + const auto &cluster_resource = gcs_resource_manager_.GetClusterResources(); + + const auto &right_node_it = std::find_if( + cluster_resource.begin(), cluster_resource.end(), + [required_resources](const auto &node_resource) { + return required_resources.IsSubset(node_resource.second.GetTotalResources()); + }); + + if (right_node_it == cluster_resource.end()) { + RAY_LOG(DEBUG) << "The required resource is bigger than the maximum resource in the " + "whole cluster, schedule failed."; + return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector()); + } + + std::vector result_nodes; // Score and sort nodes. const auto &node_scores = ScoreNodes(required_resources, candidate_nodes); @@ -209,17 +222,22 @@ std::vector GcsResourceScheduler::StrictPackSchedule( // only schedules to a node and triggers rescheduling when node dead. if (!node_scores.empty() && node_scores.front().second >= 0) { for (int index = 0; index < (int)required_resources_list.size(); ++index) { - result.push_back(node_scores.front().first); + result_nodes.push_back(node_scores.front().first); } } - return result; + if (result_nodes.empty()) { + // Can't meet the scheduling requirements temporarily. + return std::make_pair(SchedulingResultStatus::FAILED, std::vector()); + } + + return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes); } -std::vector GcsResourceScheduler::PackSchedule( +SchedulingResult GcsResourceScheduler::PackSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes) { - std::vector result; - result.resize(required_resources_list.size()); + std::vector result_nodes; + result_nodes.resize(required_resources_list.size()); absl::flat_hash_set candidate_nodes_copy(candidate_nodes); std::list> required_resources_list_copy; int index = 0; @@ -240,14 +258,14 @@ std::vector GcsResourceScheduler::PackSchedule( const auto &highest_score_node_id = node_scores.front().first; RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, required_resources)); - result[required_resources_index] = highest_score_node_id; + result_nodes[required_resources_index] = highest_score_node_id; required_resources_list_copy.pop_front(); // We try to schedule more resources on one node. for (auto iter = required_resources_list_copy.begin(); iter != required_resources_list_copy.end();) { if (gcs_resource_manager_.AcquireResources(highest_score_node_id, iter->second)) { - result[iter->first] = highest_score_node_id; + result_nodes[iter->first] = highest_score_node_id; required_resources_list_copy.erase(iter++); } else { ++iter; @@ -257,13 +275,13 @@ std::vector GcsResourceScheduler::PackSchedule( } // Releasing the resources temporarily deducted from `gcs_resource_manager_`. - ReleaseTemporarilyDeductedResources(required_resources_list, result); + ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes); if (!required_resources_list_copy.empty()) { - // Unable to meet the resources required for scheduling, scheduling failed. - result.clear(); + // Can't meet the scheduling requirements temporarily. + return std::make_pair(SchedulingResultStatus::FAILED, std::vector()); } - return result; + return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes); } std::list GcsResourceScheduler::ScoreNodes( diff --git a/src/ray/gcs/gcs_server/gcs_resource_scheduler.h b/src/ray/gcs/gcs_server/gcs_resource_scheduler.h index 4d8fd264c..4d78fa1d6 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_resource_scheduler.h @@ -35,7 +35,18 @@ enum SchedulingType { SchedulingType_MAX = 4, }; +// Status of resource scheduling result. +enum SchedulingResultStatus { + // Scheduling failed but retryable. + FAILED = 0, + // Scheduling failed and non-retryable. + INFEASIBLE = 1, + // Scheduling successful. + SUCCESS = 2, +}; + typedef std::pair NodeScore; +typedef std::pair> SchedulingResult; /// NodeScorer is a scorer to make a grade to the node, which is used for scheduling /// decision. @@ -86,9 +97,10 @@ class GcsResourceScheduler { /// \param node_filter_func This function is used to filter candidate nodes. If a node /// returns true, it can be used for scheduling. By default, all nodes in the cluster /// can be used for scheduling. - /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one - /// by one. If the scheduling fails, an empty vector is returned. - std::vector Schedule( + /// \return `SchedulingResult`, including the selected nodes if schedule successful, + /// otherwise, it will return an empty vector and a flag to indicate whether this + /// request can be retry or not. + SchedulingResult Schedule( const std::vector &required_resources_list, const SchedulingType &scheduling_type, const std::function &node_filter_func = nullptr); @@ -118,9 +130,10 @@ class GcsResourceScheduler { /// /// \param required_resources_list The resources to be scheduled. /// \param candidate_nodes The nodes can be used for scheduling. - /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one - /// by one. If the scheduling fails, an empty vector is returned. - std::vector StrictSpreadSchedule( + /// \return `SchedulingResult`, including the selected nodes if schedule successful, + /// otherwise, it will return an empty vector and a flag to indicate whether this + /// request can be retry or not. + SchedulingResult StrictSpreadSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes); @@ -128,19 +141,20 @@ class GcsResourceScheduler { /// /// \param required_resources_list The resources to be scheduled. /// \param candidate_nodes The nodes can be used for scheduling. - /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one - /// by one. If the scheduling fails, an empty vector is returned. - std::vector SpreadSchedule( - const std::vector &required_resources_list, - const absl::flat_hash_set &candidate_nodes); + /// \return `SchedulingResult`, including the selected nodes if schedule successful, + /// otherwise, it will return an empty vector and a flag to indicate whether this + /// request can be retry or not. + SchedulingResult SpreadSchedule(const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); /// Schedule resources according to `STRICT_PACK` strategy. /// /// \param required_resources_list The resources to be scheduled. /// \param candidate_nodes The nodes can be used for scheduling. - /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one - /// by one. If the scheduling fails, an empty vector is returned. - std::vector StrictPackSchedule( + /// \return `SchedulingResult`, including the selected nodes if schedule successful, + /// otherwise, it will return an empty vector and a flag to indicate whether this + /// request can be retry or not. + SchedulingResult StrictPackSchedule( const std::vector &required_resources_list, const absl::flat_hash_set &candidate_nodes); @@ -148,11 +162,11 @@ class GcsResourceScheduler { /// /// \param required_resources_list The resources to be scheduled. /// \param candidate_nodes The nodes can be used for scheduling. - /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one - /// by one. If the scheduling fails, an empty vector is returned. - std::vector PackSchedule( - const std::vector &required_resources_list, - const absl::flat_hash_set &candidate_nodes); + /// \return `SchedulingResult`, including the selected nodes if schedule successful, + /// otherwise, it will return an empty vector and a flag to indicate whether this + /// request can be retry or not. + SchedulingResult PackSchedule(const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); /// Score all nodes according to the specified resources. /// diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 72749cd0a..9e0e1d6f3 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -408,7 +408,7 @@ void GcsServer::InstallEventListeners() { // Because a new node has been added, we need to try to schedule the pending // placement groups and the pending actors. gcs_resource_manager_->OnNodeAdd(*node); - gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + gcs_placement_group_manager_->OnNodeAdd(NodeID::FromBinary(node->node_id())); gcs_actor_manager_->SchedulePendingActors(); gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id())); if (config_.pull_based_resource_reporting) { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index d5b421b11..c5c2ca2c5 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -30,7 +30,7 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf void ScheduleUnplacedBundles( std::shared_ptr placement_group, - std::function)> failure_handler, + std::function, bool)> failure_handler, std::function)> success_handler) override { absl::MutexLock lock(&mutex_); @@ -510,6 +510,39 @@ TEST_F(GcsPlacementGroupManagerTest, gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id); } +TEST_F(GcsPlacementGroupManagerTest, TestSchedulingCanceledWhenPgIsInfeasible) { + auto request = Mocker::GenCreatePlacementGroupRequest(); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, + [®istered_placement_group_count](const Status &status) { + ++registered_placement_group_count; + }); + + ASSERT_EQ(registered_placement_group_count, 1); + WaitForExpectedPgCount(1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.clear(); + + // Mark it non-retryable. + gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group, false); + + // Schedule twice to make sure it will not be scheduled afterward. + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + + // Add a node and make sure it will reschedule the infeasible placement group. + const auto &node_id = NodeID::FromRandom(); + gcs_placement_group_manager_->OnNodeAdd(node_id); + + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + mock_placement_group_scheduler_->placement_groups_.clear(); + + OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); +} + TEST_F(GcsPlacementGroupManagerTest, TestRayNamespace) { auto request1 = Mocker::GenCreatePlacementGroupRequest("test_name"); job_namespace_table_[JobID::FromInt(11)] = "another_namespace"; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 83f62c4ee..ec4c7d781 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -107,8 +107,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { const auto &node_id = NodeID::FromBinary(node->node_id()); std::unordered_map resource_map; resource_map["CPU"] = cpu_num; - ResourceSet resources(resource_map); - gcs_resource_manager_->SetAvailableResources(node_id, resources); + gcs_resource_manager_->UpdateResourceCapacity(node_id, resource_map); } void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) { @@ -119,7 +118,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { // Schedule the placement_group with zero node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -148,7 +148,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -171,11 +172,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { void ReschedulingWhenNodeAddTest(rpc::PlacementStrategy strategy) { AddNode(Mocker::GenNodeInfo(0), 1); - auto failure_handler = - [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&placement_group_requests_mutex_); - failure_placement_groups_.emplace_back(std::move(placement_group)); - }; + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { + absl::MutexLock lock(&placement_group_requests_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; auto success_handler = [this](std::shared_ptr placement_group) { absl::MutexLock lock(&placement_group_requests_mutex_); @@ -264,7 +265,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) { // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -288,7 +290,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) { TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadStrategyResourceCheck) { auto node = Mocker::GenNodeInfo(0); AddNode(node, 2); - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -322,7 +325,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -347,7 +351,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) { AddNode(Mocker::GenNodeInfo(0)); AddNode(Mocker::GenNodeInfo(1)); - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -394,7 +399,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyReschedulingWhenNod TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { auto node0 = Mocker::GenNodeInfo(0); AddNode(node0); - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -443,7 +449,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -482,7 +489,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -514,7 +522,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) { // send to the node. scheduler_->ScheduleUnplacedBundles( placement_group, - [this](std::shared_ptr placement_group) { + [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }, @@ -542,7 +551,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyReschedulingWhenNodeAdd) TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { AddNode(Mocker::GenNodeInfo(0)); AddNode(Mocker::GenNodeInfo(1)); - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -593,7 +603,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadRescheduleWhenNodeDead) { std::make_shared(create_placement_group_request, ""); // Schedule the placement group successfully. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -659,7 +670,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadRescheduleWhenNodeDead) { TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) { auto node0 = Mocker::GenNodeInfo(0); AddNode(node0); - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -783,7 +795,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringPreparingResources) { // Schedule the placement group. // One node is dead, so one bundle failed to schedule. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 2); failure_placement_groups_.emplace_back(std::move(placement_group)); @@ -819,7 +832,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, // Schedule the placement group. // One node is dead, so one bundle failed to schedule. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 1); failure_placement_groups_.emplace_back(std::move(placement_group)); @@ -859,7 +873,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringCommittingResources) { // Schedule the placement group. // One node is dead, so one bundle failed to schedule. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 2); failure_placement_groups_.emplace_back(std::move(placement_group)); @@ -893,7 +908,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringRescheduling) { std::make_shared(create_placement_group_request, ""); // Schedule the placement group successfully. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -947,7 +963,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommit) std::make_shared(create_placement_group_request, ""); // Schedule the placement group successfully. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; @@ -1003,7 +1020,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommitPr std::make_shared(create_placement_group_request, ""); // Schedule the placement group successfully. - auto failure_handler = [this](std::shared_ptr placement_group) { + auto failure_handler = [this](std::shared_ptr placement_group, + bool is_insfeasble) { absl::MutexLock lock(&placement_group_requests_mutex_); failure_placement_groups_.emplace_back(std::move(placement_group)); }; diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc index 4e3d72092..393183a28 100644 --- a/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc @@ -70,7 +70,8 @@ class GcsResourceSchedulerTest : public ::testing::Test { } const auto &result1 = gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type); - ASSERT_EQ(result1.size(), 3); + ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::SUCCESS); + ASSERT_EQ(result1.second.size(), 3); // Check for resource leaks. CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num); @@ -80,7 +81,8 @@ class GcsResourceSchedulerTest : public ::testing::Test { required_resources_list.emplace_back(resource_map); const auto &result2 = gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type); - ASSERT_EQ(result2.size(), 0); + ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::FAILED); + ASSERT_EQ(result2.second.size(), 0); // Check for resource leaks. CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num); @@ -116,13 +118,58 @@ TEST_F(GcsResourceSchedulerTest, TestNodeFilter) { const auto &result1 = gcs_resource_scheduler_->Schedule( required_resources_list, gcs::SchedulingType::STRICT_SPREAD, [](const NodeID &) { return false; }); - ASSERT_EQ(result1.size(), 0); + ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::INFEASIBLE); + ASSERT_EQ(result1.second.size(), 0); // Scheduling succeeded. const auto &result2 = gcs_resource_scheduler_->Schedule( required_resources_list, gcs::SchedulingType::STRICT_SPREAD, [](const NodeID &) { return true; }); - ASSERT_EQ(result2.size(), 1); + ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::SUCCESS); + ASSERT_EQ(result2.second.size(), 1); +} + +TEST_F(GcsResourceSchedulerTest, TestSchedulingResultStatusForStrictStrategy) { + // Init resources with two node. + const auto &node_one_id = NodeID::FromRandom(); + const auto &node_tow_id = NodeID::FromRandom(); + const std::string cpu_resource = "CPU"; + const double node_cpu_num = 10.0; + AddClusterResources(node_one_id, cpu_resource, node_cpu_num); + AddClusterResources(node_tow_id, cpu_resource, node_cpu_num); + + // Mock a request that has three required resources. + std::vector required_resources_list; + std::unordered_map resource_map; + resource_map[cpu_resource] = 1; + for (int node_number = 0; node_number < 3; node_number++) { + required_resources_list.emplace_back(resource_map); + } + + const auto &result1 = gcs_resource_scheduler_->Schedule( + required_resources_list, gcs::SchedulingType::STRICT_SPREAD); + ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::INFEASIBLE); + ASSERT_EQ(result1.second.size(), 0); + + // Check for resource leaks. + CheckClusterAvailableResources(node_one_id, cpu_resource, node_cpu_num); + CheckClusterAvailableResources(node_tow_id, cpu_resource, node_cpu_num); + + // Mock a request that only has one required resource but bigger than the maximum + // resource. + required_resources_list.clear(); + resource_map.clear(); + resource_map[cpu_resource] = 50; + required_resources_list.emplace_back(resource_map); + + const auto &result2 = gcs_resource_scheduler_->Schedule( + required_resources_list, gcs::SchedulingType::STRICT_PACK); + ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::INFEASIBLE); + ASSERT_EQ(result2.second.size(), 0); + + // Check for resource leaks. + CheckClusterAvailableResources(node_one_id, cpu_resource, node_cpu_num); + CheckClusterAvailableResources(node_tow_id, cpu_resource, node_cpu_num); } } // namespace ray