[hotfix][core] Load balancing spillback feature flag (#14457)

This commit is contained in:
Alex Wu 2021-03-05 16:45:33 -08:00 committed by GitHub
parent 2002cff42e
commit 2395e25fc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 38 deletions

View file

@ -1,3 +1,4 @@
from collections import Counter
import os
import pytest
import subprocess
@ -8,6 +9,7 @@ from ray.test_utils import (
check_call_ray, run_string_as_driver, run_string_as_driver_nonblocking,
wait_for_children_of_pid, wait_for_children_of_pid_to_exit,
wait_for_children_names_of_pid, kill_process_by_name, Semaphore)
from time import sleep
def test_calling_start_ray_head(call_ray_stop_only):
@ -392,25 +394,32 @@ ray.get(main_wait.release.remote())
def test_spillback_distribution(ray_start_cluster):
cluster = ray_start_cluster
# Create a head node and wait until it is up.
cluster.add_node(num_cpus=0)
cluster.add_node(
num_cpus=0, _system_config={"scheduler_loadbalance_spillback": True})
ray.init(address=cluster.address)
cluster.wait_for_nodes()
num_nodes = 3
num_nodes = 2
# create 2 worker nodes.
for _ in range(num_nodes):
cluster.add_node(num_cpus=5)
cluster.add_node(num_cpus=8)
cluster.wait_for_nodes()
assert ray.cluster_resources()["CPU"] == 16
@ray.remote
def task():
import time
time.sleep(1)
return ray.worker._global_node.unique_id
sleep(1)
return ray.worker.global_worker.current_node_id
# Make sure tasks are spilled back non-deterministically.
task_refs = [task.remote() for _ in range(5)]
assert len(set(ray.get(task_refs))) != 1
locations = ray.get([task.remote() for _ in range(8)])
counter = Counter(locations)
spread = max(counter.values()) - min(counter.values())
# Ideally we'd want 4 tasks to go to each node, but we'll settle for
# anything better than a 1-7 split since randomness is noisy.
assert spread < 6
assert len(counter) > 1
@ray.remote(num_cpus=1)
class Actor1:
@ -418,26 +427,14 @@ def test_spillback_distribution(ray_start_cluster):
pass
def get_location(self):
return ray.worker.global_worker.node.unique_id
return ray.worker.global_worker.current_node_id
# Create a bunch of actors.
num_actors = 10
num_attempts = 20
minimum_count = 2
# Make sure that actors are spread between the raylets.
attempts = 0
while attempts < num_attempts:
actors = [Actor1.remote() for _ in range(num_actors)]
locations = ray.get([actor.get_location.remote() for actor in actors])
names = set(locations)
counts = [locations.count(name) for name in names]
print("Counts are {}.".format(counts))
if (len(names) == num_nodes
and all(count >= minimum_count for count in counts)):
break
attempts += 1
assert attempts < num_attempts
actors = [Actor1.remote() for _ in range(10)]
locations = ray.get([actor.get_location.remote() for actor in actors])
counter = Counter(locations)
spread = max(counter.values()) - min(counter.values())
assert spread < 6
assert len(counter) > 1
if __name__ == "__main__":

View file

@ -18,11 +18,6 @@
// Macro definition format: RAY_CONFIG(type, name, default_value).
// NOTE: This file should NOT be included in any file other than ray_config.h.
// IF YOU MODIFY THIS FILE and add a configuration parameter, you must change
// at least two additional things:
// 1. You must update the file "ray/python/ray/includes/ray_config.pxd".
// 2. You must update the file "ray/python/ray/includes/ray_config.pxi".
/// In theory, this is used to detect Ray cookie mismatches.
/// This magic number (hex for "RAY") is used instead of zero, rationale is
/// that it could still be possible that some random program sends an int64_t
@ -100,6 +95,13 @@ RAY_CONFIG(size_t, free_objects_batch_size, 100)
RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// Pick between 2 scheduling spillback strategies. Load balancing mode picks the node at
/// uniform random from the valid options. The other mode is more likely to spill back
/// many tasks to the same node.
RAY_CONFIG(bool, scheduler_loadbalance_spillback,
getenv("RAY_SCHEDULER_LOADBALANCE_SPILLBACK") != nullptr &&
getenv("RAY_SCHEDULER_LOADBALANCE_SPILLBACK") != std::string("1"))
// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled/promoted to plasma.
RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)

View file

@ -30,7 +30,8 @@ ClusterResourceScheduler::ClusterResourceScheduler(
ClusterResourceScheduler::ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources,
std::function<int64_t(void)> get_used_object_store_memory) {
std::function<int64_t(void)> get_used_object_store_memory)
: loadbalance_spillback_(RayConfig::instance().scheduler_loadbalance_spillback()) {
local_node_id_ = string_to_int_map_.Insert(local_node_id);
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources);
@ -318,13 +319,17 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
*total_violations = min_violations;
// Randomly select one of the best nodes to spillback.
int64_t best_node;
if (best_nodes.empty()) {
best_node = -1;
} else {
int idx = std::rand() % best_nodes.size();
int64_t best_node = -1;
if (!best_nodes.empty()) {
int idx;
if (loadbalance_spillback_) {
idx = std::rand() % best_nodes.size();
} else {
idx = 0;
}
best_node = best_nodes[idx];
}
// If there's no best node, and the task is not feasible locally,
// it means the task is infeasible.
*is_infeasible = best_node == -1 && !local_node_feasible;

View file

@ -435,6 +435,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool SubtractRemoteNodeAvailableResources(int64_t node_id,
const TaskRequest &task_request);
bool loadbalance_spillback_;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
absl::flat_hash_map<int64_t, Node> nodes_;