From c9cf912a15557867187a57d677d8ec39e8be0201 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Tue, 21 Dec 2021 01:23:17 -0500 Subject: [PATCH] [autoscaler] Pass on provider.internal_ip() exceptions during scale down (#21204) Treats failures of provider.internal_ip during node drain as non-fatal. For example, if a node is deleted by a third party between the time it's scheduled for termination and drained, there will now be no error on GCP. Closes #21151 --- python/ray/autoscaler/_private/autoscaler.py | 21 +++++++++-- python/ray/tests/test_autoscaler.py | 39 ++++++++++++++++++-- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index fe977e58e..523c1d86e 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -480,10 +480,20 @@ class StandardAutoscaler: # node provider node id -> ip -> raylet id # Convert node provider node ids to ips. - node_ips = { - self.provider.internal_ip(provider_node_id) - for provider_node_id in provider_node_ids_to_drain - } + node_ips = set() + failed_ip_fetch = False + for provider_node_id in provider_node_ids_to_drain: + # If the provider's call to fetch ip fails, the exception is not + # fatal. Log the exception and proceed. + try: + ip = self.provider.internal_ip(provider_node_id) + node_ips.add(ip) + except Exception: + logger.exception("Failed to get ip of node with id" + f" {provider_node_id} during scale-down.") + failed_ip_fetch = True + if failed_ip_fetch: + self.prom_metrics.drain_node_exceptions.inc() # Only attempt to drain connected nodes, i.e. nodes with ips in # LoadMetrics. @@ -498,6 +508,9 @@ class StandardAutoscaler: for ip in connected_node_ips } + if not raylet_ids_to_drain: + return + logger.info(f"Draining {len(raylet_ids_to_drain)} raylet(s).") try: request = gcs_service_pb2.DrainNodeRequest(drain_node_data=[ diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index cfbc712d5..165527690 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -54,6 +54,8 @@ class DrainNodeOutcome(str, Enum): GenericRpcError = "GenericRpcError" # Raise a generic unexpected exception. GenericException = "GenericException" + # Tell the autoscaler to fail finding ips during drain + FailedToFindIp = "FailedToFindIp" class MockRpcException(grpc.RpcError): @@ -115,7 +117,9 @@ class MockNodeInfoStub(): # All but the last. not_all_drained_status = all_nodes_drained_status[:-1] - if self.drain_node_outcome == DrainNodeOutcome.Succeeded: + if self.drain_node_outcome in [ + DrainNodeOutcome.Succeeded, DrainNodeOutcome.FailedToFindIp + ]: drain_node_status = all_nodes_drained_status elif self.drain_node_outcome == DrainNodeOutcome.NotAllDrained: drain_node_status = not_all_drained_status @@ -294,6 +298,7 @@ class MockProvider(NodeProvider): self.ready_to_create.set() self.cache_stopped = cache_stopped self.unique_ips = unique_ips + self.fail_to_fetch_ip = False # Many of these functions are called by node_launcher or updater in # different threads. This can be treated as a global lock for # everything. @@ -345,6 +350,8 @@ class MockProvider(NodeProvider): return self.mock_nodes[node_id].tags def internal_ip(self, node_id): + if self.fail_to_fetch_ip: + raise Exception("Failed to fetch ip on purpose.") if node_id is None: # Circumvent test-cases where there's no head node. return "mock" @@ -401,6 +408,10 @@ class MockAutoscaler(StandardAutoscaler): autoscaler update issues at most one provider.non_terminated_nodes call. """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fail_to_find_ip_during_drain = False + def _update(self): # Only works with MockProvider assert isinstance(self.provider, MockProvider) @@ -412,6 +423,12 @@ class MockAutoscaler(StandardAutoscaler): # interval `self.update_interval_s` assert end_calls <= start_calls + 1 + def drain_nodes_via_gcs(self, provider_node_ids_to_drain): + if self.fail_to_find_ip_during_drain: + self.provider.fail_to_fetch_ip = True + super().drain_nodes_via_gcs(provider_node_ids_to_drain) + self.provider.fail_to_fetch_ip = False + SMALL_CLUSTER = { "cluster_name": "default", @@ -1443,6 +1460,9 @@ class AutoscalingTest(unittest.TestCase): def testDynamicScaling5(self): self.helperDynamicScaling(DrainNodeOutcome.GenericException) + def testDynamicScaling6(self): + self.helperDynamicScaling(DrainNodeOutcome.FailedToFindIp) + def helperDynamicScaling(self, drain_node_outcome: DrainNodeOutcome): mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) mock_node_info_stub = MockNodeInfoStub(drain_node_outcome) @@ -1452,21 +1472,25 @@ class AutoscalingTest(unittest.TestCase): # Make assertions about DrainNode error handling during scale-down. - # DrainNode call was made. - assert mock_node_info_stub.drain_node_call_count > 0 if drain_node_outcome == DrainNodeOutcome.Succeeded: + # DrainNode call was made. + mock_node_info_stub.drain_node_call_count > 0 # No drain node exceptions. assert mock_metrics.drain_node_exceptions.inc.call_count == 0 # Each drain node call succeeded. assert (mock_node_info_stub.drain_node_reply_success == mock_node_info_stub.drain_node_call_count) elif drain_node_outcome == DrainNodeOutcome.Unimplemented: + # DrainNode call was made. + mock_node_info_stub.drain_node_call_count > 0 # All errors were supressed. assert mock_metrics.drain_node_exceptions.inc.call_count == 0 # Every call failed. assert mock_node_info_stub.drain_node_reply_success == 0 elif drain_node_outcome in (DrainNodeOutcome.GenericRpcError, DrainNodeOutcome.GenericException): + # DrainNode call was made. + mock_node_info_stub.drain_node_call_count > 0 # We encountered an exception. assert mock_metrics.drain_node_exceptions.inc.call_count > 0 @@ -1474,6 +1498,12 @@ class AutoscalingTest(unittest.TestCase): assert (mock_metrics.drain_node_exceptions.inc.call_count == mock_node_info_stub.drain_node_call_count) assert mock_node_info_stub.drain_node_reply_success == 0 + elif drain_node_outcome == DrainNodeOutcome.FailedToFindIp: + # We never called the drain node api because we were unable to + # fetch ips + assert mock_node_info_stub.drain_node_call_count == 0 + # We encountered an exception fetching ip. + assert mock_metrics.drain_node_exceptions.inc.call_count > 0 def _helperDynamicScaling(self, mock_metrics, mock_node_info_stub): config_path = self.write_config(SMALL_CLUSTER) @@ -1497,6 +1527,9 @@ class AutoscalingTest(unittest.TestCase): process_runner=runner, update_interval_s=0, prom_metrics=mock_metrics) + if (mock_node_info_stub.drain_node_outcome == + DrainNodeOutcome.FailedToFindIp): + autoscaler.fail_to_find_ip_during_drain = True self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})