[autoscaler] Additional Autoscaler Metrics (#16198)

This commit is contained in:
Chris K. W 2021-06-04 23:19:17 -07:00 committed by GitHub
parent e5fad4bc2d
commit 2e11ac678f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 153 additions and 12 deletions

View file

@ -269,8 +269,15 @@ class StandardAutoscaler:
if completed_nodes:
failed_nodes = []
for node_id in completed_nodes:
if self.updaters[node_id].exitcode == 0:
updater = self.updaters[node_id]
if updater.exitcode == 0:
self.num_successful_updates[node_id] += 1
self.prom_metrics.successful_updates.inc()
if updater.for_recovery:
self.prom_metrics.successful_recoveries.inc()
if updater.update_time:
self.prom_metrics.worker_update_time.observe(
updater.update_time)
# Mark the node as active to prevent the node recovery
# logic immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(
@ -278,6 +285,9 @@ class StandardAutoscaler:
else:
failed_nodes.append(node_id)
self.num_failed_updates[node_id] += 1
self.prom_metrics.failed_updates.inc()
if updater.for_recovery:
self.prom_metrics.failed_recoveries.inc()
self.node_tracker.untrack(node_id)
del self.updaters[node_id]
@ -332,6 +342,12 @@ class StandardAutoscaler:
for node_id in nodes:
self.recover_if_needed(node_id, now)
self.prom_metrics.updating_nodes.set(len(self.updaters))
num_recovering = 0
for updater in self.updaters.values():
if updater.for_recovery:
num_recovering += 1
self.prom_metrics.recovering_nodes.set(num_recovering)
logger.info(self.info_string())
legacy_log_info_string(self, nodes)
@ -621,7 +637,8 @@ class StandardAutoscaler:
use_internal_ip=True,
is_head_node=False,
docker_config=self.config.get("docker"),
node_resources=self._node_resources(node_id))
node_resources=self._node_resources(node_id),
for_recovery=True)
updater.start()
self.updaters[node_id] = updater

View file

@ -2,6 +2,7 @@ from typing import Any, Optional, Dict
import copy
import logging
import threading
import time
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME,
@ -60,7 +61,15 @@ class NodeLauncher(threading.Thread):
if node_type:
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
node_config.update(launch_config)
launch_start_time = time.time()
self.provider.create_node(node_config, node_tags, count)
launch_time = time.time() - launch_start_time
for _ in range(count):
# Note: when launching multiple nodes we observe the time it
# took all nodes to launch for each node. For example, if 4
# nodes were created in 25 seconds, we would observe the 25
# second create time 4 times.
self.prom_metrics.worker_create_node_time.observe(launch_time)
self.prom_metrics.started_nodes.inc(count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
@ -74,6 +83,7 @@ class NodeLauncher(threading.Thread):
self._launch_node(config, count, node_type)
except Exception:
self.prom_metrics.node_launch_exceptions.inc()
self.prom_metrics.failed_create_nodes.inc(count)
logger.exception("Launch failed")
finally:
self.pending.dec(node_type, count)

View file

@ -2,6 +2,7 @@ from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
)
@ -11,6 +12,37 @@ class AutoscalerPrometheusMetrics:
def __init__(self, registry: CollectorRegistry = None):
self.registry: CollectorRegistry = registry or \
CollectorRegistry(auto_describe=True)
# Buckets: 5 seconds, 10 seconds, 20 seconds, 30 seconds,
# 45 seconds, 1 minute, 1.5 minutes, 2 minutes,
# 3 minutes, 4 minutes, 5 minutes, 6 minutes,
# 8 minutes, 10 minutes, 12 minutes, 15 minutes
# 20 minutes, 25 minutes, 30 minutes
# used for both worker launch time and worker update time
histogram_buckets = [
5, 10, 20, 30, 45, 60, 90, 120, 180, 240, 300, 360, 480, 600, 720,
900, 1200, 1500, 1800
]
self.worker_create_node_time: Histogram = Histogram(
"worker_create_node_time_seconds",
"Worker launch time. This is the time it takes for a call to "
"a node provider's create_node method to return. Note that "
"when nodes are launched in batches, the launch time for that "
"batch will be observed once for *each* node in that batch. For "
"example, if 8 nodes are launched in 3 minutes, a launch time "
"of 3 minutes will be observed 8 times.",
unit="seconds",
namespace="autoscaler",
registry=self.registry,
buckets=histogram_buckets)
self.worker_update_time: Histogram = Histogram(
"worker_update_time_seconds",
"Worker update time. This is the time between when an updater "
"thread begins executing and when it exits successfully. This "
"metric only observes times for successful updates.",
unit="seconds",
namespace="autoscaler",
registry=self.registry,
buckets=histogram_buckets)
self.pending_nodes: Gauge = Gauge(
"pending_nodes",
"Number of nodes pending to be started.",
@ -29,12 +61,55 @@ class AutoscalerPrometheusMetrics:
unit="nodes",
namespace="autoscaler",
registry=self.registry)
self.updating_nodes: Gauge = Gauge(
"updating_nodes",
"Number of nodes in the process of updating.",
unit="nodes",
namespace="autoscaler",
registry=self.registry)
self.recovering_nodes: Gauge = Gauge(
"recovering_nodes",
"Number of nodes in the process of recovering.",
unit="nodes",
namespace="autoscaler",
registry=self.registry)
self.running_workers: Gauge = Gauge(
"running_workers",
"Number of worker nodes running.",
unit="nodes",
namespace="autoscaler",
registry=self.registry)
self.failed_create_nodes: Counter = Counter(
"failed_create_nodes",
"Number of nodes that failed to be created due to an exception "
"in the node provider's create_node method.",
unit="nodes",
namespace="autoscaler",
registry=self.registry)
self.failed_updates: Counter = Counter(
"failed_updates",
"Number of failed worker node updates.",
unit="updates",
namespace="autoscaler",
registry=self.registry)
self.successful_updates: Counter = Counter(
"successful_updates",
"Number of succesfful worker node updates.",
unit="updates",
namespace="autoscaler",
registry=self.registry)
self.failed_recoveries: Counter = Counter(
"failed_recoveries",
"Number of failed node recoveries.",
unit="recoveries",
namespace="autoscaler",
registry=self.registry)
self.successful_recoveries: Counter = Counter(
"successful_recoveries",
"Number of successful node recoveries.",
unit="recoveries",
namespace="autoscaler",
registry=self.registry)
self.update_loop_exceptions: Counter = Counter(
"update_loop_exceptions",
"Number of exceptions raised in the update loop of the "

View file

@ -50,6 +50,8 @@ class NodeUpdater:
or external ip.
docker_config: Docker section of autoscaler yaml
restart_only: Whether to skip setup commands & just restart ray
for_recovery: True if updater is for a recovering node. Only used for
metric tracking.
"""
def __init__(self,
@ -71,7 +73,8 @@ class NodeUpdater:
process_runner=subprocess,
use_internal_ip=False,
docker_config=None,
restart_only=False):
restart_only=False,
for_recovery=False):
self.log_prefix = "NodeUpdater: {}: ".format(node_id)
use_internal_ip = (use_internal_ip
@ -110,8 +113,11 @@ class NodeUpdater:
self.is_head_node = is_head_node
self.docker_config = docker_config
self.restart_only = restart_only
self.update_time = None
self.for_recovery = for_recovery
def run(self):
update_start_time = time.time()
if cmd_output_util.does_allow_interactive(
) and cmd_output_util.is_output_redirected():
# this is most probably a bug since the user has no control
@ -159,6 +165,7 @@ class NodeUpdater:
self.provider.set_node_tags(self.node_id, tags_to_set)
cli_logger.labeled_value("New status", STATUS_UP_TO_DATE)
self.update_time = time.time() - update_start_time
self.exitcode = 0
def sync_file_mounts(self, sync_cmd, step_numbers=(0, 2)):

View file

@ -897,6 +897,7 @@ class AutoscalingTest(unittest.TestCase):
# started_nodes metric should have been incremented by 2
assert mock_metrics.started_nodes.inc.call_count == 1
mock_metrics.started_nodes.inc.assert_called_with(2)
assert mock_metrics.worker_create_node_time.observe.call_count == 2
autoscaler.update()
self.waitForNodes(2)
@ -943,6 +944,7 @@ class AutoscalingTest(unittest.TestCase):
"ray-legacy-worker-node-type (outdated)." in events), events
assert mock_metrics.stopped_nodes.inc.call_count == 10
mock_metrics.started_nodes.inc.assert_called_with(5)
assert mock_metrics.worker_create_node_time.observe.call_count == 5
def testDynamicScaling(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -1728,12 +1730,14 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
update_interval_s=0,
prom_metrics=mock_metrics)
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
@ -1747,9 +1751,11 @@ class AutoscalingTest(unittest.TestCase):
time.sleep(0.05)
autoscaler.update()
assert not autoscaler.updaters
mock_metrics.recovering_nodes.set.assert_called_with(0)
num_calls = len(runner.calls)
lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
mock_metrics.recovering_nodes.set.assert_called_with(1)
self.waitFor(lambda: len(runner.calls) > num_calls, num_retries=150)
# Check the node removal event is generated.
@ -2327,10 +2333,16 @@ MemAvailable: 33000000 kB
if autoscaler.updaters:
time.sleep(0.05)
autoscaler.update()
assert not autoscaler.updaters
lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0
lm.last_heartbeat_time_by_ip["172.0.0.1"] = 0
# Expect both updates to be successful, no nodes in updating state
assert mock_metrics.successful_updates.inc.call_count == 2
assert mock_metrics.worker_update_time.observe.call_count == 2
mock_metrics.updating_nodes.set.assert_called_with(0)
assert not autoscaler.updaters
# Set up process runner to terminate worker 0 during missed heartbeat
# recovery and also cause the updater to fail.
def terminate_worker_zero():
@ -2339,9 +2351,14 @@ MemAvailable: 33000000 kB
autoscaler.process_runner = MockProcessRunner(
fail_cmds=["ray_start_cmd"],
cmd_to_callback={"ray_start_cmd": terminate_worker_zero})
# ensures that no updates are completed until after the next call
# to update()
autoscaler.process_runner.ready_to_run.clear()
num_calls = len(autoscaler.process_runner.calls)
autoscaler.update()
mock_metrics.updating_nodes.set.assert_called_with(2)
mock_metrics.recovering_nodes.set.assert_called_with(2)
autoscaler.process_runner.ready_to_run.set()
# Wait for updaters spawned by last autoscaler update to finish.
self.waitFor(
lambda: all(not updater.is_alive()
@ -2372,6 +2389,9 @@ MemAvailable: 33000000 kB
"Node zero update failure not registered"
assert autoscaler.num_failed_updates[1] == 1,\
"Node one update failure not registered"
assert mock_metrics.failed_updates.inc.call_count == 2
assert mock_metrics.failed_recoveries.inc.call_count == 2
assert mock_metrics.successful_recoveries.inc.call_count == 0
# Completed-update-processing logic should have terminated node 1.
assert self.provider.is_terminated(1), "Node 1 not terminated on time."
@ -2390,7 +2410,7 @@ MemAvailable: 33000000 kB
assert set(autoscaler.workers()) == {2, 3},\
"Unexpected node_ids"
assert len(mock_metrics.stopped_nodes.mock_calls) == 1
assert mock_metrics.stopped_nodes.inc.call_count == 1
def testProviderException(self):
config_path = self.write_config(SMALL_CLUSTER)
@ -2407,12 +2427,20 @@ MemAvailable: 33000000 kB
prom_metrics=mock_metrics)
autoscaler.update()
def exceptions_incremented():
return mock_metrics.node_launch_exceptions.inc.call_count == 1
def metrics_incremented():
exceptions = \
mock_metrics.node_launch_exceptions.inc.call_count == 1
create_failures = \
mock_metrics.failed_create_nodes.inc.call_count == 1
create_arg = False
if create_failures:
# number of failed creations should be incremented by 2
create_arg = mock_metrics.failed_create_nodes.inc.call_args[
0] == (2, )
return exceptions and create_failures and create_arg
self.waitFor(
exceptions_incremented,
fail_msg="Expected to see a node launch exception")
metrics_incremented, fail_msg="Expected metrics to update")
if __name__ == "__main__":

View file

@ -56,7 +56,11 @@ _AUTOSCALER_METRICS = [
"autoscaler_node_launch_exceptions", "autoscaler_pending_nodes",
"autoscaler_reset_exceptions", "autoscaler_running_workers",
"autoscaler_started_nodes", "autoscaler_stopped_nodes",
"autoscaler_update_loop_exceptions"
"autoscaler_update_loop_exceptions", "autoscaler_worker_create_node_time",
"autoscaler_worker_update_time", "autoscaler_updating_nodes",
"autoscaler_successful_updates", "autoscaler_failed_updates",
"autoscaler_failed_create_nodes", "autoscaler_recovering_nodes",
"autoscaler_successful_recoveries", "autoscaler_failed_recoveries"
]