From c0fd69a33bd5e4dbe2002b4eccb79dbe9e3f8123 Mon Sep 17 00:00:00 2001 From: scv119 Date: Fri, 29 Jul 2022 15:37:52 -0700 Subject: [PATCH] Revert "[autoscaler] Remove deprecated fields from schema (#27040) (#27200)" This reverts commit cd1ba2da8080bf1c317af37079a63f3a96b6be7d. --- python/ray/autoscaler/_private/commands.py | 7 - .../_private/kuberay/autoscaling_config.py | 2 + python/ray/autoscaler/_private/util.py | 19 - .../autoscaler/local/development-example.yaml | 2 + python/ray/autoscaler/ray-schema.json | 35 +- .../tests/kuberay/test_autoscaling_config.py | 2 + python/ray/tests/test_autoscaler.py | 467 +++++++++++++++--- python/ray/tests/test_autoscaler_yaml.py | 2 +- .../tests/test_resource_demand_scheduler.py | 199 +++++++- python/ray/tune/examples/tune-default.yaml | 8 +- 10 files changed, 625 insertions(+), 118 deletions(-) diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 155e41bca..ffd20f5c8 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -923,13 +923,6 @@ def _set_up_config_for_head_node( # head node won't be able to connect to workers remote_config["auth"].pop("ssh_proxy_command", None) - # Drop the head_node field if it was introduced. It is technically not a - # valid field in the config, but it may have been introduced after - # validation (see _bootstrap_config() call to - # provider_cls.bootstrap_config(config)). The head node will never try to - # launch a head node so it doesn't need these defaults. - remote_config.pop("head_node", None) - if "ssh_private_key" in config["auth"]: remote_key_path = "~/ray_bootstrap_key.pem" remote_config["auth"]["ssh_private_key"] = remote_key_path diff --git a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py index b10c3d4b1..29c0f0141 100644 --- a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py +++ b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py @@ -182,6 +182,8 @@ def _generate_legacy_autoscaling_config_fields() -> Dict[str, Any]: "head_start_ray_commands": [], "worker_start_ray_commands": [], "auth": {}, + "head_node": {}, + "worker_nodes": {}, } diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index a34bd73f5..6be87b6ca 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -222,23 +222,6 @@ def prepare_config(config: Dict[str, Any]) -> Dict[str, Any]: return with_defaults -def translate_trivial_legacy_config(config: Dict[str, Any]): - """ - Drop empty deprecated fields ("head_node" and "worker_node"). - """ - - REMOVABLE_FIELDS = ["head_node", "worker_nodes"] - - for field in REMOVABLE_FIELDS: - if field in config and not config[field]: - logger.warning( - f"Dropping the empty legacy field {field}. {field}" - "is not supported for ray>=2.0.0. It is recommended to remove" - f"{field} from the cluster config." - ) - del config[field] - - def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: defaults = _get_default_config(config["provider"]) defaults.update(config) @@ -264,8 +247,6 @@ def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: # workers, node types, but does specify min workers: merged_config.pop("min_workers", None) - translate_trivial_legacy_config(merged_config) - return merged_config diff --git a/python/ray/autoscaler/local/development-example.yaml b/python/ray/autoscaler/local/development-example.yaml index 12aebe319..117b5881d 100644 --- a/python/ray/autoscaler/local/development-example.yaml +++ b/python/ray/autoscaler/local/development-example.yaml @@ -9,6 +9,8 @@ provider: auth: ssh_user: YOUR_USERNAME ssh_private_key: ~/.ssh/id_rsa +head_node: {} +worker_nodes: {} file_mounts: "/tmp/ray_sha": "/YOUR/LOCAL/RAY/REPO/.git/refs/heads/YOUR_BRANCH" setup_commands: [] diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index ac59b65a5..32232c5fb 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -23,11 +23,32 @@ "description": "A unique identifier for the head node and workers of this cluster.", "type": "string" }, + "min_workers": { + "description": "DEPRECATED. Use the per node_type min_workers field instead.", + "type": "integer", + "minimum": 0 + }, "max_workers": { "description": "The maximum number of workers nodes to launch in addition to the head node. This takes precedence over min_workers.", "type": "integer", "minimum": 0 }, + "initial_workers": { + "description": "DEPRECATED.", + "type": "integer", + "minimum": 0 + }, + "autoscaling_mode": { + "description": "DEPRECATED. Use upscaling_speed instead.", + "type": "string", + "enum": [ "default", "aggressive" ] + }, + "target_utilization_fraction": { + "description": "DEPRECATED. Use upscaling_speed instead.", + "type": "number", + "minimum": 0, + "maximum": 1 + }, "upscaling_speed": { "description": "The autoscaler will scale up the cluster faster with higher upscaling speed. E.g., if the task requires adding more nodes then autoscaler will gradually scale up the cluster in chunks of upscaling_speed*currently_running_nodes. This number should be > 0.", "type": "number", @@ -48,7 +69,7 @@ "type": "string", "description": "e.g. aws, azure, gcp,..." }, - "region": { + "region": { "type": "string", "description": "e.g. us-east-1" }, @@ -280,6 +301,18 @@ "type": "string", "description": "If using multiple node types, specifies the head node type." }, + "worker_default_node_type": { + "type": "string", + "description": "DEPRECATED." + }, + "head_node": { + "type": "object", + "description": "Provider-specific config for the head node, e.g. instance type." + }, + "worker_nodes": { + "type": "object", + "description": "Provider-specific config for worker nodes. e.g. instance type." + }, "file_mounts": { "type": "object", "description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}" diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index c142213e8..f6d4b8121 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -91,6 +91,7 @@ def _get_basic_autoscaling_config() -> dict: "cluster_synced_files": [], "file_mounts": {}, "file_mounts_sync_continuously": False, + "head_node": {}, "head_node_type": "head-group", "head_setup_commands": [], "head_start_ray_commands": [], @@ -99,6 +100,7 @@ def _get_basic_autoscaling_config() -> dict: "max_workers": 600, "setup_commands": [], "upscaling_speed": 1000, + "worker_nodes": {}, "worker_setup_commands": [], "worker_start_ray_commands": [], } diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 0eceba815..2e33c9dc5 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -17,6 +17,7 @@ from unittest.mock import ( ) import grpc +import jsonschema import pytest import yaml from jsonschema.exceptions import ValidationError @@ -464,8 +465,12 @@ class NoUpdaterMockAutoscaler(MockAutoscaler): SMALL_CLUSTER = { "cluster_name": "default", - "idle_timeout_minutes": 5, + "min_workers": 2, "max_workers": 2, + "initial_workers": 0, + "autoscaling_mode": "default", + "target_utilization_fraction": 0.8, + "idle_timeout_minutes": 5, "provider": { "type": "mock", "region": "us-east-1", @@ -479,24 +484,12 @@ SMALL_CLUSTER = { "ssh_user": "ubuntu", "ssh_private_key": os.devnull, }, - "available_node_types": { - NODE_TYPE_LEGACY_HEAD: { - "node_config": { - "TestProp": 1, - }, - "resources": {}, - "max_workers": 0, - }, - NODE_TYPE_LEGACY_WORKER: { - "node_config": { - "TestProp": 2, - }, - "resources": {}, - "min_workers": 2, - "max_workers": 2, - }, + "head_node": { + "TestProp": 1, + }, + "worker_nodes": { + "TestProp": 2, }, - "head_node_type": NODE_TYPE_LEGACY_HEAD, "file_mounts": {}, "cluster_synced_files": [], "initialization_commands": ["init_cmd"], @@ -551,7 +544,6 @@ TYPES_A = { "empty_node": { "node_config": { "FooProperty": 42, - "TestProp": 1, }, "resources": {}, "max_workers": 0, @@ -813,7 +805,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, NODE_TYPE_LEGACY_HEAD) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) runner.assert_has_call("1.2.3.4", pattern="docker run") runner.assert_has_call("1.2.3.4", pattern=head_run_option) runner.assert_has_call("1.2.3.4", pattern=standard_run_option) @@ -1006,7 +998,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, NODE_TYPE_LEGACY_HEAD) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) runner.assert_has_call("1.2.3.4", pattern="podman run") docker_mount_prefix = get_docker_host_mount_location( @@ -1056,7 +1048,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, NODE_TYPE_LEGACY_HEAD) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) runner.assert_has_call("1.2.3.4", pattern="docker run") docker_mount_prefix = get_docker_host_mount_location( @@ -1168,7 +1160,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, NODE_TYPE_LEGACY_HEAD) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) runner.assert_has_call("1.2.3.4", pattern="docker stop") runner.assert_has_call("1.2.3.4", pattern="docker run") @@ -1220,7 +1212,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, NODE_TYPE_LEGACY_HEAD) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) # We only removed amount from the YAML, no changes should happen. runner.assert_not_has_call("1.2.3.4", pattern="docker stop") runner.assert_not_has_call("1.2.3.4", pattern="docker run") @@ -1296,7 +1288,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.4", pattern="rsync --rsh") def testRsyncCommandWithoutDocker(self): - cluster_cfg = copy.deepcopy(SMALL_CLUSTER) + cluster_cfg = SMALL_CLUSTER.copy() cluster_cfg["docker"] = {} config_path = self.write_config(cluster_cfg) self.provider = MockProvider(unique_ips=True) @@ -1482,10 +1474,9 @@ class AutoscalingTest(unittest.TestCase): self.ScaleUpHelper(disable_node_updaters=True) def testTerminateOutdatedNodesGracefully(self): - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 5 + config = SMALL_CLUSTER.copy() + config["min_workers"] = 5 config["max_workers"] = 5 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 5 config_path = self.write_config(config) self.provider = MockProvider() self.provider.create_node( @@ -1680,15 +1671,13 @@ class AutoscalingTest(unittest.TestCase): # Update the config to reduce the cluster size new_config = copy.deepcopy(SMALL_CLUSTER) new_config["max_workers"] = 1 - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 self.write_config(new_config) fill_in_raylet_ids(self.provider, lm) autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Update the config to increase the cluster size - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 10 - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + new_config["min_workers"] = 10 new_config["max_workers"] = 10 self.write_config(new_config) autoscaler.update() @@ -1719,6 +1708,101 @@ class AutoscalingTest(unittest.TestCase): assert mock_metrics.stopped_nodes.inc.call_count == 1 mock_metrics.running_workers.set.assert_called_with(10) + def testInitialWorkers(self): + """initial_workers is deprecated, this tests that it is ignored.""" + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["initial_workers"] = 10 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + autoscaler = MockAutoscaler( + config_path, + LoadMetrics(), + MockNodeInfoStub(), + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + process_runner=runner, + update_interval_s=0, + ) + self.waitForNodes(0) + autoscaler.update() + self.waitForNodes(0) + + def testLegacyYamlWithRequestResources(self): + """Test when using legacy yamls request_resources() adds workers. + + Makes sure that requested resources are added for legacy yamls when + necessary. So if requested resources for instance fit on the headnode + we don't add more nodes. But we add more nodes when they don't fit. + """ + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 100 + config["idle_timeout_minutes"] = 0 + config["upscaling_speed"] = 1 + config_path = self.write_config(config) + + self.provider = MockProvider() + self.provider.create_node( + {}, + { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + }, + 1, + ) + head_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, + )[0] + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + + lm = LoadMetrics() + lm.local_ip = head_ip + lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {}) + autoscaler = MockAutoscaler( + config_path, + lm, + MockNodeInfoStub(), + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + process_runner=runner, + update_interval_s=0, + ) + autoscaler.update() + # 1 head node. + self.waitForNodes(1) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) + autoscaler.update() + # still 1 head node because request_resources fits in the headnode. + self.waitForNodes(1) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}] + [{"CPU": 2}] * 9) + autoscaler.update() + self.waitForNodes(2) # Adds a single worker to get its resources. + autoscaler.update() + self.waitForNodes(2) # Still 1 worker because its resources + # aren't known. + lm.update("172.0.0.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {}) + autoscaler.update() + self.waitForNodes(10) # 9 workers and 1 head node, scaled immediately. + lm.update( + "172.0.0.1", + mock_raylet_id(), + {"CPU": 2}, + {"CPU": 2}, + {}, + waiting_bundles=[{"CPU": 2}] * 9, + infeasible_bundles=[{"CPU": 1}] * 1, + ) + autoscaler.update() + # Make sure that if all the resources fit on the exising nodes not + # to add any more. + self.waitForNodes(10) + def testAggressiveAutoscaling(self): self._aggressiveAutoscalingHelper() @@ -1727,13 +1811,10 @@ class AutoscalingTest(unittest.TestCase): def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False): config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + config["min_workers"] = 0 config["max_workers"] = 10 config["idle_timeout_minutes"] = 0 - config["upscaling_speed"] = config["available_node_types"][ - NODE_TYPE_LEGACY_WORKER - ]["max_workers"] + config["upscaling_speed"] = config["max_workers"] if foreground_node_launcher: config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True config_path = self.write_config(config) @@ -1829,12 +1910,13 @@ class AutoscalingTest(unittest.TestCase): ] == {"CPU": 1} def testUnmanagedNodes(self): - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 20 + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 config["max_workers"] = 20 + config["initial_workers"] = 0 config["idle_timeout_minutes"] = 0 - config["upscaling_speed"] = 9999 + config["autoscaling_mode"] = "aggressive" + config["target_utilization_fraction"] = 0.8 config_path = self.write_config(config) self.provider = MockProvider() @@ -1892,13 +1974,13 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(3) def testUnmanagedNodes2(self): - config = copy.deepcopy(SMALL_CLUSTER) - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 20 + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 config["max_workers"] = 20 + config["initial_workers"] = 0 config["idle_timeout_minutes"] = 0 - config["upscaling_speed"] = 9999 + config["autoscaling_mode"] = "aggressive" + config["target_utilization_fraction"] = 1.0 config_path = self.write_config(config) self.provider = MockProvider() @@ -1980,17 +2062,16 @@ class AutoscalingTest(unittest.TestCase): assert autoscaler.pending_launches.value == 0 # Update the config to reduce the cluster size - new_config = copy.deepcopy(SMALL_CLUSTER) - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 + new_config = SMALL_CLUSTER.copy() + new_config["max_workers"] = 1 self.write_config(new_config) fill_in_raylet_ids(self.provider, lm) autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 1 def testDelayedLaunchWithMinWorkers(self): - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 10 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + config = SMALL_CLUSTER.copy() + config["min_workers"] = 10 config["max_workers"] = 10 config_path = self.write_config(config) self.provider = MockProvider() @@ -2052,7 +2133,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) assert autoscaler.pending_launches.value == 0 - new_config = copy.deepcopy(SMALL_CLUSTER) + new_config = SMALL_CLUSTER.copy() new_config["max_workers"] = 1 self.write_config(new_config) autoscaler.update() @@ -2073,10 +2154,8 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) # Update the config to change the node type - new_config = copy.deepcopy(SMALL_CLUSTER) - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["node_config"][ - "InstanceType" - ] = "updated" + new_config = SMALL_CLUSTER.copy() + new_config["worker_nodes"]["InstanceType"] = "updated" self.write_config(new_config) self.provider.ready_to_create.clear() fill_in_raylet_ids(self.provider, lm) @@ -2115,7 +2194,6 @@ class AutoscalingTest(unittest.TestCase): prom_metrics=mock_metrics, ) autoscaler.update() - assert mock_metrics.config_validation_exceptions.inc.call_count == 0 self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Write a corrupted config @@ -2136,10 +2214,9 @@ class AutoscalingTest(unittest.TestCase): ) # New a good config again - new_config = copy.deepcopy(SMALL_CLUSTER) - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 10 + new_config = SMALL_CLUSTER.copy() + new_config["min_workers"] = 10 new_config["max_workers"] = 10 - new_config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 self.write_config(new_config) worker_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, @@ -2281,7 +2358,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) runner.calls = [] - new_config = copy.deepcopy(SMALL_CLUSTER) + new_config = SMALL_CLUSTER.copy() new_config["worker_setup_commands"] = ["cmdX", "cmdY"] self.write_config(new_config) autoscaler.update() @@ -2380,6 +2457,216 @@ class AutoscalingTest(unittest.TestCase): lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {}) assert lm + def testScaleUpBasedOnLoad(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 1 + config["max_workers"] = 10 + config["target_utilization_fraction"] = 0.5 + config_path = self.write_config(config) + self.provider = MockProvider() + lm = LoadMetrics() + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)]) + self.provider.create_node( + {}, + { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + }, + 1, + ) + lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {}) + autoscaler = MockAutoscaler( + config_path, + lm, + MockNodeInfoStub(), + max_failures=0, + process_runner=runner, + update_interval_s=0, + ) + assert ( + len( + self.provider.non_terminated_nodes( + {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} + ) + ) + == 0 + ) + autoscaler.update() + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + autoscaler.update() + assert autoscaler.pending_launches.value == 0 + assert ( + len( + self.provider.non_terminated_nodes( + {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} + ) + ) + == 1 + ) + + autoscaler.update() + lm.update( + "172.0.0.1", + mock_raylet_id(), + {"CPU": 2}, + {"CPU": 0}, + {}, + waiting_bundles=2 * [{"CPU": 2}], + ) + autoscaler.update() + self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + lm.update( + "172.0.0.2", + mock_raylet_id(), + {"CPU": 2}, + {"CPU": 0}, + {}, + waiting_bundles=3 * [{"CPU": 2}], + ) + autoscaler.update() + self.waitForNodes(5, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + + # Holds steady when load is removed + lm.update("172.0.0.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {}) + lm.update("172.0.0.2", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {}) + autoscaler.update() + assert autoscaler.pending_launches.value == 0 + assert ( + len( + self.provider.non_terminated_nodes( + {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} + ) + ) + == 5 + ) + + # Scales down as nodes become unused + lm.last_used_time_by_ip["172.0.0.1"] = 0 + lm.last_used_time_by_ip["172.0.0.2"] = 0 + autoscaler.update() + + assert autoscaler.pending_launches.value == 0 + # This actually remained 4 instead of 3, because the other 2 nodes + # are not connected and hence we rely more on connected nodes for + # min_workers. When the "pending" nodes show up as connected, + # then we can terminate the ones connected before. + assert ( + len( + self.provider.non_terminated_nodes( + {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} + ) + ) + == 4 + ) + lm.last_used_time_by_ip["172.0.0.3"] = 0 + lm.last_used_time_by_ip["172.0.0.4"] = 0 + fill_in_raylet_ids(self.provider, lm) + autoscaler.update() + assert autoscaler.pending_launches.value == 0 + # 2 nodes and not 1 because 1 is needed for min_worker and the other 1 + # is still not connected. + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + # when we connect it, we will see 1 node. + lm.last_used_time_by_ip["172.0.0.5"] = 0 + autoscaler.update() + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + + # Check add/remove events. + events = autoscaler.event_summarizer.summary() + assert "Adding 5 nodes of type ray-legacy-worker-node-type." in events, events + assert ( + "Removing 4 nodes of type ray-legacy-worker-node-type (idle)." in events + ), events + + summary = autoscaler.summary() + assert ( + len(summary.failed_nodes) == 0 + ), "Autoscaling policy decisions shouldn't result in failed nodes" + + def testTargetUtilizationFraction(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["upscaling_speed"] = 10 + config_path = self.write_config(config) + self.provider = MockProvider() + lm = LoadMetrics() + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(12)]) + autoscaler = MockAutoscaler( + config_path, + lm, + MockNodeInfoStub(), + max_failures=0, + process_runner=runner, + update_interval_s=0, + ) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + assert autoscaler.pending_launches.value == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 + self.provider.create_node( + {}, + { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + }, + 1, + ) + head_ip = self.provider.non_terminated_node_ips({})[0] + lm.local_ip = head_ip + lm.update( + head_ip, + mock_raylet_id(), + {"CPU": 2}, + {"CPU": 1}, + {}, + waiting_bundles=[{"CPU": 1}], + ) # head + # The headnode should be sufficient for now + autoscaler.update() + self.waitForNodes(1) + + # Requires 1 more worker as the head node is fully used. + lm.update( + head_ip, + mock_raylet_id(), + {"CPU": 2}, + {"CPU": 0}, + {}, + waiting_bundles=[{"CPU": 1}], + ) + autoscaler.update() + self.waitForNodes(2) # 1 worker is added to get its resources. + worker_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, + )[0] + lm.update( + worker_ip, + mock_raylet_id(), + {"CPU": 1}, + {"CPU": 1}, + {}, + waiting_bundles=[{"CPU": 1}] * 7, + infeasible_bundles=[{"CPU": 1}] * 4, + ) + # Add another 10 workers (frac=1/0.1=10, 1 worker running, 10*1=10) + # and bypass constraint of 5 due to target utiization fraction. + autoscaler.update() + self.waitForNodes(12) + + worker_ips = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, + ) + for ip in worker_ips: + lm.last_used_time_by_ip[ip] = 0 + fill_in_raylet_ids(self.provider, lm) + autoscaler.update() + self.waitForNodes(1) # only the head node + assert len(self.provider.non_terminated_nodes({})) == 1 + def testRecoverUnhealthyWorkers(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() @@ -2583,7 +2870,7 @@ class AutoscalingTest(unittest.TestCase): assert mock_metrics.drain_node_exceptions.inc.call_count == 0 def testExternalNodeScaler(self): - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["provider"] = { "type": "external", "module": "ray.autoscaler.node_provider.NodeProvider", @@ -2598,8 +2885,30 @@ class AutoscalingTest(unittest.TestCase): ) assert isinstance(autoscaler.provider, NodeProvider) + def testLegacyExternalNodeScalerMissingFields(self): + """Should fail to validate legacy external config with missing + head_node, worker_nodes, or both.""" + external_config = copy.deepcopy(SMALL_CLUSTER) + external_config["provider"] = { + "type": "external", + "module": "ray.autoscaler.node_provider.NodeProvider", + } + + missing_workers, missing_head, missing_both = [ + copy.deepcopy(external_config) for _ in range(3) + ] + del missing_workers["worker_nodes"] + del missing_head["head_node"] + del missing_both["worker_nodes"] + del missing_both["head_node"] + + for faulty_config in missing_workers, missing_head, missing_both: + faulty_config = prepare_config(faulty_config) + with pytest.raises(jsonschema.ValidationError): + validate_config(faulty_config) + def testExternalNodeScalerWrongImport(self): - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["provider"] = { "type": "external", "module": "mymodule.provider_class", @@ -2611,7 +2920,7 @@ class AutoscalingTest(unittest.TestCase): ) def testExternalNodeScalerWrongModuleFormat(self): - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["provider"] = { "type": "external", "module": "does-not-exist", @@ -2623,9 +2932,8 @@ class AutoscalingTest(unittest.TestCase): ) def testSetupCommandsWithNoNodeCaching(self): - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 1 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 + config = SMALL_CLUSTER.copy() + config["min_workers"] = 1 config["max_workers"] = 1 config_path = self.write_config(config) self.provider = MockProvider(cache_stopped=False) @@ -2665,12 +2973,11 @@ class AutoscalingTest(unittest.TestCase): def testSetupCommandsWithStoppedNodeCachingNoDocker(self): file_mount_dir = tempfile.mkdtemp() - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() del config["docker"] config["file_mounts"] = {"/root/test-folder": file_mount_dir} config["file_mounts_sync_continuously"] = True - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 1 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 + config["min_workers"] = 1 config["max_workers"] = 1 config_path = self.write_config(config) self.provider = MockProvider(cache_stopped=True) @@ -2735,11 +3042,10 @@ class AutoscalingTest(unittest.TestCase): # NOTE(ilr) Setup & Init commands **should** run with stopped nodes # when Docker is in use. file_mount_dir = tempfile.mkdtemp() - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["file_mounts"] = {"/root/test-folder": file_mount_dir} config["file_mounts_sync_continuously"] = True - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 1 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 + config["min_workers"] = 1 config["max_workers"] = 1 config_path = self.write_config(config) self.provider = MockProvider(cache_stopped=True) @@ -2812,11 +3118,10 @@ class AutoscalingTest(unittest.TestCase): runner.assert_not_has_call("172.0.0.1", " ") def testMultiNodeReuse(self): - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() # Docker re-runs setup commands when nodes are reused. del config["docker"] - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 3 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 3 + config["min_workers"] = 3 config["max_workers"] = 3 config_path = self.write_config(config) self.provider = MockProvider(cache_stopped=True) @@ -2842,8 +3147,7 @@ class AutoscalingTest(unittest.TestCase): runner.clear_history() # Scale up to 10 nodes, check we reuse the first 3 and add 7 more. - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 10 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + config["min_workers"] = 10 config["max_workers"] = 10 self.write_config(config) autoscaler.update() @@ -2864,7 +3168,7 @@ class AutoscalingTest(unittest.TestCase): file_mount_dir = tempfile.mkdtemp() self.provider = MockProvider() - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["file_mounts"] = {"/home/test-folder": file_mount_dir} config["file_mounts_sync_continuously"] = True config["min_workers"] = 2 @@ -2924,7 +3228,7 @@ class AutoscalingTest(unittest.TestCase): file_mount_dir = tempfile.mkdtemp() self.provider = MockProvider() - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config["file_mounts"] = {"/home/test-folder": file_mount_dir} config["min_workers"] = 2 config["max_workers"] = 2 @@ -3007,7 +3311,7 @@ class AutoscalingTest(unittest.TestCase): def testAutodetectResources(self): self.provider = MockProvider() - config = copy.deepcopy(SMALL_CLUSTER) + config = SMALL_CLUSTER.copy() config_path = self.write_config(config) runner = MockProcessRunner() proc_meminfo = """ @@ -3040,8 +3344,7 @@ MemAvailable: 33000000 kB def testDockerImageExistsBeforeInspect(self): config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["min_workers"] = 1 - config["available_node_types"][NODE_TYPE_LEGACY_WORKER]["max_workers"] = 1 + config["min_workers"] = 1 config["max_workers"] = 1 config["docker"]["pull_before_run"] = False config_path = self.write_config(config) diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index ffcb07587..fd648562d 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -301,7 +301,7 @@ class AutoscalingConfigTest(unittest.TestCase): ] = 0 assert prepared_config == expected_prepared - def testValidateNetworkConfigForBackwardsCompatibility(self): + def testValidateNetworkConfig(self): web_yaml = ( "https://raw.githubusercontent.com/ray-project/ray/" "master/python/ray/autoscaler/aws/example-full.yaml" diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index c5d708597..68992d31f 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -32,10 +32,13 @@ from ray.autoscaler._private.util import ( LoadMetricsSummary, format_info_string, is_placement_group_resource, + prepare_config, ) from ray.autoscaler.tags import ( NODE_KIND_HEAD, NODE_KIND_WORKER, + NODE_TYPE_LEGACY_HEAD, + NODE_TYPE_LEGACY_WORKER, STATUS_UNINITIALIZED, STATUS_UP_TO_DATE, STATUS_WAITING_FOR_SSH, @@ -45,7 +48,9 @@ from ray.autoscaler.tags import ( ) from ray.core.generated.common_pb2 import Bundle, PlacementStrategy from ray.tests.test_autoscaler import ( + MOCK_DEFAULT_CONFIG, MULTI_WORKER_CLUSTER, + SMALL_CLUSTER, TYPES_A, MockAutoscaler, MockNodeInfoStub, @@ -1168,6 +1173,192 @@ def test_get_nodes_to_launch_max_launch_concurrency(): assert not rem +def test_rewrite_legacy_yaml_to_available_node_types(): + with mock.patch(GET_DEFAULT_METHOD, return_value=MOCK_DEFAULT_CONFIG): + cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config. + cluster_config = prepare_config(cluster_config) + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD]["max_workers"] + == 0 + ) + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD]["min_workers"] + == 0 + ) + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD]["node_config"] + == SMALL_CLUSTER["head_node"] + ) + + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "node_config" + ] + == SMALL_CLUSTER["worker_nodes"] + ) + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "max_workers" + ] + == SMALL_CLUSTER["max_workers"] + ) + assert ( + cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "min_workers" + ] + == SMALL_CLUSTER["min_workers"] + ) + + +def test_handle_legacy_cluster_config_yaml(): + with mock.patch(GET_DEFAULT_METHOD, return_value=MOCK_DEFAULT_CONFIG): + provider = MockProvider() + head_resources = {"CPU": 8, "GPU": 1} + worker_resources = {"CPU": 32, "GPU": 8} + cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config. + cluster_config = prepare_config(cluster_config) + scheduler = ResourceDemandScheduler( + provider, + cluster_config["available_node_types"], + 0, + head_node_type=NODE_TYPE_LEGACY_HEAD, + ) + provider.create_node( + {}, + { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + }, + 1, + ) + head_ip = provider.non_terminated_node_ips({})[0] + head_node_id = provider.non_terminated_nodes({})[0] + to_launch, rem = scheduler.get_nodes_to_launch( + [], {}, [], {}, [], {head_ip: head_resources} + ) + assert to_launch == {} # Should always be empty with max_workers = 0. + assert not rem + + scheduler.max_workers = 30 + min_workers = scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0 + to_launch, rem = scheduler.get_nodes_to_launch( + [head_node_id], {}, [], {}, [], {head_ip: head_resources} + ) + assert ( + to_launch == {} + ) # Since the resource demand does not require adding nodes. + assert not rem + to_launch, rem = scheduler.get_nodes_to_launch( + [head_node_id], {}, [head_resources], {}, [], {head_ip: head_resources} + ) + assert ( + to_launch == {} + ) # Since the resource demand does not require adding nodes. + assert not rem + + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = min_workers + # Returns min_workers when min_workers>0. + to_launch, rem = scheduler.get_nodes_to_launch( + [head_node_id], {}, [head_resources], {}, [], {head_ip: head_resources} + ) + assert to_launch == {NODE_TYPE_LEGACY_WORKER: min_workers} + assert not rem + + provider.create_node( + {}, + { + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_WORKER, + }, + min_workers, + ) + nodes = provider.non_terminated_nodes({}) + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, {}, [head_resources], {}, [], {head_ip: head_resources} + ) + # A node is running, at some point it'll connect. + assert to_launch == {} + assert not rem + pending_launches = {NODE_TYPE_LEGACY_WORKER: 4} + to_launch, rem = scheduler.get_nodes_to_launch( + [], pending_launches, [head_resources], {}, [], {head_ip: head_resources} + ) + # A node is launching, at some point it'll connect. + assert to_launch == {} + assert not rem + + # Now assume that we already launched/connected the nodes. + ips = provider.non_terminated_node_ips({}) + lm = LoadMetrics() + worker_ips = [] + for ip in ips: + if ip == head_ip: + lm.update(ip, mock_raylet_id(), head_resources, head_resources, {}) + else: + lm.update(ip, mock_raylet_id(), worker_resources, worker_resources, {}) + worker_ips.append(ip) + + assert not scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, {}, [], {}, [], lm.get_static_node_resources_by_ip() + ) + assert ( + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] + == worker_resources + ) + assert to_launch == {} + assert not rem + utilizations = {ip: worker_resources for ip in worker_ips} + utilizations[head_ip] = head_resources + # Needs 4 nodes since worker resources is bigger than head reasources. + demands = [worker_resources] * (len(utilizations) + 3) + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], lm.get_static_node_resources_by_ip() + ) + # 4 nodes are necessary to meet resource demand, but we never exceed + # max_workers. + assert to_launch == {} + assert rem == [{"CPU": 32, "GPU": 8}] * 4 + scheduler.max_workers = 10 + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], lm.get_static_node_resources_by_ip() + ) + # 4 nodes are necessary to meet resource demand, but we never exceed + # max_workers. + assert to_launch == {} + assert rem == [{"CPU": 32, "GPU": 8}] * 4 + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], lm.get_static_node_resources_by_ip() + ) + # 4 nodes are necessary to meet resource demand. + assert to_launch == {NODE_TYPE_LEGACY_WORKER: 4} + assert not rem + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, + pending_launches, + demands, + utilizations, + [], + lm.get_static_node_resources_by_ip(), + ) + # 0 because there are 4 pending launches and we only need 4. + assert to_launch == {} + to_launch, rem = scheduler.get_nodes_to_launch( + nodes, + pending_launches, + demands * 2, + utilizations, + [], + lm.get_static_node_resources_by_ip(), + ) + # 1 because there are 4 pending launches and we only allow a max of 5. + assert to_launch == {NODE_TYPE_LEGACY_WORKER: 1} + assert rem == [{"CPU": 32, "GPU": 8}] * 2 + + class LoadMetricsTest(unittest.TestCase): def testResourceDemandVector(self): lm = LoadMetrics() @@ -1547,7 +1738,9 @@ class AutoscalingTest(unittest.TestCase): def testScaleUpMinSanity(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) - config["available_node_types"]["m4.large"]["min_workers"] = 2 + config["available_node_types"]["m4.large"]["min_workers"] = config[ + "min_workers" + ] config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() @@ -2176,7 +2369,9 @@ class AutoscalingTest(unittest.TestCase): def testUpdateConfig(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) - config["available_node_types"]["m4.large"]["min_workers"] = 2 + config["available_node_types"]["m4.large"]["min_workers"] = config[ + "min_workers" + ] config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() diff --git a/python/ray/tune/examples/tune-default.yaml b/python/ray/tune/examples/tune-default.yaml index 31df4bf29..1f51cf4ce 100644 --- a/python/ray/tune/examples/tune-default.yaml +++ b/python/ray/tune/examples/tune-default.yaml @@ -4,11 +4,7 @@ auth: {ssh_user: ubuntu} min_workers: 3 max_workers: 3 # Deep Learning AMI (Ubuntu) Version 21.0 -available_node_types: - head_node: - node_config: {InstanceType: c5.xlarge, ImageId: ami-0b294f219d14e6a82} - worker_nodes: - node_config: {InstanceType: c5.xlarge, ImageId: ami-0b294f219d14e6a82} -head_node_type: head_node +head_node: {InstanceType: c5.xlarge, ImageId: ami-0b294f219d14e6a82} +worker_nodes: {InstanceType: c5.xlarge, ImageId: ami-0b294f219d14e6a82} setup_commands: # Set up each node. - pip install ray torch torchvision tabulate tensorboard