[autoscaler] Limit max launch concurrency per node type (#11242)

This commit is contained in:
Ameer Haj Ali 2020-10-12 09:45:52 -07:00 committed by GitHub
parent 92a58aabce
commit 06fe690682
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 233 additions and 3 deletions

View file

@ -216,7 +216,6 @@ class StandardAutoscaler:
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)

View file

@ -14,7 +14,8 @@ import collections
from typing import List, Dict
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \
STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS
logger = logging.getLogger(__name__)
@ -88,9 +89,83 @@ class ResourceDemandScheduler:
if nodes_to_add > 0:
total_nodes_to_add[node_type] = nodes_to_add
# Limit the number of concurrent launches
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
total_nodes_to_add, nodes, pending_nodes)
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
def _get_concurrent_resource_demand_to_launch(
self, to_launch: Dict[NodeType, int],
non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int]
) -> Dict[NodeType, int]:
"""Updates the max concurrent resources to launch for each node type.
Given the current nodes that should be launched, the non terminated
nodes (running and pending) and the pending to be launched nodes. This
method calculates the maximum number of nodes to launch concurrently
for each node type as follows:
1) Calculates the running nodes.
2) Calculates the pending nodes and gets the launching nodes.
3) Limits the total number of pending + currently-launching +
to-be-launched nodes to max(5, frac * running_nodes[node_type]).
Args:
to_launch: Number of nodes to launch based on resource demand.
non_terminated_nodes: Non terminated nodes (pending/running).
pending_launches_nodes: Nodes that are in the launch queue.
Returns:
Dict[NodeType, int]: Maximum number of nodes to launch for each
node type.
"""
# TODO(ameer): Consider making frac configurable.
frac = 1
updated_nodes_to_launch = {}
running_nodes, pending_nodes = \
self._separate_running_and_pending_nodes(
non_terminated_nodes
)
for node_type in to_launch:
# Enforce here max allowed pending nodes to be frac of total
# running nodes.
max_allowed_pending_nodes = max(
5, int(frac * running_nodes[node_type]))
total_pending_nodes = pending_launches_nodes.get(
node_type, 0) + pending_nodes[node_type]
# Allow more nodes if this is to respect min_workers constraint.
nodes_to_add = max(
max_allowed_pending_nodes - total_pending_nodes,
self.node_types[node_type].get("min_workers", 0) -
total_pending_nodes - running_nodes[node_type])
if nodes_to_add > 0:
updated_nodes_to_launch[node_type] = min(
nodes_to_add, to_launch[node_type])
return updated_nodes_to_launch
def _separate_running_and_pending_nodes(
self,
non_terminated_nodes: List[NodeID],
) -> (Dict[NodeType, int], Dict[NodeType, int]):
"""Receives non terminated nodes & splits them to pending & running."""
running_nodes = collections.defaultdict(int)
pending_nodes = collections.defaultdict(int)
for node_id in non_terminated_nodes:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
status = tags.get(TAG_RAY_NODE_STATUS)
if status == STATUS_UP_TO_DATE:
running_nodes[node_type] += 1
elif status != STATUS_UPDATE_FAILED:
pending_nodes[node_type] += 1
return running_nodes, pending_nodes
def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]

View file

@ -18,7 +18,8 @@ from ray.autoscaler._private.resource_demand_scheduler import \
_utilization_score, _add_min_workers_nodes, \
get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \
NODE_KIND_WORKER
NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UNINITIALIZED
from ray.test_utils import same_elements
from time import sleep
@ -321,6 +322,161 @@ def test_calculate_node_resources():
assert to_launch == {"p2.8xlarge": 1}
def test_get_concurrent_resource_demand_to_launch():
node_types = copy.deepcopy(TYPES_A)
node_types["p2.8xlarge"]["min_workers"] = 1
node_types["p2.8xlarge"]["max_workers"] = 10
node_types["m4.large"]["min_workers"] = 2
node_types["m4.large"]["max_workers"] = 100
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, node_types, 200)
# Sanity check.
assert len(provider.non_terminated_nodes({})) == 0
# Sanity check.
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch({}, [], {})
assert updated_to_launch == {}
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 1)
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 2)
# All nodes so far are pending/launching here.
to_launch = {"p2.8xlarge": 4, "m4.large": 40}
non_terminated_nodes = provider.non_terminated_nodes({})
pending_launches_nodes = {"p2.8xlarge": 1, "m4.large": 1}
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note: we have 2 pending/launching gpus, 3 pending/launching cpus,
# 0 running gpu, and 0 running cpus.
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2}
# This starts the min workers only, so we have no more pending workers.
# The workers here are either running or in pending_launches_nodes,
# which is "launching".
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note that here we have 1 launching gpu, 1 launching cpu,
# 1 running gpu, and 2 running cpus.
assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4}
# Launch the nodes. Note, after create_node the node is pending.
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 5)
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 5)
# Continue scaling.
non_terminated_nodes = provider.non_terminated_nodes({})
to_launch = {"m4.large": 36} # No more gpus are necessary
pending_launches_nodes = {} # No pending launches
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note: we have 5 pending cpus. So we are not allowed to start any.
# Still only 2 running cpus.
assert updated_to_launch == {}
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note: that here we have 7 running cpus and nothing pending/launching.
assert updated_to_launch == {"m4.large": 7}
# Launch the nodes. Note, after create_node the node is pending.
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 7)
# Continue scaling.
non_terminated_nodes = provider.non_terminated_nodes({})
to_launch = {"m4.large": 29}
pending_launches_nodes = {"m4.large": 1}
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note: we have 8 pending/launching cpus and only 7 running.
# So we should not launch anything (8 < 7).
assert updated_to_launch == {}
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# Note: that here we have 14 running cpus and 1 launching.
assert updated_to_launch == {"m4.large": 13}
def test_get_nodes_to_launch_max_launch_concurrency():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 4
new_types["p2.8xlarge"]["max_workers"] = 40
scheduler = ResourceDemandScheduler(provider, new_types, 30)
to_launch = scheduler.get_nodes_to_launch([], {}, [], [])
# Respects min_workers despite concurrency limitation.
assert to_launch == {"p2.8xlarge": 4}
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 1)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
utilizations = {ip: {"GPU": 8} for ip in ips}
launching_nodes = {"p2.8xlarge": 1}
# requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running}
demands = [{"GPU": 8}] * (len(utilizations) + 40)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations)
# Enforces max launch to 5 when < 5 running. 2 are pending/launching.
assert to_launch == {"p2.8xlarge": 3}
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 8)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
utilizations = {ip: {"GPU": 8} for ip in ips}
launching_nodes = {"p2.8xlarge": 1}
# requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running}
demands = [{"GPU": 8}] * (len(utilizations) + 15)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations)
# We are allowed to launch up to 8 more since 8 are running.
# We already have 2 pending/launching, so only 6 remain.
assert to_launch == {"p2.8xlarge": 6}
class LoadMetricsTest(unittest.TestCase):
def testResourceDemandVector(self):
lm = LoadMetrics()