diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index d12c0adab..8eba80aaf 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -51,6 +51,9 @@ CLUSTER_CONFIG_SCHEMA = { # The number of workers to launch initially, in addition to the head node. "initial_workers": (int, OPTIONAL), + # The mode of the autoscaler e.g. default, aggressive + "autoscaling_mode": (str, OPTIONAL), + # The autoscaler will scale up the cluster to this target fraction of # resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10. @@ -519,9 +522,13 @@ class StandardAutoscaler(object): ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node + initial_workers = self.config["initial_workers"] + aggressive = self.config["autoscaling_mode"] == "aggressive" if self.bringup: - ideal_num_workers = max(ideal_num_workers, - self.config["initial_workers"]) + ideal_num_workers = max(ideal_num_workers, initial_workers) + elif aggressive and cur_used > 0: + # If we want any workers, we want at least initial_workers + ideal_num_workers = max(ideal_num_workers, initial_workers) return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 82bf92a99..43e2172a3 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/aws/example-gpu-docker.yaml b/python/ray/autoscaler/aws/example-gpu-docker.yaml index 962685390..37c0323fc 100644 --- a/python/ray/autoscaler/aws/example-gpu-docker.yaml +++ b/python/ray/autoscaler/aws/example-gpu-docker.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index a10c7b3c1..405185687 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/gcp/example-gpu-docker.yaml b/python/ray/autoscaler/gcp/example-gpu-docker.yaml index fa1face51..fda967c56 100644 --- a/python/ray/autoscaler/gcp/example-gpu-docker.yaml +++ b/python/ray/autoscaler/gcp/example-gpu-docker.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index 51f64cbfb..1d152807c 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -2,6 +2,7 @@ cluster_name: default min_workers: 0 max_workers: 0 initial_workers: 0 +autoscaling_mode: default docker: image: "" container_name: "" diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 6cbcc761f..d679f7548 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -159,10 +159,11 @@ class NodeUpdater(object): try: logger.debug("NodeUpdater: " "{}: Waiting for SSH...".format(self.node_id)) - self.ssh_cmd( - "uptime", - connect_timeout=5, - redirect=open("/dev/null", "w")) + + with open("/dev/null", "w") as redirect: + self.ssh_cmd( + "uptime", connect_timeout=5, redirect=redirect) + return True except Exception as e: @@ -206,12 +207,12 @@ class NodeUpdater(object): m = "{}: Synced {} to {}".format(self.node_id, local_path, remote_path) with LogTimer("NodeUpdater {}".format(m)): - self.ssh_cmd( - "mkdir -p {}".format(os.path.dirname(remote_path)), - redirect=open("/dev/null", "w"), - ) - self.rsync_up( - local_path, remote_path, redirect=open("/dev/null", "w")) + with open("/dev/null", "w") as redirect: + self.ssh_cmd( + "mkdir -p {}".format(os.path.dirname(remote_path)), + redirect=redirect, + ) + self.rsync_up(local_path, remote_path, redirect=redirect) # Run init commands self.provider.set_node_tags(self.node_id, @@ -219,13 +220,15 @@ class NodeUpdater(object): m = "{}: Initialization commands completed".format(self.node_id) with LogTimer("NodeUpdater: {}".format(m)): - for cmd in self.initialization_commands: - self.ssh_cmd(cmd) + with open("/dev/null", "w") as redirect: + for cmd in self.initialization_commands: + self.ssh_cmd(cmd, redirect=redirect) m = "{}: Setup commands completed".format(self.node_id) with LogTimer("NodeUpdater: {}".format(m)): - for cmd in self.setup_commands: - self.ssh_cmd(cmd) + with open("/dev/null", "w") as redirect: + for cmd in self.setup_commands: + self.ssh_cmd(cmd, redirect=redirect) def rsync_up(self, source, target, redirect=None, check_error=True): self.set_ssh_ip_if_required() diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 403a6c414..44f963e53 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -141,6 +141,7 @@ class Dashboard(object): "min_workers": cfg["min_workers"], "max_workers": cfg["max_workers"], "initial_workers": cfg["initial_workers"], + "autoscaling_mode": cfg["autoscaling_mode"], "idle_timeout_minutes": cfg["idle_timeout_minutes"], } diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f4503c6c2..d870c655a 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -256,6 +256,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, raylet_socket_name=raylet_socket_name, temp_dir=temp_dir, include_java=include_java, + include_webui=include_webui, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, _internal_config=internal_config) @@ -292,7 +293,6 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_memory=redis_max_memory, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, - include_webui=include_webui, autoscaling_config=autoscaling_config, include_java=False, ) diff --git a/python/ray/services.py b/python/ray/services.py index 4a6fd30f3..b9d5867c4 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -220,6 +220,8 @@ def get_node_ip_address(address="8.8.8.8:53"): node_ip_address = socket.gethostbyname(host_name) except Exception: pass + finally: + s.close() return node_ip_address diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 6f0ac9f85..c0ecafc5c 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -64,6 +64,14 @@ class MockProvider(NodeProvider): if n.matches(tag_filters) and n.state != "terminated" ] + def non_terminated_node_ips(self, tag_filters): + if self.throw: + raise Exception("oops") + return [ + n.internal_ip for n in self.mock_nodes.values() + if n.matches(tag_filters) and n.state != "terminated" + ] + def is_running(self, node_id): return self.mock_nodes[node_id].state == "running" @@ -99,6 +107,7 @@ SMALL_CLUSTER = { "min_workers": 2, "max_workers": 2, "initial_workers": 0, + "autoscaling_mode": "default", "target_utilization_fraction": 0.8, "idle_timeout_minutes": 5, "provider": { @@ -337,6 +346,59 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(10) autoscaler.update() + def testAggressiveAutoscaling(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["initial_workers"] = 10 + config["idle_timeout_minutes"] = 0 + config["autoscaling_mode"] = "aggressive" + config_path = self.write_config(config) + + self.provider = MockProvider() + self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "head"}, 1) + head_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_TYPE: "head"}, )[0] + + lm = LoadMetrics() + lm.local_ip = head_ip + + autoscaler = StandardAutoscaler( + config_path, + lm, + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + update_interval_s=0) + + self.waitForNodes(1) + autoscaler.update() + self.waitForNodes(6) # expected due to batch sizes and concurrency + autoscaler.update() + self.waitForNodes(11) + + # Connect the head and workers to end the bringup phase + addrs = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_TYPE: "worker"}, ) + addrs += head_ip + for addr in addrs: + lm.update(addr, {"CPU": 2}, {"CPU": 0}) + lm.update(addr, {"CPU": 2}, {"CPU": 2}) + assert autoscaler.bringup + autoscaler.update() + + assert not autoscaler.bringup + autoscaler.update() + self.waitForNodes(1) + + # All of the nodes are down. Simulate some load on the head node + lm.update(head_ip, {"CPU": 2}, {"CPU": 0}) + + autoscaler.update() + self.waitForNodes(6) # expected due to batch sizes and concurrency + autoscaler.update() + self.waitForNodes(11) + def testDelayedLaunch(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider()