[hotfix][autoscaler] Request resources refactor2 (#12661)

* prepare for head node

* move command runner interface outside _private

* remove space

* Eric

* flake

* min_workers in multi node type

* fixing edge cases

* eric not idle

* fix target_workers to consider min_workers of node types

* idle timeout

* minor

* minor fix

* test

* lint

* eric v2

* eric 3

* min_workers constraint before bin packing

* Update resource_demand_scheduler.py

* Revert "Update resource_demand_scheduler.py"

This reverts commit 818a63a2c86d8437b3ef21c5035d701c1d1127b5.

* reducing diff

* make get_nodes_to_launch return a dict

* merge

* weird merge fix

* auto fill instance types for AWS

* Alex/Eric

* Update doc/source/cluster/autoscaling.rst

* merge autofill and input from user

* logger.exception

* make the yaml use the default autofill

* docs Eric

* remove test_autoscaler_yaml from windows tests

* lets try changing the test a bit

* return test

* lets see

* edward

* Limit max launch concurrency

* commenting frac TODO

* move to resource demand scheduler

* use STATUS UP TO DATE

* Eric

* make logger of gc freed refs debug instead of info

* add cluster name to docker mount prefix directory

* grrR

* fix tests

* moving docker directory to sdk

* move the import to prevent circular dependency

* smallf fix

* ian

* fix max launch concurrency bug to assume failing nodes as pending and consider only load_metric's connected nodes as running

* small fix

* request_resources -> min workers

* test fixes

* add race condition tests

* Eric

* fixes

* semi final

* semi final

* lint

* lint

Co-authored-by: Ameer Haj Ali <ameerhajali@ameers-mbp.lan>
Co-authored-by: Alex Wu <alex@anyscale.io>
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: Ameer Haj Ali <ameerhajali@Ameers-MacBook-Pro.local>
This commit is contained in:
Ameer Haj Ali 2020-12-09 04:41:30 +02:00 committed by GitHub
parent 343b479ae2
commit a4dbb271bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 595 additions and 91 deletions

View file

@ -13,16 +13,17 @@ import collections
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED)
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER,
NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
from ray.autoscaler._private.providers import _get_node_provider
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler._private.resource_demand_scheduler import \
ResourceDemandScheduler, NodeType, NodeID
get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \
ResourceDict
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
@ -164,27 +165,35 @@ class StandardAutoscaler:
last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])
nodes_to_terminate = []
nodes_to_terminate: Dict[NodeID, bool] = []
node_type_counts = collections.defaultdict(int)
# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
# might keep a node that should be terminated.
for node_id in self._sort_based_on_last_used(nodes, last_used):
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
# Don't terminate nodes needed by request_resources()
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
if self.resource_demand_vector:
nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate(
sorted_node_ids)
for node_id in sorted_node_ids:
# Make sure to not kill idle node types if the number of workers
# of that type is lower/equal to the min_workers of that type.
if self._keep_min_worker_of_node_type(
node_id,
node_type_counts) and self.launch_config_ok(node_id):
# of that type is lower/equal to the min_workers of that type
# or it is needed for request_resources().
if (self._keep_min_worker_of_node_type(node_id, node_type_counts)
or not nodes_allowed_to_terminate.get(
node_id, True)) and self.launch_config_ok(node_id):
continue
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:
logger.info("StandardAutoscaler: "
"{}: Terminating idle node".format(node_id))
"{}: Terminating idle node.".format(node_id))
nodes_to_terminate.append(node_id)
elif not self.launch_config_ok(node_id):
logger.info("StandardAutoscaler: "
"{}: Terminating outdated node".format(node_id))
"{}: Terminating outdated node.".format(node_id))
nodes_to_terminate.append(node_id)
if nodes_to_terminate:
@ -198,7 +207,7 @@ class StandardAutoscaler:
len(nodes_to_terminate)) > self.config["max_workers"] and nodes:
to_terminate = nodes.pop()
logger.info("StandardAutoscaler: "
"{}: Terminating unneeded node".format(to_terminate))
"{}: Terminating unneeded node.".format(to_terminate))
nodes_to_terminate.append(to_terminate)
if nodes_to_terminate:
@ -226,15 +235,23 @@ class StandardAutoscaler:
if not updater.is_alive():
completed.append(node_id)
if completed:
nodes_to_terminate: List[NodeID] = []
for node_id in completed:
if self.updaters[node_id].exitcode == 0:
self.num_successful_updates[node_id] += 1
# Mark the node as active to prevent the node recovery
# logic immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(
self.provider.internal_ip(node_id))
else:
logger.error(f"StandardAutoscaler: {node_id}: Terminating "
"failed to setup/initialize node.")
nodes_to_terminate.append(node_id)
self.num_failed_updates[node_id] += 1
del self.updaters[node_id]
# Mark the node as active to prevent the node recovery logic
# immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(self.provider.internal_ip(node_id))
if nodes_to_terminate:
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()
self.log_info_string(nodes)
@ -266,14 +283,16 @@ class StandardAutoscaler:
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
The first item in the return list is the least recently used.
The first item in the return list is the most recently used.
"""
updated_last_used = copy.deepcopy(last_used)
now = time.time()
# Add the unconnected nodes as the least recently used (the end of
# list). This prioritizes connected nodes.
least_recently_used = -1
for node_id in nodes:
node_ip = self.provider.internal_ip(node_id)
if node_ip not in updated_last_used:
updated_last_used[node_ip] = now
updated_last_used[node_ip] = least_recently_used
def last_time_used(node_id: NodeID):
node_ip = self.provider.internal_ip(node_id)
@ -281,9 +300,86 @@ class StandardAutoscaler:
return sorted(nodes, key=last_time_used, reverse=True)
def _keep_min_worker_of_node_type(self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]):
"""Returns if workers of node_type should be terminated.
def _get_nodes_allowed_to_terminate(
self, sorted_node_ids: List[NodeID]) -> Dict[NodeID, bool]:
# TODO(ameer): try merging this with resource_demand_scheduler
# code responsible for adding nodes for request_resources().
"""Returns the nodes allowed to terminate for request_resources().
Args:
sorted_node_ids: the node ids sorted based on last used (LRU last).
Returns:
nodes_allowed_to_terminate: whether the node id is allowed to
terminate or not.
"""
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
head_node_resources: ResourceDict = copy.deepcopy(
self.available_node_types[self.config["head_node_type"]][
"resources"])
if not head_node_resources:
# Legacy yaml might include {} in the resources field.
# TODO(ameer): this is somewhat duplicated in
# resource_demand_scheduler.py.
head_id: List[NodeID] = self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
if head_id:
head_ip = self.provider.internal_ip(head_id[0])
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
head_node_resources = static_nodes[head_ip]
else:
head_node_resources = {}
max_node_resources: List[ResourceDict] = [head_node_resources]
resource_demand_vector_worker_node_ids = []
# Get max resources on all the non terminated nodes.
for node_id in sorted_node_ids:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_resources: ResourceDict = copy.deepcopy(
self.available_node_types[node_type]["resources"])
if not node_resources:
# Legacy yaml might include {} in the resources field.
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
node_ip = self.provider.internal_ip(node_id)
node_resources = static_nodes.get(node_ip, {})
max_node_resources.append(node_resources)
resource_demand_vector_worker_node_ids.append(node_id)
# Since it is sorted based on last used, we "keep" nodes that are
# most recently used when we binpack. We assume get_bin_pack_residual
# is following the given order here.
used_resource_requests: List[ResourceDict]
_, used_resource_requests = \
get_bin_pack_residual(max_node_resources,
self.resource_demand_vector)
# Remove the first entry (the head node).
max_node_resources.pop(0)
# Remove the first entry (the head node).
used_resource_requests.pop(0)
for i, node_id in enumerate(resource_demand_vector_worker_node_ids):
if used_resource_requests[i] == max_node_resources[i] \
and max_node_resources[i]:
# No resources of the node were needed for request_resources().
# max_node_resources[i] is an empty dict for legacy yamls
# before the node is connected.
nodes_allowed_to_terminate[node_id] = True
else:
nodes_allowed_to_terminate[node_id] = False
return nodes_allowed_to_terminate
def _keep_min_worker_of_node_type(
self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]) -> bool:
"""Returns if workers of node_type can be terminated.
The worker cannot be terminated to respect min_workers constraint.
Receives the counters of running nodes so far and determines if idle
node_id should be terminated or not. It also updates the counters
@ -293,7 +389,7 @@ class StandardAutoscaler:
node_type_counts(Dict[NodeType, int]): The non_terminated node
types counted so far.
Returns:
bool: if workers of node_types should be terminated or not.
bool: if workers of node_types can be terminated or not.
"""
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:

View file

@ -135,24 +135,6 @@ class ResourceDemandScheduler:
this set of resources. This differs from resources_demands in
that we don't take into account existing usage.
"""
# If the user is using request_resources() API, calculate the remaining
# delta resources required to meet their requested cluster size.
if ensure_min_cluster_size is not None:
used_resources = []
for ip, max_res in max_resources_by_ip.items():
res = copy.deepcopy(max_res)
_inplace_subtract(res, unused_resources_by_ip.get(ip, {}))
used_resources.append(res)
# Example: user requests 1000 CPUs, but the cluster is currently
# 500 CPUs in size with 250 used. Then, the delta is 750 CPUs that
# we need to fit to get the cluster to scale to 1000.
resource_requests, _ = get_bin_pack_residual(
used_resources, ensure_min_cluster_size)
resource_demands += resource_requests
else:
resource_requests = []
if self.is_legacy_yaml():
# When using legacy yaml files we need to infer the head & worker
# node resources from the static node resources from LoadMetrics.
@ -166,9 +148,12 @@ class ResourceDemandScheduler:
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
# Step 2: add nodes to add to satisfy min_workers for each type
node_resources, node_type_counts, min_workers_nodes_to_add = \
(node_resources,
node_type_counts,
adjusted_min_workers) = \
_add_min_workers_nodes(
node_resources, node_type_counts, self.node_types)
node_resources, node_type_counts, self.node_types,
self.max_workers, ensure_min_cluster_size)
# Step 3: add nodes for strict spread groups
logger.info(f"Placement group demands: {pending_placement_groups}")
@ -180,8 +165,16 @@ class ResourceDemandScheduler:
not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
# Need to launch worker nodes to later infer their
# resources.
# We add request_resources() demands here to make sure we launch
# a single worker sometimes even if min_workers = 0 and resource
# demands is empty.
if ensure_min_cluster_size:
request_resources_demands = ensure_min_cluster_size
else:
request_resources_demands = []
return self._legacy_worker_node_to_launch(
nodes, launching_nodes, node_resources, resource_demands)
nodes, launching_nodes, node_resources,
resource_demands + request_resources_demands)
placement_group_nodes_to_add, node_resources, node_type_counts = \
self.reserve_and_allocate_spread(
strict_spreads, node_resources, node_type_counts)
@ -194,20 +187,15 @@ class ResourceDemandScheduler:
logger.info("Unfulfilled demands: {}".format(unfulfilled))
# Add 1 to account for the head node.
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
if resource_requests:
nodes_to_add_based_on_requests = get_nodes_for(
self.node_types, node_type_counts, max_to_add,
resource_requests)
else:
nodes_to_add_based_on_requests = {}
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, max_to_add, unfulfilled)
# Merge nodes to add based on demand and nodes to add based on
# min_workers constraint. We add them because nodes to add based on
# demand was calculated after the min_workers constraint was respected.
total_nodes_to_add = {}
for node_type in self.node_types:
nodes_to_add = (min_workers_nodes_to_add.get(
nodes_to_add = (adjusted_min_workers.get(
node_type, 0) + placement_group_nodes_to_add.get(node_type, 0)
+ nodes_to_add_based_on_demand.get(node_type, 0))
if nodes_to_add > 0:
@ -216,7 +204,7 @@ class ResourceDemandScheduler:
# Limit the number of concurrent launches
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
total_nodes_to_add, unused_resources_by_ip.keys(), nodes,
launching_nodes, nodes_to_add_based_on_requests)
launching_nodes, adjusted_min_workers)
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
@ -294,7 +282,7 @@ class ResourceDemandScheduler:
connected_nodes: List[NodeIP],
non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int],
nodes_to_add_based_on_requests: Dict[NodeType, int],
adjusted_min_workers: Dict[NodeType, int],
) -> Dict[NodeType, int]:
"""Updates the max concurrent resources to launch for each node type.
@ -314,9 +302,10 @@ class ResourceDemandScheduler:
connected_nodes: Running nodes (from LoadMetrics).
non_terminated_nodes: Non terminated nodes (pending/running).
pending_launches_nodes: Nodes that are in the launch queue.
nodes_to_add_based_on_requests: Nodes to launch to satisfy
request_resources(). This overrides the launch limits since the
user is hinting to immediately scale up to this size.
adjusted_min_workers: Nodes to launch to satisfy
min_workers and request_resources(). This overrides the launch
limits since the user is hinting to immediately scale up to
this size.
Returns:
Dict[NodeType, int]: Maximum number of nodes to launch for each
node type.
@ -338,13 +327,9 @@ class ResourceDemandScheduler:
upper_bound = max(
max_allowed_pending_nodes - total_pending_nodes,
# Allow more nodes if this is to respect min_workers.
self.node_types[node_type].get("min_workers", 0) -
total_pending_nodes - running_nodes[node_type],
# Allow more nodes from request_resources API.
nodes_to_add_based_on_requests.get(node_type,
0) - total_pending_nodes)
# Allow more nodes if this is to respect min_workers or
# request_resources().
adjusted_min_workers.get(node_type, 0))
if upper_bound > 0:
updated_nodes_to_launch[node_type] = min(
@ -504,21 +489,26 @@ def _node_type_counts_to_node_resources(
def _add_min_workers_nodes(
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int],
node_types: Dict[NodeType, NodeTypeConfigDict],
node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int,
ensure_min_cluster_size: List[ResourceDict]
) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]):
"""Updates resource demands to respect the min_workers constraint.
"""Updates resource demands to respect the min_workers and
request_resources() constraints.
Args:
node_resources: Resources of exisiting nodes already launched/pending.
node_type_counts: Counts of existing nodes already launched/pending.
node_types: Node types config.
max_workers: global max_workers constaint.
ensure_min_cluster_size: resource demands from request_resources().
Returns:
node_resources: The updated node resources after adding min_workers
constraint per node type.
and request_resources() constraints per node type.
node_type_counts: The updated node counts after adding min_workers
constraint per node type.
total_nodes_to_add: The nodes to add to respect min_workers constraint.
and request_resources() constraints per node type.
total_nodes_to_add_dict: The nodes to add to respect min_workers and
request_resources() constraints.
"""
total_nodes_to_add_dict = {}
for node_type, config in node_types.items():
@ -528,10 +518,41 @@ def _add_min_workers_nodes(
if existing < target:
total_nodes_to_add_dict[node_type] = target - existing
node_type_counts[node_type] = target
available = copy.deepcopy(node_types[node_type]["resources"])
node_resources.extend(
[available] * total_nodes_to_add_dict[node_type])
node_resources.extend([
copy.deepcopy(node_types[node_type]["resources"])
for _ in range(total_nodes_to_add_dict[node_type])
])
if ensure_min_cluster_size:
max_to_add = max_workers + 1 - sum(node_type_counts.values())
max_node_resources = []
# Fit request_resources() on all the resources as if they are idle.
for node_type in node_type_counts:
max_node_resources.extend([
copy.deepcopy(node_types[node_type]["resources"])
for _ in range(node_type_counts[node_type])
])
# Get the unfulfilled to ensure min cluster size.
resource_requests_unfulfilled, _ = get_bin_pack_residual(
max_node_resources, ensure_min_cluster_size)
# Get the nodes to meet the unfulfilled.
nodes_to_add_request_resources = get_nodes_for(
node_types, node_type_counts, max_to_add,
resource_requests_unfulfilled)
# Update the resources, counts and total nodes to add.
for node_type in nodes_to_add_request_resources:
nodes_to_add = nodes_to_add_request_resources.get(node_type, 0)
if nodes_to_add > 0:
node_type_counts[
node_type] = nodes_to_add + node_type_counts.get(
node_type, 0)
node_resources.extend([
copy.deepcopy(node_types[node_type]["resources"])
for _ in range(nodes_to_add)
])
total_nodes_to_add_dict[
node_type] = nodes_to_add + total_nodes_to_add_dict.get(
node_type, 0)
return node_resources, node_type_counts, total_nodes_to_add_dict
@ -623,7 +644,8 @@ def _utilization_score(node_resources: ResourceDict,
def get_bin_pack_residual(node_resources: List[ResourceDict],
resource_demands: List[ResourceDict],
strict_spread: bool = False) -> List[ResourceDict]:
strict_spread: bool = False
) -> (List[ResourceDict], List[ResourceDict]):
"""Return a subset of resource_demands that cannot fit in the cluster.
TODO(ekl): this currently does not guarantee the resources will be packed
@ -638,7 +660,7 @@ def get_bin_pack_residual(node_resources: List[ResourceDict],
placed on a different entry in `node_resources`.
Returns:
List[ResourceDict] the residual list resources that do not fit.
List[ResourceDict]: the residual list resources that do not fit.
List[ResourceDict]: The updated node_resources after the method.
"""

View file

@ -537,6 +537,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10)
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
@ -558,6 +559,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
@ -613,6 +615,70 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(0)
def testLegacyYamlWithRequestResources(self):
"""Test when using legacy yamls request_resources() adds workers.
Makes sure that requested resources are added for legacy yamls when
necessary. So if requested resources for instance fit on the headnode
we don't add more nodes. But we add more nodes when they don't fit.
"""
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 100
config["idle_timeout_minutes"] = 0
config["upscaling_speed"] = 1
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, )[0]
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
lm = LoadMetrics()
lm.local_ip = head_ip
lm.update(head_ip, {"CPU": 1}, {"CPU": 1}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
# 1 head node.
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
# still 1 head node because request_resources fits in the headnode.
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}] + [{"CPU": 2}] * 9)
autoscaler.update()
self.waitForNodes(2) # Adds a single worker to get its resources.
autoscaler.update()
self.waitForNodes(2) # Still 1 worker because its resources
# aren't known.
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
self.waitForNodes(10) # 9 workers and 1 head node, scaled immediately.
lm.update(
"172.0.0.1", {"CPU": 2}, {"CPU": 2}, {},
waiting_bundles=[{
"CPU": 2
}] * 9,
infeasible_bundles=[{
"CPU": 1
}] * 1)
autoscaler.update()
# Make sure that if all the resources fit on the exising nodes not
# to add any more.
self.waitForNodes(10)
def testAggressiveAutoscaling(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
@ -896,6 +962,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
@ -1038,6 +1105,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
lm = LoadMetrics()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)])
autoscaler = StandardAutoscaler(
config_path,
lm,
@ -1087,12 +1155,22 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 3
# This actually remained 4 instead of 3, because the other 2 nodes
# are not connected and hence we rely more on connected nodes for
# min_workers. When the "pending" nodes show up as connected,
# then we can terminate the ones connected before.
assert len(self.provider.non_terminated_nodes({})) == 4
lm.last_used_time_by_ip["172.0.0.2"] = 0
lm.last_used_time_by_ip["172.0.0.3"] = 0
autoscaler.update()
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 1
# 2 nodes and not 1 because 1 is needed for min_worker and the other 1
# is still not connected.
self.waitForNodes(2)
# when we connect it, we will see 1 node.
lm.last_used_time_by_ip["172.0.0.4"] = 0
autoscaler.update()
self.waitForNodes(1)
def testTargetUtilizationFraction(self):
config = SMALL_CLUSTER.copy()

View file

@ -256,19 +256,19 @@ def test_add_min_workers_nodes():
}
assert _add_min_workers_nodes([],
{},
types) == \
types, None, None) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
{"m2.large": 50, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*5,
{"m2.large": 5},
types) == \
types, None, None) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
{"m2.large": 45, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*60,
{"m2.large": 60},
types) == \
types, None, None) == \
([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999},
{"gpu": 99999})
@ -279,7 +279,7 @@ def test_add_min_workers_nodes():
}] * 99999, {
"m2.large": 50,
"gpu": 99999
}, types) == ([{
}, types, None, None) == ([{
"CPU": 2
}] * 50 + [{
"GPU": 1
@ -288,17 +288,18 @@ def test_add_min_workers_nodes():
"gpu": 99999
}, {})
assert _add_min_workers_nodes([], {},
{"gpubla": types["gpubla"]}) == ([], {}, {})
assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None,
None) == ([], {}, {})
types["gpubla"]["max_workers"] = 10
assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}) == ([{
"GPU": 1
}] * 10, {
"gpubla": 10
}, {
"gpubla": 10
})
assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None,
None) == ([{
"GPU": 1
}] * 10, {
"gpubla": 10
}, {
"gpubla": 10
})
def test_get_nodes_to_launch_with_min_workers():
@ -1406,7 +1407,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
@ -1432,6 +1433,7 @@ class AutoscalingTest(unittest.TestCase):
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
"new_worker_setup_command")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"setup_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
@ -1571,6 +1573,312 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
def testRequestResourcesIdleTimeout(self):
"""Test request_resources() with and without idle timeout."""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["max_workers"] = 4
config["idle_timeout_minutes"] = 0
config["available_node_types"] = {
"empty_node": {
"node_config": {},
"resources": {
"CPU": 2
},
"max_workers": 1
},
"def_worker": {
"node_config": {},
"resources": {
"CPU": 2,
"GPU": 1,
"WORKER": 1
},
"max_workers": 3
}
}
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
self.waitForNodes(1)
non_terminated_nodes = autoscaler.provider.non_terminated_nodes({})
assert len(non_terminated_nodes) == 1
node_id = non_terminated_nodes[0]
node_ip = autoscaler.provider.non_terminated_node_ips({})[0]
# A hack to check if the node was terminated when it shouldn't.
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
# this fits on request_resources()!
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.update()
self.waitForNodes(2)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"], {}, {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
self.waitForNodes(2)
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
# Still 2 as the second node did not show up a heart beat.
self.waitForNodes(2)
# If node {node_id} was terminated any time then it's state will be set
# to terminated.
assert autoscaler.provider.mock_nodes[
node_id].state == "unterminatable"
lm.update(
"172.0.0.1",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
# Now it is 1 because it showed up in last used (heart beat).
# The remaining one is 127.0.0.1.
self.waitForNodes(1)
def testRequestResourcesRaceConditionsLong(self):
"""Test request_resources(), race conditions & demands/min_workers.
Tests when request_resources() is called simultaneously with resource
demands and min_workers constraint in multiple orders upscaling and
downscaling.
"""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["max_workers"] = 4
config["idle_timeout_minutes"] = 0
config["available_node_types"] = {
"empty_node": {
"node_config": {},
"resources": {
"CPU": 2
},
"max_workers": 1
},
"def_worker": {
"node_config": {},
"resources": {
"CPU": 2,
"GPU": 1,
"WORKER": 1
},
"max_workers": 3,
"min_workers": 1
}
}
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
# 1 min worker for both min_worker and request_resources()
self.waitForNodes(1)
non_terminated_nodes = autoscaler.provider.non_terminated_nodes({})
assert len(non_terminated_nodes) == 1
node_id = non_terminated_nodes[0]
node_ip = autoscaler.provider.non_terminated_node_ips({})[0]
# A hack to check if the node was terminated when it shouldn't.
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.update()
# 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total
self.waitForNodes(2)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
# Still 2 because the second one is not connected and hence
# request_resources occupies the connected node.
self.waitForNodes(2)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3)
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"], {}, {},
waiting_bundles=[{
"CPU": 0.2,
"WORKER": 1.0
}] * 3)
autoscaler.update()
self.waitForNodes(3)
autoscaler.request_resources([])
lm.update("172.0.0.1",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
lm.update("172.0.0.2",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
lm.update(node_ip,
config["available_node_types"]["def_worker"]["resources"],
{}, {})
autoscaler.update()
self.waitForNodes(1)
# If node {node_id} was terminated any time then it's state will be set
# to terminated.
assert autoscaler.provider.mock_nodes[
node_id].state == "unterminatable"
def testRequestResourcesRaceConditionWithMinWorker(self):
"""Test request_resources() with min_workers.
Tests when request_resources() is called simultaneously with adding
min_workers constraint.
"""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"] = {
"empty_node": {
"node_config": {},
"resources": {
"CPU": 2
},
"max_workers": 1
},
"def_worker": {
"node_config": {},
"resources": {
"CPU": 2,
"GPU": 1,
"WORKER": 1
},
"max_workers": 3,
"min_workers": 1
}
}
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2)
autoscaler.update()
# 2 min worker for both min_worker and request_resources(), not 3.
self.waitForNodes(2)
def testRequestResourcesRaceConditionWithResourceDemands(self):
"""Test request_resources() with resource_demands.
Tests when request_resources() is called simultaneously with resource
demands in multiple orders.
"""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"].update({
"empty_node": {
"node_config": {},
"resources": {
"CPU": 2,
"GPU": 1
},
"max_workers": 1
},
"def_worker": {
"node_config": {},
"resources": {
"CPU": 2,
"GPU": 1,
"WORKER": 1
},
"max_workers": 3,
}
})
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
lm.update(
"127.0.0.0", {
"CPU": 2,
"GPU": 1
}, {"CPU": 2}, {},
waiting_bundles=[{
"CPU": 2
}])
autoscaler.request_resources([{"CPU": 2, "GPU": 1}] * 2)
autoscaler.update()
# 1 head, 1 worker.
self.waitForNodes(2)
lm.update(
"127.0.0.0", {
"CPU": 2,
"GPU": 1
}, {"CPU": 2}, {},
waiting_bundles=[{
"CPU": 2
}])
# make sure it stays consistent.
for _ in range(10):
autoscaler.update()
self.waitForNodes(2)
if __name__ == "__main__":
import sys