[autoscaler] Log ips and ids when terminating nodes, code structure (#18180)

* recovery failure uses same termination function

* More cleanup

* More cleanup

* ips

* wip

* wip

* wip

* Fix tests

* tweak
This commit is contained in:
Dmitri Gekhtman 2021-09-19 15:44:38 -07:00 committed by GitHub
parent 35aa944ef4
commit ffe533b297
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 148 additions and 122 deletions

View file

@ -1,5 +1,5 @@
from collections import defaultdict, namedtuple, Counter
from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple
from typing import Callable, Any, Optional, Dict, List, Set, FrozenSet, Tuple
import copy
import logging
import math
@ -141,6 +141,11 @@ class StandardAutoscaler:
self.last_update_time = 0.0
self.update_interval_s = update_interval_s
# Tracks active worker nodes
self.workers = []
# Tracks nodes scheduled for termination
self.nodes_to_terminate = []
# Disable NodeUpdater threads if true.
# Should be set to true in situations where another component, such as
# a Kubernetes operator, is responsible for Ray setup on nodes.
@ -207,23 +212,48 @@ class StandardAutoscaler:
return
self.last_update_time = now
nodes = self.workers()
self.update_worker_list()
self.load_metrics.prune_active_ips([
self.provider.internal_ip(node_id)
for node_id in self.all_workers()
self.provider.internal_ip(node_id) for node_id in self.all_workers
])
# Terminate any idle or out of date nodes
self.terminate_nodes_to_enforce_config_constraints(now)
self.launch_required_nodes()
if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
logger.info(self.info_string())
legacy_log_info_string(self, self.workers)
def terminate_nodes_to_enforce_config_constraints(self, now: float):
"""Terminates nodes to enforce constraints defined by the autoscaling
config.
(1) Terminates nodes in excess of `max_workers`.
(2) Terminates nodes idle for longer than `idle_timeout_minutes`.
(3) Terminates outdated nodes,
namely nodes whose configs don't match `node_config` for the
relevant node type.
Avoids terminating non-outdated nodes required by
autoscaler.sdk.request_resources().
"""
last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])
nodes_to_terminate: List[NodeID] = []
node_type_counts = defaultdict(int)
# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
# might keep a node that should be terminated.
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
sorted_node_ids = self._sort_based_on_last_used(
self.workers, last_used)
# Don't terminate nodes needed by request_resources()
nodes_not_allowed_to_terminate: FrozenSet[NodeID] = {}
@ -231,28 +261,16 @@ class StandardAutoscaler:
nodes_not_allowed_to_terminate = \
self._get_nodes_needed_for_request_resources(sorted_node_ids)
# Tracks counts of nodes we intend to keep for each node type.
node_type_counts = defaultdict(int)
def keep_node(node_id: NodeID) -> None:
# Update per-type counts and add node_id to nodes_to_keep.
# Update per-type counts.
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
def schedule_node_termination(node_id: NodeID,
reason_opt: Optional[str]) -> None:
if reason_opt is None:
raise Exception("reason should be not None.")
reason: str = reason_opt
# Log, record an event, and add node_id to nodes_to_terminate.
logger.info("StandardAutoscaler: "
"{}: Terminating {} node.".format(node_id, reason))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id) +
" ({}).".format(reason),
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
# Nodes that we could terminate, if needed.
nodes_we_could_terminate: List[NodeID] = []
@ -263,7 +281,7 @@ class StandardAutoscaler:
should_keep_or_terminate, reason = self._keep_worker_of_node_type(
node_id, node_type_counts)
if should_keep_or_terminate == KeepOrTerminate.terminate:
schedule_node_termination(node_id, reason)
self.schedule_node_termination(node_id, reason, logger.info)
continue
if ((should_keep_or_terminate == KeepOrTerminate.keep
or node_id in nodes_not_allowed_to_terminate)
@ -273,16 +291,17 @@ class StandardAutoscaler:
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:
schedule_node_termination(node_id, "idle")
self.schedule_node_termination(node_id, "idle", logger.info)
elif not self.launch_config_ok(node_id):
schedule_node_termination(node_id, "outdated")
self.schedule_node_termination(node_id, "outdated",
logger.info)
else:
keep_node(node_id)
nodes_we_could_terminate.append(node_id)
# Terminate nodes if there are too many
num_extra_nodes_to_terminate = (
len(nodes) - len(nodes_to_terminate) - self.config["max_workers"])
num_extra_nodes_to_terminate = (len(self.workers) - len(
self.nodes_to_terminate) - self.config["max_workers"])
if num_extra_nodes_to_terminate > len(nodes_we_could_terminate):
logger.warning(
@ -299,12 +318,43 @@ class StandardAutoscaler:
extra_nodes_to_terminate = nodes_we_could_terminate[
-num_extra_nodes_to_terminate:]
for node_id in extra_nodes_to_terminate:
schedule_node_termination(node_id, "max workers")
self.schedule_node_termination(node_id, "max workers",
logger.info)
if nodes_to_terminate:
self._terminate_nodes_and_cleanup(nodes_to_terminate)
nodes = self.workers()
self.terminate_scheduled_nodes()
def schedule_node_termination(self, node_id: NodeID,
reason_opt: Optional[str],
logger_method: Callable) -> None:
if reason_opt is None:
raise Exception("reason should be not None.")
reason: str = reason_opt
node_ip = self.provider.internal_ip(node_id)
# Log, record an event, and add node_id to nodes_to_terminate.
logger_method("StandardAutoscaler: "
f"Terminating the node with id {node_id}"
f" and ip {node_ip}."
f" ({reason})")
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id) +
" ({}).".format(reason),
quantity=1,
aggregate=operator.add)
self.nodes_to_terminate.append(node_id)
def terminate_scheduled_nodes(self):
"""Terminate scheduled nodes and clean associated autoscaler state."""
if not self.nodes_to_terminate:
return
self.provider.terminate_nodes(self.nodes_to_terminate)
for node in self.nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
self.nodes_to_terminate = []
self.update_worker_list()
def launch_required_nodes(self):
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
@ -313,13 +363,38 @@ class StandardAutoscaler:
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
if to_launch:
nodes = self.workers()
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
self.update_worker_list()
# Process any completed updates
def update_nodes(self):
"""Run NodeUpdaterThreads to run setup commands, sync files,
and/or start Ray.
"""
# Update nodes with out-of-date files.
# TODO(edoakes): Spawning these threads directly seems to cause
# problems. They should at a minimum be spawned as daemon threads.
# See https://github.com/ray-project/ray/pull/5903 for more info.
T = []
for node_id, setup_commands, ray_start_commands, docker_config in (
self.should_update(node_id) for node_id in self.workers):
if node_id is not None:
resources = self._node_resources(node_id)
logger.debug(f"{node_id}: Starting new thread runner.")
T.append(
threading.Thread(
target=self.spawn_updater,
args=(node_id, setup_commands, ray_start_commands,
resources, docker_config)))
for t in T:
t.start()
for t in T:
t.join()
def process_completed_updates(self):
"""Clean up completed NodeUpdaterThreads.
"""
completed_nodes = []
for node_id, updater in self.updaters.items():
if not updater.is_alive():
@ -350,76 +425,32 @@ class StandardAutoscaler:
del self.updaters[node_id]
if failed_nodes:
# Some nodes in failed_nodes may have been terminated
# Some nodes in failed_nodes may already have been terminated
# during an update (for being idle after missing a heartbeat).
# Only terminate currently non terminated nodes.
non_terminated_nodes = self.workers()
nodes_to_terminate: List[NodeID] = []
# Update the list of non-terminated workers.
self.update_worker_list()
for node_id in failed_nodes:
if node_id in non_terminated_nodes:
nodes_to_terminate.append(node_id)
logger.error(f"StandardAutoscaler: {node_id}:"
" Terminating. Failed to setup/initialize"
" node.")
self.event_summarizer.add(
"Removing {} nodes of type " +
self._get_node_type(node_id) + " (launch failed).",
quantity=1,
aggregate=operator.add)
# Check if the node has already been terminated.
if node_id in self.workers:
self.schedule_node_termination(
node_id, "launch failed", logger.error)
else:
logger.warning(f"StandardAutoscaler: {node_id}:"
" Failed to update node."
" Node has already been terminated.")
if nodes_to_terminate:
self._terminate_nodes_and_cleanup(nodes_to_terminate)
nodes = self.workers()
# Update nodes with out-of-date files.
# TODO(edoakes): Spawning these threads directly seems to cause
# problems. They should at a minimum be spawned as daemon threads.
# See https://github.com/ray-project/ray/pull/5903 for more info.
T = []
for node_id, setup_commands, ray_start_commands, docker_config in (
self.should_update(node_id) for node_id in nodes):
if node_id is not None:
resources = self._node_resources(node_id)
logger.debug(f"{node_id}: Starting new thread runner.")
T.append(
threading.Thread(
target=self.spawn_updater,
args=(node_id, setup_commands, ray_start_commands,
resources, docker_config)))
for t in T:
t.start()
for t in T:
t.join()
if self.disable_node_updaters:
# If updaters are unavailable, terminate unhealthy nodes.
nodes_to_terminate = self.get_unhealthy_nodes(nodes, now)
if nodes_to_terminate:
self._terminate_nodes_and_cleanup(nodes_to_terminate)
nodes = self.workers()
else:
# Attempt to recover unhealthy nodes
for node_id in nodes:
self.recover_if_needed(node_id, now)
self.terminate_scheduled_nodes()
def set_prometheus_updater_data(self):
"""Record total number of active NodeUpdaterThreads and how many of
these are being run to recover nodes.
"""
self.prom_metrics.updating_nodes.set(len(self.updaters))
num_recovering = 0
for updater in self.updaters.values():
if updater.for_recovery:
num_recovering += 1
self.prom_metrics.recovering_nodes.set(num_recovering)
logger.info(self.info_string())
legacy_log_info_string(self, nodes)
def _terminate_nodes_and_cleanup(self, nodes_to_terminate: List[str]):
"""Terminate specified nodes and clean associated autoscaler state."""
self.provider.terminate_nodes(nodes_to_terminate)
for node in nodes_to_terminate:
self.node_tracker.untrack(node)
self.prom_metrics.stopped_nodes.inc()
def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
@ -719,15 +750,11 @@ class StandardAutoscaler:
return True
return False
def get_unhealthy_nodes(self, nodes: List[NodeID],
now: float) -> List[NodeID]:
"""Determine nodes for which we haven't received a heartbeat on time.
def terminate_unhealthy_nodes(self, now: float):
"""Terminated nodes for which we haven't received a heartbeat on time.
These nodes are subsequently terminated.
Used when node updaters are not available for recovery.
"""
nodes_to_terminate = []
for node_id in nodes:
for node_id in self.workers:
node_status = self.provider.node_tags(node_id)[TAG_RAY_NODE_STATUS]
# We're not responsible for taking down
# nodes with pending or failed status:
@ -742,18 +769,13 @@ class StandardAutoscaler:
# Heartbeat indicates node is healthy:
if self.heartbeat_on_time(node_id, now):
continue
# Node is unhealthy, terminate:
logger.warning("StandardAutoscaler: "
"{}: No recent heartbeat, "
"terminating node.".format(node_id))
self.event_summarizer.add(
"Terminating {} nodes of type " + self._get_node_type(node_id)
+ " (lost contact with raylet).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
self.schedule_node_termination(node_id, "lost contact with raylet",
logger.warning)
self.terminate_scheduled_nodes()
return nodes_to_terminate
def attempt_to_recover_unhealthy_nodes(self, now):
for node_id in self.workers:
self.recover_if_needed(node_id, now)
def recover_if_needed(self, node_id, now):
if not self.can_update(node_id):
@ -913,16 +935,17 @@ class StandardAutoscaler:
node_type))
count -= self.max_launch_batch
@property
def all_workers(self):
return self.workers() + self.unmanaged_workers()
return self.workers + self.unmanaged_workers
def workers(self):
nodes = self.provider.non_terminated_nodes(
def update_worker_list(self):
self.workers = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update running nodes gauge whenever we check workers
self.prom_metrics.running_workers.set(len(nodes))
return nodes
self.prom_metrics.running_workers.set(len(self.workers))
@property
def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})

View file

@ -1857,7 +1857,8 @@ class AutoscalingTest(unittest.TestCase):
assert "empty_node" not in event
node_type_counts = defaultdict(int)
for node_id in autoscaler.workers():
autoscaler.update_worker_list()
for node_id in autoscaler.workers:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
@ -2115,7 +2116,7 @@ class AutoscalingTest(unittest.TestCase):
# Check the node removal event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert ("Terminating 1 nodes of type "
assert ("Removing 1 nodes of type "
"ray-legacy-worker-node-type (lost contact with raylet)." in
events), events
@ -2820,7 +2821,8 @@ MemAvailable: 33000000 kB
# Node 0 was terminated during the last update.
# Node 1's updater failed, but node 1 won't be terminated until the
# next autoscaler update.
assert 0 not in autoscaler.workers(), "Node zero still non-terminated."
autoscaler.update_worker_list()
assert 0 not in autoscaler.workers, "Node zero still non-terminated."
assert not self.provider.is_terminated(1),\
"Node one terminated prematurely."
@ -2848,7 +2850,8 @@ MemAvailable: 33000000 kB
# Should get two new nodes after the next update.
autoscaler.update()
self.waitForNodes(2)
assert set(autoscaler.workers()) == {2, 3},\
autoscaler.update_worker_list()
assert set(autoscaler.workers) == {2, 3},\
"Unexpected node_ids"
assert mock_metrics.stopped_nodes.inc.call_count == 1