mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Core] Support max cpu allocation per node for placement group scheduling (#26397)
The PR adds a new experimental flag to the placement group API to avoid placement group taking all cpus on each node. It is used internally by Air to avoid placement group (created by Tune) is using all CPU resources which are needed for dataset
This commit is contained in:
parent
34cf1f17ea
commit
0f0102666a
24 changed files with 381 additions and 87 deletions
|
@ -167,7 +167,8 @@ ray::PlacementGroup NativeTaskSubmitter::CreatePlacementGroup(
|
|||
create_options.name,
|
||||
(ray::core::PlacementStrategy)create_options.strategy,
|
||||
create_options.bundles,
|
||||
false);
|
||||
false,
|
||||
1.0);
|
||||
ray::PlacementGroupID placement_group_id;
|
||||
auto status = CoreWorkerProcess::GetCoreWorker().CreatePlacementGroup(
|
||||
options, &placement_group_id);
|
||||
|
|
|
@ -1659,7 +1659,8 @@ cdef class CoreWorker:
|
|||
c_string name,
|
||||
c_vector[unordered_map[c_string, double]] bundles,
|
||||
c_string strategy,
|
||||
c_bool is_detached):
|
||||
c_bool is_detached,
|
||||
double max_cpu_fraction_per_node):
|
||||
cdef:
|
||||
CPlacementGroupID c_placement_group_id
|
||||
CPlacementStrategy c_strategy
|
||||
|
@ -1684,8 +1685,8 @@ cdef class CoreWorker:
|
|||
name,
|
||||
c_strategy,
|
||||
bundles,
|
||||
is_detached
|
||||
),
|
||||
is_detached,
|
||||
max_cpu_fraction_per_node),
|
||||
&c_placement_group_id))
|
||||
|
||||
return PlacementGroupID(c_placement_group_id.Binary())
|
||||
|
|
|
@ -207,6 +207,21 @@ class ResourceDemandScheduler:
|
|||
# Step 3: get resource demands of placement groups and return the
|
||||
# groups that should be strictly spread.
|
||||
logger.debug(f"Placement group demands: {pending_placement_groups}")
|
||||
# TODO(Clark): Refactor placement group bundle demands such that their placement
|
||||
# group provenance is mantained, since we need to keep an accounting of the
|
||||
# cumulative CPU cores allocated as fulfilled during bin packing in order to
|
||||
# ensure that a placement group's cumulative allocation is under the placement
|
||||
# group's max CPU fraction per node. Without this, and placement group with many
|
||||
# bundles might not be schedulable, but will fail to trigger scale-up since the
|
||||
# max CPU fraction is properly applied to the cumulative bundle requests for a
|
||||
# single node.
|
||||
#
|
||||
# placement_group_demand_vector: List[Tuple[List[ResourceDict], double]]
|
||||
#
|
||||
# bin_pack_residual() can keep it's packing priority; we just need to account
|
||||
# for (1) the running CPU allocation for the bundle's placement group for that
|
||||
# particular node, and (2) the max CPU cores allocatable for a single placement
|
||||
# group for that particular node.
|
||||
(
|
||||
placement_group_demand_vector,
|
||||
strict_spreads,
|
||||
|
@ -244,13 +259,16 @@ class ResourceDemandScheduler:
|
|||
node_resources,
|
||||
node_type_counts,
|
||||
) = self.reserve_and_allocate_spread(
|
||||
strict_spreads, node_resources, node_type_counts
|
||||
strict_spreads,
|
||||
node_resources,
|
||||
node_type_counts,
|
||||
)
|
||||
|
||||
# Calculate the nodes to add for bypassing max launch limit for
|
||||
# placement groups and spreads.
|
||||
unfulfilled_placement_groups_demands, _ = get_bin_pack_residual(
|
||||
node_resources, placement_group_demand_vector
|
||||
node_resources,
|
||||
placement_group_demand_vector,
|
||||
)
|
||||
# Add 1 to account for the head node.
|
||||
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
|
||||
|
|
|
@ -288,7 +288,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
|
|||
const c_string &name,
|
||||
CPlacementStrategy strategy,
|
||||
const c_vector[unordered_map[c_string, double]] &bundles,
|
||||
c_bool is_detached
|
||||
c_bool is_detached,
|
||||
double max_cpu_fraction_per_node
|
||||
)
|
||||
|
||||
cdef cppclass CObjectLocation "ray::core::ObjectLocation":
|
||||
|
|
|
@ -76,6 +76,83 @@ def test_placement_group_bin_packing_priority(
|
|||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("multi_bundle", [False, True])
|
||||
@pytest.mark.parametrize("even_pack", [False, True])
|
||||
@pytest.mark.parametrize("scheduling_strategy", ["SPREAD", "STRICT_PACK", "PACK"])
|
||||
def test_placement_group_max_cpu_frac(
|
||||
ray_start_cluster, multi_bundle, even_pack, scheduling_strategy
|
||||
):
|
||||
cluster = ray_start_cluster
|
||||
cluster.add_node(num_cpus=4)
|
||||
cluster.wait_for_nodes()
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
if multi_bundle:
|
||||
bundles = [{"CPU": 1}] * 3
|
||||
else:
|
||||
bundles = [{"CPU": 3}]
|
||||
|
||||
# Input validation - max_cpu_fraction_per_node must be between 0 and 1.
|
||||
with pytest.raises(ValueError):
|
||||
ray.util.placement_group(bundles, _max_cpu_fraction_per_node=-1)
|
||||
with pytest.raises(ValueError):
|
||||
ray.util.placement_group(bundles, _max_cpu_fraction_per_node=2)
|
||||
|
||||
pg = ray.util.placement_group(
|
||||
bundles, strategy=scheduling_strategy, _max_cpu_fraction_per_node=0.5
|
||||
)
|
||||
|
||||
# Placement group will never be scheduled since it would violate the max CPU
|
||||
# fraction reservation.
|
||||
with pytest.raises(ray.exceptions.GetTimeoutError):
|
||||
ray.get(pg.ready(), timeout=5)
|
||||
|
||||
# Add new node with enough CPU cores to scheduled placement group bundle while
|
||||
# adhering to the max CPU fraction constraint.
|
||||
if even_pack:
|
||||
num_cpus = 6
|
||||
else:
|
||||
num_cpus = 8
|
||||
cluster.add_node(num_cpus=num_cpus)
|
||||
cluster.wait_for_nodes()
|
||||
# The placement group should be schedulable so this shouldn't raise.
|
||||
ray.get(pg.ready(), timeout=5)
|
||||
|
||||
|
||||
def test_placement_group_max_cpu_frac_multiple_pgs(ray_start_cluster):
|
||||
"""
|
||||
Make sure when there's more than 1 pg, they respect the fraction.
|
||||
"""
|
||||
cluster = ray_start_cluster
|
||||
cluster.add_node(num_cpus=8)
|
||||
cluster.wait_for_nodes()
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
# This pg should be scheduable.
|
||||
pg = ray.util.placement_group([{"CPU": 4}], _max_cpu_fraction_per_node=0.5)
|
||||
ray.get(pg.ready())
|
||||
|
||||
# When we schedule another placement group, it shouldn't be scheduled.
|
||||
pg2 = ray.util.placement_group([{"CPU": 4}], _max_cpu_fraction_per_node=0.5)
|
||||
with pytest.raises(ray.exceptions.GetTimeoutError):
|
||||
ray.get(pg2.ready(), timeout=5)
|
||||
|
||||
cluster.add_node(num_cpus=8)
|
||||
ray.get(pg2.ready())
|
||||
|
||||
"""
|
||||
Make sure when the CPU * frac < 1, we can at least
|
||||
guarantee to have 1 CPU for pg.
|
||||
"""
|
||||
ray.util.remove_placement_group(pg)
|
||||
ray.util.remove_placement_group(pg2)
|
||||
|
||||
# We can reserve up to 0.8 CPU, but it should round up to 1, so this pg
|
||||
# is schedulable.
|
||||
pg = ray.util.placement_group([{"CPU": 1}], _max_cpu_fraction_per_node=0.1)
|
||||
ray.get(pg.ready())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
|
|
|
@ -127,7 +127,8 @@ def placement_group(
|
|||
bundles: List[Dict[str, float]],
|
||||
strategy: str = "PACK",
|
||||
name: str = "",
|
||||
lifetime=None,
|
||||
lifetime: Optional[str] = None,
|
||||
_max_cpu_fraction_per_node: Optional[float] = None,
|
||||
) -> PlacementGroup:
|
||||
"""Asynchronously creates a PlacementGroup.
|
||||
|
||||
|
@ -147,6 +148,21 @@ def placement_group(
|
|||
will fate share with its creator and will be deleted once its
|
||||
creator is dead, or "detached", which means the placement group
|
||||
will live as a global object independent of the creator.
|
||||
_max_cpu_fraction_per_node: THIS FEATURE IS EXPERIMENTAL
|
||||
The maximum fraction of CPU cores this placement group can take
|
||||
up on each node. This must be a float between 0 and 1.
|
||||
If provided, the placement group won't take up more than
|
||||
_max_cpu_fraction_per_node * node["num_cpus"] CPU cores on each node. This
|
||||
is useful for ensuring that some percentage of CPU cores are available on
|
||||
each node for workloads that aren't using this placement group.
|
||||
NOTE: When the _max_cpu_fraction_per_node * node["num_cpus"] < 1,
|
||||
it can reserve up to 1 CPU.
|
||||
NOTE: The fraction is applied per node, not per placement group.
|
||||
For example, if there are 2 placement groups each of which has
|
||||
the fraction 0.5, it doesn't mean 2 placement groups can take
|
||||
the whole CPUs. If there are 2 placement groups with bundles {CPU: 4},
|
||||
_max_cpu_fraction_per_node=0.5 and there is a node with 8 CPUs,
|
||||
only one placement group can be scheduled on this node.
|
||||
|
||||
Raises:
|
||||
ValueError if bundle type is not a list.
|
||||
|
@ -162,6 +178,11 @@ def placement_group(
|
|||
if not isinstance(bundles, list):
|
||||
raise ValueError("The type of bundles must be list, got {}".format(bundles))
|
||||
|
||||
if _max_cpu_fraction_per_node is None:
|
||||
_max_cpu_fraction_per_node = 1.0
|
||||
if _max_cpu_fraction_per_node < 0 or _max_cpu_fraction_per_node > 1:
|
||||
raise ValueError("max_cpu_fraction_per_node must be a float between 0 and 1.")
|
||||
|
||||
# Validate bundles
|
||||
for bundle in bundles:
|
||||
if len(bundle) == 0 or all(
|
||||
|
@ -187,7 +208,11 @@ def placement_group(
|
|||
)
|
||||
|
||||
placement_group_id = worker.core_worker.create_placement_group(
|
||||
name, bundles, strategy, detached
|
||||
name,
|
||||
bundles,
|
||||
strategy,
|
||||
detached,
|
||||
_max_cpu_fraction_per_node,
|
||||
)
|
||||
|
||||
return PlacementGroup(placement_group_id)
|
||||
|
|
|
@ -44,4 +44,8 @@ BundleSpecification PlacementGroupSpecification::GetBundle(int position) const {
|
|||
std::string PlacementGroupSpecification::GetName() const {
|
||||
return std::string(message_->name());
|
||||
}
|
||||
|
||||
double PlacementGroupSpecification::GetMaxCpuFractionPerNode() const {
|
||||
return message_->max_cpu_fraction_per_node();
|
||||
}
|
||||
} // namespace ray
|
||||
|
|
|
@ -60,6 +60,8 @@ class PlacementGroupSpecification : public MessageWrapper<rpc::PlacementGroupSpe
|
|||
BundleSpecification GetBundle(int position) const;
|
||||
/// Return the name of this placement group.
|
||||
std::string GetName() const;
|
||||
/// Return the max CPU fraction per node for this placement group.
|
||||
double GetMaxCpuFractionPerNode() const;
|
||||
|
||||
private:
|
||||
/// Construct bundle vector from protobuf.
|
||||
|
@ -82,6 +84,7 @@ class PlacementGroupSpecBuilder {
|
|||
const std::vector<std::unordered_map<std::string, double>> &bundles,
|
||||
const rpc::PlacementStrategy strategy,
|
||||
const bool is_detached,
|
||||
double max_cpu_fraction_per_node,
|
||||
const JobID &creator_job_id,
|
||||
const ActorID &creator_actor_id,
|
||||
bool is_creator_detached_actor) {
|
||||
|
@ -99,6 +102,7 @@ class PlacementGroupSpecBuilder {
|
|||
message_->set_creator_actor_id(creator_actor_id.Binary());
|
||||
message_->set_creator_actor_dead(creator_actor_id.IsNil());
|
||||
message_->set_is_detached(is_detached);
|
||||
message_->set_max_cpu_fraction_per_node(max_cpu_fraction_per_node);
|
||||
|
||||
for (size_t i = 0; i < bundles.size(); i++) {
|
||||
auto resources = bundles[i];
|
||||
|
|
|
@ -167,11 +167,13 @@ struct PlacementGroupCreationOptions {
|
|||
std::string name,
|
||||
PlacementStrategy strategy,
|
||||
std::vector<std::unordered_map<std::string, double>> bundles,
|
||||
bool is_detached)
|
||||
bool is_detached,
|
||||
double max_cpu_fraction_per_node)
|
||||
: name(std::move(name)),
|
||||
strategy(strategy),
|
||||
bundles(std::move(bundles)),
|
||||
is_detached(is_detached) {}
|
||||
is_detached(is_detached),
|
||||
max_cpu_fraction_per_node(max_cpu_fraction_per_node) {}
|
||||
|
||||
/// The name of the placement group.
|
||||
const std::string name;
|
||||
|
@ -181,6 +183,8 @@ struct PlacementGroupCreationOptions {
|
|||
const std::vector<std::unordered_map<std::string, double>> bundles;
|
||||
/// Whether to keep the placement group persistent after its creator dead.
|
||||
const bool is_detached = false;
|
||||
/// The maximum fraction of CPU cores this placement group can take up on each node.
|
||||
const double max_cpu_fraction_per_node;
|
||||
};
|
||||
|
||||
class ObjectLocation {
|
||||
|
|
|
@ -1797,14 +1797,16 @@ Status CoreWorker::CreatePlacementGroup(
|
|||
}
|
||||
const PlacementGroupID placement_group_id = PlacementGroupID::Of(GetCurrentJobId());
|
||||
PlacementGroupSpecBuilder builder;
|
||||
builder.SetPlacementGroupSpec(placement_group_id,
|
||||
placement_group_creation_options.name,
|
||||
placement_group_creation_options.bundles,
|
||||
placement_group_creation_options.strategy,
|
||||
placement_group_creation_options.is_detached,
|
||||
worker_context_.GetCurrentJobID(),
|
||||
worker_context_.GetCurrentActorID(),
|
||||
worker_context_.CurrentActorDetached());
|
||||
builder.SetPlacementGroupSpec(
|
||||
placement_group_id,
|
||||
placement_group_creation_options.name,
|
||||
placement_group_creation_options.bundles,
|
||||
placement_group_creation_options.strategy,
|
||||
placement_group_creation_options.is_detached,
|
||||
placement_group_creation_options.max_cpu_fraction_per_node,
|
||||
worker_context_.GetCurrentJobID(),
|
||||
worker_context_.GetCurrentActorID(),
|
||||
worker_context_.CurrentActorDetached());
|
||||
PlacementGroupSpecification placement_group_spec = builder.Build();
|
||||
*return_placement_group_id = placement_group_id;
|
||||
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;
|
||||
|
|
|
@ -251,8 +251,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
|
|||
serialized_runtime_env = JavaStringToNativeString(env, java_serialized_runtime_env);
|
||||
}
|
||||
|
||||
auto java_namespace = (jstring)env->GetObjectField(actorCreationOptions,
|
||||
java_actor_creation_options_namespace);
|
||||
auto java_namespace = (jstring)env->GetObjectField(
|
||||
actorCreationOptions, java_actor_creation_options_namespace);
|
||||
if (java_namespace) {
|
||||
ray_namespace = JavaStringToNativeString(env, java_namespace);
|
||||
}
|
||||
|
@ -337,7 +337,8 @@ inline PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
|
|||
return PlacementGroupCreationOptions(name,
|
||||
ConvertStrategy(java_strategy),
|
||||
bundles,
|
||||
/*is_detached=*/false);
|
||||
/*is_detached=*/false,
|
||||
/*max_cpu_fraction_per_node*/ 1.0);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -117,6 +117,10 @@ bool GcsPlacementGroup::IsDetached() const {
|
|||
return placement_group_table_data_.is_detached();
|
||||
}
|
||||
|
||||
double GcsPlacementGroup::GetMaxCpuFractionPerNode() const {
|
||||
return placement_group_table_data_.max_cpu_fraction_per_node();
|
||||
}
|
||||
|
||||
const rpc::PlacementGroupStats &GcsPlacementGroup::GetStats() const {
|
||||
return placement_group_table_data_.stats();
|
||||
}
|
||||
|
|
|
@ -70,6 +70,8 @@ class GcsPlacementGroup {
|
|||
placement_group_table_data_.set_creator_actor_dead(
|
||||
placement_group_spec.creator_actor_dead());
|
||||
placement_group_table_data_.set_is_detached(placement_group_spec.is_detached());
|
||||
placement_group_table_data_.set_max_cpu_fraction_per_node(
|
||||
placement_group_spec.max_cpu_fraction_per_node());
|
||||
placement_group_table_data_.set_ray_namespace(ray_namespace);
|
||||
SetupStates();
|
||||
}
|
||||
|
@ -127,6 +129,9 @@ class GcsPlacementGroup {
|
|||
/// Returns whether or not this is a detached placement group.
|
||||
bool IsDetached() const;
|
||||
|
||||
/// Returns the maximum CPU fraction per node for this placement group.
|
||||
double GetMaxCpuFractionPerNode() const;
|
||||
|
||||
const rpc::PlacementGroupStats &GetStats() const;
|
||||
|
||||
rpc::PlacementGroupStats *GetMutableStats();
|
||||
|
|
|
@ -63,7 +63,9 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
|
|||
}
|
||||
|
||||
auto scheduling_options =
|
||||
CreateSchedulingOptions(placement_group->GetPlacementGroupID(), strategy);
|
||||
CreateSchedulingOptions(placement_group->GetPlacementGroupID(),
|
||||
strategy,
|
||||
placement_group->GetMaxCpuFractionPerNode());
|
||||
auto scheduling_result =
|
||||
cluster_resource_scheduler_.Schedule(resource_request_list, scheduling_options);
|
||||
|
||||
|
@ -71,9 +73,12 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
|
|||
const auto &selected_nodes = scheduling_result.selected_nodes;
|
||||
|
||||
if (!result_status.IsSuccess()) {
|
||||
RAY_LOG(DEBUG) << "Failed to schedule placement group " << placement_group->GetName()
|
||||
<< ", id: " << placement_group->GetPlacementGroupID()
|
||||
<< ", because current reource can't satisfy the required resource.";
|
||||
RAY_LOG(DEBUG)
|
||||
<< "Failed to schedule placement group " << placement_group->GetName()
|
||||
<< ", id: " << placement_group->GetPlacementGroupID()
|
||||
<< ", because current resources can't satisfy the required resource. IsFailed: "
|
||||
<< result_status.IsFailed() << " IsInfeasible: " << result_status.IsInfeasible()
|
||||
<< " IsPartialSuccess: " << result_status.IsPartialSuccess();
|
||||
bool infeasible = result_status.IsInfeasible();
|
||||
// If the placement group creation has failed,
|
||||
// but if it is not infeasible, it is retryable to create.
|
||||
|
@ -81,6 +86,10 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
|
|||
return;
|
||||
}
|
||||
|
||||
RAY_LOG(DEBUG) << "Can schedule a placement group "
|
||||
<< placement_group->GetPlacementGroupID()
|
||||
<< ". Selected node size: " << selected_nodes.size();
|
||||
|
||||
RAY_CHECK(bundles.size() == selected_nodes.size());
|
||||
|
||||
// Covert to a map of bundle to node.
|
||||
|
@ -439,17 +448,19 @@ GcsPlacementGroupScheduler::CreateSchedulingContext(
|
|||
}
|
||||
|
||||
SchedulingOptions GcsPlacementGroupScheduler::CreateSchedulingOptions(
|
||||
const PlacementGroupID &placement_group_id, rpc::PlacementStrategy strategy) {
|
||||
const PlacementGroupID &placement_group_id,
|
||||
rpc::PlacementStrategy strategy,
|
||||
double max_cpu_fraction_per_node) {
|
||||
switch (strategy) {
|
||||
case rpc::PlacementStrategy::PACK:
|
||||
return SchedulingOptions::BundlePack();
|
||||
return SchedulingOptions::BundlePack(max_cpu_fraction_per_node);
|
||||
case rpc::PlacementStrategy::SPREAD:
|
||||
return SchedulingOptions::BundleSpread();
|
||||
return SchedulingOptions::BundleSpread(max_cpu_fraction_per_node);
|
||||
case rpc::PlacementStrategy::STRICT_PACK:
|
||||
return SchedulingOptions::BundleStrictPack();
|
||||
return SchedulingOptions::BundleStrictPack(max_cpu_fraction_per_node);
|
||||
case rpc::PlacementStrategy::STRICT_SPREAD:
|
||||
return SchedulingOptions::BundleStrictSpread(
|
||||
CreateSchedulingContext(placement_group_id));
|
||||
max_cpu_fraction_per_node, CreateSchedulingContext(placement_group_id));
|
||||
default:
|
||||
RAY_LOG(FATAL) << "Unsupported scheduling type: "
|
||||
<< rpc::PlacementStrategy_Name(strategy);
|
||||
|
|
|
@ -426,7 +426,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
|
|||
|
||||
/// Create scheduling options.
|
||||
SchedulingOptions CreateSchedulingOptions(const PlacementGroupID &placement_group_id,
|
||||
rpc::PlacementStrategy strategy);
|
||||
rpc::PlacementStrategy strategy,
|
||||
double max_cpu_fraction_per_node);
|
||||
|
||||
/// A timer that ticks every cancel resource failure milliseconds.
|
||||
boost::asio::deadline_timer return_timer_;
|
||||
|
|
|
@ -160,6 +160,7 @@ struct Mocker {
|
|||
bundles,
|
||||
strategy,
|
||||
/* is_detached */ false,
|
||||
/* max_cpu_fraction_per_node */ 1.0,
|
||||
job_id,
|
||||
actor_id,
|
||||
/* is_creator_detached */ false);
|
||||
|
|
|
@ -374,6 +374,8 @@ message PlacementGroupSpec {
|
|||
bool creator_actor_dead = 8;
|
||||
// Whether the placement group is persistent.
|
||||
bool is_detached = 9;
|
||||
// The maximum fraction of CPU cores that this placement group can use on each node.
|
||||
double max_cpu_fraction_per_node = 10;
|
||||
}
|
||||
|
||||
message ObjectReference {
|
||||
|
|
|
@ -549,5 +549,7 @@ message PlacementGroupTableData {
|
|||
// The placement group's stats / information such as when it is created or
|
||||
// what's the current scheduling state.
|
||||
PlacementGroupStats stats = 12;
|
||||
// The maximum fraction of CPU cores that this placement group can use on each node.
|
||||
double max_cpu_fraction_per_node = 13;
|
||||
}
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -225,6 +225,7 @@ TEST_F(GcsResourceSchedulerTest, TestNodeFilter) {
|
|||
auto result1 = cluster_resource_scheduler_->Schedule(
|
||||
resource_request_list,
|
||||
SchedulingOptions::BundleStrictSpread(
|
||||
/*max_cpu_fraction_per_node*/ 1.0,
|
||||
std::make_unique<BundleSchedulingContext>(bundle_locations)));
|
||||
ASSERT_TRUE(result1.status.IsInfeasible());
|
||||
ASSERT_EQ(result1.selected_nodes.size(), 0);
|
||||
|
@ -233,6 +234,7 @@ TEST_F(GcsResourceSchedulerTest, TestNodeFilter) {
|
|||
auto result2 = cluster_resource_scheduler_->Schedule(
|
||||
resource_request_list,
|
||||
SchedulingOptions::BundleStrictSpread(
|
||||
/*max_cpu_fraction_per_node*/ 1.0,
|
||||
std::make_unique<BundleSchedulingContext>(nullptr)));
|
||||
ASSERT_TRUE(result2.status.IsSuccess());
|
||||
ASSERT_EQ(result2.selected_nodes.size(), 1);
|
||||
|
|
|
@ -14,6 +14,27 @@
|
|||
|
||||
#include "ray/raylet/scheduling/policy/bundle_scheduling_policy.h"
|
||||
|
||||
namespace {
|
||||
|
||||
bool AllocationWillExceedMaxCpuFraction(const ray::NodeResources &node_resources,
|
||||
const ray::ResourceRequest &resource_request,
|
||||
double max_cpu_fraction_per_node) {
|
||||
auto cpu_id = ray::ResourceID::CPU();
|
||||
auto remaining_cpus = node_resources.available.Get(cpu_id).Double() -
|
||||
resource_request.Get(cpu_id).Double();
|
||||
auto total_allocated_cpus = node_resources.total.Get(cpu_id).Double() - remaining_cpus;
|
||||
auto max_reservable_cpus =
|
||||
max_cpu_fraction_per_node * node_resources.total.Get(cpu_id).Double();
|
||||
|
||||
// If the max reservable cpu < 1, we allow at least 1 CPU.
|
||||
if (max_reservable_cpus < 1) {
|
||||
max_reservable_cpus = 1;
|
||||
}
|
||||
return total_allocated_cpus > max_reservable_cpus;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace ray {
|
||||
namespace raylet_scheduling_policy {
|
||||
|
||||
|
@ -114,7 +135,8 @@ BundleSchedulingPolicy::SortRequiredResources(
|
|||
|
||||
std::pair<scheduling::NodeID, const Node *> BundleSchedulingPolicy::GetBestNode(
|
||||
const ResourceRequest &required_resources,
|
||||
const absl::flat_hash_map<scheduling::NodeID, const Node *> &candidate_nodes) const {
|
||||
const absl::flat_hash_map<scheduling::NodeID, const Node *> &candidate_nodes,
|
||||
const SchedulingOptions &options) const {
|
||||
double best_node_score = -1;
|
||||
auto best_node_id = scheduling::NodeID::Nil();
|
||||
const Node *best_node = nullptr;
|
||||
|
@ -122,6 +144,11 @@ std::pair<scheduling::NodeID, const Node *> BundleSchedulingPolicy::GetBestNode(
|
|||
// Score the nodes.
|
||||
for (const auto &[node_id, node] : candidate_nodes) {
|
||||
const auto &node_resources = node->GetLocalView();
|
||||
if (AllocationWillExceedMaxCpuFraction(
|
||||
node_resources, required_resources, options.max_cpu_fraction_per_node)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
double node_score = node_scorer_->Score(required_resources, node_resources);
|
||||
if (best_node_id.IsNil() || best_node_score < node_score) {
|
||||
best_node_id = node_id;
|
||||
|
@ -164,7 +191,7 @@ SchedulingResult BundlePackSchedulingPolicy::Schedule(
|
|||
while (!required_resources_list_copy.empty()) {
|
||||
const auto &required_resources_index = required_resources_list_copy.front().first;
|
||||
const auto &required_resources = required_resources_list_copy.front().second;
|
||||
auto best_node = GetBestNode(*required_resources, candidate_nodes);
|
||||
auto best_node = GetBestNode(*required_resources, candidate_nodes, options);
|
||||
if (best_node.first.IsNil()) {
|
||||
// There is no node to meet the scheduling requirements.
|
||||
break;
|
||||
|
@ -178,12 +205,20 @@ SchedulingResult BundlePackSchedulingPolicy::Schedule(
|
|||
// We try to schedule more resources on one node.
|
||||
for (auto iter = required_resources_list_copy.begin();
|
||||
iter != required_resources_list_copy.end();) {
|
||||
if (best_node.second->GetLocalView().IsAvailable(*iter->second)) {
|
||||
const auto &node_resources = best_node.second->GetLocalView();
|
||||
if (node_resources.IsAvailable(*iter->second) // If the node has enough resources.
|
||||
&& !AllocationWillExceedMaxCpuFraction( // and allocating resources won't
|
||||
// exceed max cpu fraction.
|
||||
node_resources,
|
||||
*iter->second,
|
||||
options.max_cpu_fraction_per_node)) {
|
||||
// Then allocate it.
|
||||
RAY_CHECK(cluster_resource_manager_.SubtractNodeAvailableResources(
|
||||
best_node.first, *iter->second));
|
||||
result_nodes[iter->first] = best_node.first;
|
||||
required_resources_list_copy.erase(iter++);
|
||||
} else {
|
||||
// Otherwise try other node.
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
|
@ -229,7 +264,7 @@ SchedulingResult BundleSpreadSchedulingPolicy::Schedule(
|
|||
absl::flat_hash_map<scheduling::NodeID, const Node *> selected_nodes;
|
||||
for (const auto &resource_request : sorted_resource_request_list) {
|
||||
// Score and sort nodes.
|
||||
auto best_node = GetBestNode(*resource_request, candidate_nodes);
|
||||
auto best_node = GetBestNode(*resource_request, candidate_nodes, options);
|
||||
|
||||
// There are nodes to meet the scheduling requirements.
|
||||
if (!best_node.first.IsNil()) {
|
||||
|
@ -240,7 +275,7 @@ SchedulingResult BundleSpreadSchedulingPolicy::Schedule(
|
|||
selected_nodes.emplace(best_node);
|
||||
} else {
|
||||
// Scheduling from selected nodes.
|
||||
auto best_node = GetBestNode(*resource_request, selected_nodes);
|
||||
auto best_node = GetBestNode(*resource_request, selected_nodes, options);
|
||||
if (!best_node.first.IsNil()) {
|
||||
result_nodes.emplace_back(best_node.first);
|
||||
RAY_CHECK(cluster_resource_manager_.SubtractNodeAvailableResources(
|
||||
|
@ -293,8 +328,17 @@ SchedulingResult BundleStrictPackSchedulingPolicy::Schedule(
|
|||
const auto &right_node_it = std::find_if(
|
||||
candidate_nodes.begin(),
|
||||
candidate_nodes.end(),
|
||||
[&aggregated_resource_request](const auto &entry) {
|
||||
return entry.second->GetLocalView().IsAvailable(aggregated_resource_request);
|
||||
[&aggregated_resource_request, &options](const auto &entry) {
|
||||
const auto &node_resources = entry.second->GetLocalView();
|
||||
auto allocatable =
|
||||
(node_resources.IsAvailable(
|
||||
aggregated_resource_request) // If the resource is available
|
||||
&& !AllocationWillExceedMaxCpuFraction( // and allocating resources won't
|
||||
// exceed max cpu fraction.
|
||||
node_resources,
|
||||
aggregated_resource_request,
|
||||
options.max_cpu_fraction_per_node));
|
||||
return allocatable;
|
||||
});
|
||||
|
||||
if (right_node_it == candidate_nodes.end()) {
|
||||
|
@ -303,7 +347,7 @@ SchedulingResult BundleStrictPackSchedulingPolicy::Schedule(
|
|||
return SchedulingResult::Infeasible();
|
||||
}
|
||||
|
||||
auto best_node = GetBestNode(aggregated_resource_request, candidate_nodes);
|
||||
auto best_node = GetBestNode(aggregated_resource_request, candidate_nodes, options);
|
||||
|
||||
// Select the node with the highest score.
|
||||
// `StrictPackSchedule` does not need to consider the scheduling context, because it
|
||||
|
@ -349,7 +393,7 @@ SchedulingResult BundleStrictSpreadSchedulingPolicy::Schedule(
|
|||
std::vector<scheduling::NodeID> result_nodes;
|
||||
for (const auto &resource_request : sorted_resource_request_list) {
|
||||
// Score and sort nodes.
|
||||
auto best_node = GetBestNode(*resource_request, candidate_nodes);
|
||||
auto best_node = GetBestNode(*resource_request, candidate_nodes, options);
|
||||
|
||||
// There are nodes to meet the scheduling requirements.
|
||||
if (!best_node.first.IsNil()) {
|
||||
|
|
|
@ -58,7 +58,8 @@ class BundleSchedulingPolicy : public IBundleSchedulingPolicy {
|
|||
/// \return Score of all nodes.
|
||||
std::pair<scheduling::NodeID, const Node *> GetBestNode(
|
||||
const ResourceRequest &required_resources,
|
||||
const absl::flat_hash_map<scheduling::NodeID, const Node *> &candidate_nodes) const;
|
||||
const absl::flat_hash_map<scheduling::NodeID, const Node *> &candidate_nodes,
|
||||
const SchedulingOptions &options) const;
|
||||
|
||||
protected:
|
||||
/// The cluster resource manager.
|
||||
|
|
|
@ -77,44 +77,6 @@ struct SchedulingOptions {
|
|||
return scheduling_options;
|
||||
}
|
||||
|
||||
// construct option for soft pack scheduling policy.
|
||||
static SchedulingOptions BundlePack() {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_PACK,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false);
|
||||
}
|
||||
|
||||
// construct option for strict spread scheduling policy.
|
||||
static SchedulingOptions BundleSpread() {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_SPREAD,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false);
|
||||
}
|
||||
|
||||
// construct option for strict pack scheduling policy.
|
||||
static SchedulingOptions BundleStrictPack() {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_STRICT_PACK,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false);
|
||||
}
|
||||
|
||||
// construct option for strict spread scheduling policy.
|
||||
static SchedulingOptions BundleStrictSpread(
|
||||
std::unique_ptr<SchedulingContext> scheduling_context = nullptr) {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_STRICT_SPREAD,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*scheduling_context*/ std::move(scheduling_context));
|
||||
}
|
||||
|
||||
// construct option for affinity with bundle scheduling policy.
|
||||
static SchedulingOptions AffinityWithBundle(const BundleID &bundle_id) {
|
||||
auto scheduling_context =
|
||||
|
@ -124,14 +86,68 @@ struct SchedulingOptions {
|
|||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*max_cpu_fraction_per_node*/ 0,
|
||||
std::move(scheduling_context));
|
||||
}
|
||||
|
||||
/*
|
||||
* Bundle scheduling options.
|
||||
*/
|
||||
|
||||
// construct option for soft pack scheduling policy.
|
||||
static SchedulingOptions BundlePack(double max_cpu_fraction_per_node = 1.0) {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_PACK,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*max_cpu_fraction_per_node*/ max_cpu_fraction_per_node);
|
||||
}
|
||||
|
||||
// construct option for strict spread scheduling policy.
|
||||
static SchedulingOptions BundleSpread(double max_cpu_fraction_per_node = 1.0) {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_SPREAD,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*max_cpu_fraction_per_node*/ max_cpu_fraction_per_node);
|
||||
}
|
||||
|
||||
// construct option for strict pack scheduling policy.
|
||||
static SchedulingOptions BundleStrictPack(double max_cpu_fraction_per_node = 1.0) {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_STRICT_PACK,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*max_cpu_fraction_per_node*/ max_cpu_fraction_per_node);
|
||||
}
|
||||
|
||||
// construct option for strict spread scheduling policy.
|
||||
static SchedulingOptions BundleStrictSpread(
|
||||
double max_cpu_fraction_per_node = 1.0,
|
||||
std::unique_ptr<SchedulingContext> scheduling_context = nullptr) {
|
||||
return SchedulingOptions(SchedulingType::BUNDLE_STRICT_SPREAD,
|
||||
/*spread_threshold*/ 0,
|
||||
/*avoid_local_node*/ false,
|
||||
/*require_node_available*/ true,
|
||||
/*avoid_gpu_nodes*/ false,
|
||||
/*max_cpu_fraction_per_node*/ max_cpu_fraction_per_node,
|
||||
/*scheduling_context*/ std::move(scheduling_context));
|
||||
}
|
||||
|
||||
SchedulingType scheduling_type;
|
||||
float spread_threshold;
|
||||
bool avoid_local_node;
|
||||
bool require_node_available;
|
||||
bool avoid_gpu_nodes;
|
||||
// Maximum reservable CPU fraction per node. It is applied across multiple
|
||||
// bundles, individually. E.g., when you have 2 bundles {CPU: 4} from 2 different
|
||||
// scheduilng request, and there's one node with {CPU: 8}, only 1 bundle from 1 request
|
||||
// can be scheduled on this node. This is only used for bundle scheduling policies
|
||||
// (bundle pack, spread).
|
||||
double max_cpu_fraction_per_node;
|
||||
std::shared_ptr<SchedulingContext> scheduling_context;
|
||||
std::string node_affinity_node_id;
|
||||
bool node_affinity_soft = false;
|
||||
|
@ -142,12 +158,14 @@ struct SchedulingOptions {
|
|||
bool avoid_local_node,
|
||||
bool require_node_available,
|
||||
bool avoid_gpu_nodes,
|
||||
double max_cpu_fraction_per_node = 1.0,
|
||||
std::shared_ptr<SchedulingContext> scheduling_context = nullptr)
|
||||
: scheduling_type(type),
|
||||
spread_threshold(spread_threshold),
|
||||
avoid_local_node(avoid_local_node),
|
||||
require_node_available(require_node_available),
|
||||
avoid_gpu_nodes(avoid_gpu_nodes),
|
||||
max_cpu_fraction_per_node(max_cpu_fraction_per_node),
|
||||
scheduling_context(std::move(scheduling_context)) {}
|
||||
|
||||
friend class ::ray::raylet::SchedulingPolicyTest;
|
||||
|
|
|
@ -25,13 +25,6 @@ namespace raylet_scheduling_policy {
|
|||
|
||||
// Status of resource scheduling result.
|
||||
struct SchedulingResultStatus {
|
||||
bool IsFailed() const { return code == SchedulingResultStatusCode::FAILED; }
|
||||
bool IsInfeasible() const { return code == SchedulingResultStatusCode::INFEASIBLE; }
|
||||
bool IsSuccess() const { return code == SchedulingResultStatusCode::SUCCESS; }
|
||||
bool IsPartialSuccess() const {
|
||||
return code == SchedulingResultStatusCode::PARTIAL_SUCCESS;
|
||||
}
|
||||
|
||||
enum class SchedulingResultStatusCode {
|
||||
// Scheduling failed but retryable.
|
||||
FAILED = 0,
|
||||
|
@ -42,6 +35,15 @@ struct SchedulingResultStatus {
|
|||
// Only part of the requested resources succeed when batch scheduling.
|
||||
PARTIAL_SUCCESS = 3,
|
||||
};
|
||||
|
||||
bool IsFailed() const { return code == SchedulingResultStatusCode::FAILED; }
|
||||
bool IsInfeasible() const { return code == SchedulingResultStatusCode::INFEASIBLE; }
|
||||
bool IsSuccess() const { return code == SchedulingResultStatusCode::SUCCESS; }
|
||||
bool IsPartialSuccess() const {
|
||||
return code == SchedulingResultStatusCode::PARTIAL_SUCCESS;
|
||||
}
|
||||
SchedulingResultStatusCode Status() const { return code; }
|
||||
|
||||
SchedulingResultStatusCode code = SchedulingResultStatusCode::SUCCESS;
|
||||
};
|
||||
|
||||
|
|
|
@ -488,6 +488,69 @@ TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) {
|
|||
ASSERT_EQ(to_schedule, remote_node);
|
||||
}
|
||||
|
||||
TEST_F(SchedulingPolicyTest, BundleSchedulingMaxFractionTest) {
|
||||
/*
|
||||
* Test the bundle scheduling policy respects the max fraction request.
|
||||
*/
|
||||
|
||||
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 2}, {"GPU", 1}}, false);
|
||||
std::vector<const ResourceRequest *> req_list;
|
||||
req_list.push_back(&req);
|
||||
req_list.push_back(&req);
|
||||
auto pack_op = SchedulingOptions::BundlePack(/*max_cpu_fraction_per_node*/ 0.5);
|
||||
auto strict_pack_op =
|
||||
SchedulingOptions::BundleStrictPack(/*max_cpu_fraction_per_node*/ 0.5);
|
||||
auto spread_op = SchedulingOptions::BundleSpread(/*max_cpu_fraction_per_node*/ 0.5);
|
||||
auto strict_spread_op =
|
||||
SchedulingOptions::BundleStrictSpread(/*max_cpu_fraction_per_node*/ 0.5);
|
||||
|
||||
nodes.emplace(local_node, CreateNodeResources(7, 7, 0, 0, 2, 2));
|
||||
|
||||
auto cluster_resource_manager = MockClusterResourceManager(nodes);
|
||||
// req is unscheduleable because the max cpu fraction reaches 0.5.
|
||||
auto unscheduable = raylet_scheduling_policy::BundlePackSchedulingPolicy(
|
||||
cluster_resource_manager, [](auto) { return true; })
|
||||
.Schedule(req_list, pack_op);
|
||||
ASSERT_TRUE(unscheduable.status.IsFailed());
|
||||
|
||||
unscheduable = raylet_scheduling_policy::BundleSpreadSchedulingPolicy(
|
||||
cluster_resource_manager, [](auto) { return true; })
|
||||
.Schedule(req_list, spread_op);
|
||||
ASSERT_TRUE(unscheduable.status.IsFailed());
|
||||
|
||||
unscheduable = raylet_scheduling_policy::BundleStrictPackSchedulingPolicy(
|
||||
cluster_resource_manager, [](auto) { return true; })
|
||||
.Schedule(req_list, strict_pack_op);
|
||||
ASSERT_TRUE(unscheduable.status.IsInfeasible());
|
||||
|
||||
unscheduable = raylet_scheduling_policy::BundleStrictSpreadSchedulingPolicy(
|
||||
cluster_resource_manager, [](auto) { return true; })
|
||||
.Schedule(req_list, strict_spread_op);
|
||||
ASSERT_TRUE(unscheduable.status.IsInfeasible());
|
||||
}
|
||||
|
||||
TEST_F(SchedulingPolicyTest, BundleSchedulingMaxFractionOneCpuReservationGuaranteeTest) {
|
||||
/*
|
||||
* Test that when the max cpu fraction is provided, it reserves at least 1 CPU.
|
||||
*/
|
||||
|
||||
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
|
||||
std::vector<const ResourceRequest *> req_list;
|
||||
req_list.push_back(&req);
|
||||
|
||||
// NOTE: We can only reserve up to 0.4 CPU, but it will round up to 1,
|
||||
// which means the placement group is schedulable.
|
||||
auto pack_op = SchedulingOptions::BundlePack(/*max_cpu_fraction_per_node*/ 0.1);
|
||||
nodes.emplace(local_node, CreateNodeResources(4, 4, 0, 0, 0, 0));
|
||||
|
||||
auto cluster_resource_manager = MockClusterResourceManager(nodes);
|
||||
// req is unscheduleable because the max cpu fraction reaches 0.5.
|
||||
auto to_schedule = raylet_scheduling_policy::BundlePackSchedulingPolicy(
|
||||
cluster_resource_manager, [](auto) { return true; })
|
||||
.Schedule(req_list, pack_op);
|
||||
ASSERT_TRUE(to_schedule.status.IsSuccess());
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
|
Loading…
Add table
Reference in a new issue