diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index ba1fb4fe4..cf4f07478 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -13,16 +13,17 @@ import collections from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized -from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, - TAG_RAY_FILE_MOUNTS_CONTENTS, - TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, - TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, - NODE_KIND_WORKER, NODE_KIND_UNMANAGED) +from ray.autoscaler.tags import ( + TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, + TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, + TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER, + NODE_KIND_UNMANAGED, NODE_KIND_HEAD) from ray.autoscaler._private.providers import _get_node_provider from ray.autoscaler._private.updater import NodeUpdaterThread from ray.autoscaler._private.node_launcher import NodeLauncher from ray.autoscaler._private.resource_demand_scheduler import \ - ResourceDemandScheduler, NodeType, NodeID + get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \ + ResourceDict from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR @@ -164,27 +165,35 @@ class StandardAutoscaler: last_used = self.load_metrics.last_used_time_by_ip horizon = now - (60 * self.config["idle_timeout_minutes"]) - nodes_to_terminate = [] + nodes_to_terminate: Dict[NodeID, bool] = [] node_type_counts = collections.defaultdict(int) # Sort based on last used to make sure to keep min_workers that # were most recently used. Otherwise, _keep_min_workers_of_node_type # might keep a node that should be terminated. - for node_id in self._sort_based_on_last_used(nodes, last_used): + sorted_node_ids = self._sort_based_on_last_used(nodes, last_used) + # Don't terminate nodes needed by request_resources() + nodes_allowed_to_terminate: Dict[NodeID, bool] = {} + if self.resource_demand_vector: + nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate( + sorted_node_ids) + + for node_id in sorted_node_ids: # Make sure to not kill idle node types if the number of workers - # of that type is lower/equal to the min_workers of that type. - if self._keep_min_worker_of_node_type( - node_id, - node_type_counts) and self.launch_config_ok(node_id): + # of that type is lower/equal to the min_workers of that type + # or it is needed for request_resources(). + if (self._keep_min_worker_of_node_type(node_id, node_type_counts) + or not nodes_allowed_to_terminate.get( + node_id, True)) and self.launch_config_ok(node_id): continue node_ip = self.provider.internal_ip(node_id) if node_ip in last_used and last_used[node_ip] < horizon: logger.info("StandardAutoscaler: " - "{}: Terminating idle node".format(node_id)) + "{}: Terminating idle node.".format(node_id)) nodes_to_terminate.append(node_id) elif not self.launch_config_ok(node_id): logger.info("StandardAutoscaler: " - "{}: Terminating outdated node".format(node_id)) + "{}: Terminating outdated node.".format(node_id)) nodes_to_terminate.append(node_id) if nodes_to_terminate: @@ -198,7 +207,7 @@ class StandardAutoscaler: len(nodes_to_terminate)) > self.config["max_workers"] and nodes: to_terminate = nodes.pop() logger.info("StandardAutoscaler: " - "{}: Terminating unneeded node".format(to_terminate)) + "{}: Terminating unneeded node.".format(to_terminate)) nodes_to_terminate.append(to_terminate) if nodes_to_terminate: @@ -226,15 +235,23 @@ class StandardAutoscaler: if not updater.is_alive(): completed.append(node_id) if completed: + nodes_to_terminate: List[NodeID] = [] for node_id in completed: if self.updaters[node_id].exitcode == 0: self.num_successful_updates[node_id] += 1 + # Mark the node as active to prevent the node recovery + # logic immediately trying to restart Ray on the new node. + self.load_metrics.mark_active( + self.provider.internal_ip(node_id)) else: + logger.error(f"StandardAutoscaler: {node_id}: Terminating " + "failed to setup/initialize node.") + nodes_to_terminate.append(node_id) self.num_failed_updates[node_id] += 1 del self.updaters[node_id] - # Mark the node as active to prevent the node recovery logic - # immediately trying to restart Ray on the new node. - self.load_metrics.mark_active(self.provider.internal_ip(node_id)) + if nodes_to_terminate: + self.provider.terminate_nodes(nodes_to_terminate) + nodes = self.workers() self.log_info_string(nodes) @@ -266,14 +283,16 @@ class StandardAutoscaler: last_used: Dict[str, float]) -> List[NodeID]: """Sort the nodes based on the last time they were used. - The first item in the return list is the least recently used. + The first item in the return list is the most recently used. """ updated_last_used = copy.deepcopy(last_used) - now = time.time() + # Add the unconnected nodes as the least recently used (the end of + # list). This prioritizes connected nodes. + least_recently_used = -1 for node_id in nodes: node_ip = self.provider.internal_ip(node_id) if node_ip not in updated_last_used: - updated_last_used[node_ip] = now + updated_last_used[node_ip] = least_recently_used def last_time_used(node_id: NodeID): node_ip = self.provider.internal_ip(node_id) @@ -281,9 +300,86 @@ class StandardAutoscaler: return sorted(nodes, key=last_time_used, reverse=True) - def _keep_min_worker_of_node_type(self, node_id: NodeID, - node_type_counts: Dict[NodeType, int]): - """Returns if workers of node_type should be terminated. + def _get_nodes_allowed_to_terminate( + self, sorted_node_ids: List[NodeID]) -> Dict[NodeID, bool]: + # TODO(ameer): try merging this with resource_demand_scheduler + # code responsible for adding nodes for request_resources(). + """Returns the nodes allowed to terminate for request_resources(). + + Args: + sorted_node_ids: the node ids sorted based on last used (LRU last). + + Returns: + nodes_allowed_to_terminate: whether the node id is allowed to + terminate or not. + """ + nodes_allowed_to_terminate: Dict[NodeID, bool] = {} + head_node_resources: ResourceDict = copy.deepcopy( + self.available_node_types[self.config["head_node_type"]][ + "resources"]) + if not head_node_resources: + # Legacy yaml might include {} in the resources field. + # TODO(ameer): this is somewhat duplicated in + # resource_demand_scheduler.py. + head_id: List[NodeID] = self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_HEAD + }) + if head_id: + head_ip = self.provider.internal_ip(head_id[0]) + static_nodes: Dict[ + NodeIP, + ResourceDict] = \ + self.load_metrics.get_static_node_resources_by_ip() + head_node_resources = static_nodes[head_ip] + else: + head_node_resources = {} + + max_node_resources: List[ResourceDict] = [head_node_resources] + resource_demand_vector_worker_node_ids = [] + # Get max resources on all the non terminated nodes. + for node_id in sorted_node_ids: + tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + node_resources: ResourceDict = copy.deepcopy( + self.available_node_types[node_type]["resources"]) + if not node_resources: + # Legacy yaml might include {} in the resources field. + static_nodes: Dict[ + NodeIP, + ResourceDict] = \ + self.load_metrics.get_static_node_resources_by_ip() + node_ip = self.provider.internal_ip(node_id) + node_resources = static_nodes.get(node_ip, {}) + max_node_resources.append(node_resources) + resource_demand_vector_worker_node_ids.append(node_id) + # Since it is sorted based on last used, we "keep" nodes that are + # most recently used when we binpack. We assume get_bin_pack_residual + # is following the given order here. + used_resource_requests: List[ResourceDict] + _, used_resource_requests = \ + get_bin_pack_residual(max_node_resources, + self.resource_demand_vector) + # Remove the first entry (the head node). + max_node_resources.pop(0) + # Remove the first entry (the head node). + used_resource_requests.pop(0) + for i, node_id in enumerate(resource_demand_vector_worker_node_ids): + if used_resource_requests[i] == max_node_resources[i] \ + and max_node_resources[i]: + # No resources of the node were needed for request_resources(). + # max_node_resources[i] is an empty dict for legacy yamls + # before the node is connected. + nodes_allowed_to_terminate[node_id] = True + else: + nodes_allowed_to_terminate[node_id] = False + return nodes_allowed_to_terminate + + def _keep_min_worker_of_node_type( + self, node_id: NodeID, + node_type_counts: Dict[NodeType, int]) -> bool: + """Returns if workers of node_type can be terminated. + The worker cannot be terminated to respect min_workers constraint. Receives the counters of running nodes so far and determines if idle node_id should be terminated or not. It also updates the counters @@ -293,7 +389,7 @@ class StandardAutoscaler: node_type_counts(Dict[NodeType, int]): The non_terminated node types counted so far. Returns: - bool: if workers of node_types should be terminated or not. + bool: if workers of node_types can be terminated or not. """ tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in tags: diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index e75364e8c..6bbae1762 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -135,24 +135,6 @@ class ResourceDemandScheduler: this set of resources. This differs from resources_demands in that we don't take into account existing usage. """ - - # If the user is using request_resources() API, calculate the remaining - # delta resources required to meet their requested cluster size. - if ensure_min_cluster_size is not None: - used_resources = [] - for ip, max_res in max_resources_by_ip.items(): - res = copy.deepcopy(max_res) - _inplace_subtract(res, unused_resources_by_ip.get(ip, {})) - used_resources.append(res) - # Example: user requests 1000 CPUs, but the cluster is currently - # 500 CPUs in size with 250 used. Then, the delta is 750 CPUs that - # we need to fit to get the cluster to scale to 1000. - resource_requests, _ = get_bin_pack_residual( - used_resources, ensure_min_cluster_size) - resource_demands += resource_requests - else: - resource_requests = [] - if self.is_legacy_yaml(): # When using legacy yaml files we need to infer the head & worker # node resources from the static node resources from LoadMetrics. @@ -166,9 +148,12 @@ class ResourceDemandScheduler: logger.info("Cluster resources: {}".format(node_resources)) logger.info("Node counts: {}".format(node_type_counts)) # Step 2: add nodes to add to satisfy min_workers for each type - node_resources, node_type_counts, min_workers_nodes_to_add = \ + (node_resources, + node_type_counts, + adjusted_min_workers) = \ _add_min_workers_nodes( - node_resources, node_type_counts, self.node_types) + node_resources, node_type_counts, self.node_types, + self.max_workers, ensure_min_cluster_size) # Step 3: add nodes for strict spread groups logger.info(f"Placement group demands: {pending_placement_groups}") @@ -180,8 +165,16 @@ class ResourceDemandScheduler: not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: # Need to launch worker nodes to later infer their # resources. + # We add request_resources() demands here to make sure we launch + # a single worker sometimes even if min_workers = 0 and resource + # demands is empty. + if ensure_min_cluster_size: + request_resources_demands = ensure_min_cluster_size + else: + request_resources_demands = [] return self._legacy_worker_node_to_launch( - nodes, launching_nodes, node_resources, resource_demands) + nodes, launching_nodes, node_resources, + resource_demands + request_resources_demands) placement_group_nodes_to_add, node_resources, node_type_counts = \ self.reserve_and_allocate_spread( strict_spreads, node_resources, node_type_counts) @@ -194,20 +187,15 @@ class ResourceDemandScheduler: logger.info("Unfulfilled demands: {}".format(unfulfilled)) # Add 1 to account for the head node. max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) - if resource_requests: - nodes_to_add_based_on_requests = get_nodes_for( - self.node_types, node_type_counts, max_to_add, - resource_requests) - else: - nodes_to_add_based_on_requests = {} nodes_to_add_based_on_demand = get_nodes_for( self.node_types, node_type_counts, max_to_add, unfulfilled) # Merge nodes to add based on demand and nodes to add based on # min_workers constraint. We add them because nodes to add based on # demand was calculated after the min_workers constraint was respected. total_nodes_to_add = {} + for node_type in self.node_types: - nodes_to_add = (min_workers_nodes_to_add.get( + nodes_to_add = (adjusted_min_workers.get( node_type, 0) + placement_group_nodes_to_add.get(node_type, 0) + nodes_to_add_based_on_demand.get(node_type, 0)) if nodes_to_add > 0: @@ -216,7 +204,7 @@ class ResourceDemandScheduler: # Limit the number of concurrent launches total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( total_nodes_to_add, unused_resources_by_ip.keys(), nodes, - launching_nodes, nodes_to_add_based_on_requests) + launching_nodes, adjusted_min_workers) logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add @@ -294,7 +282,7 @@ class ResourceDemandScheduler: connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID], pending_launches_nodes: Dict[NodeType, int], - nodes_to_add_based_on_requests: Dict[NodeType, int], + adjusted_min_workers: Dict[NodeType, int], ) -> Dict[NodeType, int]: """Updates the max concurrent resources to launch for each node type. @@ -314,9 +302,10 @@ class ResourceDemandScheduler: connected_nodes: Running nodes (from LoadMetrics). non_terminated_nodes: Non terminated nodes (pending/running). pending_launches_nodes: Nodes that are in the launch queue. - nodes_to_add_based_on_requests: Nodes to launch to satisfy - request_resources(). This overrides the launch limits since the - user is hinting to immediately scale up to this size. + adjusted_min_workers: Nodes to launch to satisfy + min_workers and request_resources(). This overrides the launch + limits since the user is hinting to immediately scale up to + this size. Returns: Dict[NodeType, int]: Maximum number of nodes to launch for each node type. @@ -338,13 +327,9 @@ class ResourceDemandScheduler: upper_bound = max( max_allowed_pending_nodes - total_pending_nodes, - # Allow more nodes if this is to respect min_workers. - self.node_types[node_type].get("min_workers", 0) - - total_pending_nodes - running_nodes[node_type], - - # Allow more nodes from request_resources API. - nodes_to_add_based_on_requests.get(node_type, - 0) - total_pending_nodes) + # Allow more nodes if this is to respect min_workers or + # request_resources(). + adjusted_min_workers.get(node_type, 0)) if upper_bound > 0: updated_nodes_to_launch[node_type] = min( @@ -504,21 +489,26 @@ def _node_type_counts_to_node_resources( def _add_min_workers_nodes( node_resources: List[ResourceDict], node_type_counts: Dict[NodeType, int], - node_types: Dict[NodeType, NodeTypeConfigDict], + node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int, + ensure_min_cluster_size: List[ResourceDict] ) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]): - """Updates resource demands to respect the min_workers constraint. + """Updates resource demands to respect the min_workers and + request_resources() constraints. Args: node_resources: Resources of exisiting nodes already launched/pending. node_type_counts: Counts of existing nodes already launched/pending. node_types: Node types config. + max_workers: global max_workers constaint. + ensure_min_cluster_size: resource demands from request_resources(). Returns: node_resources: The updated node resources after adding min_workers - constraint per node type. + and request_resources() constraints per node type. node_type_counts: The updated node counts after adding min_workers - constraint per node type. - total_nodes_to_add: The nodes to add to respect min_workers constraint. + and request_resources() constraints per node type. + total_nodes_to_add_dict: The nodes to add to respect min_workers and + request_resources() constraints. """ total_nodes_to_add_dict = {} for node_type, config in node_types.items(): @@ -528,10 +518,41 @@ def _add_min_workers_nodes( if existing < target: total_nodes_to_add_dict[node_type] = target - existing node_type_counts[node_type] = target - available = copy.deepcopy(node_types[node_type]["resources"]) - node_resources.extend( - [available] * total_nodes_to_add_dict[node_type]) + node_resources.extend([ + copy.deepcopy(node_types[node_type]["resources"]) + for _ in range(total_nodes_to_add_dict[node_type]) + ]) + if ensure_min_cluster_size: + max_to_add = max_workers + 1 - sum(node_type_counts.values()) + max_node_resources = [] + # Fit request_resources() on all the resources as if they are idle. + for node_type in node_type_counts: + max_node_resources.extend([ + copy.deepcopy(node_types[node_type]["resources"]) + for _ in range(node_type_counts[node_type]) + ]) + # Get the unfulfilled to ensure min cluster size. + resource_requests_unfulfilled, _ = get_bin_pack_residual( + max_node_resources, ensure_min_cluster_size) + # Get the nodes to meet the unfulfilled. + nodes_to_add_request_resources = get_nodes_for( + node_types, node_type_counts, max_to_add, + resource_requests_unfulfilled) + # Update the resources, counts and total nodes to add. + for node_type in nodes_to_add_request_resources: + nodes_to_add = nodes_to_add_request_resources.get(node_type, 0) + if nodes_to_add > 0: + node_type_counts[ + node_type] = nodes_to_add + node_type_counts.get( + node_type, 0) + node_resources.extend([ + copy.deepcopy(node_types[node_type]["resources"]) + for _ in range(nodes_to_add) + ]) + total_nodes_to_add_dict[ + node_type] = nodes_to_add + total_nodes_to_add_dict.get( + node_type, 0) return node_resources, node_type_counts, total_nodes_to_add_dict @@ -623,7 +644,8 @@ def _utilization_score(node_resources: ResourceDict, def get_bin_pack_residual(node_resources: List[ResourceDict], resource_demands: List[ResourceDict], - strict_spread: bool = False) -> List[ResourceDict]: + strict_spread: bool = False + ) -> (List[ResourceDict], List[ResourceDict]): """Return a subset of resource_demands that cannot fit in the cluster. TODO(ekl): this currently does not guarantee the resources will be packed @@ -638,7 +660,7 @@ def get_bin_pack_residual(node_resources: List[ResourceDict], placed on a different entry in `node_resources`. Returns: - List[ResourceDict] the residual list resources that do not fit. + List[ResourceDict]: the residual list resources that do not fit. List[ResourceDict]: The updated node_resources after the method. """ diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index f99b3791c..e685f34d6 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -537,6 +537,7 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10) runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), @@ -558,6 +559,7 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, @@ -613,6 +615,70 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(0) + def testLegacyYamlWithRequestResources(self): + """Test when using legacy yamls request_resources() adds workers. + + Makes sure that requested resources are added for legacy yamls when + necessary. So if requested resources for instance fit on the headnode + we don't add more nodes. But we add more nodes when they don't fit. + """ + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 100 + config["idle_timeout_minutes"] = 0 + config["upscaling_speed"] = 1 + config_path = self.write_config(config) + + self.provider = MockProvider() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) + head_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, )[0] + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + + lm = LoadMetrics() + lm.local_ip = head_ip + lm.update(head_ip, {"CPU": 1}, {"CPU": 1}, {}) + autoscaler = StandardAutoscaler( + config_path, + lm, + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + process_runner=runner, + update_interval_s=0) + autoscaler.update() + # 1 head node. + self.waitForNodes(1) + autoscaler.request_resources([{"CPU": 1}]) + autoscaler.update() + # still 1 head node because request_resources fits in the headnode. + self.waitForNodes(1) + autoscaler.request_resources([{"CPU": 1}] + [{"CPU": 2}] * 9) + autoscaler.update() + self.waitForNodes(2) # Adds a single worker to get its resources. + autoscaler.update() + self.waitForNodes(2) # Still 1 worker because its resources + # aren't known. + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) + autoscaler.update() + self.waitForNodes(10) # 9 workers and 1 head node, scaled immediately. + lm.update( + "172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}, + waiting_bundles=[{ + "CPU": 2 + }] * 9, + infeasible_bundles=[{ + "CPU": 1 + }] * 1) + autoscaler.update() + # Make sure that if all the resources fit on the exising nodes not + # to add any more. + self.waitForNodes(10) + def testAggressiveAutoscaling(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 0 @@ -896,6 +962,7 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, @@ -1038,6 +1105,7 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() lm = LoadMetrics() runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)]) autoscaler = StandardAutoscaler( config_path, lm, @@ -1087,12 +1155,22 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() assert autoscaler.pending_launches.value == 0 - assert len(self.provider.non_terminated_nodes({})) == 3 + # This actually remained 4 instead of 3, because the other 2 nodes + # are not connected and hence we rely more on connected nodes for + # min_workers. When the "pending" nodes show up as connected, + # then we can terminate the ones connected before. + assert len(self.provider.non_terminated_nodes({})) == 4 lm.last_used_time_by_ip["172.0.0.2"] = 0 lm.last_used_time_by_ip["172.0.0.3"] = 0 autoscaler.update() assert autoscaler.pending_launches.value == 0 - assert len(self.provider.non_terminated_nodes({})) == 1 + # 2 nodes and not 1 because 1 is needed for min_worker and the other 1 + # is still not connected. + self.waitForNodes(2) + # when we connect it, we will see 1 node. + lm.last_used_time_by_ip["172.0.0.4"] = 0 + autoscaler.update() + self.waitForNodes(1) def testTargetUtilizationFraction(self): config = SMALL_CLUSTER.copy() diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index f20462993..50d899af0 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -256,19 +256,19 @@ def test_add_min_workers_nodes(): } assert _add_min_workers_nodes([], {}, - types) == \ + types, None, None) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, {"m2.large": 50, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*5, {"m2.large": 5}, - types) == \ + types, None, None) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, {"m2.large": 45, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*60, {"m2.large": 60}, - types) == \ + types, None, None) == \ ([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999}, {"gpu": 99999}) @@ -279,7 +279,7 @@ def test_add_min_workers_nodes(): }] * 99999, { "m2.large": 50, "gpu": 99999 - }, types) == ([{ + }, types, None, None) == ([{ "CPU": 2 }] * 50 + [{ "GPU": 1 @@ -288,17 +288,18 @@ def test_add_min_workers_nodes(): "gpu": 99999 }, {}) - assert _add_min_workers_nodes([], {}, - {"gpubla": types["gpubla"]}) == ([], {}, {}) + assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None, + None) == ([], {}, {}) types["gpubla"]["max_workers"] = 10 - assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}) == ([{ - "GPU": 1 - }] * 10, { - "gpubla": 10 - }, { - "gpubla": 10 - }) + assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None, + None) == ([{ + "GPU": 1 + }] * 10, { + "gpubla": 10 + }, { + "gpubla": 10 + }) def test_get_nodes_to_launch_with_min_workers(): @@ -1406,7 +1407,7 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) autoscaler = StandardAutoscaler( config_path, LoadMetrics(), @@ -1432,6 +1433,7 @@ class AutoscalingTest(unittest.TestCase): sleep(0.1) runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, "new_worker_setup_command") + runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, "setup_cmd") runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, @@ -1571,6 +1573,312 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + def testRequestResourcesIdleTimeout(self): + """Test request_resources() with and without idle timeout.""" + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["max_workers"] = 4 + config["idle_timeout_minutes"] = 0 + config["available_node_types"] = { + "empty_node": { + "node_config": {}, + "resources": { + "CPU": 2 + }, + "max_workers": 1 + }, + "def_worker": { + "node_config": {}, + "resources": { + "CPU": 2, + "GPU": 1, + "WORKER": 1 + }, + "max_workers": 3 + } + } + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + lm = LoadMetrics() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + autoscaler.update() + self.waitForNodes(0) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.update() + self.waitForNodes(1) + non_terminated_nodes = autoscaler.provider.non_terminated_nodes({}) + assert len(non_terminated_nodes) == 1 + node_id = non_terminated_nodes[0] + node_ip = autoscaler.provider.non_terminated_node_ips({})[0] + + # A hack to check if the node was terminated when it shouldn't. + autoscaler.provider.mock_nodes[node_id].state = "unterminatable" + lm.update( + node_ip, + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }]) + autoscaler.update() + # this fits on request_resources()! + self.waitForNodes(1) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) + autoscaler.update() + self.waitForNodes(2) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + lm.update( + node_ip, + config["available_node_types"]["def_worker"]["resources"], {}, {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }]) + autoscaler.update() + self.waitForNodes(2) + lm.update( + node_ip, + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }]) + autoscaler.update() + # Still 2 as the second node did not show up a heart beat. + self.waitForNodes(2) + # If node {node_id} was terminated any time then it's state will be set + # to terminated. + assert autoscaler.provider.mock_nodes[ + node_id].state == "unterminatable" + lm.update( + "172.0.0.1", + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }]) + autoscaler.update() + # Now it is 1 because it showed up in last used (heart beat). + # The remaining one is 127.0.0.1. + self.waitForNodes(1) + + def testRequestResourcesRaceConditionsLong(self): + """Test request_resources(), race conditions & demands/min_workers. + + Tests when request_resources() is called simultaneously with resource + demands and min_workers constraint in multiple orders upscaling and + downscaling. + """ + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["max_workers"] = 4 + config["idle_timeout_minutes"] = 0 + config["available_node_types"] = { + "empty_node": { + "node_config": {}, + "resources": { + "CPU": 2 + }, + "max_workers": 1 + }, + "def_worker": { + "node_config": {}, + "resources": { + "CPU": 2, + "GPU": 1, + "WORKER": 1 + }, + "max_workers": 3, + "min_workers": 1 + } + } + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.update() + # 1 min worker for both min_worker and request_resources() + self.waitForNodes(1) + non_terminated_nodes = autoscaler.provider.non_terminated_nodes({}) + assert len(non_terminated_nodes) == 1 + node_id = non_terminated_nodes[0] + node_ip = autoscaler.provider.non_terminated_node_ips({})[0] + + # A hack to check if the node was terminated when it shouldn't. + autoscaler.provider.mock_nodes[node_id].state = "unterminatable" + lm.update( + node_ip, + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }]) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) + autoscaler.update() + # 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total + self.waitForNodes(2) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.update() + # Still 2 because the second one is not connected and hence + # request_resources occupies the connected node. + self.waitForNodes(2) + autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3) + lm.update( + node_ip, + config["available_node_types"]["def_worker"]["resources"], {}, {}, + waiting_bundles=[{ + "CPU": 0.2, + "WORKER": 1.0 + }] * 3) + autoscaler.update() + self.waitForNodes(3) + autoscaler.request_resources([]) + + lm.update("172.0.0.1", + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], + {}) + lm.update("172.0.0.2", + config["available_node_types"]["def_worker"]["resources"], + config["available_node_types"]["def_worker"]["resources"], + {}) + lm.update(node_ip, + config["available_node_types"]["def_worker"]["resources"], + {}, {}) + autoscaler.update() + self.waitForNodes(1) + # If node {node_id} was terminated any time then it's state will be set + # to terminated. + assert autoscaler.provider.mock_nodes[ + node_id].state == "unterminatable" + + def testRequestResourcesRaceConditionWithMinWorker(self): + """Test request_resources() with min_workers. + + Tests when request_resources() is called simultaneously with adding + min_workers constraint. + """ + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"] = { + "empty_node": { + "node_config": {}, + "resources": { + "CPU": 2 + }, + "max_workers": 1 + }, + "def_worker": { + "node_config": {}, + "resources": { + "CPU": 2, + "GPU": 1, + "WORKER": 1 + }, + "max_workers": 3, + "min_workers": 1 + } + } + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2) + autoscaler.update() + # 2 min worker for both min_worker and request_resources(), not 3. + self.waitForNodes(2) + + def testRequestResourcesRaceConditionWithResourceDemands(self): + """Test request_resources() with resource_demands. + + Tests when request_resources() is called simultaneously with resource + demands in multiple orders. + """ + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"].update({ + "empty_node": { + "node_config": {}, + "resources": { + "CPU": 2, + "GPU": 1 + }, + "max_workers": 1 + }, + "def_worker": { + "node_config": {}, + "resources": { + "CPU": 2, + "GPU": 1, + "WORKER": 1 + }, + "max_workers": 3, + } + }) + + config_path = self.write_config(config) + self.provider = MockProvider() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + + runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + lm.update( + "127.0.0.0", { + "CPU": 2, + "GPU": 1 + }, {"CPU": 2}, {}, + waiting_bundles=[{ + "CPU": 2 + }]) + autoscaler.request_resources([{"CPU": 2, "GPU": 1}] * 2) + autoscaler.update() + # 1 head, 1 worker. + self.waitForNodes(2) + lm.update( + "127.0.0.0", { + "CPU": 2, + "GPU": 1 + }, {"CPU": 2}, {}, + waiting_bundles=[{ + "CPU": 2 + }]) + # make sure it stays consistent. + for _ in range(10): + autoscaler.update() + self.waitForNodes(2) + if __name__ == "__main__": import sys