[autoscaler] Use drain node api in autoscaler before terminating nodes (#20013)

* wip

* Draft

* Use bytest for node id

* remove stray helm change

* fix autoscaler init arg

* don't forget to instantiate new load metrics dict

* remove extraneous diff

* Timeout, comments, function signature.

* typo

* another comment

* tweak

* docstring

* shorter timeout

* Use a better error code

* missing self

* Dedent example

* Add drain node prometheus metric.

* comment

* Update tests part 1: test_autoscaler.py

* Update tests part 2: test_resource_demand_scheduler

* lint

* Update tests part 3: test_autoscaling_policy

* Unit tests for new Prometheus metric and DrainNode error handling.

* comment

* removed unused function

* Try adding ability to mock out process termination to fake node provider

* Add integration test.

* fix

* fix

* lint

* Improve log message

* fix

* Simplify test

* Fix doc example

* remove unused dict

* Mock out process termination in a subclass

* Add add doc string and comment explaining prune active ips.

* Comment: wtf is use_node_id_as_ip

* one more comment

* more explanation

* period

* tweak
This commit is contained in:
Dmitri Gekhtman 2021-11-11 08:31:40 -08:00 committed by GitHub
parent 9fd8c6648c
commit 8971422d8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 621 additions and 83 deletions

View file

@ -58,6 +58,7 @@ To programmatically create a fake multi-node autoscaling cluster and connect to
.. literalinclude:: /../../python/ray/tests/test_autoscaler_fake_multinode.py
:language: python
:dedent: 4
:start-after: __example_begin__
:end-before: __example_end__

View file

@ -12,6 +12,8 @@ import time
import yaml
from enum import Enum
import grpc
try:
from urllib3.exceptions import MaxRetryError
except ImportError:
@ -42,6 +44,8 @@ from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
from ray.autoscaler._private.constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from six.moves import queue
logger = logging.getLogger(__name__)
@ -87,6 +91,7 @@ class StandardAutoscaler:
# TODO(ekl): require config reader to be a callable always.
config_reader: Union[str, Callable[[], dict]],
load_metrics: LoadMetrics,
gcs_node_info_stub: gcs_service_pb2_grpc.NodeInfoGcsServiceStub,
max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH,
max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
max_failures: int = AUTOSCALER_MAX_NUM_FAILURES,
@ -94,7 +99,8 @@ class StandardAutoscaler:
update_interval_s: int = AUTOSCALER_UPDATE_INTERVAL_S,
prefix_cluster_info: bool = False,
event_summarizer: Optional[EventSummarizer] = None,
prom_metrics: Optional[AutoscalerPrometheusMetrics] = None):
prom_metrics: Optional[AutoscalerPrometheusMetrics] = None,
):
"""Create a StandardAutoscaler.
Args:
@ -112,6 +118,8 @@ class StandardAutoscaler:
prefix_cluster_info: Whether to add the cluster name to info strs.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
gcs_node_info_stub: Stub for interactions with Ray nodes via gRPC
request to the GCS. Used to drain nodes before termination.
"""
if isinstance(config_reader, str):
@ -199,6 +207,8 @@ class StandardAutoscaler:
for remote, local in self.config["file_mounts"].items()
}
self.gcs_node_info_stub = gcs_node_info_stub
for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)
logger.info("StandardAutoscaler: {}".format(self.config))
@ -233,6 +243,7 @@ class StandardAutoscaler:
self.last_update_time = now
self.update_worker_list()
# Remove from LoadMetrics the ips unknown to the NodeProvider.
self.load_metrics.prune_active_ips([
self.provider.internal_ip(node_id) for node_id in self.all_workers
])
@ -380,6 +391,7 @@ class StandardAutoscaler:
"""Terminate scheduled nodes and clean associated autoscaler state."""
if not self.nodes_to_terminate:
return
self.drain_nodes_via_gcs(self.nodes_to_terminate)
self.provider.terminate_nodes(self.nodes_to_terminate)
for node in self.nodes_to_terminate:
self.node_tracker.untrack(node)
@ -388,6 +400,82 @@ class StandardAutoscaler:
self.nodes_to_terminate = []
self.update_worker_list()
def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]):
"""Send an RPC request to the GCS to drain (prepare for termination)
the nodes with the given node provider ids.
note: The current implementation of DrainNode on the GCS side is to
de-register and gracefully shut down the Raylets. In the future,
the behavior may change to better reflect the name "Drain."
See https://github.com/ray-project/ray/pull/19350.
"""
# The GCS expects Raylet ids in the request, rather than NodeProvider
# ids. To get the Raylet ids of the nodes to we're draining, we make
# the following translations of identifiers:
# node provider node id -> ip -> raylet id
# Convert node provider node ids to ips.
node_ips = {
self.provider.internal_ip(provider_node_id)
for provider_node_id in provider_node_ids_to_drain
}
# Only attempt to drain connected nodes, i.e. nodes with ips in
# LoadMetrics.
connected_node_ips = (
node_ips & self.load_metrics.raylet_id_by_ip.keys())
# Convert ips to Raylet ids.
# (The assignment ip->raylet_id is well-defined under current
# assumptions. See "use_node_id_as_ip" in monitor.py)
raylet_ids_to_drain = {
self.load_metrics.raylet_id_by_ip[ip]
for ip in connected_node_ips
}
logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).")
try:
request = gcs_service_pb2.DrainNodeRequest(drain_node_data=[
gcs_service_pb2.DrainNodeData(node_id=raylet_id)
for raylet_id in raylet_ids_to_drain
])
# A successful response indicates that the GCS has marked the
# desired nodes as "drained." The cloud provider can then terminate
# the nodes without the GCS printing an error.
response = self.gcs_node_info_stub.DrainNode(request, timeout=5)
# Check if we succeeded in draining all of the intended nodes by
# looking at the RPC response.
drained_raylet_ids = {
status_item.node_id
for status_item in response.drain_node_status
}
failed_to_drain = raylet_ids_to_drain - drained_raylet_ids
if failed_to_drain:
self.prom_metrics.drain_node_exceptions.inc()
logger.error(
f"Failed to drain {len(failed_to_drain)} raylet(s).")
# If we get a gRPC error with an UNIMPLEMENTED code, fail silently.
# This error indicates that the GCS is using Ray version < 1.8.0,
# for which DrainNode is not implemented.
except grpc.RpcError as e:
# If the code is UNIMPLEMENTED, pass.
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
pass
# Otherwise, it's a plane old gRPC error and we should log it.
else:
self.prom_metrics.drain_node_exceptions.inc()
logger.exception(
"Failed to drain Ray nodes. Traceback follows.")
except Exception:
# We don't need to interrupt the autoscaler update with an
# exception, but we should log what went wrong and record the
# failure in Prometheus.
self.prom_metrics.drain_node_exceptions.inc()
logger.exception("Failed to drain Ray nodes. Traceback follows.")
def launch_required_nodes(self, to_launch: Dict[NodeType, int]) -> None:
if to_launch:
for node_type, count in to_launch.items():

View file

@ -107,6 +107,9 @@ class FakeMultiNodeProvider(NodeProvider):
def terminate_node(self, node_id):
node = self._nodes.pop(node_id)["node"]
self._kill_ray_processes(node)
def _kill_ray_processes(self, node):
node.kill_all_processes(check_alive=False, allow_graceful=True)
@staticmethod

View file

@ -76,6 +76,7 @@ class LoadMetrics:
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.raylet_id_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address(
) if local_ip is None else local_ip
@ -87,6 +88,7 @@ class LoadMetrics:
def update(self,
ip: str,
raylet_id: bytes,
static_resources: Dict[str, Dict],
dynamic_resources: Dict[str, Dict],
resource_load: Dict[str, Dict],
@ -96,6 +98,7 @@ class LoadMetrics:
cluster_full_of_actors_detected: bool = False):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
self.raylet_id_by_ip[ip] = raylet_id
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected
if not waiting_bundles:
@ -133,28 +136,37 @@ class LoadMetrics:
def is_active(self, ip):
return ip in self.last_heartbeat_time_by_ip
def prune_active_ips(self, active_ips):
def prune_active_ips(self, active_ips: List[str]):
"""The Raylet ips stored by LoadMetrics are obtained by polling
the GCS in Monitor.update_load_metrics().
On the other hand, the autoscaler gets a list of node ips from
its NodeProvider.
This method removes from LoadMetrics the ips unknown to the autoscaler.
Args:
active_ips (List[str]): The node ips known to the autoscaler.
"""
active_ips = set(active_ips)
active_ips.add(self.local_ip)
def prune(mapping, should_log):
unwanted = set(mapping) - active_ips
for unwanted_key in unwanted:
unwanted_ips = set(mapping) - active_ips
for unwanted_ip in unwanted_ips:
if should_log:
logger.info("LoadMetrics: "
"Removed mapping: {} - {}".format(
unwanted_key, mapping[unwanted_key]))
del mapping[unwanted_key]
if unwanted and should_log:
# TODO (Alex): Change this back to info after #12138.
logger.info("LoadMetrics: " f"Removed ip: {unwanted_ip}.")
del mapping[unwanted_ip]
if unwanted_ips and should_log:
logger.info(
"LoadMetrics: "
"Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
assert not (unwanted & set(mapping))
len(unwanted_ips), unwanted_ips, active_ips))
assert not (unwanted_ips & set(mapping))
prune(self.last_used_time_by_ip, should_log=True)
prune(self.static_resources_by_ip, should_log=False)
prune(self.raylet_id_by_ip, should_log=False)
prune(self.dynamic_resources_by_ip, should_log=False)
prune(self.resource_load_by_ip, should_log=False)
prune(self.last_heartbeat_time_by_ip, should_log=False)

View file

@ -156,6 +156,8 @@ class Monitor:
# TODO: Use gcs client for this
self.gcs_node_resources_stub = \
gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel)
self.gcs_node_info_stub = \
gcs_service_pb2_grpc.NodeInfoGcsServiceStub(gcs_channel)
# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
@ -219,6 +221,7 @@ class Monitor:
self.autoscaler = StandardAutoscaler(
autoscaling_config,
self.load_metrics,
self.gcs_node_info_stub,
prefix_cluster_info=self.prefix_cluster_info,
event_summarizer=self.event_summarizer,
prom_metrics=self.prom_metrics)
@ -242,11 +245,11 @@ class Monitor:
mirror_node_types = {}
cluster_full = False
for resource_message in resources_batch_data.batch:
node_id = resource_message.node_id
# Generate node type config based on GCS reported node list.
if self.readonly_config:
# Keep prefix in sync with ReadonlyNodeProvider.
node_type = format_readonly_node_type(
resource_message.node_id.hex())
node_type = format_readonly_node_type(node_id.hex())
resources = {}
for k, v in resource_message.resources_total.items():
resources[k] = v
@ -272,18 +275,23 @@ class Monitor:
use_node_id_as_ip = (self.autoscaler is not None
and self.autoscaler.config["provider"].get(
"use_node_id_as_ip", False))
# "use_node_id_as_ip" is a hack meant to address situations in
# which there's more than one Ray node residing at a given ip.
# TODO (Dmitri): Stop using ips as node identifiers.
# https://github.com/ray-project/ray/issues/19086
if use_node_id_as_ip:
peloton_id = total_resources.get("NODE_ID_AS_RESOURCE")
# Legacy support https://github.com/ray-project/ray/pull/17312
if peloton_id is not None:
ip = str(int(peloton_id))
else:
ip = resource_message.node_id.hex()
ip = node_id.hex()
else:
ip = resource_message.node_manager_address
self.load_metrics.update(ip, total_resources, available_resources,
resource_load, waiting_bundles,
infeasible_bundles,
self.load_metrics.update(ip, node_id, total_resources,
available_resources, resource_load,
waiting_bundles, infeasible_bundles,
pending_placement_groups, cluster_full)
if self.readonly_config:
self.readonly_config["available_node_types"].update(

View file

@ -138,6 +138,13 @@ try:
unit="exceptions",
namespace="autoscaler",
registry=self.registry)
self.drain_node_exceptions: Counter = Counter(
"drain_node_exceptions",
"Number of exceptions raised when making a DrainNode rpc"
"prior to node termination.",
unit="exceptions",
namespace="autoscaler",
registry=self.registry)
except ImportError:
class NullMetric:

View file

@ -1,3 +1,4 @@
import copy
import logging
import json
import yaml
@ -27,20 +28,24 @@ class AutoscalingCluster:
head_resources: resources of the head node, including CPU.
worker_node_types: autoscaler node types config for worker nodes.
"""
self._head_resources = head_resources
self._config = self._generate_config(head_resources, worker_node_types)
self._process = None
def _generate_config(self, head_resources, worker_node_types):
base_config = yaml.safe_load(
open(
os.path.join(
os.path.dirname(ray.__file__),
"autoscaler/_private/fake_multi_node/example.yaml")))
base_config["available_node_types"] = worker_node_types
base_config["available_node_types"]["ray.head.default"] = {
custom_config = copy.deepcopy(base_config)
custom_config["available_node_types"] = worker_node_types
custom_config["available_node_types"]["ray.head.default"] = {
"resources": head_resources,
"node_config": {},
"max_workers": 0,
}
self._head_resources = head_resources
self._config = base_config
self._process = None
return custom_config
def start(self):
"""Start the cluster.

View file

@ -88,6 +88,7 @@ if __name__ == "__main__":
do_link("serve", force=args.yes)
do_link("_private", force=args.yes)
do_link("node.py", force=args.yes)
do_link("cluster_utils.py", force=args.yes)
# Link package's `dashboard` directly to local (repo's) dashboard.
# The repo's `dashboard` is a file, soft-linking to which will not work
# on Mac.

View file

@ -105,6 +105,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_autoscaler_fake_multinode.py", # Temporarily owned by core.
"test_autoscaler_drain_node_api.py",
"test_args.py",
"test_asyncio_cluster.py",
"test_asyncio.py",

View file

@ -1,3 +1,4 @@
from enum import Enum
import json
import jsonschema
import os
@ -17,6 +18,7 @@ from jsonschema.exceptions import ValidationError
from typing import Dict, Callable, List, Optional
import ray
from ray.core.generated import gcs_service_pb2
from ray.autoscaler._private.util import prepare_config, validate_config
from ray.autoscaler._private import commands
from ray.autoscaler.sdk import get_docker_host_mount_location
@ -32,9 +34,115 @@ from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \
NODE_KIND_WORKER, STATUS_UNINITIALIZED, TAG_RAY_CLUSTER_NAME
from ray.autoscaler.node_provider import NodeProvider
from ray._private.test_utils import RayTestTimeoutException
import grpc
import pytest
class DrainNodeOutcome(str, Enum):
"""Potential outcomes of DrainNode calls, each of which is handled
differently by the autoscaler.
"""
# Return a reponse indicating all nodes were succesfully drained.
Succeeded = "Succeeded"
# Return response indicating at least one node failed to be drained.
NotAllDrained = "NotAllDrained"
# Return an unimplemented gRPC error, indicating an old GCS.
Unimplemented = "Unimplemented"
# Raise a generic unexpected RPC error.
GenericRpcError = "GenericRpcError"
# Raise a generic unexpected exception.
GenericException = "GenericException"
class MockRpcException(grpc.RpcError):
"""Mock RpcError with a specified status code.
Note (Dmitri): It might be possible to do this already with standard tools
in the `grpc` module, but how wasn't immediately obvious to me.
"""
def __init__(self, status_code: grpc.StatusCode):
self.status_code = status_code
def code(self):
return self.status_code
class MockNodeInfoStub():
"""Mock for GCS node info stub used by autoscaler to drain Ray nodes.
Can simulate DrainNode failures via the `drain_node_outcome` parameter.
Comments in DrainNodeOutcome enum class indicate the behavior for each
outcome.
"""
def __init__(self, drain_node_outcome=DrainNodeOutcome.Succeeded):
self.drain_node_outcome = drain_node_outcome
# Tracks how many times we've called DrainNode.
self.drain_node_call_count = 0
# Tracks how many times DrainNode returned a successful RPC response.
self.drain_node_reply_success = 0
def DrainNode(self, drain_node_request, timeout: int):
"""Simulate NodeInfo stub's DrainNode call.
Outcome determined by self.drain_outcome.
"""
self.drain_node_call_count += 1
if self.drain_node_outcome == DrainNodeOutcome.Unimplemented:
raise MockRpcException(status_code=grpc.StatusCode.UNIMPLEMENTED)
elif self.drain_node_outcome == DrainNodeOutcome.GenericRpcError:
# Any StatusCode besides UNIMPLEMENTED will do here.
raise MockRpcException(status_code=grpc.StatusCode.UNAVAILABLE)
elif self.drain_node_outcome == DrainNodeOutcome.GenericException:
raise Exception("DrainNode failed in some unexpected way.")
node_ids_to_drain = [
data_item.node_id
for data_item in drain_node_request.drain_node_data
]
ok_gcs_status = gcs_service_pb2.GcsStatus(
code=0, message="Yeah, it's fine.")
all_nodes_drained_status = [
gcs_service_pb2.DrainNodeStatus(node_id=node_id)
for node_id in node_ids_to_drain
]
# All but the last.
not_all_drained_status = all_nodes_drained_status[:-1]
if self.drain_node_outcome == DrainNodeOutcome.Succeeded:
drain_node_status = all_nodes_drained_status
elif self.drain_node_outcome == DrainNodeOutcome.NotAllDrained:
drain_node_status = not_all_drained_status
else:
# Shouldn't land here.
assert False, "Possible drain node outcomes exhausted."
self.drain_node_reply_success += 1
return gcs_service_pb2.DrainNodeReply(
status=ok_gcs_status, drain_node_status=drain_node_status)
def mock_raylet_id() -> bytes:
"""Random raylet id to pass to load_metrics.update.
"""
return os.urandom(10)
def fill_in_raylet_ids(provider, load_metrics) -> None:
"""Raylet ids for each ip are usually obtained by polling the GCS
in monitor.py. For test purposes, we sometimes need to manually fill
these fields with mocks.
"""
for node in provider.non_terminated_nodes({}):
ip = provider.internal_ip(node)
load_metrics.raylet_id_by_ip[ip] = mock_raylet_id()
class MockNode:
def __init__(self, node_id, tags, node_config, node_type,
unique_ips=False):
@ -417,7 +525,7 @@ MULTI_WORKER_CLUSTER = dict(
class LoadMetricsTest(unittest.TestCase):
def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, {})
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
@ -425,10 +533,16 @@ class LoadMetricsTest(unittest.TestCase):
def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", mock_raylet_id(), {
"CPU": 2,
"GPU": 16
}, {
"CPU": 2,
"GPU": 2
}, {})
lm.update(
"3.3.3.3", {
"3.3.3.3", mock_raylet_id(), {
"memory": 1.05 * 1024 * 1024 * 1024,
"object_store_memory": 2.1 * 1024 * 1024 * 1024,
}, {
@ -505,6 +619,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -655,9 +770,11 @@ class AutoscalingTest(unittest.TestCase):
_provider=self.provider,
_runner=runner)
self.waitForNodes(1)
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -719,6 +836,7 @@ class AutoscalingTest(unittest.TestCase):
assert self.provider.node_tags(worker).get(
TAG_RAY_USER_NODE_TYPE) == "ray.worker.old"
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(2)
events = autoscaler.event_summarizer.summary()
@ -1114,6 +1232,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -1138,6 +1257,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(3)
assert mock_metrics.started_nodes.inc.call_count == 0
assert mock_metrics.stopped_nodes.inc.call_count == 0
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
assert len(runner.calls) == 0
events = autoscaler.event_summarizer.summary()
assert not events, events
@ -1152,6 +1272,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -1184,6 +1305,7 @@ class AutoscalingTest(unittest.TestCase):
# The updates failed. Key thing is that the updates completed.
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED})
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testScaleUp(self):
self.ScaleUpHelper(disable_node_updaters=False)
@ -1205,15 +1327,18 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
prom_metrics=mock_metrics)
self.waitForNodes(10)
fill_in_raylet_ids(self.provider, lm)
# Gradually scales down to meet target size, never going too low
for _ in range(10):
autoscaler.update()
@ -1231,8 +1356,59 @@ class AutoscalingTest(unittest.TestCase):
assert mock_metrics.stopped_nodes.inc.call_count == 10
mock_metrics.started_nodes.inc.assert_called_with(5)
assert mock_metrics.worker_create_node_time.observe.call_count == 5
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testDynamicScaling(self):
# Parameterization functionality in the unittest module is not great.
# To test scale-down behavior, we parameterize the DynamicScaling test
# manually over outcomes for the DrainNode RPC call.
def testDynamicScaling1(self):
self.helperDynamicScaling(DrainNodeOutcome.Succeeded)
def testDynamicScaling2(self):
self.helperDynamicScaling(DrainNodeOutcome.NotAllDrained)
def testDynamicScaling3(self):
self.helperDynamicScaling(DrainNodeOutcome.Unimplemented)
def testDynamicScaling4(self):
self.helperDynamicScaling(DrainNodeOutcome.GenericRpcError)
def testDynamicScaling5(self):
self.helperDynamicScaling(DrainNodeOutcome.GenericException)
def helperDynamicScaling(self, drain_node_outcome: DrainNodeOutcome):
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
mock_node_info_stub = MockNodeInfoStub(drain_node_outcome)
# Run the core of the test logic.
self._helperDynamicScaling(mock_metrics, mock_node_info_stub)
# Make assertions about DrainNode error handling during scale-down.
# DrainNode call was made.
assert mock_node_info_stub.drain_node_call_count > 0
if drain_node_outcome == DrainNodeOutcome.Succeeded:
# No drain node exceptions.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Each drain node call succeeded.
assert (mock_node_info_stub.drain_node_reply_success ==
mock_node_info_stub.drain_node_call_count)
elif drain_node_outcome == DrainNodeOutcome.Unimplemented:
# All errors were supressed.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Every call failed.
assert mock_node_info_stub.drain_node_reply_success == 0
elif drain_node_outcome in (DrainNodeOutcome.GenericRpcError,
DrainNodeOutcome.GenericException):
# We encountered an exception.
assert mock_metrics.drain_node_exceptions.inc.call_count > 0
# Every call failed.
assert (mock_metrics.drain_node_exceptions.inc.call_count ==
mock_node_info_stub.drain_node_call_count)
assert mock_node_info_stub.drain_node_reply_success == 0
def _helperDynamicScaling(self, mock_metrics, mock_node_info_stub):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
@ -1243,11 +1419,11 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
mock_node_info_stub,
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1262,6 +1438,7 @@ class AutoscalingTest(unittest.TestCase):
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
@ -1275,7 +1452,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
worker_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {})
autoscaler.update()
self.waitForNodes(
10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
@ -1300,6 +1477,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1335,10 +1513,11 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
lm.local_ip = head_ip
lm.update(head_ip, {"CPU": 1}, {"CPU": 1}, {})
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1361,11 +1540,12 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2) # Still 1 worker because its resources
# aren't known.
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
self.waitForNodes(10) # 9 workers and 1 head node, scaled immediately.
lm.update(
"172.0.0.1", {"CPU": 2}, {"CPU": 2}, {},
"172.0.0.1",
mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {},
waiting_bundles=[{
"CPU": 2
}] * 9,
@ -1400,6 +1580,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1408,7 +1589,8 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(1)
lm.update(
head_ip, {"CPU": 1}, {"CPU": 0}, {},
head_ip,
mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {},
waiting_bundles=[{
"CPU": 1
}] * 7,
@ -1420,7 +1602,8 @@ class AutoscalingTest(unittest.TestCase):
worker_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
lm.update(
worker_ip, {"CPU": 1}, {"CPU": 1}, {},
worker_ip,
mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {},
waiting_bundles=[{
"CPU": 1
}] * 7,
@ -1434,7 +1617,9 @@ class AutoscalingTest(unittest.TestCase):
worker_ips = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )
for ip in worker_ips:
# Mark workers inactive.
lm.last_used_time_by_ip[ip] = 0
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(1) # only the head node
# Make sure they don't get overwritten.
@ -1478,6 +1663,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1487,13 +1673,14 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
# This node has num_cpus=0
lm.update(head_ip, {"CPU": 1}, {"CPU": 0}, {})
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {})
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, {})
autoscaler.update()
self.waitForNodes(2)
# 1 CPU task cannot be scheduled.
lm.update(
unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {},
unmanaged_ip,
mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, {},
waiting_bundles=[{
"CPU": 1
}])
@ -1533,14 +1720,16 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
process_runner=runner,
update_interval_s=0)
lm.update(head_ip, {"CPU": 1}, {"CPU": 0}, {"CPU": 1})
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {})
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0},
{"CPU": 1})
lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, {})
# Note that we shouldn't autoscale here because the resource demand
# vector is not set and target utilization fraction = 1.
@ -1555,9 +1744,11 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
lm,
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1580,6 +1771,7 @@ class AutoscalingTest(unittest.TestCase):
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 1
@ -1595,6 +1787,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=8,
max_failures=0,
@ -1626,6 +1819,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(10)
assert autoscaler.pending_launches.value == 0
mock_metrics.pending_nodes.set.assert_called_with(0)
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testUpdateThrottling(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -1634,6 +1828,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
@ -1655,8 +1850,13 @@ class AutoscalingTest(unittest.TestCase):
def testLaunchConfigChange(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
@ -1665,6 +1865,7 @@ class AutoscalingTest(unittest.TestCase):
new_config["worker_nodes"]["InstanceType"] = "updated"
self.write_config(new_config)
self.provider.ready_to_create.clear()
fill_in_raylet_ids(self.provider, lm)
for _ in range(5):
autoscaler.update()
self.waitForNodes(0)
@ -1682,11 +1883,12 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm = LoadMetrics()
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_launch_batch=10,
max_concurrent_launches=10,
process_runner=runner,
@ -1718,10 +1920,11 @@ class AutoscalingTest(unittest.TestCase):
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
# Because one worker already started, the scheduler waits for its
# resources to be updated before it launches the remaining min_workers.
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {})
autoscaler.update()
self.waitForNodes(
10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testMaxFailures(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -1732,6 +1935,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=2,
process_runner=runner,
update_interval_s=0,
@ -1742,6 +1946,7 @@ class AutoscalingTest(unittest.TestCase):
assert mock_metrics.update_loop_exceptions.inc.call_count == 2
with pytest.raises(Exception):
autoscaler.update()
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testLaunchNewNodeOnOutOfBandTerminate(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -1751,6 +1956,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1771,6 +1977,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1789,9 +1996,11 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner(fail_cmds=["setup_cmd"])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1799,6 +2008,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
try:
self.waitForNodes(
@ -1830,6 +2040,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1883,6 +2094,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
max_concurrent_launches=13,
max_launch_batch=13,
@ -1905,6 +2117,7 @@ class AutoscalingTest(unittest.TestCase):
config["available_node_types"]["p2.xlarge"]["min_workers"] = 6 # 5
config["available_node_types"]["p2.xlarge"]["max_workers"] = 6
self.write_config(config)
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert autoscaler.pending_launches.value == 0
@ -1944,10 +2157,11 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1966,14 +2180,16 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
lm.update(
"172.0.0.1", {"CPU": 2}, {"CPU": 0}, {},
"172.0.0.1",
mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=2 * [{
"CPU": 2
}])
autoscaler.update()
self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
lm.update(
"172.0.0.2", {"CPU": 2}, {"CPU": 0}, {},
"172.0.0.2",
mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=3 * [{
"CPU": 2
}])
@ -1981,8 +2197,8 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(5, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Holds steady when load is removed
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.2", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.pending_launches.value == 0
assert len(
@ -2006,6 +2222,7 @@ class AutoscalingTest(unittest.TestCase):
})) == 4
lm.last_used_time_by_ip["172.0.0.3"] = 0
lm.last_used_time_by_ip["172.0.0.4"] = 0
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
assert autoscaler.pending_launches.value == 0
# 2 nodes and not 1 because 1 is needed for min_worker and the other 1
@ -2040,6 +2257,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2054,7 +2272,9 @@ class AutoscalingTest(unittest.TestCase):
head_ip = self.provider.non_terminated_node_ips({})[0]
lm.local_ip = head_ip
lm.update(
head_ip, {"CPU": 2}, {"CPU": 1}, {}, waiting_bundles=[{
head_ip,
mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, {},
waiting_bundles=[{
"CPU": 1
}]) # head
# The headnode should be sufficient for now
@ -2063,7 +2283,9 @@ class AutoscalingTest(unittest.TestCase):
# Requires 1 more worker as the head node is fully used.
lm.update(
head_ip, {"CPU": 2}, {"CPU": 0}, {}, waiting_bundles=[{
head_ip,
mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=[{
"CPU": 1
}])
autoscaler.update()
@ -2071,7 +2293,8 @@ class AutoscalingTest(unittest.TestCase):
worker_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
lm.update(
worker_ip, {"CPU": 1}, {"CPU": 1}, {},
worker_ip,
mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, {},
waiting_bundles=[{
"CPU": 1
}] * 7,
@ -2087,6 +2310,7 @@ class AutoscalingTest(unittest.TestCase):
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )
for ip in worker_ips:
lm.last_used_time_by_ip[ip] = 0
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(1) # only the head node
assert len(self.provider.non_terminated_nodes({})) == 1
@ -2101,6 +2325,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -2131,6 +2356,7 @@ class AutoscalingTest(unittest.TestCase):
assert ("Restarting 1 nodes of type "
"ray-legacy-worker-node-type (lost contact with raylet)." in
events), events
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testTerminateUnhealthyWorkers(self):
"""Test termination of unhealthy workers, when
@ -2147,6 +2373,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -2171,6 +2398,7 @@ class AutoscalingTest(unittest.TestCase):
# Reduce min_workers to 1
autoscaler.config["available_node_types"][NODE_TYPE_LEGACY_WORKER][
"min_workers"] = 1
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
# Stopped node metric incremented.
mock_metrics.stopped_nodes.inc.assert_called_once_with()
@ -2187,6 +2415,7 @@ class AutoscalingTest(unittest.TestCase):
# No additional runner calls, since updaters were disabled.
time.sleep(1)
assert len(runner.calls) == num_calls
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testTerminateUnhealthyWorkers2(self):
"""Tests finer details of termination of unhealthy workers when
@ -2205,6 +2434,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -2232,12 +2462,14 @@ class AutoscalingTest(unittest.TestCase):
# Mark nodes unhealthy.
for ip in ips:
lm.last_heartbeat_time_by_ip[ip] = 0
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
# Unhealthy nodes are gone.
self.waitForNodes(0)
autoscaler.update()
# IPs pruned
assert lm.last_heartbeat_time_by_ip == {}
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testExternalNodeScaler(self):
config = SMALL_CLUSTER.copy()
@ -2247,7 +2479,11 @@ class AutoscalingTest(unittest.TestCase):
}
config_path = self.write_config(config)
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
update_interval_s=0)
assert isinstance(autoscaler.provider, NodeProvider)
def testLegacyExternalNodeScalerMissingFields(self):
@ -2281,7 +2517,10 @@ class AutoscalingTest(unittest.TestCase):
invalid_provider = self.write_config(config)
with pytest.raises(ImportError):
StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0)
invalid_provider,
LoadMetrics(),
MockNodeInfoStub(),
update_interval_s=0)
def testExternalNodeScalerWrongModuleFormat(self):
config = SMALL_CLUSTER.copy()
@ -2292,7 +2531,10 @@ class AutoscalingTest(unittest.TestCase):
invalid_provider = self.write_config(config, call_prepare_config=False)
with pytest.raises(ValueError):
StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0)
invalid_provider,
LoadMetrics(),
MockNodeInfoStub(),
update_interval_s=0)
def testSetupCommandsWithNoNodeCaching(self):
config = SMALL_CLUSTER.copy()
@ -2306,6 +2548,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2350,6 +2593,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2419,6 +2663,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2497,6 +2742,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2549,6 +2795,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2604,6 +2851,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2650,6 +2898,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2685,6 +2934,7 @@ MemAvailable: 33000000 kB
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2711,6 +2961,7 @@ MemAvailable: 33000000 kB
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2819,6 +3070,7 @@ MemAvailable: 33000000 kB
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -2887,6 +3139,7 @@ MemAvailable: 33000000 kB
assert not self.provider.is_terminated(1),\
"Node one terminated prematurely."
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
# Failed updates processed are now processed.
assert autoscaler.num_failed_updates[0] == 1,\
@ -2909,6 +3162,7 @@ MemAvailable: 33000000 kB
"ray.worker.default (launch failed)." not in events), events
# Should get two new nodes after the next update.
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(2)
autoscaler.update_worker_list()
@ -2916,6 +3170,7 @@ MemAvailable: 33000000 kB
"Unexpected node_ids"
assert mock_metrics.stopped_nodes.inc.call_count == 1
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testProviderException(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -2926,6 +3181,7 @@ MemAvailable: 33000000 kB
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
@ -2946,6 +3202,7 @@ MemAvailable: 33000000 kB
self.waitFor(
metrics_incremented, fail_msg="Expected metrics to update")
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testDefaultMinMaxWorkers(self):
config = copy.deepcopy(MOCK_DEFAULT_CONFIG)

View file

@ -0,0 +1,109 @@
import platform
import logging
import time
import pytest
import ray
from ray.autoscaler._private.fake_multi_node.node_provider import\
FakeMultiNodeProvider
from ray.cluster_utils import AutoscalingCluster
import ray.ray_constants as ray_constants
from ray._private.test_utils import get_error_message, init_error_pubsub
logger = logging.getLogger(__name__)
class MockFakeProvider(FakeMultiNodeProvider):
"""FakeMultiNodeProvider, with Ray node process termination mocked out.
Used to check that a Ray node can be terminated by DrainNode API call
from the autoscaler.
"""
def _kill_ray_processes(self, node):
logger.info("Leaving Raylet termination to autoscaler Drain API!")
class MockAutoscalingCluster(AutoscalingCluster):
"""AutoscalingCluster modified to used the above MockFakeProvider.
"""
def _generate_config(self, head_resources, worker_node_types):
config = super()._generate_config(head_resources, worker_node_types)
config["provider"]["type"] = "external"
config["provider"]["module"] = (
"ray.tests"
".test_autoscaler_drain_node_api.MockFakeProvider")
return config
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_drain_api(shutdown_only):
"""E2E test of the autoscaler's use of the DrainNode API.
Adapted from test_autoscaler_fake_multinode.py.
The strategy is to mock out Ray node process termination in
FakeMultiNodeProvider, leaving node termination to the DrainNode API.
Scale-down is verified by `ray.cluster_resources`. It is verified that
no removed_node errors are issued adter scale-down.
Validity of this test depends on the current implementation of DrainNode.
DrainNode currently works by asking the GCS to de-register and shut down
Ray nodes.
"""
# Autoscaling cluster with Ray process termination mocked out in the node
# provider.
cluster = MockAutoscalingCluster(
head_resources={"CPU": 1},
worker_node_types={
"gpu_node": {
"resources": {
"CPU": 1,
"GPU": 1,
"object_store_memory": 1024 * 1024 * 1024,
},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
})
try:
cluster.start()
ray.init("auto")
# Triggers the addition of a GPU node.
@ray.remote(num_gpus=1)
def f():
print("gpu ok")
ray.get(f.remote())
# Verify scale-up
assert ray.cluster_resources().get("GPU", 0) == 1
# Sleep for double the idle timeout of 6 seconds.
time.sleep(12)
# Verify scale-down
assert ray.cluster_resources().get("GPU", 0) == 0
# Check that no errors were raised while draining nodes.
# (Logic copied from test_failure4::test_gcs_drain.)
try:
p = init_error_pubsub()
errors = get_error_message(
p, 1, ray_constants.REMOVED_NODE_ERROR, timeout=5)
assert len(errors) == 0
finally:
p.close()
finally:
cluster.shutdown()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -10,7 +10,8 @@ import unittest
import pytest
import ray
from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
from ray.tests.test_autoscaler import (MockProvider, MockProcessRunner,
MockNodeInfoStub, mock_raylet_id)
from ray.tests.test_resource_demand_scheduler import MULTI_WORKER_CLUSTER
from ray.autoscaler._private.providers import (
_NODE_PROVIDERS,
@ -74,6 +75,7 @@ class Node:
self.in_cluster = in_cluster
self.node_type = node_type
self.start_time = start_time
self.raylet_id = mock_raylet_id()
def bundle_fits(self, bundle):
if not self.in_cluster:
@ -181,6 +183,7 @@ class Simulator:
self.autoscaler = StandardAutoscaler(
self.config_path,
self.load_metrics,
MockNodeInfoStub(),
# Don't let the autoscaler start any node launchers. Instead, we
# will launch nodes ourself after every update call.
max_concurrent_launches=0,
@ -353,6 +356,7 @@ class Simulator:
continue
self.load_metrics.update(
ip=ip,
raylet_id=node.raylet_id,
static_resources=node.total_resources,
dynamic_resources=node.available_resources,
resource_load={},

View file

@ -72,7 +72,8 @@ _AUTOSCALER_METRICS = [
"autoscaler_worker_update_time", "autoscaler_updating_nodes",
"autoscaler_successful_updates", "autoscaler_failed_updates",
"autoscaler_failed_create_nodes", "autoscaler_recovering_nodes",
"autoscaler_successful_recoveries", "autoscaler_failed_recoveries"
"autoscaler_successful_recoveries", "autoscaler_failed_recoveries",
"autoscaler_drain_node_exceptions"
]

View file

@ -12,7 +12,8 @@ import ray
import ray.ray_constants
from ray.autoscaler._private.util import prepare_config, format_info_string
from ray.tests.test_autoscaler import SMALL_CLUSTER, MOCK_DEFAULT_CONFIG, \
MULTI_WORKER_CLUSTER, TYPES_A, MockProvider, MockProcessRunner
MULTI_WORKER_CLUSTER, TYPES_A, MockProvider, MockProcessRunner, \
MockNodeInfoStub, mock_raylet_id, fill_in_raylet_ids
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
_clear_provider_cache)
from ray.autoscaler._private.autoscaler import StandardAutoscaler, \
@ -1191,9 +1192,11 @@ def test_handle_legacy_cluster_config_yaml():
worker_ips = []
for ip in ips:
if ip == head_ip:
lm.update(ip, head_resources, head_resources, {})
lm.update(ip, mock_raylet_id(), head_resources, head_resources,
{})
else:
lm.update(ip, worker_resources, worker_resources, {})
lm.update(ip, mock_raylet_id(), worker_resources,
worker_resources, {})
worker_ips.append(ip)
assert not scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]
@ -1246,7 +1249,8 @@ class LoadMetricsTest(unittest.TestCase):
def testResourceDemandVector(self):
lm = LoadMetrics()
lm.update(
"1.1.1.1", {"CPU": 2}, {"CPU": 1}, {},
"1.1.1.1",
mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, {},
waiting_bundles=[{
"GPU": 1
}],
@ -1272,7 +1276,8 @@ class LoadMetricsTest(unittest.TestCase):
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
lm.update(
"1.1.1.1", {}, {}, {},
"1.1.1.1",
mock_raylet_id(), {}, {}, {},
pending_placement_groups=pending_placement_groups)
assert lm.get_pending_placement_groups() == pending_placement_groups
@ -1291,6 +1296,7 @@ class LoadMetricsTest(unittest.TestCase):
]
lm.update(
"1.1.1.1",
mock_raylet_id(),
{
"CPU": 64,
"memory": 1000 * 1024 * 1024,
@ -1302,7 +1308,7 @@ class LoadMetricsTest(unittest.TestCase):
"object_store_memory": 1000 * 1024 * 1024,
},
{})
lm.update("1.1.1.2", {
lm.update("1.1.1.2", mock_raylet_id(), {
"CPU": 64,
"GPU": 8,
"accelerator_type:V100": 1,
@ -1311,7 +1317,7 @@ class LoadMetricsTest(unittest.TestCase):
"GPU": 1,
"accelerator_type:V100": 1,
}, {})
lm.update("1.1.1.3", {
lm.update("1.1.1.3", mock_raylet_id(), {
"CPU": 64,
"GPU": 8,
"accelerator_type:V100": 1
@ -1321,7 +1327,8 @@ class LoadMetricsTest(unittest.TestCase):
"accelerator_type:V100": 0.92
}, {})
lm.update(
"1.1.1.4", {"CPU": 2}, {"CPU": 2}, {},
"1.1.1.4",
mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, {},
waiting_bundles=[{
"GPU": 2
}] * 10,
@ -1490,6 +1497,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
max_launch_batch=1,
max_concurrent_launches=10,
@ -1500,9 +1508,9 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(3)
for ip in self.provider.non_terminated_node_ips({}):
lm.update(ip, {"CPU": 2}, {"CPU": 0}, {})
lm.update(ip, mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, {})
lm.update(head_ip, {"CPU": 16}, {"CPU": 1}, {})
lm.update(head_ip, mock_raylet_id(), {"CPU": 16}, {"CPU": 1}, {})
autoscaler.update()
while True:
@ -1516,7 +1524,9 @@ class AutoscalingTest(unittest.TestCase):
runner.ready_to_run.clear()
lm.update(
head_ip, {"CPU": 16}, {"CPU": 1}, {}, waiting_bundles=[{
head_ip,
mock_raylet_id(), {"CPU": 16}, {"CPU": 1}, {},
waiting_bundles=[{
"GPU": 1
}])
@ -1565,6 +1575,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1591,6 +1602,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1620,6 +1632,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1647,7 +1660,8 @@ class AutoscalingTest(unittest.TestCase):
"GPU_group_6c2506ac733bc37496295b02c4fad446": 0.0101
}]
lm.update(
head_ip, {"CPU": 16}, {"CPU": 16}, {},
head_ip,
mock_raylet_id(), {"CPU": 16}, {"CPU": 16}, {},
infeasible_bundles=placement_group_resource_demands,
waiting_bundles=[{
"GPU": 8
@ -1688,6 +1702,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1716,6 +1731,7 @@ class AutoscalingTest(unittest.TestCase):
# min workers.
for node_id in self.provider.non_terminated_nodes({}):
lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(3)
@ -1751,16 +1767,18 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(1)
lm.update(head_ip, {"CPU": 4, "GPU": 1}, {}, {})
lm.update(head_ip, mock_raylet_id(), {"CPU": 4, "GPU": 1}, {}, {})
self.waitForNodes(1)
lm.update(
head_ip, {
head_ip,
mock_raylet_id(), {
"CPU": 4,
"GPU": 1
}, {"GPU": 0}, {},
@ -1788,6 +1806,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1826,6 +1845,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1863,6 +1883,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1909,6 +1930,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -1917,7 +1939,8 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.update()
lm.update(
"1.2.3.4", {}, {}, {},
"1.2.3.4",
mock_raylet_id(), {}, {}, {},
waiting_bundles=[{
"GPU": 1
}],
@ -1952,10 +1975,11 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2016,6 +2040,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2093,9 +2118,11 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2106,6 +2133,7 @@ class AutoscalingTest(unittest.TestCase):
config["available_node_types"]["m4.large"]["node_config"][
"field_changed"] = 1
config_path = self.write_config(config)
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
@ -2125,6 +2153,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2175,6 +2204,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2195,6 +2225,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
lm.update(
node_ip,
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
@ -2216,6 +2247,7 @@ class AutoscalingTest(unittest.TestCase):
}])
lm.update(
node_ip,
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"], {}, {},
waiting_bundles=[{
"CPU": 0.2,
@ -2225,6 +2257,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
lm.update(
node_ip,
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
@ -2240,6 +2273,7 @@ class AutoscalingTest(unittest.TestCase):
node_id].state == "unterminatable"
lm.update(
"172.0.0.2",
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
@ -2292,6 +2326,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2311,6 +2346,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
lm.update(
node_ip,
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
@ -2338,6 +2374,7 @@ class AutoscalingTest(unittest.TestCase):
}] * 3)
lm.update(
node_ip,
mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"], {}, {},
waiting_bundles=[{
"CPU": 0.2,
@ -2347,15 +2384,15 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.load_metrics.set_resource_requests([])
lm.update("172.0.0.2",
lm.update("172.0.0.2", mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
lm.update("172.0.0.3",
lm.update("172.0.0.3", mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
lm.update(node_ip,
lm.update(node_ip, mock_raylet_id(),
config["available_node_types"]["def_worker"]["resources"],
{}, {})
print("============ Should scale down from here =============",
@ -2405,6 +2442,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
@ -2458,11 +2496,13 @@ class AutoscalingTest(unittest.TestCase):
autoscaler = StandardAutoscaler(
config_path,
lm,
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
lm.update(
"127.0.0.0", {
"127.0.0.0",
mock_raylet_id(), {
"CPU": 2,
"GPU": 1
}, {"CPU": 2}, {},
@ -2477,7 +2517,8 @@ class AutoscalingTest(unittest.TestCase):
# 1 head, 1 worker.
self.waitForNodes(2)
lm.update(
"127.0.0.0", {
"127.0.0.0",
mock_raylet_id(), {
"CPU": 2,
"GPU": 1
}, {"CPU": 2}, {},