[autoscaler] Add an aggressive_autoscaling flag (#4285)

This commit is contained in:
Daniel Edgecumbe 2019-04-14 02:44:32 +01:00 committed by Robert Nishihara
parent 56a78baf67
commit 3e1adafbce
11 changed files with 113 additions and 17 deletions

View file

@ -51,6 +51,9 @@ CLUSTER_CONFIG_SCHEMA = {
# The number of workers to launch initially, in addition to the head node.
"initial_workers": (int, OPTIONAL),
# The mode of the autoscaler e.g. default, aggressive
"autoscaling_mode": (str, OPTIONAL),
# The autoscaler will scale up the cluster to this target fraction of
# resources usage. For example, if a cluster of 8 nodes is 100% busy
# and target_utilization was 0.8, it would resize the cluster to 10.
@ -519,9 +522,13 @@ class StandardAutoscaler(object):
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
initial_workers = self.config["initial_workers"]
aggressive = self.config["autoscaling_mode"] == "aggressive"
if self.bringup:
ideal_num_workers = max(ideal_num_workers,
self.config["initial_workers"])
ideal_num_workers = max(ideal_num_workers, initial_workers)
elif aggressive and cur_used > 0:
# If we want any workers, we want at least initial_workers
ideal_num_workers = max(ideal_num_workers, initial_workers)
return min(self.config["max_workers"],
max(self.config["min_workers"], ideal_num_workers))

View file

@ -14,6 +14,11 @@ max_workers: 2
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.

View file

@ -14,6 +14,11 @@ max_workers: 2
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.

View file

@ -14,6 +14,11 @@ max_workers: 2
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.

View file

@ -14,6 +14,11 @@ max_workers: 2
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.

View file

@ -2,6 +2,7 @@ cluster_name: default
min_workers: 0
max_workers: 0
initial_workers: 0
autoscaling_mode: default
docker:
image: ""
container_name: ""

View file

@ -159,10 +159,11 @@ class NodeUpdater(object):
try:
logger.debug("NodeUpdater: "
"{}: Waiting for SSH...".format(self.node_id))
self.ssh_cmd(
"uptime",
connect_timeout=5,
redirect=open("/dev/null", "w"))
with open("/dev/null", "w") as redirect:
self.ssh_cmd(
"uptime", connect_timeout=5, redirect=redirect)
return True
except Exception as e:
@ -206,12 +207,12 @@ class NodeUpdater(object):
m = "{}: Synced {} to {}".format(self.node_id, local_path,
remote_path)
with LogTimer("NodeUpdater {}".format(m)):
self.ssh_cmd(
"mkdir -p {}".format(os.path.dirname(remote_path)),
redirect=open("/dev/null", "w"),
)
self.rsync_up(
local_path, remote_path, redirect=open("/dev/null", "w"))
with open("/dev/null", "w") as redirect:
self.ssh_cmd(
"mkdir -p {}".format(os.path.dirname(remote_path)),
redirect=redirect,
)
self.rsync_up(local_path, remote_path, redirect=redirect)
# Run init commands
self.provider.set_node_tags(self.node_id,
@ -219,13 +220,15 @@ class NodeUpdater(object):
m = "{}: Initialization commands completed".format(self.node_id)
with LogTimer("NodeUpdater: {}".format(m)):
for cmd in self.initialization_commands:
self.ssh_cmd(cmd)
with open("/dev/null", "w") as redirect:
for cmd in self.initialization_commands:
self.ssh_cmd(cmd, redirect=redirect)
m = "{}: Setup commands completed".format(self.node_id)
with LogTimer("NodeUpdater: {}".format(m)):
for cmd in self.setup_commands:
self.ssh_cmd(cmd)
with open("/dev/null", "w") as redirect:
for cmd in self.setup_commands:
self.ssh_cmd(cmd, redirect=redirect)
def rsync_up(self, source, target, redirect=None, check_error=True):
self.set_ssh_ip_if_required()

View file

@ -141,6 +141,7 @@ class Dashboard(object):
"min_workers": cfg["min_workers"],
"max_workers": cfg["max_workers"],
"initial_workers": cfg["initial_workers"],
"autoscaling_mode": cfg["autoscaling_mode"],
"idle_timeout_minutes": cfg["idle_timeout_minutes"],
}

View file

@ -256,6 +256,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
include_java=include_java,
include_webui=include_webui,
java_worker_options=java_worker_options,
load_code_from_local=load_code_from_local,
_internal_config=internal_config)
@ -292,7 +293,6 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_memory=redis_max_memory,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
include_webui=include_webui,
autoscaling_config=autoscaling_config,
include_java=False,
)

View file

@ -220,6 +220,8 @@ def get_node_ip_address(address="8.8.8.8:53"):
node_ip_address = socket.gethostbyname(host_name)
except Exception:
pass
finally:
s.close()
return node_ip_address

View file

@ -64,6 +64,14 @@ class MockProvider(NodeProvider):
if n.matches(tag_filters) and n.state != "terminated"
]
def non_terminated_node_ips(self, tag_filters):
if self.throw:
raise Exception("oops")
return [
n.internal_ip for n in self.mock_nodes.values()
if n.matches(tag_filters) and n.state != "terminated"
]
def is_running(self, node_id):
return self.mock_nodes[node_id].state == "running"
@ -99,6 +107,7 @@ SMALL_CLUSTER = {
"min_workers": 2,
"max_workers": 2,
"initial_workers": 0,
"autoscaling_mode": "default",
"target_utilization_fraction": 0.8,
"idle_timeout_minutes": 5,
"provider": {
@ -337,6 +346,59 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(10)
autoscaler.update()
def testAggressiveAutoscaling(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 20
config["initial_workers"] = 10
config["idle_timeout_minutes"] = 0
config["autoscaling_mode"] = "aggressive"
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "head"}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_TYPE: "head"}, )[0]
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
config_path,
lm,
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
update_interval_s=0)
self.waitForNodes(1)
autoscaler.update()
self.waitForNodes(6) # expected due to batch sizes and concurrency
autoscaler.update()
self.waitForNodes(11)
# Connect the head and workers to end the bringup phase
addrs = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_TYPE: "worker"}, )
addrs += head_ip
for addr in addrs:
lm.update(addr, {"CPU": 2}, {"CPU": 0})
lm.update(addr, {"CPU": 2}, {"CPU": 2})
assert autoscaler.bringup
autoscaler.update()
assert not autoscaler.bringup
autoscaler.update()
self.waitForNodes(1)
# All of the nodes are down. Simulate some load on the head node
lm.update(head_ip, {"CPU": 2}, {"CPU": 0})
autoscaler.update()
self.waitForNodes(6) # expected due to batch sizes and concurrency
autoscaler.update()
self.waitForNodes(11)
def testDelayedLaunch(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()