[autoscaler] Fewer non terminated nodes calls (#20359)

non_terminated_nodes calls are expensive for some node provider implementations.

This PR refactors autoscaler._update() such that it results in at most one non_terminated_nodes call.
Conceptually, the change is that the autoscaler only needs a consistent view of the world once per update interval.

The structure of an autoscaler update is now

call non_terminated_nodes to update internal state
update autoscaler status strings
terminate nodes we don't need, removing them from internal state as we go
run node updaters if needed
get nodes to launch based on internal state
There's a small operational difference introduced:
Previously -- After a node is created, its NodeUpdater thread is initiated immediately.
Now -- After a node is created, its NodeUpdater thread is initiated in the next autoscaler update.

This typically will not introduce latency, since the time to get SSH access (a few minutes) is much longer than the autoscaler update interval (5 seconds by default).

Along the way, I've removed the local_ip initialization parameter for LoadMetrics because it was confusing and not useful (and caused some tests to fail)
This commit is contained in:
Dmitri Gekhtman 2021-11-21 12:22:24 -05:00 committed by GitHub
parent 36ee3fd46b
commit 0f70f40a2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 335 additions and 207 deletions

View file

@ -2,6 +2,7 @@ from collections import defaultdict, namedtuple, Counter
from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple, Union, \
Callable
import copy
from dataclasses import dataclass
import logging
import math
import operator
@ -19,6 +20,7 @@ try:
except ImportError:
MaxRetryError = None
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
@ -50,15 +52,57 @@ from six.moves import queue
logger = logging.getLogger(__name__)
# Status of a node e.g. "up-to-date", see ray/autoscaler/tags.py
NodeStatus = str
# Tuple of modified fields for the given node_id returned by should_update
# that will be passed into a NodeUpdaterThread.
UpdateInstructions = namedtuple(
"UpdateInstructions",
["node_id", "setup_commands", "ray_start_commands", "docker_config"])
AutoscalerSummary = namedtuple(
"AutoscalerSummary",
["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"])
@dataclass
class AutoscalerSummary:
active_nodes: Dict[NodeType, int]
pending_nodes: List[Tuple[NodeIP, NodeType, NodeStatus]]
pending_launches: Dict[NodeType, int]
failed_nodes: List[Tuple[NodeIP, NodeType]]
class NonTerminatedNodes:
"""Class to extract and organize information on non-terminated nodes."""
def __init__(self, provider: NodeProvider):
# All non-terminated nodes
self.all_node_ids = provider.non_terminated_nodes({})
# Managed worker nodes (node kind "worker"):
self.worker_ids: List[NodeID] = []
# The head node (node kind "head")
self.head_id: Optional[NodeID] = None
for node in self.all_node_ids:
node_kind = provider.node_tags(node)[TAG_RAY_NODE_KIND]
if node_kind == NODE_KIND_WORKER:
self.worker_ids.append(node)
elif node_kind == NODE_KIND_HEAD:
self.head_id = node
# Note: For typical use-cases,
# self.all_node_ids == self.worker_ids + [self.head_id]
def remove_terminating_nodes(self,
terminating_nodes: List[NodeID]) -> None:
"""Remove nodes we're in the process of terminating from internal
state."""
def not_terminating(node):
return node not in terminating_nodes
self.worker_ids = list(filter(not_terminating, self.worker_ids))
self.all_node_ids = list(filter(not_terminating, self.all_node_ids))
# Whether a worker should be kept based on the min_workers and
# max_workers constraints.
@ -145,7 +189,6 @@ class StandardAutoscaler:
AutoscalerPrometheusMetrics()
self.resource_demand_scheduler = None
self.reset(errors_fatal=True)
self.head_node_ip = load_metrics.local_ip
self.load_metrics = load_metrics
self.max_failures = max_failures
@ -162,10 +205,11 @@ class StandardAutoscaler:
self.last_update_time = 0.0
self.update_interval_s = update_interval_s
# Tracks active worker nodes
self.workers = []
# Keeps track of pending and running nodes
self.non_terminated_nodes: Optional[NonTerminatedNodes] = None
# Tracks nodes scheduled for termination
self.nodes_to_terminate = []
self.nodes_to_terminate: List[NodeID] = []
# Disable NodeUpdater threads if true.
# Should be set to true in situations where another component, such as
@ -241,20 +285,39 @@ class StandardAutoscaler:
return
self.last_update_time = now
self.update_worker_list()
# Query the provider to update the list of non-terminated nodes
self.non_terminated_nodes = NonTerminatedNodes(self.provider)
# Update running nodes gauge
num_workers = len(self.non_terminated_nodes.worker_ids)
self.prom_metrics.running_workers.set(num_workers)
# Remove from LoadMetrics the ips unknown to the NodeProvider.
self.load_metrics.prune_active_ips([
self.provider.internal_ip(node_id) for node_id in self.all_workers
self.load_metrics.prune_active_ips(active_ips=[
self.provider.internal_ip(node_id)
for node_id in self.non_terminated_nodes.all_node_ids
])
# Update status strings
logger.info(self.info_string())
legacy_log_info_string(self, self.non_terminated_nodes.worker_ids)
if not self.provider.is_readonly():
self.terminate_nodes_to_enforce_config_constraints(now)
if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
# Dict[NodeType, int], List[ResourceDict]
to_launch, unfulfilled = (
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.non_terminated_nodes.all_node_ids,
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
@ -267,17 +330,6 @@ class StandardAutoscaler:
if not self.provider.is_readonly():
self.launch_required_nodes(to_launch)
if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
logger.info(self.info_string())
legacy_log_info_string(self, self.workers)
def terminate_nodes_to_enforce_config_constraints(self, now: float):
"""Terminates nodes to enforce constraints defined by the autoscaling
config.
@ -298,7 +350,7 @@ class StandardAutoscaler:
# were most recently used. Otherwise, _keep_min_workers_of_node_type
# might keep a node that should be terminated.
sorted_node_ids = self._sort_based_on_last_used(
self.workers, last_used)
self.non_terminated_nodes.worker_ids, last_used)
# Don't terminate nodes needed by request_resources()
nodes_not_allowed_to_terminate: FrozenSet[NodeID] = {}
@ -345,7 +397,8 @@ class StandardAutoscaler:
nodes_we_could_terminate.append(node_id)
# Terminate nodes if there are too many
num_extra_nodes_to_terminate = (len(self.workers) - len(
num_workers = len(self.non_terminated_nodes.worker_ids)
num_extra_nodes_to_terminate = (num_workers - len(
self.nodes_to_terminate) - self.config["max_workers"])
if num_extra_nodes_to_terminate > len(nodes_we_could_terminate):
@ -391,14 +444,20 @@ class StandardAutoscaler:
"""Terminate scheduled nodes and clean associated autoscaler state."""
if not self.nodes_to_terminate:
return
# Do Ray-internal preparation for termination
self.drain_nodes_via_gcs(self.nodes_to_terminate)
# Terminate the nodes
self.provider.terminate_nodes(self.nodes_to_terminate)
for node in self.nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
# Update internal node lists
self.non_terminated_nodes.remove_terminating_nodes(
self.nodes_to_terminate)
self.nodes_to_terminate = []
self.update_worker_list()
def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]):
"""Send an RPC request to the GCS to drain (prepare for termination)
@ -480,7 +539,6 @@ class StandardAutoscaler:
if to_launch:
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
self.update_worker_list()
def update_nodes(self):
"""Run NodeUpdaterThreads to run setup commands, sync files,
@ -492,7 +550,8 @@ class StandardAutoscaler:
# See https://github.com/ray-project/ray/pull/5903 for more info.
T = []
for node_id, setup_commands, ray_start_commands, docker_config in (
self.should_update(node_id) for node_id in self.workers):
self.should_update(node_id)
for node_id in self.non_terminated_nodes.worker_ids):
if node_id is not None:
resources = self._node_resources(node_id)
logger.debug(f"{node_id}: Starting new thread runner.")
@ -543,10 +602,9 @@ class StandardAutoscaler:
# during an update (for being idle after missing a heartbeat).
# Update the list of non-terminated workers.
self.update_worker_list()
for node_id in failed_nodes:
# Check if the node has already been terminated.
if node_id in self.workers:
if node_id in self.non_terminated_nodes.worker_ids:
self.schedule_node_termination(
node_id, "launch failed", logger.error)
else:
@ -648,18 +706,13 @@ class StandardAutoscaler:
# Legacy yaml might include {} in the resources field.
# TODO(ameer): this is somewhat duplicated in
# resource_demand_scheduler.py.
head_id: List[NodeID] = self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
if head_id:
head_ip = self.provider.internal_ip(head_id[0])
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
head_node_resources = static_nodes.get(head_ip, {})
else:
head_node_resources = {}
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
head_node_ip = self.provider.internal_ip(
self.non_terminated_nodes.head_id)
head_node_resources = static_nodes.get(head_node_ip, {})
max_node_resources: List[ResourceDict] = [head_node_resources]
resource_demand_vector_worker_node_ids = []
@ -909,7 +962,7 @@ class StandardAutoscaler:
"""Terminated nodes for which we haven't received a heartbeat on time.
These nodes are subsequently terminated.
"""
for node_id in self.workers:
for node_id in self.non_terminated_nodes.worker_ids:
node_status = self.provider.node_tags(node_id)[TAG_RAY_NODE_STATUS]
# We're not responsible for taking down
# nodes with pending or failed status:
@ -929,7 +982,7 @@ class StandardAutoscaler:
self.terminate_scheduled_nodes()
def attempt_to_recover_unhealthy_nodes(self, now):
for node_id in self.workers:
for node_id in self.non_terminated_nodes.worker_ids:
self.recover_if_needed(node_id, now)
def recover_if_needed(self, node_id, now):
@ -946,6 +999,8 @@ class StandardAutoscaler:
" (lost contact with raylet).",
quantity=1,
aggregate=operator.add)
head_node_ip = self.provider.internal_ip(
self.non_terminated_nodes.head_id)
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
@ -956,7 +1011,7 @@ class StandardAutoscaler:
initialization_commands=[],
setup_commands=[],
ray_start_commands=with_head_node_ip(
self.config["worker_start_ray_commands"], self.head_node_ip),
self.config["worker_start_ray_commands"], head_node_ip),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
process_runner=self.process_runner,
@ -1032,6 +1087,8 @@ class StandardAutoscaler:
ip = self.provider.internal_ip(node_id)
node_type = self._get_node_type(node_id)
self.node_tracker.track(node_id, ip, node_type)
head_node_ip = self.provider.internal_ip(
self.non_terminated_nodes.head_id)
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
@ -1041,11 +1098,10 @@ class StandardAutoscaler:
file_mounts=self.config["file_mounts"],
initialization_commands=with_head_node_ip(
self._get_node_type_specific_fields(
node_id, "initialization_commands"), self.head_node_ip),
setup_commands=with_head_node_ip(setup_commands,
self.head_node_ip),
node_id, "initialization_commands"), head_node_ip),
setup_commands=with_head_node_ip(setup_commands, head_node_ip),
ray_start_commands=with_head_node_ip(ray_start_commands,
self.head_node_ip),
head_node_ip),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
is_head_node=False,
@ -1090,21 +1146,6 @@ class StandardAutoscaler:
node_type))
count -= self.max_launch_batch
@property
def all_workers(self):
return self.workers + self.unmanaged_workers
def update_worker_list(self):
self.workers = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update running nodes gauge whenever we check workers
self.prom_metrics.running_workers.set(len(self.workers))
@property
def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})
def kill_workers(self):
logger.error("StandardAutoscaler: kill_workers triggered")
nodes = self.workers()
@ -1127,14 +1168,12 @@ class StandardAutoscaler:
Returns:
AutoscalerSummary: The summary.
"""
all_node_ids = self.provider.non_terminated_nodes(tag_filters={})
active_nodes = Counter()
pending_nodes = []
failed_nodes = []
non_failed = set()
for node_id in all_node_ids:
for node_id in self.non_terminated_nodes.all_node_ids:
ip = self.provider.internal_ip(node_id)
node_tags = self.provider.node_tags(node_id)
@ -1174,7 +1213,8 @@ class StandardAutoscaler:
pending_launches[node_type] = count
return AutoscalerSummary(
active_nodes=active_nodes,
# Convert active_nodes from counter to dict for later serialization
active_nodes=dict(active_nodes),
pending_nodes=pending_nodes,
pending_launches=pending_launches,
failed_nodes=failed_nodes)

View file

@ -1,12 +1,13 @@
from collections import namedtuple, Counter
from collections import Counter
from dataclasses import dataclass
from functools import reduce
import logging
from numbers import Number
import time
from typing import Dict, List
from typing import Dict, List, Tuple
import numpy as np
import ray.ray_constants
import ray._private.services as services
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES,\
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray._private.gcs_utils import PlacementGroupTableData
@ -16,10 +17,23 @@ from ray.core.generated.common_pb2 import PlacementStrategy
logger = logging.getLogger(__name__)
LoadMetricsSummary = namedtuple("LoadMetricsSummary", [
"head_ip", "usage", "resource_demand", "pg_demand", "request_demand",
"node_types"
])
# A Dict and the count of how many times it occurred.
# Refer to freq_of_dicts() below.
DictCount = Tuple[Dict, Number]
@dataclass
class LoadMetricsSummary:
# Map of resource name (e.g. "memory") to pair of (Used, Available) numbers
usage: Dict[str, Tuple[Number, Number]]
# Counts of demand bundles from task/actor demand.
# e.g. [({"CPU": 1}, 5), ({"GPU":1}, 2)]
resource_demand: List[DictCount]
# Counts of pending placement groups
pg_demand: List[DictCount]
# Counts of demand bundles requested by autoscaler.sdk.request_resources
request_demand: List[DictCount]
node_types: List[DictCount]
def add_resources(dict1: Dict[str, float],
@ -37,7 +51,7 @@ def add_resources(dict1: Dict[str, float],
def freq_of_dicts(dicts: List[Dict],
serializer=lambda d: frozenset(d.items()),
deserializer=dict):
deserializer=dict) -> DictCount:
"""Count a list of dictionaries (or unhashable types).
This is somewhat annoying because mutable data structures aren't hashable,
@ -45,7 +59,7 @@ def freq_of_dicts(dicts: List[Dict],
Args:
dicts (List[D]): A list of dictionaries to be counted.
serializer (D -> S): A custom serailization function. The output type S
serializer (D -> S): A custom serialization function. The output type S
must be hashable. The default serializer converts a dictionary into
a frozenset of KV pairs.
deserializer (S -> U): A custom deserialization function. See the
@ -71,15 +85,13 @@ class LoadMetrics:
can be removed.
"""
def __init__(self, local_ip=None):
def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.raylet_id_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address(
) if local_ip is None else local_ip
self.waiting_bundles = []
self.infeasible_bundles = []
self.pending_placement_groups = []
@ -149,7 +161,6 @@ class LoadMetrics:
active_ips (List[str]): The node ips known to the autoscaler.
"""
active_ips = set(active_ips)
active_ips.add(self.local_ip)
def prune(mapping, should_log):
unwanted_ips = set(mapping) - active_ips
@ -307,7 +318,6 @@ class LoadMetrics:
nodes_summary = freq_of_dicts(self.static_resources_by_ip.values())
return LoadMetricsSummary(
head_ip=self.local_ip,
usage=usage_dict,
resource_demand=summarized_demand_vector,
pg_demand=summarized_placement_groups,

View file

@ -1,6 +1,7 @@
"""Autoscaler monitoring loop daemon."""
import argparse
from dataclasses import asdict
import logging.handlers
import os
import sys
@ -26,8 +27,6 @@ from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.fake_multi_node.node_provider import \
FAKE_HEAD_NODE_ID
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS, \
DEBUG_AUTOSCALING_ERROR, format_readonly_node_type
@ -168,10 +167,7 @@ class Monitor:
head_node_ip = redis_address.split(":")[0]
self.redis_address = redis_address
self.redis_password = redis_password
if os.environ.get("RAY_FAKE_CLUSTER"):
self.load_metrics = LoadMetrics(local_ip=FAKE_HEAD_NODE_ID)
else:
self.load_metrics = LoadMetrics(local_ip=head_node_ip)
self.load_metrics = LoadMetrics()
self.last_avail_resources = None
self.event_summarizer = EventSummarizer()
self.prefix_cluster_info = prefix_cluster_info
@ -319,7 +315,7 @@ class Monitor:
self.update_resource_requests()
self.update_event_summary()
status = {
"load_metrics_report": self.load_metrics.summary()._asdict(),
"load_metrics_report": asdict(self.load_metrics.summary()),
"time": time.time(),
"monitor_pid": os.getpid()
}
@ -328,8 +324,7 @@ class Monitor:
if self.autoscaler:
# Only used to update the load metrics for the autoscaler.
self.autoscaler.update()
status[
"autoscaler_report"] = self.autoscaler.summary()._asdict()
status["autoscaler_report"] = asdict(self.autoscaler.summary())
for msg in self.event_summarizer.summary():
logger.info("{}{}".format(

View file

@ -38,8 +38,6 @@ class NodeLauncher(threading.Thread):
node_type: Optional[str]):
if self.node_types:
assert node_type, node_type
worker_filter = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER}
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
# The `worker_nodes` field is deprecated in favor of per-node-type
# node_configs. We allow it for backwards-compatibility.
@ -76,9 +74,6 @@ class NodeLauncher(threading.Thread):
# second create time 4 times.
self.prom_metrics.worker_create_node_time.observe(launch_time)
self.prom_metrics.started_nodes.inc(count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
def run(self):
while True:

View file

@ -15,6 +15,7 @@ from numbers import Real
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from ray.autoscaler.node_provider import NodeProvider
from ray._private.gcs_utils import PlacementGroupTableData
@ -60,6 +61,20 @@ class ResourceDemandScheduler:
self.head_node_type = head_node_type
self.upscaling_speed = upscaling_speed
def _get_head_and_workers(
self, nodes: List[NodeID]) -> Tuple[NodeID, List[NodeID]]:
"""Returns the head node's id and the list of all worker node ids,
given a list `nodes` of all node ids in the cluster.
"""
head_id, worker_ids = None, []
for node in nodes:
tags = self.provider.node_tags(node)
if tags[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD:
head_id = node
elif tags[TAG_RAY_NODE_KIND] == NODE_KIND_WORKER:
worker_ids.append(node)
return head_id, worker_ids
def reset_config(self,
provider: NodeProvider,
node_types: Dict[NodeType, NodeTypeConfigDict],
@ -161,7 +176,8 @@ class ResourceDemandScheduler:
if self.is_legacy_yaml():
# When using legacy yaml files we need to infer the head & worker
# node resources from the static node resources from LoadMetrics.
self._infer_legacy_node_resources_if_needed(max_resources_by_ip)
self._infer_legacy_node_resources_if_needed(
nodes, max_resources_by_ip)
self._update_node_resources_from_runtime(nodes, max_resources_by_ip)
@ -265,8 +281,9 @@ class ResourceDemandScheduler:
workers, it returns max(1, min_workers) worker nodes from which we
later calculate the node resources.
"""
worker_nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Populate worker list.
_, worker_nodes = self._get_head_and_workers(nodes)
if self.max_workers == 0:
return {}
elif sum(launching_nodes.values()) + len(worker_nodes) > 0:
@ -332,22 +349,22 @@ class ResourceDemandScheduler:
self.node_resource_updated.add(node_type)
def _infer_legacy_node_resources_if_needed(
self, max_resources_by_ip: Dict[NodeIP, ResourceDict]
self, nodes: List[NodeIP],
max_resources_by_ip: Dict[NodeIP, ResourceDict]
) -> (bool, Dict[NodeType, int]):
"""Infers node resources for legacy config files.
Updates the resources of the head and worker node types in
self.node_types.
Args:
nodes: List of all node ids in the cluster
max_resources_by_ip: Mapping from ip to static node resources.
"""
head_node, worker_nodes = self._get_head_and_workers(nodes)
# We fill the head node resources only once.
if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]:
try:
head_ip = self.provider.internal_ip(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})[0])
head_ip = self.provider.internal_ip(head_node)
self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = \
copy.deepcopy(max_resources_by_ip[head_ip])
except (IndexError, KeyError):
@ -356,8 +373,6 @@ class ResourceDemandScheduler:
if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
# Set the node_types here in case we already launched a worker node
# from which we can directly get the node_resources.
worker_nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
worker_node_ips = [
self.provider.internal_ip(node_id) for node_id in worker_nodes
]

View file

@ -89,6 +89,7 @@ if __name__ == "__main__":
do_link("_private", force=args.yes)
do_link("node.py", force=args.yes)
do_link("cluster_utils.py", force=args.yes)
do_link("ray_constants.py", force=args.yes)
# Link package's `dashboard` directly to local (repo's) dashboard.
# The repo's `dashboard` is a file, soft-linking to which will not work
# on Mac.

View file

@ -23,7 +23,8 @@ from ray.autoscaler._private.util import prepare_config, validate_config
from ray.autoscaler._private import commands
from ray.autoscaler.sdk import get_docker_host_mount_location
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.autoscaler import StandardAutoscaler,\
NonTerminatedNodes
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.providers import (
_NODE_PROVIDERS, _clear_provider_cache, _DEFAULT_CONFIGS)
@ -297,9 +298,11 @@ class MockProvider(NodeProvider):
# different threads. This can be treated as a global lock for
# everything.
self.lock = threading.Lock()
self.num_non_terminated_nodes_calls = 0
super().__init__(None, None)
def non_terminated_nodes(self, tag_filters):
self.num_non_terminated_nodes_calls += 1
with self.lock:
if self.throw:
raise Exception("oops")
@ -324,10 +327,16 @@ class MockProvider(NodeProvider):
return self.mock_nodes[node_id].state == "running"
def is_terminated(self, node_id):
if node_id is None:
# Circumvent test-cases where there's no head node.
return True
with self.lock:
return self.mock_nodes[node_id].state in ["stopped", "terminated"]
def node_tags(self, node_id):
if node_id is None:
# Circumvent test-cases where there's no head node.
return {}
# Don't assume that node providers can retrieve tags from
# terminated nodes.
if self.is_terminated(node_id):
@ -336,6 +345,9 @@ class MockProvider(NodeProvider):
return self.mock_nodes[node_id].tags
def internal_ip(self, node_id):
if node_id is None:
# Circumvent test-cases where there's no head node.
return "mock"
with self.lock:
return self.mock_nodes[node_id].internal_ip
@ -384,6 +396,23 @@ class MockProvider(NodeProvider):
node.state = "running"
class MockAutoscaler(StandardAutoscaler):
"""Test autoscaler constructed to verify the property that each
autoscaler update issues at most one provider.non_terminated_nodes call.
"""
def _update(self):
# Only works with MockProvider
assert isinstance(self.provider, MockProvider)
start_calls = self.provider.num_non_terminated_nodes_calls
super()._update()
end_calls = self.provider.num_non_terminated_nodes_calls
# Strict inequality if update is called twice within the throttling
# interval `self.update_interval_s`
assert end_calls <= start_calls + 1
SMALL_CLUSTER = {
"cluster_name": "default",
"min_workers": 2,
@ -578,6 +607,13 @@ class AutoscalingTest(unittest.TestCase):
fail_msg = fail_msg or "Timed out waiting for {}".format(condition)
raise RayTestTimeoutException(fail_msg)
def waitForUpdatersToFinish(self, autoscaler):
self.waitFor(
lambda: all(not updater.is_alive()
for updater in autoscaler.updaters.values()),
num_retries=500,
fail_msg="Last round of updaters didn't complete on time.")
def waitForNodes(self, expected, comparison=None, tag_filters=None):
if tag_filters is None:
tag_filters = {}
@ -616,7 +652,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(invalid_config)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -771,7 +807,7 @@ class AutoscalingTest(unittest.TestCase):
_runner=runner)
self.waitForNodes(1)
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1269,7 +1305,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1328,7 +1364,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1420,7 +1456,7 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
mock_node_info_stub,
@ -1474,7 +1510,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1514,7 +1550,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
lm.local_ip = head_ip
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {})
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1575,9 +1611,8 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1619,7 +1654,6 @@ class AutoscalingTest(unittest.TestCase):
for ip in worker_ips:
# Mark workers inactive.
lm.last_used_time_by_ip[ip] = 0
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(1) # only the head node
# Make sure they don't get overwritten.
@ -1660,7 +1694,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1717,7 +1751,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1745,7 +1779,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1784,7 +1818,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1825,7 +1859,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1851,7 +1885,7 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1885,7 +1919,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1932,7 +1966,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider.throw = True
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1953,7 +1987,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1974,7 +2008,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1997,7 +2031,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner(fail_cmds=["setup_cmd"])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2005,7 +2039,6 @@ class AutoscalingTest(unittest.TestCase):
process_runner=runner,
update_interval_s=0)
autoscaler.update()
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
fill_in_raylet_ids(self.provider, lm)
@ -2037,7 +2070,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -2091,7 +2124,7 @@ class AutoscalingTest(unittest.TestCase):
_runner=runner)
self.waitForNodes(1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2120,7 +2153,7 @@ class AutoscalingTest(unittest.TestCase):
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert autoscaler.pending_launches.value == 0
self.waitFor(lambda: autoscaler.pending_launches.value == 0)
self.waitForNodes(8, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert autoscaler.pending_launches.value == 0
events = autoscaler.event_summarizer.summary()
@ -2134,8 +2167,7 @@ class AutoscalingTest(unittest.TestCase):
assert "empty_node" not in event
node_type_counts = defaultdict(int)
autoscaler.update_worker_list()
for node_id in autoscaler.workers:
for node_id in NonTerminatedNodes(self.provider).worker_ids:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
@ -2158,7 +2190,7 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2254,7 +2286,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(12)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2322,7 +2354,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2370,7 +2402,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2431,7 +2463,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2478,7 +2510,7 @@ class AutoscalingTest(unittest.TestCase):
"module": "ray.autoscaler.node_provider.NodeProvider",
}
config_path = self.write_config(config)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -2516,7 +2548,7 @@ class AutoscalingTest(unittest.TestCase):
}
invalid_provider = self.write_config(config)
with pytest.raises(ImportError):
StandardAutoscaler(
MockAutoscaler(
invalid_provider,
LoadMetrics(),
MockNodeInfoStub(),
@ -2530,7 +2562,7 @@ class AutoscalingTest(unittest.TestCase):
}
invalid_provider = self.write_config(config, call_prepare_config=False)
with pytest.raises(ValueError):
StandardAutoscaler(
MockAutoscaler(
invalid_provider,
LoadMetrics(),
MockNodeInfoStub(),
@ -2545,7 +2577,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2590,7 +2622,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2660,7 +2692,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2739,7 +2771,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider(cache_stopped=True)
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2792,7 +2824,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("command -v docker",
["docker" for _ in range(4)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2848,7 +2880,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2895,7 +2927,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2931,7 +2963,7 @@ MemAvailable: 33000000 kB
runner.respond_to_call("nvidia-smi", 2 * ["works"])
runner.respond_to_call("json .Config.Env", 2 * ["[]"])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2958,7 +2990,7 @@ MemAvailable: 33000000 kB
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(1)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -3067,7 +3099,7 @@ MemAvailable: 33000000 kB
runner = MockProcessRunner()
lm = LoadMetrics()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -3116,11 +3148,7 @@ MemAvailable: 33000000 kB
mock_metrics.recovering_nodes.set.assert_called_with(2)
autoscaler.process_runner.ready_to_run.set()
# Wait for updaters spawned by last autoscaler update to finish.
self.waitFor(
lambda: all(not updater.is_alive()
for updater in autoscaler.updaters.values()),
num_retries=500,
fail_msg="Last round of updaters didn't complete on time.")
self.waitForUpdatersToFinish(autoscaler)
# Check that updaters processed some commands in the last autoscaler
# update.
assert len(autoscaler.process_runner.calls) > num_calls,\
@ -3134,8 +3162,8 @@ MemAvailable: 33000000 kB
# Node 0 was terminated during the last update.
# Node 1's updater failed, but node 1 won't be terminated until the
# next autoscaler update.
autoscaler.update_worker_list()
assert 0 not in autoscaler.workers, "Node zero still non-terminated."
assert 0 not in NonTerminatedNodes(
self.provider).worker_ids, "Node zero still non-terminated."
assert not self.provider.is_terminated(1),\
"Node one terminated prematurely."
@ -3165,8 +3193,7 @@ MemAvailable: 33000000 kB
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(2)
autoscaler.update_worker_list()
assert set(autoscaler.workers) == {2, 3},\
assert set(NonTerminatedNodes(self.provider).worker_ids) == {2, 3},\
"Unexpected node_ids"
assert mock_metrics.stopped_nodes.inc.call_count == 1
@ -3178,7 +3205,7 @@ MemAvailable: 33000000 kB
self.provider.error_creates = True
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),

View file

@ -11,13 +11,13 @@ import pytest
import ray
from ray.tests.test_autoscaler import (MockProvider, MockProcessRunner,
MockNodeInfoStub, mock_raylet_id)
MockNodeInfoStub, mock_raylet_id,
MockAutoscaler)
from ray.tests.test_resource_demand_scheduler import MULTI_WORKER_CLUSTER
from ray.autoscaler._private.providers import (
_NODE_PROVIDERS,
_clear_provider_cache,
)
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND,
@ -179,8 +179,8 @@ class Simulator:
)
self.head_ip = self.provider.non_terminated_node_ips({})[0]
self.load_metrics = LoadMetrics(local_ip=self.head_ip)
self.autoscaler = StandardAutoscaler(
self.load_metrics = LoadMetrics()
self.autoscaler = MockAutoscaler(
self.config_path,
self.load_metrics,
MockNodeInfoStub(),

View file

@ -1,5 +1,7 @@
import pytest
from datetime import datetime
from dataclasses import asdict
import json
import time
import yaml
import tempfile
@ -13,11 +15,10 @@ import ray.ray_constants
from ray.autoscaler._private.util import prepare_config, format_info_string
from ray.tests.test_autoscaler import SMALL_CLUSTER, MOCK_DEFAULT_CONFIG, \
MULTI_WORKER_CLUSTER, TYPES_A, MockProvider, MockProcessRunner, \
MockNodeInfoStub, mock_raylet_id, fill_in_raylet_ids
MockNodeInfoStub, mock_raylet_id, fill_in_raylet_ids, MockAutoscaler
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
_clear_provider_cache)
from ray.autoscaler._private.autoscaler import StandardAutoscaler, \
AutoscalerSummary
from ray.autoscaler._private.autoscaler import AutoscalerSummary
from ray.autoscaler._private.load_metrics import LoadMetrics, \
LoadMetricsSummary
from ray.autoscaler._private.commands import get_or_create_head_node
@ -1282,7 +1283,7 @@ class LoadMetricsTest(unittest.TestCase):
assert lm.get_pending_placement_groups() == pending_placement_groups
def testSummary(self):
lm = LoadMetrics(local_ip="1.1.1.1")
lm = LoadMetrics()
assert lm.summary() is not None
pending_placement_groups = [
PlacementGroupTableData(
@ -1346,8 +1347,6 @@ class LoadMetricsTest(unittest.TestCase):
summary = lm.summary()
assert summary.head_ip == "1.1.1.1"
assert summary.usage["CPU"] == (190, 194)
assert summary.usage["GPU"] == (15, 16)
assert summary.usage["memory"] == (500 * 2**20, 1000 * 2**20)
@ -1378,6 +1377,38 @@ class LoadMetricsTest(unittest.TestCase):
# should ever have the same set of resources.
assert len(summary.node_types) == 3, summary.node_types
# Ensure correct dict-conversion
summary_dict = asdict(summary)
assert summary_dict["usage"]["CPU"] == (190, 194)
assert summary_dict["usage"]["GPU"] == (15, 16)
assert summary_dict["usage"]["memory"] == (500 * 2**20, 1000 * 2**20)
assert summary_dict["usage"]["object_store_memory"] == \
(1000 * 2**20, 2000 * 2**20)
assert summary_dict["usage"]["accelerator_type:V100"][1] == 2, \
"Not comparing the usage value due to floating point error."
assert ({"GPU": 2}, 11) in summary_dict["resource_demand"]
assert ({"CPU": 16}, 1) in summary_dict["resource_demand"]
assert ({"CPU": 16, "GPU": 2}, 1) in summary_dict["resource_demand"]
assert len(summary_dict["resource_demand"]) == 3
assert ({
"bundles": [({
"GPU": 2
}, 2)],
"strategy": "PACK"
}, 2) in summary_dict["pg_demand"]
assert len(summary_dict["pg_demand"]) == 1
assert ({"GPU": 8}, 2) in summary_dict["request_demand"]
assert ({"CPU": 64}, 1) in summary_dict["request_demand"]
assert len(summary_dict["request_demand"]) == 2
assert len(summary_dict["node_types"]) == 3, summary_dict["node_types"]
# Ensure summary_dict is json-serializable
json.dumps(summary_dict)
class AutoscalingTest(unittest.TestCase):
def setUp(self):
@ -1493,8 +1524,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
lm = LoadMetrics(local_ip=head_ip)
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1556,6 +1587,22 @@ class AutoscalingTest(unittest.TestCase):
assert summary.failed_nodes == [("172.0.0.4", "m4.4xlarge")]
# Check dict conversion
summary_dict = asdict(summary)
assert summary_dict["active_nodes"]["m4.large"] == 2
assert summary_dict["active_nodes"]["empty_node"] == 1
assert len(
summary_dict["active_nodes"]) == 2, summary_dict["active_nodes"]
assert summary_dict["pending_nodes"] == [("172.0.0.3", "p2.xlarge",
STATUS_WAITING_FOR_SSH)]
assert summary_dict["pending_launches"] == {"m4.16xlarge": 2}
assert summary_dict["failed_nodes"] == [("172.0.0.4", "m4.4xlarge")]
# Ensure summary is json-serializable
json.dumps(summary_dict)
# Make sure we return something (and don't throw exceptions). Let's not
# get bogged down with a full cli test here.
assert len(autoscaler.info_string()) > 1
@ -1572,9 +1619,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -1599,9 +1646,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -1628,8 +1675,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
lm = LoadMetrics(head_ip)
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1698,8 +1745,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1763,8 +1810,8 @@ class AutoscalingTest(unittest.TestCase):
head_ip = self.provider.non_terminated_node_ips({})[0]
self.provider.finish_starting_nodes()
runner = MockProcessRunner()
lm = LoadMetrics(local_ip=head_ip)
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1803,7 +1850,7 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1842,9 +1889,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -1880,9 +1927,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -1926,8 +1973,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -1974,9 +2021,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
lm = LoadMetrics()
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2037,9 +2084,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -2118,8 +2165,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2150,9 +2197,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
@ -2199,9 +2246,9 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
lm = LoadMetrics()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2322,8 +2369,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2438,8 +2485,8 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
lm = LoadMetrics()
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2493,7 +2540,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
autoscaler = MockAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
@ -2543,7 +2590,6 @@ def format_pg(pg):
def test_info_string():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530.0, 544.0),
"GPU": (2, 2),
@ -2612,7 +2658,6 @@ Demands:
def test_info_string_failed_node_cap():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530.0, 544.0),
"GPU": (2, 2),