[autoscaler] Enforce per-node-type max workers (#17352)

* Enforce per-node-type max workers

* type annonation

Co-authored-by: Ameer Haj Ali <ameerh@berkeley.edu>

* cleanup. comments. type annotations

* additional type annotation

Co-authored-by: Ameer Haj Ali <ameerh@berkeley.edu>

* additional cleanup. comments. type annotations

* _get_nodes_needed_for_request_resources to use FrozenSet

* comments

* whitespace

* [Placement Group] Fix resource index assignment between with bundle index and without bundle index pg (#17318)

* [serve] Add Ray API stability annotations (#17295)

* Support streaming output of runtime env setup to logger/driver (#17306)

* [SGD] v2 prototype: ``WorkerGroup`` implementation (#17330)

* wip

* formatting

* increase timeouts

* address comments

* comments

* fix

* address comments

* Update python/ray/util/sgd/v2/worker_group.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* Update python/ray/util/sgd/v2/worker_group.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* address comments

* formatting

* fix

* avoid race condition

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* [RLlib] Discussion 3001: Fix comment on internal state shape (must be [B x S=state dim]). (#17341)

* [autoscaler] GCP TPU VM autoscaler (#17278)

* [Rllib] set self._allow_unknown_config (#17335)

Co-authored-by: Sven Mika <sven@anyscale.io>

* [RLlib] Discussion 2294: Custom vector env example and fix. (#16083)

* [docs] Link broken in Tune's page (#17394) (#17407)

* [Serve] Fix response_model for class based view routes as well (#17376)

* [serve] Fix single deployment nightly test (#17368)

* [RLlib] SAC tuple observation space fix (#17356)

* Support schema on read for csv/json (#17354)

* [RLlib] New and changed version of parametric actions cartpole example + small suggested update in policy_client.py (#15664)

* [gcs] Fix GCS related issues: ByteSizeLong and redis connection (#17373)

* [runtime_env] Gracefully fail tasks when an environment fails to be set up (#17249)

* [docs] update docs with pip requirements (#17317)

* removed nodes_to_keep. cleanup

* formatting

* +comment

* treat max_workers=0 as 0 workers (as opposed to unlimited)

* fix wrong comment

* warning for inconsistent config

* terminate nodes with no matching node type right away

* quotes

* special handling for head node when enforcing max_workers per type. tests. cleanup

* cleanup comments and prints

* comments

* cleanup. removed special handling of head node.

* adding an eplicit non-None check in schedule_node_termination

* raise the exception

Co-authored-by: Ameer Haj Ali <ameerh@berkeley.edu>

Co-authored-by: Ameer Haj Ali <ameerh@berkeley.edu>
Co-authored-by: DK.Pino <loushang.ls@antfin.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Simon Mo <simon.mo@hey.com>
Co-authored-by: Amog Kamsetty <amogkam@users.noreply.github.com>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Sven Mika <sven@anyscale.io>
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Co-authored-by: Rohan138 <66227218+Rohan138@users.noreply.github.com>
Co-authored-by: amavilla <takashi.tameshige.jj@hitachi.com>
Co-authored-by: Jiao <sophchess@gmail.com>
Co-authored-by: Julius Frost <33183774+juliusfrost@users.noreply.github.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: kk-55 <63732956+kk-55@users.noreply.github.com>
Co-authored-by: Yi Cheng <74173148+iycheng@users.noreply.github.com>
Co-authored-by: matthewdeng <matt@anyscale.com>
This commit is contained in:
Sasha Sobol 2021-08-03 08:31:32 -07:00 committed by GitHub
parent c40555c82b
commit 5dbbaf7261
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 263 additions and 129 deletions

View file

@ -1,5 +1,5 @@
from collections import defaultdict, namedtuple, Counter
from typing import Any, Optional, Dict, List, Tuple
from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple
import copy
import logging
import math
@ -9,7 +9,7 @@ import subprocess
import threading
import time
import yaml
import collections
from enum import Enum
try:
from urllib3.exceptions import MaxRetryError
@ -55,6 +55,14 @@ AutoscalerSummary = namedtuple(
"AutoscalerSummary",
["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"])
# Whether a worker should be kept based on the min_workers and
# max_workers constraints.
#
# keep: should keep the worker
# terminate: should terminate the worker
# decide_later: the worker can be terminated if needed
KeepOrTerminate = Enum("KeepOrTerminate", "keep terminate decide_later")
class StandardAutoscaler:
"""The autoscaling control loop for a Ray cluster.
@ -210,66 +218,88 @@ class StandardAutoscaler:
last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])
nodes_to_terminate: Dict[NodeID, bool] = []
node_type_counts = collections.defaultdict(int)
nodes_to_terminate: List[NodeID] = []
node_type_counts = defaultdict(int)
# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
# might keep a node that should be terminated.
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
# Don't terminate nodes needed by request_resources()
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
nodes_not_allowed_to_terminate: FrozenSet[NodeID] = {}
if self.load_metrics.get_resource_requests():
nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate(
sorted_node_ids)
nodes_not_allowed_to_terminate = \
self._get_nodes_needed_for_request_resources(sorted_node_ids)
def keep_node(node_id: NodeID) -> None:
# Update per-type counts and add node_id to nodes_to_keep.
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
def schedule_node_termination(node_id: NodeID,
reason_opt: Optional[str]) -> None:
if reason_opt is None:
raise Exception("reason should be not None.")
reason: str = reason_opt
# Log, record an event, and add node_id to nodes_to_terminate.
logger.info("StandardAutoscaler: "
"{}: Terminating {} node.".format(node_id, reason))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id) +
" ({}).".format(reason),
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
# Nodes that we could terminate, if needed.
nodes_we_could_terminate: List[NodeID] = []
for node_id in sorted_node_ids:
# Make sure to not kill idle node types if the number of workers
# of that type is lower/equal to the min_workers of that type
# or it is needed for request_resources().
should_keep_worker_of_node_type, node_type_counts = \
self._keep_worker_of_node_type(node_id, node_type_counts)
if ((should_keep_worker_of_node_type
or not nodes_allowed_to_terminate.get(node_id, True))
should_keep_or_terminate, reason = self._keep_worker_of_node_type(
node_id, node_type_counts)
if should_keep_or_terminate == KeepOrTerminate.terminate:
schedule_node_termination(node_id, reason)
continue
if ((should_keep_or_terminate == KeepOrTerminate.keep
or node_id in nodes_not_allowed_to_terminate)
and self.launch_config_ok(node_id)):
keep_node(node_id)
continue
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:
logger.info("StandardAutoscaler: "
"{}: Terminating idle node.".format(node_id))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id)
+ " (idle).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
schedule_node_termination(node_id, "idle")
elif not self.launch_config_ok(node_id):
logger.info("StandardAutoscaler: "
"{}: Terminating outdated node.".format(node_id))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id)
+ " (outdated).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
if nodes_to_terminate:
self._terminate_nodes_and_cleanup(nodes_to_terminate)
nodes = self.workers()
schedule_node_termination(node_id, "outdated")
else:
keep_node(node_id)
nodes_we_could_terminate.append(node_id)
# Terminate nodes if there are too many
nodes_to_terminate = []
while (len(nodes) -
len(nodes_to_terminate)) > self.config["max_workers"] and nodes:
to_terminate = nodes.pop()
logger.info("StandardAutoscaler: "
"{}: Terminating unneeded node.".format(to_terminate))
self.event_summarizer.add(
"Removing {} nodes of type " +
self._get_node_type(to_terminate) + " (max workers).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(to_terminate)
num_extra_nodes_to_terminate = (
len(nodes) - len(nodes_to_terminate) - self.config["max_workers"])
if num_extra_nodes_to_terminate > len(nodes_we_could_terminate):
logger.warning(
"StandardAutoscaler: trying to terminate "
f"{num_extra_nodes_to_terminate} nodes, while only "
f"{len(nodes_we_could_terminate)} are safe to terminate."
" Inconsistent config is likely.")
num_extra_nodes_to_terminate = len(nodes_we_could_terminate)
# If num_extra_nodes_to_terminate is negative or zero,
# we would have less than max_workers nodes after terminating
# nodes_to_terminate and we do not need to terminate anything else.
if num_extra_nodes_to_terminate > 0:
extra_nodes_to_terminate = nodes_we_could_terminate[
-num_extra_nodes_to_terminate:]
for node_id in extra_nodes_to_terminate:
schedule_node_termination(node_id, "max workers")
if nodes_to_terminate:
self._terminate_nodes_and_cleanup(nodes_to_terminate)
@ -408,20 +438,20 @@ class StandardAutoscaler:
return sorted(nodes, key=last_time_used, reverse=True)
def _get_nodes_allowed_to_terminate(
self, sorted_node_ids: List[NodeID]) -> Dict[NodeID, bool]:
def _get_nodes_needed_for_request_resources(
self, sorted_node_ids: List[NodeID]) -> FrozenSet[NodeID]:
# TODO(ameer): try merging this with resource_demand_scheduler
# code responsible for adding nodes for request_resources().
"""Returns the nodes allowed to terminate for request_resources().
"""Returns the nodes NOT allowed to terminate due to request_resources().
Args:
sorted_node_ids: the node ids sorted based on last used (LRU last).
Returns:
nodes_allowed_to_terminate: whether the node id is allowed to
terminate or not.
FrozenSet[NodeID]: a set of nodes (node ids) that
we should NOT terminate.
"""
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
nodes_not_allowed_to_terminate: Set[NodeID] = set()
head_node_resources: ResourceDict = copy.deepcopy(
self.available_node_types[self.config["head_node_type"]][
"resources"])
@ -478,50 +508,63 @@ class StandardAutoscaler:
# No resources of the node were needed for request_resources().
# max_node_resources[i] is an empty dict for legacy yamls
# before the node is connected.
nodes_allowed_to_terminate[node_id] = True
pass
else:
nodes_allowed_to_terminate[node_id] = False
return nodes_allowed_to_terminate
nodes_not_allowed_to_terminate.add(node_id)
return frozenset(nodes_not_allowed_to_terminate)
def _keep_worker_of_node_type(self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]
) -> Tuple[bool, Dict[NodeType, int]]:
) -> Tuple[KeepOrTerminate, Optional[str]]:
"""Determines if a worker should be kept based on the min_workers
constraint of the worker's node_type.
and max_workers constraint of the worker's node_type.
Returns True exactly when both of the following hold:
Returns KeepOrTerminate.keep when both of the following hold:
(a) The worker's node_type is present among the keys of the current
config's available_node_types dict.
(b) Deleting the node would violate the min_workers constraint for that
worker's node_type.
Also updates and returns the dictionary of node type counts.
Returns KeepOrTerminate.terminate when both the following hold:
(a) The worker's node_type is not present among the keys of the current
config's available_node_types dict.
(b) Keeping the node would violate the max_workers constraint for that
worker's node_type.
Return KeepOrTerminate.decide_later otherwise.
Args:
node_type_counts(Dict[NodeType, int]): The non_terminated node
types counted so far.
Returns:
bool: True if the node should be kept. False otherwise.
Dict[NodeType, int]: Updated node type counts
KeepOrTerminate: keep if the node should be kept, terminate if the
node should be terminated, decide_later if we are allowed
to terminate it, but do not have to.
Optional[str]: reason for termination. Not None on
KeepOrTerminate.terminate, None otherwise.
"""
new_node_type_counts = copy.deepcopy(node_type_counts)
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
min_workers = self.available_node_types.get(node_type, {}).get(
"min_workers", 0)
max_workers = self.available_node_types.get(node_type, {}).get(
"max_workers", 0)
if node_type not in self.available_node_types:
# The node type has been deleted from the cluster config.
# Don't keep the node.
return False, new_node_type_counts
new_node_type_counts[node_type] += 1
min_workers = self.available_node_types[node_type].get(
"min_workers", 0)
max_workers = self.available_node_types[node_type].get(
"max_workers", 0)
if new_node_type_counts[node_type] <= min(min_workers,
max_workers):
return True, new_node_type_counts
# Allow terminating it if needed.
available_node_types = list(self.available_node_types.keys())
return (KeepOrTerminate.terminate,
f"not in available_node_types: {available_node_types}")
new_count = node_type_counts[node_type] + 1
if new_count <= min(min_workers, max_workers):
return KeepOrTerminate.keep, None
if new_count > max_workers:
return KeepOrTerminate.terminate, "max_workers_per_type"
return False, new_node_type_counts
return KeepOrTerminate.decide_later, None
def _node_resources(self, node_id):
node_type = self.provider.node_tags(node_id).get(

View file

@ -10,6 +10,8 @@ import unittest
from unittest.mock import Mock
import yaml
import copy
from collections import defaultdict
from ray.autoscaler._private.commands import get_or_create_head_node
from jsonschema.exceptions import ValidationError
from typing import Dict, Callable
@ -340,6 +342,59 @@ MOCK_DEFAULT_CONFIG = {
"worker_start_ray_commands": [],
}
TYPES_A = {
"empty_node": {
"node_config": {
"FooProperty": 42,
},
"resources": {},
"max_workers": 0,
},
"m4.large": {
"node_config": {},
"resources": {
"CPU": 2
},
"max_workers": 10,
},
"m4.4xlarge": {
"node_config": {},
"resources": {
"CPU": 16
},
"max_workers": 8,
},
"m4.16xlarge": {
"node_config": {},
"resources": {
"CPU": 64
},
"max_workers": 4,
},
"p2.xlarge": {
"node_config": {},
"resources": {
"CPU": 16,
"GPU": 1
},
"max_workers": 10,
},
"p2.8xlarge": {
"node_config": {},
"resources": {
"CPU": 32,
"GPU": 8
},
"max_workers": 4,
},
}
MULTI_WORKER_CLUSTER = dict(
SMALL_CLUSTER, **{
"available_node_types": TYPES_A,
"head_node_type": "empty_node"
})
class LoadMetricsTest(unittest.TestCase):
def testHeartbeat(self):
@ -644,6 +699,16 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
events = autoscaler.event_summarizer.summary()
# Just one node (node_id 1) terminated in the last update.
# Validates that we didn't try to double-terminate node 0.
assert (sorted(events) == [
"Adding 1 nodes of type ray.worker.new.",
"Adding 1 nodes of type ray.worker.old.",
"Removing 1 nodes of type ray.worker.old (not "
"in available_node_types: ['ray.head.new', 'ray.worker.new'])."
])
head_list = self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
@ -1125,7 +1190,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update the config to reduce the cluster size
# Update the config to increase the cluster size
new_config["min_workers"] = 10
new_config["max_workers"] = 10
self.write_config(new_config)
@ -1143,8 +1208,8 @@ class AutoscalingTest(unittest.TestCase):
# Check the launch failure event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert ("Removing 1 nodes of type "
"ray-legacy-worker-node-type (max workers)." in events), events
assert ("Removing 1 nodes of type ray-legacy-worker-node-type "
"(max_workers_per_type)." in events)
assert mock_metrics.stopped_nodes.inc.call_count == 1
mock_metrics.running_workers.set.assert_called_with(10)
@ -1708,6 +1773,86 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitFor(lambda: len(runner.calls) > 0)
def testScaleDownMaxWorkers(self):
"""Tests terminating nodes due to max_nodes per type."""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["m4.large"]["min_workers"] = 3
config["available_node_types"]["m4.large"]["max_workers"] = 3
config["available_node_types"]["m4.large"]["resources"] = {}
config["available_node_types"]["m4.16xlarge"]["resources"] = {}
config["available_node_types"]["p2.xlarge"]["min_workers"] = 5
config["available_node_types"]["p2.xlarge"]["max_workers"] = 8
config["available_node_types"]["p2.xlarge"]["resources"] = {}
config["available_node_types"]["p2.8xlarge"]["min_workers"] = 2
config["available_node_types"]["p2.8xlarge"]["max_workers"] = 4
config["available_node_types"]["p2.8xlarge"]["resources"] = {}
config["max_workers"] = 13
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(15)])
lm = LoadMetrics()
get_or_create_head_node(
config,
printable_config_file=config_path,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
_provider=self.provider,
_runner=runner)
self.waitForNodes(1)
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
max_concurrent_launches=13,
max_launch_batch=13,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(11)
assert autoscaler.pending_launches.value == 0
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 10
# Terminate some nodes
config["available_node_types"]["m4.large"]["min_workers"] = 2 # 3
config["available_node_types"]["m4.large"]["max_workers"] = 2
config["available_node_types"]["p2.8xlarge"]["min_workers"] = 0 # 2
config["available_node_types"]["p2.8xlarge"]["max_workers"] = 0
# And spawn one.
config["available_node_types"]["p2.xlarge"]["min_workers"] = 6 # 5
config["available_node_types"]["p2.xlarge"]["max_workers"] = 6
self.write_config(config)
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert autoscaler.pending_launches.value == 0
self.waitForNodes(8, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert autoscaler.pending_launches.value == 0
events = autoscaler.event_summarizer.summary()
assert ("Removing 1 nodes of type m4.large (max_workers_per_type)." in
events)
assert ("Removing 2 nodes of type p2.8xlarge (max_workers_per_type)."
in events)
# We should not be starting/stopping empty_node at all.
for event in events:
assert "empty_node" not in event
node_type_counts = defaultdict(int)
for node_id in autoscaler.workers():
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
assert node_type_counts == {"m4.large": 2, "p2.xlarge": 6}
def testScaleUpBasedOnLoad(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 1
@ -2617,8 +2762,7 @@ MemAvailable: 33000000 kB
lambda: all(not updater.is_alive()
for updater in autoscaler.updaters.values()),
num_retries=500,
fail_msg="Last round of updaters didn't complete on time."
)
fail_msg="Last round of updaters didn't complete on time.")
# Check that updaters processed some commands in the last autoscaler
# update.
assert len(autoscaler.process_runner.calls) > num_calls,\

View file

@ -14,7 +14,7 @@ from ray.autoscaler._private.util import \
prepare_config, format_info_string, \
format_info_string_no_node_types
from ray.tests.test_autoscaler import SMALL_CLUSTER, MOCK_DEFAULT_CONFIG, \
MockProvider, MockProcessRunner
MULTI_WORKER_CLUSTER, TYPES_A, MockProvider, MockProcessRunner
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
_clear_provider_cache)
from ray.autoscaler._private.autoscaler import StandardAutoscaler, \
@ -41,59 +41,6 @@ from time import sleep
GET_DEFAULT_METHOD = "ray.autoscaler._private.util._get_default_config"
TYPES_A = {
"empty_node": {
"node_config": {
"FooProperty": 42,
},
"resources": {},
"max_workers": 0,
},
"m4.large": {
"node_config": {},
"resources": {
"CPU": 2
},
"max_workers": 10,
},
"m4.4xlarge": {
"node_config": {},
"resources": {
"CPU": 16
},
"max_workers": 8,
},
"m4.16xlarge": {
"node_config": {},
"resources": {
"CPU": 64
},
"max_workers": 4,
},
"p2.xlarge": {
"node_config": {},
"resources": {
"CPU": 16,
"GPU": 1
},
"max_workers": 10,
},
"p2.8xlarge": {
"node_config": {},
"resources": {
"CPU": 32,
"GPU": 8
},
"max_workers": 4,
},
}
MULTI_WORKER_CLUSTER = dict(
SMALL_CLUSTER, **{
"available_node_types": TYPES_A,
"head_node_type": "empty_node"
})
def test_util_score():
assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None