[Placement Group] Support infeasible placement groups for Placement Group. (#16188)

* init

* update comment

* update logical

* ut failing

* compile passing

* add ut

* lint

* fix comment

* lint

* fix ut and typo

* fix ut and typo

* lint

* typo
This commit is contained in:
DK.Pino 2021-06-17 12:48:39 +08:00 committed by GitHub
parent 45357ff590
commit 7f91cfedd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 380 additions and 190 deletions

View file

@ -105,7 +105,7 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
strategy = "STRICT_PACK"
pg = placement_group(pg_demands, strategy=strategy)
pg.ready()
time.sleep(2) # wait for placemnt groups to propogate.
time.sleep(2) # wait for placement groups to propagate.
# Disable event clearing for test.
monitor.event_summarizer.clear = lambda *a: None

View file

@ -221,27 +221,33 @@ PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(
}
void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
std::shared_ptr<GcsPlacementGroup> placement_group) {
std::shared_ptr<GcsPlacementGroup> placement_group, bool is_feasible) {
RAY_LOG(DEBUG) << "Failed to create placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID() << ", try again.";
// We will attempt to schedule this placement_group once an eligible node is
// registered.
auto state = placement_group->GetState();
RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING ||
state == rpc::PlacementGroupTableData::PENDING ||
state == rpc::PlacementGroupTableData::REMOVED)
<< "State: " << state;
if (state == rpc::PlacementGroupTableData::RESCHEDULING) {
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher
// priority than trying to place other placement groups.
pending_placement_groups_.emplace_front(std::move(placement_group));
} else {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
MarkSchedulingDone();
RetryCreatingPlacementGroup();
if (!is_feasible) {
// We will attempt to schedule this placement_group once an eligible node is
// registered.
infeasible_placement_groups_.emplace_back(std::move(placement_group));
MarkSchedulingDone();
} else {
auto state = placement_group->GetState();
RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING ||
state == rpc::PlacementGroupTableData::PENDING ||
state == rpc::PlacementGroupTableData::REMOVED)
<< "State: " << state;
if (state == rpc::PlacementGroupTableData::RESCHEDULING) {
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher
// priority than trying to place other placement groups.
pending_placement_groups_.emplace_front(std::move(placement_group));
} else {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
MarkSchedulingDone();
RetryCreatingPlacementGroup();
}
}
void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
@ -284,8 +290,8 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationFailed(std::move(placement_group));
[this](std::shared_ptr<GcsPlacementGroup> placement_group, bool is_insfeasble) {
OnPlacementGroupCreationFailed(std::move(placement_group), is_insfeasble);
},
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationSuccess(std::move(placement_group));
@ -383,6 +389,17 @@ void GcsPlacementGroupManager::RemovePlacementGroup(
pending_placement_groups_.erase(pending_it);
}
// Remove a placement group from infeasible queue if exists.
pending_it = std::find_if(
infeasible_placement_groups_.begin(), infeasible_placement_groups_.end(),
[placement_group_id](const std::shared_ptr<GcsPlacementGroup> &placement_group) {
return placement_group->GetPlacementGroupID() == placement_group_id;
});
if (pending_it != infeasible_placement_groups_.end()) {
// The placement group is infeasible now, remove it from the queue.
infeasible_placement_groups_.erase(pending_it);
}
// Flush the status and respond to workers.
placement_group->UpdateState(rpc::PlacementGroupTableData::REMOVED);
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
@ -570,6 +587,22 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
SchedulePendingPlacementGroups();
}
void GcsPlacementGroupManager::OnNodeAdd(const NodeID &node_id) {
RAY_LOG(INFO)
<< "A new node: " << node_id
<< " registered, will try to reschedule all the infeasible placement groups.";
// Move all the infeasible placement groups to the pending queue so that we can
// reschedule them.
if (infeasible_placement_groups_.size() > 0) {
auto end_it = pending_placement_groups_.end();
pending_placement_groups_.insert(end_it, infeasible_placement_groups_.cbegin(),
infeasible_placement_groups_.cend());
infeasible_placement_groups_.clear();
}
SchedulePendingPlacementGroups();
}
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
const JobID &job_id) {
for (const auto &it : registered_placement_groups_) {
@ -620,6 +653,17 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() {
break;
}
}
// NOTE: Infeasible placement groups also belong to the pending queue when report
// metrics.
for (const auto &pending_pg_spec : infeasible_placement_groups_) {
auto placement_group_data = placement_group_load->add_placement_group_data();
auto placement_group_table_data = pending_pg_spec->GetPlacementGroupTableData();
placement_group_data->Swap(&placement_group_table_data);
total_cnt += 1;
if (total_cnt >= RayConfig::instance().max_placement_group_load_report_size()) {
break;
}
}
gcs_resource_manager_.UpdatePlacementGroupLoad(move(placement_group_load));
}

View file

@ -207,7 +207,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// an placement_group creation task is infeasible.
///
/// \param placement_group The placement_group whose creation task is infeasible.
void OnPlacementGroupCreationFailed(std::shared_ptr<GcsPlacementGroup> placement_group);
/// \param is_feasible whether the scheduler can be retry or not currently.
void OnPlacementGroupCreationFailed(std::shared_ptr<GcsPlacementGroup> placement_group,
bool is_feasible = true);
/// Handle placement_group creation task success. This should be called when the
/// placement_group creation task has been scheduled successfully.
@ -226,6 +228,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);
/// Handle a node register. This will try to reschedule all the infeasible
/// placement groups.
///
/// \param node_id The specified node id.
void OnNodeAdd(const NodeID &node_id);
/// Clean placement group that belongs to the job id if necessary.
///
/// This interface is a part of automatic lifecycle management for placement groups.
@ -321,6 +329,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// `std::priority_queue`.
std::deque<std::shared_ptr<GcsPlacementGroup>> pending_placement_groups_;
/// The infeasible placement_groups that can't be scheduled currently.
std::deque<std::shared_ptr<GcsPlacementGroup>> infeasible_placement_groups_;
/// The scheduler to schedule all registered placement_groups.
std::shared_ptr<gcs::GcsPlacementGroupSchedulerInterface>
gcs_placement_group_scheduler_;

View file

@ -47,55 +47,58 @@ std::vector<ResourceSet> GcsScheduleStrategy::GetRequiredResourcesFromBundles(
return required_resources;
}
ScheduleMap GcsScheduleStrategy::GenerateScheduleMap(
ScheduleResult GcsScheduleStrategy::GenerateScheduleResult(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes) {
const std::vector<NodeID> &selected_nodes, const SchedulingResultStatus &status) {
ScheduleMap schedule_map;
if (!selected_nodes.empty()) {
if (status == SUCCESS && !selected_nodes.empty()) {
RAY_CHECK(bundles.size() == selected_nodes.size());
int index = 0;
for (const auto &bundle : bundles) {
schedule_map[bundle->BundleId()] = selected_nodes[index++];
}
}
return schedule_map;
return std::make_pair(status, schedule_map);
}
ScheduleMap GcsStrictPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
ScheduleResult GcsStrictPackStrategy::Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::STRICT_PACK);
return GenerateScheduleMap(bundles, selected_nodes);
return GenerateScheduleResult(bundles, scheduling_result.second,
scheduling_result.first);
}
ScheduleMap GcsPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
ScheduleResult GcsPackStrategy::Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
// The current algorithm is to select a node and deploy as many bundles as possible.
// First fill up a node. If the node resource is insufficient, select a new node.
// TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop.
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::PACK);
return GenerateScheduleMap(bundles, selected_nodes);
return GenerateScheduleResult(bundles, scheduling_result.second,
scheduling_result.first);
}
ScheduleMap GcsSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
ScheduleResult GcsSpreadStrategy::Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD);
return GenerateScheduleMap(bundles, selected_nodes);
return GenerateScheduleResult(bundles, scheduling_result.second,
scheduling_result.first);
}
ScheduleMap GcsStrictSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
ScheduleResult GcsStrictSpreadStrategy::Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
// TODO(ffbin): A bundle may require special resources, such as GPU. We need to
@ -112,18 +115,19 @@ ScheduleMap GcsStrictSpreadStrategy::Schedule(
}
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes = gcs_resource_scheduler.Schedule(
const auto &scheduling_result = gcs_resource_scheduler.Schedule(
required_resources, SchedulingType::STRICT_SPREAD,
/*node_filter_func=*/[&nodes_in_use](const NodeID &node_id) {
return nodes_in_use.count(node_id) == 0;
});
return GenerateScheduleMap(bundles, selected_nodes);
return GenerateScheduleResult(bundles, scheduling_result.second,
scheduling_result.first);
}
void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) {
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback) {
// We need to ensure that the PrepareBundleResources won't be sent before the reply of
// ReleaseUnusedBundles is returned.
if (!nodes_of_releasing_unused_bundles_.empty()) {
@ -131,26 +135,29 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
<< ", id: " << placement_group->GetPlacementGroupID() << ", because "
<< nodes_of_releasing_unused_bundles_.size()
<< " nodes have not released unused bundles.";
failure_callback(placement_group);
failure_callback(placement_group, true);
return;
}
auto bundles = placement_group->GetUnplacedBundles();
auto strategy = placement_group->GetStrategy();
const auto &bundles = placement_group->GetUnplacedBundles();
const auto &strategy = placement_group->GetStrategy();
RAY_LOG(DEBUG) << "Scheduling placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID()
<< ", bundles size = " << bundles.size();
auto selected_nodes = scheduler_strategies_[strategy]->Schedule(
auto scheduling_result = scheduler_strategies_[strategy]->Schedule(
bundles, GetScheduleContext(placement_group->GetPlacementGroupID()),
gcs_resource_scheduler_);
// If no nodes are available, scheduling fails.
if (selected_nodes.empty()) {
auto result_status = scheduling_result.first;
auto selected_nodes = scheduling_result.second;
if (result_status != SUCCESS) {
RAY_LOG(DEBUG) << "Failed to schedule placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID()
<< ", because no nodes are available.";
failure_callback(placement_group);
<< ", because current reource can't satisfied this required resource.";
const bool &retryable = (result_status == FAILED) ? true : false;
failure_callback(placement_group, retryable);
return;
}
@ -296,10 +303,8 @@ GcsPlacementGroupScheduler::GetLeaseClientFromNode(
void GcsPlacementGroupScheduler::CommitAllBundles(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler) {
const std::shared_ptr<BundleLocations> &prepared_bundle_locations =
lease_status_tracker->GetPreparedBundleLocations();
lease_status_tracker->MarkCommitPhaseStarted();
@ -330,10 +335,8 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler) {
RAY_CHECK(lease_status_tracker->AllPrepareRequestsReturned())
<< "This method can be called only after all bundle scheduling requests are "
"returned.";
@ -353,7 +356,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
ReturnBundleResources(lease_status_tracker->GetBundleLocations());
schedule_failure_handler(placement_group);
schedule_failure_handler(placement_group, true);
return;
}
@ -383,10 +386,8 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler) {
const auto &placement_group = lease_status_tracker->GetPlacementGroup();
const auto &prepared_bundle_locations =
lease_status_tracker->GetPreparedBundleLocations();
@ -410,7 +411,7 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) {
DestroyPlacementGroupCommittedBundleResources(placement_group_id);
ReturnBundleResources(lease_status_tracker->GetBundleLocations());
schedule_failure_handler(placement_group);
schedule_failure_handler(placement_group, true);
return;
}
@ -429,7 +430,7 @@ void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
}
placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
ReturnBundleResources(uncommitted_bundle_locations);
schedule_failure_handler(placement_group);
schedule_failure_handler(placement_group, true);
} else {
schedule_success_handler(placement_group);
}

View file

@ -31,9 +31,16 @@
namespace ray {
namespace gcs {
class GcsPlacementGroup;
using ReserveResourceClientFactoryFn =
std::function<std::shared_ptr<ResourceReserveInterface>(const rpc::Address &address)>;
using PGSchedulingFailureCallback =
std::function<void(std::shared_ptr<GcsPlacementGroup>, bool)>;
using PGSchedulingSuccessfulCallback =
std::function<void(std::shared_ptr<GcsPlacementGroup>)>;
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2> &pair) const {
@ -41,12 +48,11 @@ struct pair_hash {
}
};
using ScheduleMap = std::unordered_map<BundleID, NodeID, pair_hash>;
using ScheduleResult = std::pair<SchedulingResultStatus, ScheduleMap>;
using BundleLocations =
absl::flat_hash_map<BundleID, std::pair<NodeID, std::shared_ptr<BundleSpecification>>,
pair_hash>;
class GcsPlacementGroup;
class GcsPlacementGroupSchedulerInterface {
public:
/// Schedule unplaced bundles of the specified placement group.
@ -56,8 +62,8 @@ class GcsPlacementGroupSchedulerInterface {
/// \param success_callback This function is called if the schedule is successful.
virtual void ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) = 0;
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback) = 0;
/// Get bundles belong to the specified node.
///
@ -105,8 +111,8 @@ class ScheduleContext {
class GcsScheduleStrategy {
public:
virtual ~GcsScheduleStrategy() {}
virtual ScheduleMap Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
virtual ScheduleResult Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) = 0;
@ -118,14 +124,15 @@ class GcsScheduleStrategy {
std::vector<ResourceSet> GetRequiredResourcesFromBundles(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles);
/// Generate `ScheduleMap` from bundles and nodes .
/// Generate `ScheduleResult` from bundles and nodes .
///
/// \param bundles Bundles to be scheduled.
/// \param selected_nodes selected_nodes to be scheduled.
/// \return Required resources.
ScheduleMap GenerateScheduleMap(
/// \param status Status of the scheduling result.
/// \return The scheduling result from the required resource.
ScheduleResult GenerateScheduleResult(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes);
const std::vector<NodeID> &selected_nodes, const SchedulingResultStatus &status);
};
/// The `GcsPackStrategy` is that pack all bundles in one node as much as possible.
@ -133,26 +140,29 @@ class GcsScheduleStrategy {
/// nodes.
class GcsPackStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ScheduleResult Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsSpreadStrategy` is that spread all bundles in different nodes.
class GcsSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ScheduleResult Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one
/// node does not have enough resources, it will fail to schedule.
class GcsStrictPackStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ScheduleResult Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes.
@ -160,9 +170,10 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy {
/// If the node resource is insufficient, it will fail to schedule.
class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ScheduleResult Schedule(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
enum class LeasingState {
@ -407,10 +418,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param placement_group to be scheduled.
/// \param failure_callback This function is called if the schedule is failed.
/// \param success_callback This function is called if the schedule is successful.
void ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_handler) override;
void ScheduleUnplacedBundles(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_handler,
PGSchedulingSuccessfulCallback success_handler) override;
/// Destroy the actual bundle resources or locked resources (for 2PC)
/// on all nodes associated with this placement group.
@ -485,25 +495,19 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Called when all prepare requests are returned from nodes.
void OnAllBundlePrepareRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler);
/// Called when all commit requests are returned from nodes.
void OnAllBundleCommitRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler);
/// Commit all bundles recorded in lease status tracker.
void CommitAllBundles(const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
const PGSchedulingFailureCallback &schedule_failure_handler,
const PGSchedulingSuccessfulCallback &schedule_success_handler);
/// Destroy the prepared bundle resources with this placement group.
/// The method is idempotent, meaning if all bundles are already cancelled,

View file

@ -54,7 +54,7 @@ double LeastResourceScorer::Calculate(const FixedPoint &requested,
/////////////////////////////////////////////////////////////////////////////////////////
std::vector<NodeID> GcsResourceScheduler::Schedule(
SchedulingResult GcsResourceScheduler::Schedule(
const std::vector<ResourceSet> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func) {
@ -65,7 +65,7 @@ std::vector<NodeID> GcsResourceScheduler::Schedule(
FilterCandidateNodes(cluster_resources, node_filter_func);
if (candidate_nodes.empty()) {
RAY_LOG(DEBUG) << "The candidate nodes is empty, return directly.";
return {};
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
}
// First schedule scarce resources (such as GPU) and large capacity resources to improve
@ -73,7 +73,7 @@ std::vector<NodeID> GcsResourceScheduler::Schedule(
const auto &to_schedule_resources = SortRequiredResources(required_resources_list);
// Score and rank nodes.
std::vector<NodeID> result;
SchedulingResult result;
switch (scheduling_type) {
case SPREAD:
result = SpreadSchedule(to_schedule_resources, candidate_nodes);
@ -115,18 +115,18 @@ const std::vector<ResourceSet> &GcsResourceScheduler::SortRequiredResources(
return required_resources;
}
std::vector<NodeID> GcsResourceScheduler::StrictSpreadSchedule(
SchedulingResult GcsResourceScheduler::StrictSpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
if (required_resources_list.size() > candidate_nodes.size()) {
RAY_LOG(DEBUG) << "The number of required resources "
<< required_resources_list.size()
<< " is greater than the number of candidate nodes "
<< candidate_nodes.size() << ", scheduling fails.";
return result;
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
}
std::vector<NodeID> result_nodes;
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
for (const auto &iter : required_resources_list) {
// Score and sort nodes.
@ -135,7 +135,7 @@ std::vector<NodeID> GcsResourceScheduler::StrictSpreadSchedule(
// There are nodes to meet the scheduling requirements.
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
result_nodes.push_back(highest_score_node_id);
candidate_nodes_copy.erase(highest_score_node_id);
} else {
// There is no node to meet the scheduling requirements.
@ -143,17 +143,17 @@ std::vector<NodeID> GcsResourceScheduler::StrictSpreadSchedule(
}
}
if (result.size() != required_resources_list.size()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
if (result_nodes.size() != required_resources_list.size()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
}
return result;
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
std::vector<NodeID> GcsResourceScheduler::SpreadSchedule(
SchedulingResult GcsResourceScheduler::SpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
std::vector<NodeID> result_nodes;
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
absl::flat_hash_set<NodeID> selected_nodes;
for (const auto &iter : required_resources_list) {
@ -163,7 +163,7 @@ std::vector<NodeID> GcsResourceScheduler::SpreadSchedule(
// There are nodes to meet the scheduling requirements.
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
result_nodes.push_back(highest_score_node_id);
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter));
candidate_nodes_copy.erase(highest_score_node_id);
selected_nodes.insert(highest_score_node_id);
@ -172,7 +172,7 @@ std::vector<NodeID> GcsResourceScheduler::SpreadSchedule(
const auto &node_scores = ScoreNodes(iter, selected_nodes);
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
result_nodes.push_back(highest_score_node_id);
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter));
} else {
break;
@ -181,26 +181,39 @@ std::vector<NodeID> GcsResourceScheduler::SpreadSchedule(
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result);
ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes);
if (result.size() != required_resources_list.size()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
if (result_nodes.size() != required_resources_list.size()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
}
return result;
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
std::vector<NodeID> GcsResourceScheduler::StrictPackSchedule(
SchedulingResult GcsResourceScheduler::StrictPackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
// Aggregate required resources.
ResourceSet required_resources;
for (const auto &iter : required_resources_list) {
required_resources.AddResources(iter);
}
const auto &cluster_resource = gcs_resource_manager_.GetClusterResources();
const auto &right_node_it = std::find_if(
cluster_resource.begin(), cluster_resource.end(),
[required_resources](const auto &node_resource) {
return required_resources.IsSubset(node_resource.second.GetTotalResources());
});
if (right_node_it == cluster_resource.end()) {
RAY_LOG(DEBUG) << "The required resource is bigger than the maximum resource in the "
"whole cluster, schedule failed.";
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
}
std::vector<NodeID> result_nodes;
// Score and sort nodes.
const auto &node_scores = ScoreNodes(required_resources, candidate_nodes);
@ -209,17 +222,22 @@ std::vector<NodeID> GcsResourceScheduler::StrictPackSchedule(
// only schedules to a node and triggers rescheduling when node dead.
if (!node_scores.empty() && node_scores.front().second >= 0) {
for (int index = 0; index < (int)required_resources_list.size(); ++index) {
result.push_back(node_scores.front().first);
result_nodes.push_back(node_scores.front().first);
}
}
return result;
if (result_nodes.empty()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
}
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
std::vector<NodeID> GcsResourceScheduler::PackSchedule(
SchedulingResult GcsResourceScheduler::PackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
result.resize(required_resources_list.size());
std::vector<NodeID> result_nodes;
result_nodes.resize(required_resources_list.size());
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
std::list<std::pair<int, ResourceSet>> required_resources_list_copy;
int index = 0;
@ -240,14 +258,14 @@ std::vector<NodeID> GcsResourceScheduler::PackSchedule(
const auto &highest_score_node_id = node_scores.front().first;
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id,
required_resources));
result[required_resources_index] = highest_score_node_id;
result_nodes[required_resources_index] = highest_score_node_id;
required_resources_list_copy.pop_front();
// We try to schedule more resources on one node.
for (auto iter = required_resources_list_copy.begin();
iter != required_resources_list_copy.end();) {
if (gcs_resource_manager_.AcquireResources(highest_score_node_id, iter->second)) {
result[iter->first] = highest_score_node_id;
result_nodes[iter->first] = highest_score_node_id;
required_resources_list_copy.erase(iter++);
} else {
++iter;
@ -257,13 +275,13 @@ std::vector<NodeID> GcsResourceScheduler::PackSchedule(
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result);
ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes);
if (!required_resources_list_copy.empty()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
}
return result;
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
std::list<NodeScore> GcsResourceScheduler::ScoreNodes(

View file

@ -35,7 +35,18 @@ enum SchedulingType {
SchedulingType_MAX = 4,
};
// Status of resource scheduling result.
enum SchedulingResultStatus {
// Scheduling failed but retryable.
FAILED = 0,
// Scheduling failed and non-retryable.
INFEASIBLE = 1,
// Scheduling successful.
SUCCESS = 2,
};
typedef std::pair<NodeID, double> NodeScore;
typedef std::pair<SchedulingResultStatus, std::vector<NodeID>> SchedulingResult;
/// NodeScorer is a scorer to make a grade to the node, which is used for scheduling
/// decision.
@ -86,9 +97,10 @@ class GcsResourceScheduler {
/// \param node_filter_func This function is used to filter candidate nodes. If a node
/// returns true, it can be used for scheduling. By default, all nodes in the cluster
/// can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> Schedule(
/// \return `SchedulingResult`, including the selected nodes if schedule successful,
/// otherwise, it will return an empty vector and a flag to indicate whether this
/// request can be retry or not.
SchedulingResult Schedule(
const std::vector<ResourceSet> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func = nullptr);
@ -118,9 +130,10 @@ class GcsResourceScheduler {
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> StrictSpreadSchedule(
/// \return `SchedulingResult`, including the selected nodes if schedule successful,
/// otherwise, it will return an empty vector and a flag to indicate whether this
/// request can be retry or not.
SchedulingResult StrictSpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
@ -128,19 +141,20 @@ class GcsResourceScheduler {
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> SpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// \return `SchedulingResult`, including the selected nodes if schedule successful,
/// otherwise, it will return an empty vector and a flag to indicate whether this
/// request can be retry or not.
SchedulingResult SpreadSchedule(const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Schedule resources according to `STRICT_PACK` strategy.
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> StrictPackSchedule(
/// \return `SchedulingResult`, including the selected nodes if schedule successful,
/// otherwise, it will return an empty vector and a flag to indicate whether this
/// request can be retry or not.
SchedulingResult StrictPackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
@ -148,11 +162,11 @@ class GcsResourceScheduler {
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> PackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// \return `SchedulingResult`, including the selected nodes if schedule successful,
/// otherwise, it will return an empty vector and a flag to indicate whether this
/// request can be retry or not.
SchedulingResult PackSchedule(const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Score all nodes according to the specified resources.
///

View file

@ -408,7 +408,7 @@ void GcsServer::InstallEventListeners() {
// Because a new node has been added, we need to try to schedule the pending
// placement groups and the pending actors.
gcs_resource_manager_->OnNodeAdd(*node);
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_placement_group_manager_->OnNodeAdd(NodeID::FromBinary(node->node_id()));
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
if (config_.pull_based_resource_reporting) {

View file

@ -30,7 +30,7 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
void ScheduleUnplacedBundles(
std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>, bool)> failure_handler,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> success_handler)
override {
absl::MutexLock lock(&mutex_);
@ -510,6 +510,39 @@ TEST_F(GcsPlacementGroupManagerTest,
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id);
}
TEST_F(GcsPlacementGroupManagerTest, TestSchedulingCanceledWhenPgIsInfeasible) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request,
[&registered_placement_group_count](const Status &status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
// Mark it non-retryable.
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group, false);
// Schedule twice to make sure it will not be scheduled afterward.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
// Add a node and make sure it will reschedule the infeasible placement group.
const auto &node_id = NodeID::FromRandom();
gcs_placement_group_manager_->OnNodeAdd(node_id);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
mock_placement_group_scheduler_->placement_groups_.clear();
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
}
TEST_F(GcsPlacementGroupManagerTest, TestRayNamespace) {
auto request1 = Mocker::GenCreatePlacementGroupRequest("test_name");
job_namespace_table_[JobID::FromInt(11)] = "another_namespace";

View file

@ -107,8 +107,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
const auto &node_id = NodeID::FromBinary(node->node_id());
std::unordered_map<std::string, double> resource_map;
resource_map["CPU"] = cpu_num;
ResourceSet resources(resource_map);
gcs_resource_manager_->SetAvailableResources(node_id, resources);
gcs_resource_manager_->UpdateResourceCapacity(node_id, resource_map);
}
void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) {
@ -119,7 +118,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
// Schedule the placement_group with zero node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -148,7 +148,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -171,11 +172,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
void ReschedulingWhenNodeAddTest(rpc::PlacementStrategy strategy) {
AddNode(Mocker::GenNodeInfo(0), 1);
auto failure_handler =
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler =
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&placement_group_requests_mutex_);
@ -264,7 +265,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -288,7 +290,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadStrategyResourceCheck) {
auto node = Mocker::GenNodeInfo(0);
AddNode(node, 2);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -322,7 +325,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -347,7 +351,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) {
AddNode(Mocker::GenNodeInfo(0));
AddNode(Mocker::GenNodeInfo(1));
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -394,7 +399,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyReschedulingWhenNod
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto node0 = Mocker::GenNodeInfo(0);
AddNode(node0);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -443,7 +449,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -482,7 +489,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -514,7 +522,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) {
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
@ -542,7 +551,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyReschedulingWhenNodeAdd)
TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
AddNode(Mocker::GenNodeInfo(0));
AddNode(Mocker::GenNodeInfo(1));
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -593,7 +603,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadRescheduleWhenNodeDead) {
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");
// Schedule the placement group successfully.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -659,7 +670,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadRescheduleWhenNodeDead) {
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) {
auto node0 = Mocker::GenNodeInfo(0);
AddNode(node0);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -783,7 +795,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringPreparingResources) {
// Schedule the placement group.
// One node is dead, so one bundle failed to schedule.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 2);
failure_placement_groups_.emplace_back(std::move(placement_group));
@ -819,7 +832,8 @@ TEST_F(GcsPlacementGroupSchedulerTest,
// Schedule the placement group.
// One node is dead, so one bundle failed to schedule.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 1);
failure_placement_groups_.emplace_back(std::move(placement_group));
@ -859,7 +873,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringCommittingResources) {
// Schedule the placement group.
// One node is dead, so one bundle failed to schedule.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
ASSERT_TRUE(placement_group->GetUnplacedBundles().size() == 2);
failure_placement_groups_.emplace_back(std::move(placement_group));
@ -893,7 +908,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeDeadDuringRescheduling) {
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");
// Schedule the placement group successfully.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -947,7 +963,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommit)
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");
// Schedule the placement group successfully.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
@ -1003,7 +1020,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommitPr
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");
// Schedule the placement group successfully.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};

View file

@ -70,7 +70,8 @@ class GcsResourceSchedulerTest : public ::testing::Test {
}
const auto &result1 =
gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type);
ASSERT_EQ(result1.size(), 3);
ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::SUCCESS);
ASSERT_EQ(result1.second.size(), 3);
// Check for resource leaks.
CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num);
@ -80,7 +81,8 @@ class GcsResourceSchedulerTest : public ::testing::Test {
required_resources_list.emplace_back(resource_map);
const auto &result2 =
gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type);
ASSERT_EQ(result2.size(), 0);
ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::FAILED);
ASSERT_EQ(result2.second.size(), 0);
// Check for resource leaks.
CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num);
@ -116,13 +118,58 @@ TEST_F(GcsResourceSchedulerTest, TestNodeFilter) {
const auto &result1 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD,
[](const NodeID &) { return false; });
ASSERT_EQ(result1.size(), 0);
ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::INFEASIBLE);
ASSERT_EQ(result1.second.size(), 0);
// Scheduling succeeded.
const auto &result2 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD,
[](const NodeID &) { return true; });
ASSERT_EQ(result2.size(), 1);
ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::SUCCESS);
ASSERT_EQ(result2.second.size(), 1);
}
TEST_F(GcsResourceSchedulerTest, TestSchedulingResultStatusForStrictStrategy) {
// Init resources with two node.
const auto &node_one_id = NodeID::FromRandom();
const auto &node_tow_id = NodeID::FromRandom();
const std::string cpu_resource = "CPU";
const double node_cpu_num = 10.0;
AddClusterResources(node_one_id, cpu_resource, node_cpu_num);
AddClusterResources(node_tow_id, cpu_resource, node_cpu_num);
// Mock a request that has three required resources.
std::vector<ResourceSet> required_resources_list;
std::unordered_map<std::string, double> resource_map;
resource_map[cpu_resource] = 1;
for (int node_number = 0; node_number < 3; node_number++) {
required_resources_list.emplace_back(resource_map);
}
const auto &result1 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD);
ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::INFEASIBLE);
ASSERT_EQ(result1.second.size(), 0);
// Check for resource leaks.
CheckClusterAvailableResources(node_one_id, cpu_resource, node_cpu_num);
CheckClusterAvailableResources(node_tow_id, cpu_resource, node_cpu_num);
// Mock a request that only has one required resource but bigger than the maximum
// resource.
required_resources_list.clear();
resource_map.clear();
resource_map[cpu_resource] = 50;
required_resources_list.emplace_back(resource_map);
const auto &result2 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_PACK);
ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::INFEASIBLE);
ASSERT_EQ(result2.second.size(), 0);
// Check for resource leaks.
CheckClusterAvailableResources(node_one_id, cpu_resource, node_cpu_num);
CheckClusterAvailableResources(node_tow_id, cpu_resource, node_cpu_num);
}
} // namespace ray