[autoscaler] Pass on provider.internal_ip() exceptions during scale down (#21204)

Treats failures of provider.internal_ip during node drain as non-fatal.
For example, if a node is deleted by a third party between the time it's scheduled for termination and drained, there will now be no error on GCP.

Closes #21151
This commit is contained in:
Dmitri Gekhtman 2021-12-21 01:23:17 -05:00 committed by GitHub
parent d1a27487a3
commit c9cf912a15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 7 deletions

View file

@ -480,10 +480,20 @@ class StandardAutoscaler:
# node provider node id -> ip -> raylet id
# Convert node provider node ids to ips.
node_ips = {
self.provider.internal_ip(provider_node_id)
for provider_node_id in provider_node_ids_to_drain
}
node_ips = set()
failed_ip_fetch = False
for provider_node_id in provider_node_ids_to_drain:
# If the provider's call to fetch ip fails, the exception is not
# fatal. Log the exception and proceed.
try:
ip = self.provider.internal_ip(provider_node_id)
node_ips.add(ip)
except Exception:
logger.exception("Failed to get ip of node with id"
f" {provider_node_id} during scale-down.")
failed_ip_fetch = True
if failed_ip_fetch:
self.prom_metrics.drain_node_exceptions.inc()
# Only attempt to drain connected nodes, i.e. nodes with ips in
# LoadMetrics.
@ -498,6 +508,9 @@ class StandardAutoscaler:
for ip in connected_node_ips
}
if not raylet_ids_to_drain:
return
logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).")
try:
request = gcs_service_pb2.DrainNodeRequest(drain_node_data=[

View file

@ -54,6 +54,8 @@ class DrainNodeOutcome(str, Enum):
GenericRpcError = "GenericRpcError"
# Raise a generic unexpected exception.
GenericException = "GenericException"
# Tell the autoscaler to fail finding ips during drain
FailedToFindIp = "FailedToFindIp"
class MockRpcException(grpc.RpcError):
@ -115,7 +117,9 @@ class MockNodeInfoStub():
# All but the last.
not_all_drained_status = all_nodes_drained_status[:-1]
if self.drain_node_outcome == DrainNodeOutcome.Succeeded:
if self.drain_node_outcome in [
DrainNodeOutcome.Succeeded, DrainNodeOutcome.FailedToFindIp
]:
drain_node_status = all_nodes_drained_status
elif self.drain_node_outcome == DrainNodeOutcome.NotAllDrained:
drain_node_status = not_all_drained_status
@ -294,6 +298,7 @@ class MockProvider(NodeProvider):
self.ready_to_create.set()
self.cache_stopped = cache_stopped
self.unique_ips = unique_ips
self.fail_to_fetch_ip = False
# Many of these functions are called by node_launcher or updater in
# different threads. This can be treated as a global lock for
# everything.
@ -345,6 +350,8 @@ class MockProvider(NodeProvider):
return self.mock_nodes[node_id].tags
def internal_ip(self, node_id):
if self.fail_to_fetch_ip:
raise Exception("Failed to fetch ip on purpose.")
if node_id is None:
# Circumvent test-cases where there's no head node.
return "mock"
@ -401,6 +408,10 @@ class MockAutoscaler(StandardAutoscaler):
autoscaler update issues at most one provider.non_terminated_nodes call.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fail_to_find_ip_during_drain = False
def _update(self):
# Only works with MockProvider
assert isinstance(self.provider, MockProvider)
@ -412,6 +423,12 @@ class MockAutoscaler(StandardAutoscaler):
# interval `self.update_interval_s`
assert end_calls <= start_calls + 1
def drain_nodes_via_gcs(self, provider_node_ids_to_drain):
if self.fail_to_find_ip_during_drain:
self.provider.fail_to_fetch_ip = True
super().drain_nodes_via_gcs(provider_node_ids_to_drain)
self.provider.fail_to_fetch_ip = False
SMALL_CLUSTER = {
"cluster_name": "default",
@ -1443,6 +1460,9 @@ class AutoscalingTest(unittest.TestCase):
def testDynamicScaling5(self):
self.helperDynamicScaling(DrainNodeOutcome.GenericException)
def testDynamicScaling6(self):
self.helperDynamicScaling(DrainNodeOutcome.FailedToFindIp)
def helperDynamicScaling(self, drain_node_outcome: DrainNodeOutcome):
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
mock_node_info_stub = MockNodeInfoStub(drain_node_outcome)
@ -1452,21 +1472,25 @@ class AutoscalingTest(unittest.TestCase):
# Make assertions about DrainNode error handling during scale-down.
# DrainNode call was made.
assert mock_node_info_stub.drain_node_call_count > 0
if drain_node_outcome == DrainNodeOutcome.Succeeded:
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
# No drain node exceptions.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Each drain node call succeeded.
assert (mock_node_info_stub.drain_node_reply_success ==
mock_node_info_stub.drain_node_call_count)
elif drain_node_outcome == DrainNodeOutcome.Unimplemented:
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
# All errors were supressed.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Every call failed.
assert mock_node_info_stub.drain_node_reply_success == 0
elif drain_node_outcome in (DrainNodeOutcome.GenericRpcError,
DrainNodeOutcome.GenericException):
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
# We encountered an exception.
assert mock_metrics.drain_node_exceptions.inc.call_count > 0
@ -1474,6 +1498,12 @@ class AutoscalingTest(unittest.TestCase):
assert (mock_metrics.drain_node_exceptions.inc.call_count ==
mock_node_info_stub.drain_node_call_count)
assert mock_node_info_stub.drain_node_reply_success == 0
elif drain_node_outcome == DrainNodeOutcome.FailedToFindIp:
# We never called the drain node api because we were unable to
# fetch ips
assert mock_node_info_stub.drain_node_call_count == 0
# We encountered an exception fetching ip.
assert mock_metrics.drain_node_exceptions.inc.call_count > 0
def _helperDynamicScaling(self, mock_metrics, mock_node_info_stub):
config_path = self.write_config(SMALL_CLUSTER)
@ -1497,6 +1527,9 @@ class AutoscalingTest(unittest.TestCase):
process_runner=runner,
update_interval_s=0,
prom_metrics=mock_metrics)
if (mock_node_info_stub.drain_node_outcome ==
DrainNodeOutcome.FailedToFindIp):
autoscaler.fail_to_find_ip_during_drain = True
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})