[autoscaler] Event summarizer reports launch failure (#20814)

Partly addresses #20774 by registering node launcher failures in driver logs, via the event summarizer.
This way, users can tell that the launch failed from the driver logs.

Also pushes the node creation exception to driver logs, but only once per 60 minutes.
This commit is contained in:
Dmitri Gekhtman 2021-12-07 16:23:45 -08:00 committed by GitHub
parent ea1d081aac
commit 94883f61b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 84 additions and 25 deletions

View file

@ -235,7 +235,8 @@ class StandardAutoscaler:
index=i,
pending=self.pending_launches,
node_types=self.available_node_types,
prom_metrics=self.prom_metrics)
prom_metrics=self.prom_metrics,
event_summarizer=self.event_summarizer)
node_launcher.daemon = True
node_launcher.start()

View file

@ -1,5 +1,6 @@
from typing import Any, Callable, Dict, List
import time
from threading import RLock
class EventSummarizer:
@ -13,6 +14,10 @@ class EventSummarizer:
# added here, until its TTL expires.
self.throttled_messages: Dict[str, float] = {}
# Event summarizer is used by the main thread and
# by node launcher child threads.
self.lock = RLock()
def add(self, template: str, *, quantity: Any,
aggregate: Callable[[Any, Any], Any]) -> None:
"""Add a log message, which will be combined by template.
@ -24,14 +29,15 @@ class EventSummarizer:
quantities. The result is inserted into the template to
produce the final log message.
"""
# Enforce proper sentence structure.
if not template.endswith("."):
template += "."
if template in self.events_by_key:
self.events_by_key[template] = aggregate(
self.events_by_key[template], quantity)
else:
self.events_by_key[template] = quantity
with self.lock:
# Enforce proper sentence structure.
if not template.endswith("."):
template += "."
if template in self.events_by_key:
self.events_by_key[template] = aggregate(
self.events_by_key[template], quantity)
else:
self.events_by_key[template] = quantity
def add_once_per_interval(self, message: str, key: str, interval_s: int):
"""Add a log message, which is throttled once per interval by a key.
@ -41,24 +47,27 @@ class EventSummarizer:
key (str): The key to use to deduplicate the message.
interval_s (int): Throttling interval in seconds.
"""
if key not in self.throttled_messages:
self.throttled_messages[key] = time.time() + interval_s
self.messages_to_send.append(message)
with self.lock:
if key not in self.throttled_messages:
self.throttled_messages[key] = time.time() + interval_s
self.messages_to_send.append(message)
def summary(self) -> List[str]:
"""Generate the aggregated log summary of all added events."""
out = []
for template, quantity in self.events_by_key.items():
out.append(template.format(quantity))
out.extend(self.messages_to_send)
with self.lock:
out = []
for template, quantity in self.events_by_key.items():
out.append(template.format(quantity))
out.extend(self.messages_to_send)
return out
def clear(self) -> None:
"""Clear the events added."""
self.events_by_key.clear()
self.messages_to_send.clear()
# Expire any messages that have reached their TTL. This allows them
# to be sent again.
for k, t in list(self.throttled_messages.items()):
if time.time() > t:
del self.throttled_messages[k]
with self.lock:
self.events_by_key.clear()
self.messages_to_send.clear()
# Expire any messages that have reached their TTL. This allows them
# to be sent again.
for k, t in list(self.throttled_messages.items()):
if time.time() > t:
del self.throttled_messages[k]

View file

@ -327,8 +327,11 @@ class Monitor:
status["autoscaler_report"] = asdict(self.autoscaler.summary())
for msg in self.event_summarizer.summary():
logger.info("{}{}".format(
ray_constants.LOG_PREFIX_EVENT_SUMMARY, msg))
# Need to prefix each line of the message for the lines to
# get pushed to the driver logs.
for line in msg.split("\n"):
logger.info("{}{}".format(
ray_constants.LOG_PREFIX_EVENT_SUMMARY, line))
self.event_summarizer.clear()
as_json = json.dumps(status)

View file

@ -1,7 +1,9 @@
from typing import Any, Optional, Dict
import copy
import logging
import operator
import threading
import traceback
import time
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
@ -21,6 +23,7 @@ class NodeLauncher(threading.Thread):
provider,
queue,
pending,
event_summarizer,
prom_metrics=None,
node_types=None,
index=None,
@ -32,6 +35,7 @@ class NodeLauncher(threading.Thread):
self.provider = provider
self.node_types = node_types
self.index = str(index) if index is not None else ""
self.event_summarizer = event_summarizer
super(NodeLauncher, self).__init__(*args, **kwargs)
def _launch_node(self, config: Dict[str, Any], count: int,
@ -84,6 +88,18 @@ class NodeLauncher(threading.Thread):
except Exception:
self.prom_metrics.node_launch_exceptions.inc()
self.prom_metrics.failed_create_nodes.inc(count)
self.event_summarizer.add(
"Failed to launch {} nodes of type " + node_type + ".",
quantity=count,
aggregate=operator.add)
# Log traceback from failed node creation only once per minute
# to avoid spamming driver logs with tracebacks.
self.event_summarizer.add_once_per_interval(
message="Node creation failed. See the traceback below."
" See autoscaler logs for further details.\n"
f"{traceback.format_exc()}",
key="Failed to create node.",
interval_s=60)
logger.exception("Launch failed")
finally:
self.pending.dec(node_type, count)

View file

@ -1259,6 +1259,34 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("172.0.0.4", pattern="rsync")
runner.clear_history()
def testSummarizerFailedCreate(self):
"""Checks that event summarizer reports failed node creation.
"""
config = copy.deepcopy(SMALL_CLUSTER)
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.error_creates = True
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = MockAutoscaler(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
max_failures=0,
process_runner=runner,
update_interval_s=0,
prom_metrics=mock_metrics)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
# Expect the next two messages in the logs.
msg = "Failed to launch 2 nodes of type ray-legacy-worker-node-type."
def expected_message_logged():
return msg in autoscaler.event_summarizer.summary()
self.waitFor(expected_message_logged)
def testReadonlyNodeProvider(self):
config = copy.deepcopy(SMALL_CLUSTER)
config_path = self.write_config(config)

View file

@ -14,6 +14,7 @@ from ray.tests.test_autoscaler import (MockProvider, MockProcessRunner,
MockNodeInfoStub, mock_raylet_id,
MockAutoscaler)
from ray.tests.test_resource_demand_scheduler import MULTI_WORKER_CLUSTER
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.providers import (
_NODE_PROVIDERS,
_clear_provider_cache,
@ -195,6 +196,7 @@ class Simulator:
# Manually create a node launcher. Note that we won't start it as a
# separate thread.
self.node_launcher = NodeLauncher(
event_summarizer=EventSummarizer(),
provider=self.autoscaler.provider,
queue=self.autoscaler.launch_queue,
index=0,