From e8d1fc36cb0d6b37248f047daef42119e5a86fea Mon Sep 17 00:00:00 2001 From: Sasha Sobol Date: Sun, 10 Oct 2021 11:56:47 -0700 Subject: [PATCH] Make binbacking prioritize nodes better (#19212) * Make binbacking prioritize nodes better Make binpacking prefer nodes that match multiple resource types. * spelling * order demands when binpacking, starting from complex ones * add stability to resource demand ordering * lint * logging * add comments * +comment * use set --- .../_private/resource_demand_scheduler.py | 24 +++- python/ray/tests/test_client.py | 2 +- .../tests/test_resource_demand_scheduler.py | 125 +++++++++++++----- 3 files changed, 116 insertions(+), 35 deletions(-) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 517f49f63..bc3fe5925 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -764,7 +764,11 @@ def _utilization_score(node_resources: ResourceDict, return None fittable = [] + resource_types = set() for r in resources: + for k, v in r.items(): + if v > 0: + resource_types.add(k) if _fits(remaining, r): fittable.append(r) _inplace_subtract(remaining, r) @@ -772,12 +776,15 @@ def _utilization_score(node_resources: ResourceDict, return None util_by_resources = [] + num_matching_resource_types = 0 for k, v in node_resources.items(): # Don't divide by zero. if v < 1: # Could test v == 0 on the nose, but v < 1 feels safer. # (Note that node resources are integers.) continue + if k in resource_types: + num_matching_resource_types += 1 util = (v - remaining[k]) / v util_by_resources.append(v * (util**3)) @@ -785,9 +792,11 @@ def _utilization_score(node_resources: ResourceDict, if not util_by_resources: return None - # Prioritize using all resources first, then prioritize overall balance + # Prioritize matching multiple resource types first, then prioritize + # using all resources, then prioritize overall balance # of multiple resources. - return (min(util_by_resources), np.mean(util_by_resources)) + return (num_matching_resource_types, min(util_by_resources), + np.mean(util_by_resources)) def get_bin_pack_residual(node_resources: List[ResourceDict], @@ -818,7 +827,16 @@ def get_bin_pack_residual(node_resources: List[ResourceDict], nodes = copy.deepcopy(node_resources) # List of nodes that cannot be used again due to strict spread. used = [] - for demand in resource_demands: + # We order the resource demands in the following way: + # More complex demands first. + # Break ties: heavier demands first. + # Break ties: lexicographically (to ensure stable ordering). + for demand in sorted( + resource_demands, + key=lambda demand: (len(demand.values()), + sum(demand.values()), + sorted(demand.items())), + reverse=True): found = False node = None for i in range(len(nodes)): diff --git a/python/ray/tests/test_client.py b/python/ray/tests/test_client.py index da4d53853..4ab08dc95 100644 --- a/python/ray/tests/test_client.py +++ b/python/ray/tests/test_client.py @@ -469,7 +469,7 @@ def test_stdout_log_stream(ray_start_regular_shared): time.sleep(1) print_on_stderr_and_stdout.remote("Hello world") time.sleep(1) - assert len(log_msgs) == 2 + assert len(log_msgs) == 2, log_msgs assert all((msg.find("Hello world") for msg in log_msgs)) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index eae1fae08..eb1260db3 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -46,24 +46,52 @@ def get_nodes_for(*a, **kw): def test_util_score(): assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None - assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (0.5, 0.5) + assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (1, 0.5, 0.5) assert _utilization_score({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == \ - (0.5, 0.5) - assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (2, 2) - assert _utilization_score({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == (2, 2) - assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (0, 1) - assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (64, 64) - assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (8, 8) + (1, 0.5, 0.5) + assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (1, 2, 2) + assert _utilization_score({ + "GPU": 2 + }, [{ + "GPU": 1 + }, { + "GPU": 1 + }]) == (1, 2, 2) + assert _utilization_score({ + "GPU": 1 + }, [{ + "GPU": 1, + "CPU": 1 + }, { + "GPU": 1 + }]) == (1, 1, 1) + assert _utilization_score({ + "GPU": 1, + "CPU": 1 + }, [{ + "GPU": 1, + "CPU": 1 + }, { + "GPU": 1 + }]) == (2, 1, 1) + assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (1, 0, 1) + assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (1, 64, 64) + assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (1, 8, 8) assert _utilization_score({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == \ - (8, 8) + (1, 8, 8) def test_gpu_node_util_score(): # Avoid scheduling CPU tasks on GPU node. assert _utilization_score({"GPU": 1, "CPU": 1}, [{"CPU": 1}]) is None assert _utilization_score({"GPU": 1, "CPU": 1}, [{"CPU": 1, "GPU": 1}]) \ - == (1.0, 1.0) - assert _utilization_score({"GPU": 1, "CPU": 1}, [{"GPU": 1}]) == (0.0, 0.5) + == (2, 1.0, 1.0) + assert _utilization_score({ + "GPU": 1, + "CPU": 1 + }, [{ + "GPU": 1 + }]) == (1, 0.0, 0.5) def test_zero_resource(): @@ -197,7 +225,7 @@ def test_get_nodes_packing_heuristic(): }] * 8) + ([{ "CPU": 1 }] * 64)) == { - "m4.16xlarge": 1, + "m4.4xlarge": 2, "p2.8xlarge": 1 } @@ -215,6 +243,47 @@ def test_get_nodes_packing_heuristic(): } +def test_node_packing_gpu_cpu_bundles(): + TYPES = { + "cpu": { + "resources": { + "CPU": 16, + }, + "max_workers": 10, + }, + "gpu": { + "resources": { + "CPU": 16, + "GPU": 1, + }, + "max_workers": 10, + }, + } + nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{ + "CPU": 1 + }] * 30 + [{ + "GPU": 1, + "CPU": 1 + }])) + assert nodes == {"gpu": 1, "cpu": 1} + + nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{ + "GPU": 1, + "CPU": 1 + }] + [{ + "CPU": 1 + }] * 30)) + assert nodes == {"gpu": 1, "cpu": 1} + + nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{ + "GPU": 1, + "CPU": 1 + }] + [{ + "CPU": 1 + }] * 15)) + assert nodes == {"gpu": 1} + + def test_gpu_node_avoid_cpu_task(): types = { "cpu": { @@ -630,13 +699,8 @@ def test_backlog_queue_impact_on_binpacking_time(): "CPU": 1 }]) # If not for the max launch concurrency the next assert should be: - # {'m4.large': 4, 'm4.4xlarge': 2, 'm4.16xlarge': 15, 'p2.8xlarge': 125}. - assert to_launch == { - "m4.large": 4, - "m4.4xlarge": 2, - "m4.16xlarge": 5, - "p2.8xlarge": 5 - } + # {'m4.16xlarge': 1, 'p2.8xlarge': 125, 'p2.xlarge': 1} + assert to_launch == {"m4.16xlarge": 1, "p2.8xlarge": 5, "p2.xlarge": 1} # Check the time it takes when there are 100 nodes available and the demand # requires another 75 nodes. @@ -1667,7 +1731,7 @@ class AutoscalingTest(unittest.TestCase): assert cnt == 2 def testScaleUpIgnoreUsed(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) # Commenting out this line causes the test case to fail?!?! config["min_workers"] = 0 config["target_utilization_fraction"] = 1.0 @@ -1708,7 +1772,7 @@ class AutoscalingTest(unittest.TestCase): assert self.provider.mock_nodes[1].node_type == "p2.xlarge" def testRequestBundlesAccountsForHeadNode(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["head_node_type"] = "p2.8xlarge" config["min_workers"] = 0 config["max_workers"] = 50 @@ -1747,7 +1811,7 @@ class AutoscalingTest(unittest.TestCase): assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" def testRequestBundles(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["min_workers"] = 0 config["max_workers"] = 50 config_path = self.write_config(config) @@ -1784,7 +1848,7 @@ class AutoscalingTest(unittest.TestCase): assert self.provider.mock_nodes[4].node_type == "m4.16xlarge" def testResourcePassing(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["min_workers"] = 0 config["max_workers"] = 50 config_path = self.write_config(config) @@ -1815,7 +1879,7 @@ class AutoscalingTest(unittest.TestCase): assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" # TODO (Alex): Autoscaler creates the node during one update then - # starts the updater in the enxt update. The sleep is largely + # starts the updater in the next update. The sleep is largely # unavoidable because the updater runs in its own thread and we have no # good way of ensuring that the commands are sent in time. autoscaler.update() @@ -1830,7 +1894,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.2", "\"GPU\":8") def testScaleUpLoadMetrics(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["min_workers"] = 0 config["max_workers"] = 50 config_path = self.write_config(config) @@ -1861,16 +1925,15 @@ class AutoscalingTest(unittest.TestCase): "CPU": 16 }]) autoscaler.update() - self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) nodes = { self.provider.mock_nodes[1].node_type, - self.provider.mock_nodes[2].node_type } - assert nodes == {"p2.xlarge", "m4.4xlarge"} + assert nodes == {"p2.xlarge"} def testCommandPassing(self): t = "custom" - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["available_node_types"]["p2.8xlarge"][ "worker_setup_commands"] = ["new_worker_setup_command"] config["available_node_types"]["p2.xlarge"][ @@ -1926,7 +1989,7 @@ class AutoscalingTest(unittest.TestCase): "init_cmd") def testDockerWorkers(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["available_node_types"]["p2.8xlarge"]["docker"] = { "worker_image": "p2.8x_image:latest", "worker_run_options": ["p2.8x-run-options"] @@ -1984,7 +2047,7 @@ class AutoscalingTest(unittest.TestCase): }]) autoscaler.update() self.waitForNodes(5) - assert self.provider.mock_nodes[4].node_type == "m4.16xlarge" + assert self.provider.mock_nodes[4].node_type == "m4.large" autoscaler.update() sleep(0.1) runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, @@ -2047,7 +2110,7 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testEmptyDocker(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) del config["docker"] config["min_workers"] = 0 config["max_workers"] = 10