Make binbacking prioritize nodes better (#19212)

* Make binbacking prioritize nodes better

Make binpacking prefer nodes that match multiple
resource types.

* spelling

* order demands when binpacking, starting from complex ones

* add stability to resource demand ordering

* lint

* logging

* add comments

* +comment

* use set
This commit is contained in:
Sasha Sobol 2021-10-10 11:56:47 -07:00 committed by GitHub
parent bd2d2079d2
commit e8d1fc36cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 35 deletions

View file

@ -764,7 +764,11 @@ def _utilization_score(node_resources: ResourceDict,
return None
fittable = []
resource_types = set()
for r in resources:
for k, v in r.items():
if v > 0:
resource_types.add(k)
if _fits(remaining, r):
fittable.append(r)
_inplace_subtract(remaining, r)
@ -772,12 +776,15 @@ def _utilization_score(node_resources: ResourceDict,
return None
util_by_resources = []
num_matching_resource_types = 0
for k, v in node_resources.items():
# Don't divide by zero.
if v < 1:
# Could test v == 0 on the nose, but v < 1 feels safer.
# (Note that node resources are integers.)
continue
if k in resource_types:
num_matching_resource_types += 1
util = (v - remaining[k]) / v
util_by_resources.append(v * (util**3))
@ -785,9 +792,11 @@ def _utilization_score(node_resources: ResourceDict,
if not util_by_resources:
return None
# Prioritize using all resources first, then prioritize overall balance
# Prioritize matching multiple resource types first, then prioritize
# using all resources, then prioritize overall balance
# of multiple resources.
return (min(util_by_resources), np.mean(util_by_resources))
return (num_matching_resource_types, min(util_by_resources),
np.mean(util_by_resources))
def get_bin_pack_residual(node_resources: List[ResourceDict],
@ -818,7 +827,16 @@ def get_bin_pack_residual(node_resources: List[ResourceDict],
nodes = copy.deepcopy(node_resources)
# List of nodes that cannot be used again due to strict spread.
used = []
for demand in resource_demands:
# We order the resource demands in the following way:
# More complex demands first.
# Break ties: heavier demands first.
# Break ties: lexicographically (to ensure stable ordering).
for demand in sorted(
resource_demands,
key=lambda demand: (len(demand.values()),
sum(demand.values()),
sorted(demand.items())),
reverse=True):
found = False
node = None
for i in range(len(nodes)):

View file

@ -469,7 +469,7 @@ def test_stdout_log_stream(ray_start_regular_shared):
time.sleep(1)
print_on_stderr_and_stdout.remote("Hello world")
time.sleep(1)
assert len(log_msgs) == 2
assert len(log_msgs) == 2, log_msgs
assert all((msg.find("Hello world") for msg in log_msgs))

View file

@ -46,24 +46,52 @@ def get_nodes_for(*a, **kw):
def test_util_score():
assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None
assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (0.5, 0.5)
assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (1, 0.5, 0.5)
assert _utilization_score({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == \
(0.5, 0.5)
assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (2, 2)
assert _utilization_score({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == (2, 2)
assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (0, 1)
assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (64, 64)
assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (8, 8)
(1, 0.5, 0.5)
assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (1, 2, 2)
assert _utilization_score({
"GPU": 2
}, [{
"GPU": 1
}, {
"GPU": 1
}]) == (1, 2, 2)
assert _utilization_score({
"GPU": 1
}, [{
"GPU": 1,
"CPU": 1
}, {
"GPU": 1
}]) == (1, 1, 1)
assert _utilization_score({
"GPU": 1,
"CPU": 1
}, [{
"GPU": 1,
"CPU": 1
}, {
"GPU": 1
}]) == (2, 1, 1)
assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (1, 0, 1)
assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (1, 64, 64)
assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (1, 8, 8)
assert _utilization_score({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == \
(8, 8)
(1, 8, 8)
def test_gpu_node_util_score():
# Avoid scheduling CPU tasks on GPU node.
assert _utilization_score({"GPU": 1, "CPU": 1}, [{"CPU": 1}]) is None
assert _utilization_score({"GPU": 1, "CPU": 1}, [{"CPU": 1, "GPU": 1}]) \
== (1.0, 1.0)
assert _utilization_score({"GPU": 1, "CPU": 1}, [{"GPU": 1}]) == (0.0, 0.5)
== (2, 1.0, 1.0)
assert _utilization_score({
"GPU": 1,
"CPU": 1
}, [{
"GPU": 1
}]) == (1, 0.0, 0.5)
def test_zero_resource():
@ -197,7 +225,7 @@ def test_get_nodes_packing_heuristic():
}] * 8) + ([{
"CPU": 1
}] * 64)) == {
"m4.16xlarge": 1,
"m4.4xlarge": 2,
"p2.8xlarge": 1
}
@ -215,6 +243,47 @@ def test_get_nodes_packing_heuristic():
}
def test_node_packing_gpu_cpu_bundles():
TYPES = {
"cpu": {
"resources": {
"CPU": 16,
},
"max_workers": 10,
},
"gpu": {
"resources": {
"CPU": 16,
"GPU": 1,
},
"max_workers": 10,
},
}
nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{
"CPU": 1
}] * 30 + [{
"GPU": 1,
"CPU": 1
}]))
assert nodes == {"gpu": 1, "cpu": 1}
nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{
"GPU": 1,
"CPU": 1
}] + [{
"CPU": 1
}] * 30))
assert nodes == {"gpu": 1, "cpu": 1}
nodes = get_nodes_for(TYPES, {}, "cpu", 9999, ([{
"GPU": 1,
"CPU": 1
}] + [{
"CPU": 1
}] * 15))
assert nodes == {"gpu": 1}
def test_gpu_node_avoid_cpu_task():
types = {
"cpu": {
@ -630,13 +699,8 @@ def test_backlog_queue_impact_on_binpacking_time():
"CPU": 1
}])
# If not for the max launch concurrency the next assert should be:
# {'m4.large': 4, 'm4.4xlarge': 2, 'm4.16xlarge': 15, 'p2.8xlarge': 125}.
assert to_launch == {
"m4.large": 4,
"m4.4xlarge": 2,
"m4.16xlarge": 5,
"p2.8xlarge": 5
}
# {'m4.16xlarge': 1, 'p2.8xlarge': 125, 'p2.xlarge': 1}
assert to_launch == {"m4.16xlarge": 1, "p2.8xlarge": 5, "p2.xlarge": 1}
# Check the time it takes when there are 100 nodes available and the demand
# requires another 75 nodes.
@ -1667,7 +1731,7 @@ class AutoscalingTest(unittest.TestCase):
assert cnt == 2
def testScaleUpIgnoreUsed(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config["target_utilization_fraction"] = 1.0
@ -1708,7 +1772,7 @@ class AutoscalingTest(unittest.TestCase):
assert self.provider.mock_nodes[1].node_type == "p2.xlarge"
def testRequestBundlesAccountsForHeadNode(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["head_node_type"] = "p2.8xlarge"
config["min_workers"] = 0
config["max_workers"] = 50
@ -1747,7 +1811,7 @@ class AutoscalingTest(unittest.TestCase):
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
def testRequestBundles(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
@ -1784,7 +1848,7 @@ class AutoscalingTest(unittest.TestCase):
assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"
def testResourcePassing(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
@ -1815,7 +1879,7 @@ class AutoscalingTest(unittest.TestCase):
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
# TODO (Alex): Autoscaler creates the node during one update then
# starts the updater in the enxt update. The sleep is largely
# starts the updater in the next update. The sleep is largely
# unavoidable because the updater runs in its own thread and we have no
# good way of ensuring that the commands are sent in time.
autoscaler.update()
@ -1830,7 +1894,7 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("172.0.0.2", "\"GPU\":8")
def testScaleUpLoadMetrics(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
@ -1861,16 +1925,15 @@ class AutoscalingTest(unittest.TestCase):
"CPU": 16
}])
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
nodes = {
self.provider.mock_nodes[1].node_type,
self.provider.mock_nodes[2].node_type
}
assert nodes == {"p2.xlarge", "m4.4xlarge"}
assert nodes == {"p2.xlarge"}
def testCommandPassing(self):
t = "custom"
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["p2.8xlarge"][
"worker_setup_commands"] = ["new_worker_setup_command"]
config["available_node_types"]["p2.xlarge"][
@ -1926,7 +1989,7 @@ class AutoscalingTest(unittest.TestCase):
"init_cmd")
def testDockerWorkers(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["p2.8xlarge"]["docker"] = {
"worker_image": "p2.8x_image:latest",
"worker_run_options": ["p2.8x-run-options"]
@ -1984,7 +2047,7 @@ class AutoscalingTest(unittest.TestCase):
}])
autoscaler.update()
self.waitForNodes(5)
assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"
assert self.provider.mock_nodes[4].node_type == "m4.large"
autoscaler.update()
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
@ -2047,7 +2110,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testEmptyDocker(self):
config = MULTI_WORKER_CLUSTER.copy()
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
del config["docker"]
config["min_workers"] = 0
config["max_workers"] = 10