Round robin during spread scheduling (#21303)

- Separate spread scheduling and default hydra scheduling (i.e. SpreadScheduling != HybridScheduling(threshold=0)): they are already separated in the API layer and they have the different end goals so it makes sense to separate their implementations and evolve them independently.
- Simple round robin for spread scheduling: this is just a starting implementation, can be optimized later.
- Prefer not to spill back tasks that are waiting for args since the pull is already in progress.
This commit is contained in:
Jiajun Yao 2022-02-18 15:05:35 -08:00 committed by GitHub
parent 5a4c6d2e88
commit baa14d695a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 156 additions and 104 deletions

View file

@ -31,6 +31,8 @@ def simple_shuffle(
map_ray_remote_args = {}
if reduce_ray_remote_args is None:
reduce_ray_remote_args = {}
if "scheduling_strategy" not in reduce_ray_remote_args:
reduce_ray_remote_args["scheduling_strategy"] = "SPREAD"
input_num_blocks = len(input_blocks)
if _spread_resource_prefix is not None:
# Use given spread resource prefix for round-robin resource-based

View file

@ -252,6 +252,8 @@ def read_datasource(
# Note that the too many workers warning triggers at 4x subscription,
# so we go at 0.5 to avoid the warning message.
ray_remote_args["num_cpus"] = 0.5
if "scheduling_strategy" not in ray_remote_args:
ray_remote_args["scheduling_strategy"] = "SPREAD"
remote_read = cached_remote_fn(remote_read)
if _spread_resource_prefix is not None:

View file

@ -3197,13 +3197,15 @@ def test_random_shuffle(shutdown_only, pipelined):
assert r1.take() == ds.take()
def test_random_shuffle_spread(ray_start_cluster):
@pytest.mark.parametrize("use_spread_resource_prefix", [False, True])
def test_random_shuffle_spread(ray_start_cluster, use_spread_resource_prefix):
cluster = ray_start_cluster
cluster.add_node(
resources={"foo": 100}, _system_config={"max_direct_call_object_size": 0}
resources={"bar:1": 100},
num_cpus=10,
_system_config={"max_direct_call_object_size": 0},
)
cluster.add_node(resources={"bar:1": 100})
cluster.add_node(resources={"bar:2": 100})
cluster.add_node(resources={"bar:2": 100}, num_cpus=10)
cluster.add_node(resources={"bar:3": 100}, num_cpus=0)
ray.init(cluster.address)
@ -3216,7 +3218,7 @@ def test_random_shuffle_spread(ray_start_cluster):
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())
ds = ray.data.range(100, parallelism=2).random_shuffle(
_spread_resource_prefix="bar:"
_spread_resource_prefix=("bar:" if use_spread_resource_prefix else None)
)
blocks = ds.get_internal_block_refs()
ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
@ -3227,13 +3229,15 @@ def test_random_shuffle_spread(ray_start_cluster):
assert set(locations) == {node1_id, node2_id}
def test_parquet_read_spread(ray_start_cluster, tmp_path):
@pytest.mark.parametrize("use_spread_resource_prefix", [False, True])
def test_parquet_read_spread(ray_start_cluster, tmp_path, use_spread_resource_prefix):
cluster = ray_start_cluster
cluster.add_node(
resources={"foo": 100}, _system_config={"max_direct_call_object_size": 0}
resources={"bar:1": 100},
num_cpus=10,
_system_config={"max_direct_call_object_size": 0},
)
cluster.add_node(resources={"bar:1": 100})
cluster.add_node(resources={"bar:2": 100})
cluster.add_node(resources={"bar:2": 100}, num_cpus=10)
cluster.add_node(resources={"bar:3": 100}, num_cpus=0)
ray.init(cluster.address)
@ -3253,7 +3257,10 @@ def test_parquet_read_spread(ray_start_cluster, tmp_path):
path2 = os.path.join(data_path, "test2.parquet")
df2.to_parquet(path2)
ds = ray.data.read_parquet(data_path, _spread_resource_prefix="bar:")
ds = ray.data.read_parquet(
data_path,
_spread_resource_prefix=("bar:" if use_spread_resource_prefix else None),
)
# Force reads.
blocks = ds.get_internal_block_refs()

View file

@ -273,7 +273,7 @@ def test_spread_scheduling_overrides_locality_aware_scheduling(ray_start_cluster
},
)
ray.init(address=cluster.address)
cluster.add_node(num_cpus=8, resources={"pin": 1})
remote_node = cluster.add_node(num_cpus=8, resources={"pin": 1})
cluster.wait_for_nodes()
@ray.remote(resources={"pin": 1})
@ -284,9 +284,14 @@ def test_spread_scheduling_overrides_locality_aware_scheduling(ray_start_cluster
def f(x):
return ray.worker.global_worker.node.unique_id
# Test that task f() runs on the local node
# even though non local node has the dependencies.
assert ray.get(f.remote(non_local.remote())) == local_node.unique_id
# Test that task f() runs on the local node as well
# even though remote node has the dependencies.
obj1 = non_local.remote()
obj2 = non_local.remote()
assert {ray.get(f.remote(obj1)), ray.get(f.remote(obj2))} == {
local_node.unique_id,
remote_node.unique_id,
}
def test_locality_aware_leasing(ray_start_cluster):

View file

@ -268,12 +268,11 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
"scheduler_spread_threshold": 1,
},
)
ray.init(address=cluster.address)
for i in range(2):
cluster.add_node(num_cpus=8, resources={f"foo:{i}": 1})
cluster.wait_for_nodes()
ray.init(address=cluster.address)
with connect_to_client_or_not(connect_to_client):
@ray.remote
@ -314,28 +313,6 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
internal_kv._internal_kv_del("test_task2")
assert set(ray.get(locations)) == worker_node_ids
# Wait for updating driver raylet's resource view.
time.sleep(5)
@ray.remote(scheduling_strategy=SPREAD_SCHEDULING_STRATEGY, num_cpus=1)
class Actor1:
def get_node_id(self):
return ray.worker.global_worker.current_node_id
@ray.remote(num_cpus=1)
class Actor2:
def get_node_id(self):
return ray.worker.global_worker.current_node_id
locations = []
actor1 = Actor1.remote()
locations.append(ray.get(actor1.get_node_id.remote()))
# Wait for updating driver raylet's resource view.
time.sleep(5)
actor2 = Actor2.options(scheduling_strategy=SPREAD_SCHEDULING_STRATEGY).remote()
locations.append(ray.get(actor2.get_node_id.remote()))
assert set(locations) == worker_node_ids
if __name__ == "__main__":
import pytest

View file

@ -12,7 +12,7 @@ SPREAD_SCHEDULING_STRATEGY = "SPREAD"
@PublicAPI(stability="beta")
class PlacementGroupSchedulingStrategy(object):
class PlacementGroupSchedulingStrategy:
"""Placement group based scheduling strategy.
Attributes:

View file

@ -221,7 +221,7 @@ def create_torch_iterator(split, batch_size, rank=None):
def create_dataset(files, num_workers=4, epochs=50, num_windows=1):
if num_windows > 1:
num_rows = ray.data.read_parquet(
files, _spread_resource_prefix="node:"
files
).count() # This should only read Parquet metadata.
file_splits = np.array_split(files, num_windows)
@ -238,20 +238,18 @@ def create_dataset(files, num_workers=4, epochs=50, num_windows=1):
raise StopIteration()
split = file_splits[self.i % num_windows]
self.i += 1
return lambda: ray.data.read_parquet(
list(split), _spread_resource_prefix="node:"
)
return lambda: ray.data.read_parquet(list(split))
pipe = DatasetPipeline.from_iterable(Windower())
split_indices = [
i * num_rows // num_windows // num_workers for i in range(1, num_workers)
]
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
pipe_shards = pipe.split_at_indices(split_indices)
else:
ds = ray.data.read_parquet(files, _spread_resource_prefix="node:")
ds = ray.data.read_parquet(files)
pipe = ds.repeat(epochs)
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
pipe_shards = pipe.split(num_workers, equal=True)
return pipe_shards

View file

@ -218,16 +218,14 @@ def create_dataset_pipeline(files, epochs, num_windows):
raise StopIteration()
split = file_splits[self.i % num_windows]
self.i += 1
return lambda: ray.data.read_parquet(
list(split), _spread_resource_prefix="node:"
)
return lambda: ray.data.read_parquet(list(split))
pipe = DatasetPipeline.from_iterable(Windower())
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
else:
ds = ray.data.read_parquet(files, _spread_resource_prefix="node:")
ds = ray.data.read_parquet(files)
pipe = ds.repeat(epochs)
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
return pipe

View file

@ -24,11 +24,7 @@ from ray.train.callbacks import MLflowLoggerCallback, TBXLoggerCallback
def read_dataset(path: str) -> ray.data.Dataset:
print(f"reading data from {path}")
return (
ray.data.read_parquet(path, _spread_resource_prefix="node:")
.repartition(400)
.random_shuffle(_spread_resource_prefix="node:")
)
return ray.data.read_parquet(path).repartition(400).random_shuffle()
class DataPreprocessor:
@ -547,7 +543,7 @@ if __name__ == "__main__":
num_gpus = 1 if use_gpu else 0
shards = (
train_dataset.repeat(num_epochs)
.random_shuffle_each_window(_spread_resource_prefix="node:")
.random_shuffle_each_window()
.split(num_workers)
)
del train_dataset
@ -568,9 +564,7 @@ if __name__ == "__main__":
exit()
# Random global shuffle
train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window(
_spread_resource_prefix="node:"
)
train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window()
del train_dataset
datasets = {"train_dataset": train_dataset_pipeline, "test_dataset": test_dataset}

View file

@ -422,7 +422,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
// assign work to the worker.
rpc::WorkerAddress addr(reply.worker_address());
RAY_LOG(DEBUG) << "Lease granted to task " << task_id << " from raylet "
<< addr.raylet_id;
<< addr.raylet_id << " with worker " << addr.worker_id;
auto resources_copy = reply.resource_mapping();

View file

@ -165,16 +165,21 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
return best_node;
}
// TODO (Alex): Setting require_available == force_spillback is a hack in order to
// remain bug compatible with the legacy scheduling algorithms.
int64_t best_node_id = scheduling_policy_->HybridPolicy(
resource_request,
scheduling_strategy.scheduling_strategy_case() ==
rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy
? 0.0
: RayConfig::instance().scheduler_spread_threshold(),
force_spillback, force_spillback,
[this](auto node_id) { return this->NodeAlive(node_id); });
int64_t best_node_id = -1;
if (scheduling_strategy.scheduling_strategy_case() ==
rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy) {
best_node_id = scheduling_policy_->SpreadPolicy(
resource_request, force_spillback, force_spillback,
[this](auto node_id) { return this->NodeAlive(node_id); });
} else {
// TODO (Alex): Setting require_available == force_spillback is a hack in order to
// remain bug compatible with the legacy scheduling algorithms.
best_node_id = scheduling_policy_->HybridPolicy(
resource_request, RayConfig::instance().scheduler_spread_threshold(),
force_spillback, force_spillback,
[this](auto node_id) { return this->NodeAlive(node_id); });
}
*is_infeasible = best_node_id == -1 ? true : false;
if (!*is_infeasible) {
// TODO (Alex): Support soft constraints if needed later.

View file

@ -334,17 +334,17 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
bool is_infeasible;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_spread_scheduling_strategy();
std::string node_id = resource_scheduler.GetBestSchedulableNode(
std::string node_id_1 = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &violations,
&is_infeasible);
ASSERT_EQ(node_id, local_node_id);
absl::flat_hash_map<std::string, double> resource_available({{"CPU", 9}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
local_node_id, resource_total, resource_available);
node_id = resource_scheduler.GetBestSchedulableNode(resource_request,
scheduling_strategy, false, false,
false, &violations, &is_infeasible);
ASSERT_EQ(node_id, remote_node_id);
node_id_1, resource_total, resource_available);
std::string node_id_2 = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &violations,
&is_infeasible);
ASSERT_EQ((std::set<std::string>{node_id_1, node_id_2}),
(std::set<std::string>{local_node_id, remote_node_id}));
}
TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {

View file

@ -77,10 +77,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId();
std::string node_id_string =
GetBestSchedulableNode(*work,
/*requires_object_store_memory=*/false,
/*force_spillback=*/false, &is_infeasible);
std::string node_id_string = GetBestSchedulableNode(*work, &is_infeasible);
// There is no node that has available resources to run the request.
// Move on to the next shape.
@ -129,10 +126,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() {
RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:"
<< task.GetTaskSpecification().TaskId();
bool is_infeasible;
std::string node_id_string =
GetBestSchedulableNode(*work,
/*requires_object_store_memory=*/false,
/*force_spillback=*/false, &is_infeasible);
std::string node_id_string = GetBestSchedulableNode(*work, &is_infeasible);
// There is no node that has available resources to run the request.
// Move on to the next shape.
@ -318,12 +312,10 @@ bool ClusterTaskManager::IsLocallySchedulable(const RayTask &task) const {
}
std::string ClusterTaskManager::GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
bool force_spillback,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if ((work.grant_or_reject || work.is_selected_based_on_locality) && !force_spillback &&
if ((work.grant_or_reject || work.is_selected_based_on_locality) &&
IsLocallySchedulable(work.task)) {
*is_infeasible = false;
return self_node_id_.Binary();
@ -334,9 +326,9 @@ std::string ClusterTaskManager::GetBestSchedulableNode(const internal::Work &wor
return cluster_resource_scheduler_->GetBestSchedulableNode(
work.task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(),
work.task.GetTaskSpecification().GetMessage().scheduling_strategy(),
requires_object_store_memory,
work.task.GetTaskSpecification().IsActorCreationTask(), force_spillback, &_unused,
is_infeasible);
/*requires_object_store_memory=*/false,
work.task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/false, &_unused, is_infeasible);
}
} // namespace raylet

View file

@ -144,9 +144,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
private:
/// Helper method to get the best node for running the task.
std::string GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
bool force_spillback, bool *is_infeasible);
std::string GetBestSchedulableNode(const internal::Work &work, bool *is_infeasible);
void TryScheduleInfeasibleTask();

View file

@ -317,7 +317,7 @@ void LocalTaskManager::SpillWaitingTasks() {
// the most memory availability.
std::string node_id_string =
GetBestSchedulableNode(*(*it),
/*requires_object_store_memory=*/true,
/*spill_waiting_task=*/true,
/*force_spillback=*/force_spillback, &is_infeasible);
if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(node_id_string);
@ -348,7 +348,7 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
bool &is_infeasible) {
std::string node_id_string =
GetBestSchedulableNode(*work,
/*requires_object_store_memory=*/false,
/*spill_waiting_task=*/false,
/*force_spillback=*/false, &is_infeasible);
if (is_infeasible || node_id_string == self_node_id_.Binary() ||
@ -1036,13 +1036,14 @@ uint64_t LocalTaskManager::MaxRunningTasksPerSchedulingClass(
}
std::string LocalTaskManager::GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
bool spill_waiting_task,
bool force_spillback,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if ((work.grant_or_reject || work.is_selected_based_on_locality) && !force_spillback &&
IsLocallySchedulable(work.task)) {
if ((work.grant_or_reject || work.is_selected_based_on_locality ||
spill_waiting_task) &&
!force_spillback && IsLocallySchedulable(work.task)) {
*is_infeasible = false;
return self_node_id_.Binary();
}
@ -1052,7 +1053,7 @@ std::string LocalTaskManager::GetBestSchedulableNode(const internal::Work &work,
return cluster_resource_scheduler_->GetBestSchedulableNode(
work.task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(),
work.task.GetTaskSpecification().GetMessage().scheduling_strategy(),
requires_object_store_memory,
/*requires_object_store_memory=*/spill_waiting_task,
work.task.GetTaskSpecification().IsActorCreationTask(), force_spillback, &_unused,
is_infeasible);
}

View file

@ -210,8 +210,7 @@ class LocalTaskManager {
uint64_t MaxRunningTasksPerSchedulingClass(SchedulingClass sched_cls_id) const;
/// Helper method to get the best node for running the task.
std::string GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
std::string GetBestSchedulableNode(const internal::Work &work, bool spill_waiting_task,
bool force_spillback, bool *is_infeasible);
/// Recompute the debug stats.

View file

@ -16,6 +16,8 @@
#include <functional>
#include "ray/util/container_util.h"
namespace ray {
namespace raylet_scheduling_policy {
@ -36,6 +38,37 @@ bool DoesNodeHaveGPUs(const NodeResources &resources) {
}
} // namespace
int64_t SchedulingPolicy::SpreadPolicy(const ResourceRequest &resource_request,
bool force_spillback, bool require_available,
std::function<bool(int64_t)> is_node_available) {
std::vector<int64_t> round;
round.reserve(nodes_.size());
for (const auto &pair : nodes_) {
round.emplace_back(pair.first);
}
std::sort(round.begin(), round.end());
size_t round_index = spread_scheduling_next_index_;
for (size_t i = 0; i < round.size(); ++i, ++round_index) {
const auto &node_id = round[round_index % round.size()];
const auto &node = map_find_or_die(nodes_, node_id);
if (node_id == local_node_id_ && force_spillback) {
continue;
}
if (!is_node_available(node_id) ||
!node.GetLocalView().IsFeasible(resource_request) ||
!node.GetLocalView().IsAvailable(resource_request, true)) {
continue;
}
spread_scheduling_next_index_ = ((round_index + 1) % round.size());
return node_id;
}
return HybridPolicy(resource_request, 0, force_spillback, require_available,
is_node_available);
}
int64_t SchedulingPolicy::HybridPolicyWithFilter(
const ResourceRequest &resource_request, float spread_threshold, bool force_spillback,
bool require_available, std::function<bool(int64_t)> is_node_available,

View file

@ -66,12 +66,22 @@ class SchedulingPolicy {
std::function<bool(int64_t)> is_node_available,
bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes());
/// Round robin among available nodes.
/// If there are no available nodes, fallback to hybrid policy.
int64_t SpreadPolicy(const ResourceRequest &resource_request, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available);
private:
/// Identifier of local node.
const int64_t local_node_id_;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
const absl::flat_hash_map<int64_t, Node> &nodes_;
// The node to start round robin if it's spread scheduling.
// The index may be inaccurate when nodes are added or removed dynamically,
// but it should still be better than always scanning from 0 for spread scheduling.
size_t spread_scheduling_next_index_ = 0;
enum class NodeFilter {
/// Default scheduling.

View file

@ -35,6 +35,37 @@ NodeResources CreateNodeResources(double available_cpu, double total_cpu,
class SchedulingPolicyTest : public ::testing::Test {};
TEST_F(SchedulingPolicyTest, SpreadPolicyTest) {
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;
int64_t remote_node_3 = 3;
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
// Unavailable node
nodes.emplace(remote_node_1, CreateNodeResources(0, 20, 0, 0, 0, 0));
// Infeasible node
nodes.emplace(remote_node_2, CreateNodeResources(0, 0, 0, 0, 0, 0));
nodes.emplace(remote_node_3, CreateNodeResources(20, 20, 0, 0, 0, 0));
raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes);
int64_t to_schedule =
scheduling_policy.SpreadPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);
to_schedule =
scheduling_policy.SpreadPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node_3);
to_schedule = scheduling_policy.SpreadPolicy(req, /*force_spillback=*/true, false,
[](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node_3);
}
TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) {
StringIdMap map;
auto task_req1 =