diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 3841b50a1..0649fbd67 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,5 +1,5 @@ from collections import defaultdict, namedtuple, Counter -from typing import Any, Optional, Dict, List, Tuple +from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple import copy import logging import math @@ -9,7 +9,7 @@ import subprocess import threading import time import yaml -import collections +from enum import Enum try: from urllib3.exceptions import MaxRetryError @@ -55,6 +55,14 @@ AutoscalerSummary = namedtuple( "AutoscalerSummary", ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) +# Whether a worker should be kept based on the min_workers and +# max_workers constraints. +# +# keep: should keep the worker +# terminate: should terminate the worker +# decide_later: the worker can be terminated if needed +KeepOrTerminate = Enum("KeepOrTerminate", "keep terminate decide_later") + class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. @@ -210,66 +218,88 @@ class StandardAutoscaler: last_used = self.load_metrics.last_used_time_by_ip horizon = now - (60 * self.config["idle_timeout_minutes"]) - nodes_to_terminate: Dict[NodeID, bool] = [] - node_type_counts = collections.defaultdict(int) + nodes_to_terminate: List[NodeID] = [] + node_type_counts = defaultdict(int) # Sort based on last used to make sure to keep min_workers that # were most recently used. Otherwise, _keep_min_workers_of_node_type # might keep a node that should be terminated. sorted_node_ids = self._sort_based_on_last_used(nodes, last_used) + # Don't terminate nodes needed by request_resources() - nodes_allowed_to_terminate: Dict[NodeID, bool] = {} + nodes_not_allowed_to_terminate: FrozenSet[NodeID] = {} if self.load_metrics.get_resource_requests(): - nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate( - sorted_node_ids) + nodes_not_allowed_to_terminate = \ + self._get_nodes_needed_for_request_resources(sorted_node_ids) + + def keep_node(node_id: NodeID) -> None: + # Update per-type counts and add node_id to nodes_to_keep. + tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + node_type_counts[node_type] += 1 + + def schedule_node_termination(node_id: NodeID, + reason_opt: Optional[str]) -> None: + if reason_opt is None: + raise Exception("reason should be not None.") + reason: str = reason_opt + # Log, record an event, and add node_id to nodes_to_terminate. + logger.info("StandardAutoscaler: " + "{}: Terminating {} node.".format(node_id, reason)) + self.event_summarizer.add( + "Removing {} nodes of type " + self._get_node_type(node_id) + + " ({}).".format(reason), + quantity=1, + aggregate=operator.add) + nodes_to_terminate.append(node_id) + + # Nodes that we could terminate, if needed. + nodes_we_could_terminate: List[NodeID] = [] for node_id in sorted_node_ids: # Make sure to not kill idle node types if the number of workers # of that type is lower/equal to the min_workers of that type # or it is needed for request_resources(). - should_keep_worker_of_node_type, node_type_counts = \ - self._keep_worker_of_node_type(node_id, node_type_counts) - if ((should_keep_worker_of_node_type - or not nodes_allowed_to_terminate.get(node_id, True)) + should_keep_or_terminate, reason = self._keep_worker_of_node_type( + node_id, node_type_counts) + if should_keep_or_terminate == KeepOrTerminate.terminate: + schedule_node_termination(node_id, reason) + continue + if ((should_keep_or_terminate == KeepOrTerminate.keep + or node_id in nodes_not_allowed_to_terminate) and self.launch_config_ok(node_id)): + keep_node(node_id) continue node_ip = self.provider.internal_ip(node_id) if node_ip in last_used and last_used[node_ip] < horizon: - logger.info("StandardAutoscaler: " - "{}: Terminating idle node.".format(node_id)) - self.event_summarizer.add( - "Removing {} nodes of type " + self._get_node_type(node_id) - + " (idle).", - quantity=1, - aggregate=operator.add) - nodes_to_terminate.append(node_id) + schedule_node_termination(node_id, "idle") elif not self.launch_config_ok(node_id): - logger.info("StandardAutoscaler: " - "{}: Terminating outdated node.".format(node_id)) - self.event_summarizer.add( - "Removing {} nodes of type " + self._get_node_type(node_id) - + " (outdated).", - quantity=1, - aggregate=operator.add) - nodes_to_terminate.append(node_id) - - if nodes_to_terminate: - self._terminate_nodes_and_cleanup(nodes_to_terminate) - nodes = self.workers() + schedule_node_termination(node_id, "outdated") + else: + keep_node(node_id) + nodes_we_could_terminate.append(node_id) # Terminate nodes if there are too many - nodes_to_terminate = [] - while (len(nodes) - - len(nodes_to_terminate)) > self.config["max_workers"] and nodes: - to_terminate = nodes.pop() - logger.info("StandardAutoscaler: " - "{}: Terminating unneeded node.".format(to_terminate)) - self.event_summarizer.add( - "Removing {} nodes of type " + - self._get_node_type(to_terminate) + " (max workers).", - quantity=1, - aggregate=operator.add) - nodes_to_terminate.append(to_terminate) + num_extra_nodes_to_terminate = ( + len(nodes) - len(nodes_to_terminate) - self.config["max_workers"]) + + if num_extra_nodes_to_terminate > len(nodes_we_could_terminate): + logger.warning( + "StandardAutoscaler: trying to terminate " + f"{num_extra_nodes_to_terminate} nodes, while only " + f"{len(nodes_we_could_terminate)} are safe to terminate." + " Inconsistent config is likely.") + num_extra_nodes_to_terminate = len(nodes_we_could_terminate) + + # If num_extra_nodes_to_terminate is negative or zero, + # we would have less than max_workers nodes after terminating + # nodes_to_terminate and we do not need to terminate anything else. + if num_extra_nodes_to_terminate > 0: + extra_nodes_to_terminate = nodes_we_could_terminate[ + -num_extra_nodes_to_terminate:] + for node_id in extra_nodes_to_terminate: + schedule_node_termination(node_id, "max workers") if nodes_to_terminate: self._terminate_nodes_and_cleanup(nodes_to_terminate) @@ -408,20 +438,20 @@ class StandardAutoscaler: return sorted(nodes, key=last_time_used, reverse=True) - def _get_nodes_allowed_to_terminate( - self, sorted_node_ids: List[NodeID]) -> Dict[NodeID, bool]: + def _get_nodes_needed_for_request_resources( + self, sorted_node_ids: List[NodeID]) -> FrozenSet[NodeID]: # TODO(ameer): try merging this with resource_demand_scheduler # code responsible for adding nodes for request_resources(). - """Returns the nodes allowed to terminate for request_resources(). + """Returns the nodes NOT allowed to terminate due to request_resources(). Args: sorted_node_ids: the node ids sorted based on last used (LRU last). Returns: - nodes_allowed_to_terminate: whether the node id is allowed to - terminate or not. + FrozenSet[NodeID]: a set of nodes (node ids) that + we should NOT terminate. """ - nodes_allowed_to_terminate: Dict[NodeID, bool] = {} + nodes_not_allowed_to_terminate: Set[NodeID] = set() head_node_resources: ResourceDict = copy.deepcopy( self.available_node_types[self.config["head_node_type"]][ "resources"]) @@ -478,50 +508,63 @@ class StandardAutoscaler: # No resources of the node were needed for request_resources(). # max_node_resources[i] is an empty dict for legacy yamls # before the node is connected. - nodes_allowed_to_terminate[node_id] = True + pass else: - nodes_allowed_to_terminate[node_id] = False - return nodes_allowed_to_terminate + nodes_not_allowed_to_terminate.add(node_id) + return frozenset(nodes_not_allowed_to_terminate) def _keep_worker_of_node_type(self, node_id: NodeID, node_type_counts: Dict[NodeType, int] - ) -> Tuple[bool, Dict[NodeType, int]]: + ) -> Tuple[KeepOrTerminate, Optional[str]]: """Determines if a worker should be kept based on the min_workers - constraint of the worker's node_type. + and max_workers constraint of the worker's node_type. - Returns True exactly when both of the following hold: + Returns KeepOrTerminate.keep when both of the following hold: (a) The worker's node_type is present among the keys of the current config's available_node_types dict. (b) Deleting the node would violate the min_workers constraint for that worker's node_type. - Also updates and returns the dictionary of node type counts. + Returns KeepOrTerminate.terminate when both the following hold: + (a) The worker's node_type is not present among the keys of the current + config's available_node_types dict. + (b) Keeping the node would violate the max_workers constraint for that + worker's node_type. + + Return KeepOrTerminate.decide_later otherwise. + Args: node_type_counts(Dict[NodeType, int]): The non_terminated node types counted so far. Returns: - bool: True if the node should be kept. False otherwise. - Dict[NodeType, int]: Updated node type counts + KeepOrTerminate: keep if the node should be kept, terminate if the + node should be terminated, decide_later if we are allowed + to terminate it, but do not have to. + Optional[str]: reason for termination. Not None on + KeepOrTerminate.terminate, None otherwise. """ - new_node_type_counts = copy.deepcopy(node_type_counts) tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] + + min_workers = self.available_node_types.get(node_type, {}).get( + "min_workers", 0) + max_workers = self.available_node_types.get(node_type, {}).get( + "max_workers", 0) if node_type not in self.available_node_types: # The node type has been deleted from the cluster config. - # Don't keep the node. - return False, new_node_type_counts - new_node_type_counts[node_type] += 1 - min_workers = self.available_node_types[node_type].get( - "min_workers", 0) - max_workers = self.available_node_types[node_type].get( - "max_workers", 0) - if new_node_type_counts[node_type] <= min(min_workers, - max_workers): - return True, new_node_type_counts + # Allow terminating it if needed. + available_node_types = list(self.available_node_types.keys()) + return (KeepOrTerminate.terminate, + f"not in available_node_types: {available_node_types}") + new_count = node_type_counts[node_type] + 1 + if new_count <= min(min_workers, max_workers): + return KeepOrTerminate.keep, None + if new_count > max_workers: + return KeepOrTerminate.terminate, "max_workers_per_type" - return False, new_node_type_counts + return KeepOrTerminate.decide_later, None def _node_resources(self, node_id): node_type = self.provider.node_tags(node_id).get( diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index bafb9801e..57ee09638 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -10,6 +10,8 @@ import unittest from unittest.mock import Mock import yaml import copy +from collections import defaultdict +from ray.autoscaler._private.commands import get_or_create_head_node from jsonschema.exceptions import ValidationError from typing import Dict, Callable @@ -340,6 +342,59 @@ MOCK_DEFAULT_CONFIG = { "worker_start_ray_commands": [], } +TYPES_A = { + "empty_node": { + "node_config": { + "FooProperty": 42, + }, + "resources": {}, + "max_workers": 0, + }, + "m4.large": { + "node_config": {}, + "resources": { + "CPU": 2 + }, + "max_workers": 10, + }, + "m4.4xlarge": { + "node_config": {}, + "resources": { + "CPU": 16 + }, + "max_workers": 8, + }, + "m4.16xlarge": { + "node_config": {}, + "resources": { + "CPU": 64 + }, + "max_workers": 4, + }, + "p2.xlarge": { + "node_config": {}, + "resources": { + "CPU": 16, + "GPU": 1 + }, + "max_workers": 10, + }, + "p2.8xlarge": { + "node_config": {}, + "resources": { + "CPU": 32, + "GPU": 8 + }, + "max_workers": 4, + }, +} + +MULTI_WORKER_CLUSTER = dict( + SMALL_CLUSTER, **{ + "available_node_types": TYPES_A, + "head_node_type": "empty_node" + }) + class LoadMetricsTest(unittest.TestCase): def testHeartbeat(self): @@ -644,6 +699,16 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + events = autoscaler.event_summarizer.summary() + # Just one node (node_id 1) terminated in the last update. + # Validates that we didn't try to double-terminate node 0. + assert (sorted(events) == [ + "Adding 1 nodes of type ray.worker.new.", + "Adding 1 nodes of type ray.worker.old.", + "Removing 1 nodes of type ray.worker.old (not " + "in available_node_types: ['ray.head.new', 'ray.worker.new'])." + ]) + head_list = self.provider.non_terminated_nodes({ TAG_RAY_NODE_KIND: NODE_KIND_HEAD }) @@ -1125,7 +1190,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - # Update the config to reduce the cluster size + # Update the config to increase the cluster size new_config["min_workers"] = 10 new_config["max_workers"] = 10 self.write_config(new_config) @@ -1143,8 +1208,8 @@ class AutoscalingTest(unittest.TestCase): # Check the launch failure event is generated. autoscaler.update() events = autoscaler.event_summarizer.summary() - assert ("Removing 1 nodes of type " - "ray-legacy-worker-node-type (max workers)." in events), events + assert ("Removing 1 nodes of type ray-legacy-worker-node-type " + "(max_workers_per_type)." in events) assert mock_metrics.stopped_nodes.inc.call_count == 1 mock_metrics.running_workers.set.assert_called_with(10) @@ -1708,6 +1773,86 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitFor(lambda: len(runner.calls) > 0) + def testScaleDownMaxWorkers(self): + """Tests terminating nodes due to max_nodes per type.""" + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["m4.large"]["min_workers"] = 3 + config["available_node_types"]["m4.large"]["max_workers"] = 3 + config["available_node_types"]["m4.large"]["resources"] = {} + config["available_node_types"]["m4.16xlarge"]["resources"] = {} + config["available_node_types"]["p2.xlarge"]["min_workers"] = 5 + config["available_node_types"]["p2.xlarge"]["max_workers"] = 8 + config["available_node_types"]["p2.xlarge"]["resources"] = {} + config["available_node_types"]["p2.8xlarge"]["min_workers"] = 2 + config["available_node_types"]["p2.8xlarge"]["max_workers"] = 4 + config["available_node_types"]["p2.8xlarge"]["resources"] = {} + config["max_workers"] = 13 + + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(15)]) + lm = LoadMetrics() + + get_or_create_head_node( + config, + printable_config_file=config_path, + no_restart=False, + restart_only=False, + yes=True, + override_cluster_name=None, + _provider=self.provider, + _runner=runner) + self.waitForNodes(1) + + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + max_concurrent_launches=13, + max_launch_batch=13, + process_runner=runner, + update_interval_s=0) + autoscaler.update() + self.waitForNodes(11) + assert autoscaler.pending_launches.value == 0 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 10 + + # Terminate some nodes + config["available_node_types"]["m4.large"]["min_workers"] = 2 # 3 + config["available_node_types"]["m4.large"]["max_workers"] = 2 + config["available_node_types"]["p2.8xlarge"]["min_workers"] = 0 # 2 + config["available_node_types"]["p2.8xlarge"]["max_workers"] = 0 + # And spawn one. + config["available_node_types"]["p2.xlarge"]["min_workers"] = 6 # 5 + config["available_node_types"]["p2.xlarge"]["max_workers"] = 6 + self.write_config(config) + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert autoscaler.pending_launches.value == 0 + self.waitForNodes(8, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + assert autoscaler.pending_launches.value == 0 + events = autoscaler.event_summarizer.summary() + assert ("Removing 1 nodes of type m4.large (max_workers_per_type)." in + events) + assert ("Removing 2 nodes of type p2.8xlarge (max_workers_per_type)." + in events) + + # We should not be starting/stopping empty_node at all. + for event in events: + assert "empty_node" not in event + + node_type_counts = defaultdict(int) + for node_id in autoscaler.workers(): + tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + node_type_counts[node_type] += 1 + assert node_type_counts == {"m4.large": 2, "p2.xlarge": 6} + def testScaleUpBasedOnLoad(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 1 @@ -2617,8 +2762,7 @@ MemAvailable: 33000000 kB lambda: all(not updater.is_alive() for updater in autoscaler.updaters.values()), num_retries=500, - fail_msg="Last round of updaters didn't complete on time." - ) + fail_msg="Last round of updaters didn't complete on time.") # Check that updaters processed some commands in the last autoscaler # update. assert len(autoscaler.process_runner.calls) > num_calls,\ diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 6f9eb353c..1c8c20b0d 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -14,7 +14,7 @@ from ray.autoscaler._private.util import \ prepare_config, format_info_string, \ format_info_string_no_node_types from ray.tests.test_autoscaler import SMALL_CLUSTER, MOCK_DEFAULT_CONFIG, \ - MockProvider, MockProcessRunner + MULTI_WORKER_CLUSTER, TYPES_A, MockProvider, MockProcessRunner from ray.autoscaler._private.providers import (_NODE_PROVIDERS, _clear_provider_cache) from ray.autoscaler._private.autoscaler import StandardAutoscaler, \ @@ -41,59 +41,6 @@ from time import sleep GET_DEFAULT_METHOD = "ray.autoscaler._private.util._get_default_config" -TYPES_A = { - "empty_node": { - "node_config": { - "FooProperty": 42, - }, - "resources": {}, - "max_workers": 0, - }, - "m4.large": { - "node_config": {}, - "resources": { - "CPU": 2 - }, - "max_workers": 10, - }, - "m4.4xlarge": { - "node_config": {}, - "resources": { - "CPU": 16 - }, - "max_workers": 8, - }, - "m4.16xlarge": { - "node_config": {}, - "resources": { - "CPU": 64 - }, - "max_workers": 4, - }, - "p2.xlarge": { - "node_config": {}, - "resources": { - "CPU": 16, - "GPU": 1 - }, - "max_workers": 10, - }, - "p2.8xlarge": { - "node_config": {}, - "resources": { - "CPU": 32, - "GPU": 8 - }, - "max_workers": 4, - }, -} - -MULTI_WORKER_CLUSTER = dict( - SMALL_CLUSTER, **{ - "available_node_types": TYPES_A, - "head_node_type": "empty_node" - }) - def test_util_score(): assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None