From b2b11316cd0be45857b157d39ab4a89ffaf059b3 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Mon, 25 Jul 2022 23:35:01 -0700 Subject: [PATCH] [autoscaler][kuberay] Disable autoscaler health check and drain functionality (#26764) Signed-off-by: Dmitri Gekhtman For KubeRay, Disables autoscaler's RPC drain of worker nodes prior to termination. Disables autoscaler's termination of nodes disconnected from the GCS. --- python/ray/autoscaler/_private/autoscaler.py | 38 +++++- python/ray/autoscaler/_private/constants.py | 2 + .../_private/kuberay/autoscaling_config.py | 4 + .../_private/kuberay/node_provider.py | 8 ++ .../tests/kuberay/test_autoscaling_config.py | 4 +- python/ray/tests/test_autoscaler.py | 129 ++++++++++++++---- 6 files changed, 154 insertions(+), 31 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 022e9a067..75aaf81a0 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -24,6 +24,8 @@ from ray.autoscaler._private.constants import ( DISABLE_LAUNCH_CONFIG_CHECK_KEY, DISABLE_NODE_UPDATERS_KEY, FOREGROUND_NODE_LAUNCH_KEY, + WORKER_LIVENESS_CHECK_KEY, + WORKER_RPC_DRAIN_KEY, ) from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.legacy_info_string import legacy_log_info_string @@ -263,10 +265,28 @@ class StandardAutoscaler: # are launched in the main thread, all in one batch, blocking until all # NodeProvider.create_node calls have returned. self.foreground_node_launch = self.config["provider"].get( - FOREGROUND_NODE_LAUNCH_KEY + FOREGROUND_NODE_LAUNCH_KEY, False ) logger.info(f"{FOREGROUND_NODE_LAUNCH_KEY}:{self.foreground_node_launch}") + # By default, the autoscaler kills and/or tries to recover + # a worker node if it hasn't produced a resource heartbeat in the last 30 + # seconds. The worker_liveness_check flag allows disabling this behavior in + # settings where another component, such as a Kubernetes operator, is + # responsible for healthchecks. + self.worker_liveness_check = self.config["provider"].get( + WORKER_LIVENESS_CHECK_KEY, True + ) + logger.info(f"{WORKER_LIVENESS_CHECK_KEY}:{self.worker_liveness_check}") + + # By default, before worker node termination, the autoscaler sends an RPC to the + # GCS asking to kill the worker node. + # The worker_rpc_drain flag allows disabling this behavior in settings where + # another component, such as a Kubernetes operator, is responsible for worker + # lifecycle. + self.worker_rpc_drain = self.config["provider"].get(WORKER_RPC_DRAIN_KEY, True) + logger.info(f"{WORKER_RPC_DRAIN_KEY}:{self.worker_rpc_drain}") + # Node launchers self.foreground_node_launcher: Optional[BaseNodeLauncher] = None self.launch_queue: Optional[queue.Queue[NodeLaunchData]] = None @@ -370,11 +390,17 @@ class StandardAutoscaler: self.terminate_nodes_to_enforce_config_constraints(now) if self.disable_node_updaters: - self.terminate_unhealthy_nodes(now) + # Don't handle unhealthy nodes if the liveness check is disabled. + # self.worker_liveness_check is True by default. + if self.worker_liveness_check: + self.terminate_unhealthy_nodes(now) else: self.process_completed_updates() self.update_nodes() - self.attempt_to_recover_unhealthy_nodes(now) + # Don't handle unhealthy nodes if the liveness check is disabled. + # self.worker_liveness_check is True by default. + if self.worker_liveness_check: + self.attempt_to_recover_unhealthy_nodes(now) self.set_prometheus_updater_data() # Dict[NodeType, int], List[ResourceDict] @@ -539,8 +565,10 @@ class StandardAutoscaler: if not self.nodes_to_terminate: return - # Do Ray-internal preparation for termination - self.drain_nodes_via_gcs(self.nodes_to_terminate) + # Do Ray-internal preparation for termination, unless this behavior is + # explicitly disabled. + if self.worker_rpc_drain: + self.drain_nodes_via_gcs(self.nodes_to_terminate) # Terminate the nodes self.provider.terminate_nodes(self.nodes_to_terminate) for node in self.nodes_to_terminate: diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 4b6bf039c..7338e035e 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -109,3 +109,5 @@ MAX_PARALLEL_SHUTDOWN_WORKERS = env_integer("MAX_PARALLEL_SHUTDOWN_WORKERS", 50) DISABLE_NODE_UPDATERS_KEY = "disable_node_updaters" DISABLE_LAUNCH_CONFIG_CHECK_KEY = "disable_launch_config_check" FOREGROUND_NODE_LAUNCH_KEY = "foreground_node_launch" +WORKER_LIVENESS_CHECK_KEY = "worker_liveness_check" +WORKER_RPC_DRAIN_KEY = "worker_rpc_drain" diff --git a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py index 8a3af3a52..6b44aa4cf 100644 --- a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py +++ b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py @@ -11,6 +11,8 @@ from ray.autoscaler._private.constants import ( DISABLE_LAUNCH_CONFIG_CHECK_KEY, DISABLE_NODE_UPDATERS_KEY, FOREGROUND_NODE_LAUNCH_KEY, + WORKER_LIVENESS_CHECK_KEY, + WORKER_RPC_DRAIN_KEY, ) from ray.autoscaler._private.kuberay import node_provider from ray.autoscaler._private.util import validate_config @@ -145,6 +147,8 @@ def _generate_provider_config(ray_cluster_namespace: str) -> Dict[str, Any]: DISABLE_NODE_UPDATERS_KEY: True, DISABLE_LAUNCH_CONFIG_CHECK_KEY: True, FOREGROUND_NODE_LAUNCH_KEY: True, + WORKER_LIVENESS_CHECK_KEY: False, + WORKER_RPC_DRAIN_KEY: False, } diff --git a/python/ray/autoscaler/_private/kuberay/node_provider.py b/python/ray/autoscaler/_private/kuberay/node_provider.py index 7048f633f..35bc9d9e9 100644 --- a/python/ray/autoscaler/_private/kuberay/node_provider.py +++ b/python/ray/autoscaler/_private/kuberay/node_provider.py @@ -8,6 +8,8 @@ from ray.autoscaler._private.constants import ( DISABLE_LAUNCH_CONFIG_CHECK_KEY, DISABLE_NODE_UPDATERS_KEY, FOREGROUND_NODE_LAUNCH_KEY, + WORKER_LIVENESS_CHECK_KEY, + WORKER_RPC_DRAIN_KEY, ) from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import ( @@ -172,6 +174,12 @@ class KuberayNodeProvider(NodeProvider): # type: ignore assert ( provider_config.get(FOREGROUND_NODE_LAUNCH_KEY, False) is True ), f"To use KuberayNodeProvider, must set `{FOREGROUND_NODE_LAUNCH_KEY}:True`." + assert ( + provider_config.get(WORKER_LIVENESS_CHECK_KEY, True) is False + ), f"To use KuberayNodeProvider, must set `{WORKER_LIVENESS_CHECK_KEY}:False`." + assert ( + provider_config.get(WORKER_RPC_DRAIN_KEY, True) is False + ), f"To use KuberayNodeProvider, must set `{WORKER_RPC_DRAIN_KEY}:False`." provider_exists = True super().__init__(provider_config, cluster_name) diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index d67b82e40..4bbec8c9b 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -42,9 +42,11 @@ def _get_basic_autoscaling_config() -> dict: return { "cluster_name": "raycluster-complete", "provider": { - "disable_launch_config_check": True, "disable_node_updaters": True, + "disable_launch_config_check": True, "foreground_node_launch": True, + "worker_liveness_check": False, + "worker_rpc_drain": False, "namespace": "default", "type": "kuberay", }, diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 772492a5b..2e33c9dc5 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -27,7 +27,11 @@ from ray._private.test_utils import RayTestTimeoutException from ray.autoscaler._private import commands from ray.autoscaler._private.autoscaler import NonTerminatedNodes, StandardAutoscaler from ray.autoscaler._private.commands import get_or_create_head_node -from ray.autoscaler._private.constants import FOREGROUND_NODE_LAUNCH_KEY +from ray.autoscaler._private.constants import ( + FOREGROUND_NODE_LAUNCH_KEY, + WORKER_LIVENESS_CHECK_KEY, + WORKER_RPC_DRAIN_KEY, +) from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.monitor import Monitor from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics @@ -75,6 +79,8 @@ class DrainNodeOutcome(str, Enum): GenericException = "GenericException" # Tell the autoscaler to fail finding ips during drain FailedToFindIp = "FailedToFindIp" + # Represents the situation in which draining nodes before termination is disabled. + DrainDisabled = "DrainDisabled" class MockRpcException(grpc.RpcError): @@ -450,6 +456,13 @@ class MockAutoscaler(StandardAutoscaler): self.provider.fail_to_fetch_ip = False +class NoUpdaterMockAutoscaler(MockAutoscaler): + def update_nodes(self): + raise AssertionError( + "Node updaters are disabled. This method should not be accessed!" + ) + + SMALL_CLUSTER = { "cluster_name": "default", "min_workers": 2, @@ -1405,7 +1418,13 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) - autoscaler = MockAutoscaler( + if disable_node_updaters: + # This class raises an assertion error if we try to create + # a node updater thread. + autoscaler_class = NoUpdaterMockAutoscaler + else: + autoscaler_class = MockAutoscaler + autoscaler = autoscaler_class( config_path, LoadMetrics(), MockNodeInfoStub(), @@ -1434,7 +1453,6 @@ class AutoscalingTest(unittest.TestCase): if disable_node_updaters: # Node Updaters have NOT been invoked because they were explicitly # disabled. - time.sleep(1) assert len(runner.calls) == 0 # Nodes were create in uninitialized and not updated. self.waitForNodes( @@ -1528,6 +1546,9 @@ class AutoscalingTest(unittest.TestCase): def testDynamicScaling6(self): self.helperDynamicScaling(DrainNodeOutcome.FailedToFindIp) + def testDynamicScaling7(self): + self.helperDynamicScaling(DrainNodeOutcome.DrainDisabled) + def helperDynamicScaling( self, drain_node_outcome: DrainNodeOutcome = DrainNodeOutcome.Succeeded, @@ -1535,19 +1556,21 @@ class AutoscalingTest(unittest.TestCase): ): mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) mock_node_info_stub = MockNodeInfoStub(drain_node_outcome) + disable_drain = drain_node_outcome == DrainNodeOutcome.DrainDisabled # Run the core of the test logic. self._helperDynamicScaling( mock_metrics, mock_node_info_stub, foreground_node_launcher=foreground_node_launcher, + disable_drain=disable_drain, ) # Make assertions about DrainNode error handling during scale-down. if drain_node_outcome == DrainNodeOutcome.Succeeded: # DrainNode call was made. - mock_node_info_stub.drain_node_call_count > 0 + assert mock_node_info_stub.drain_node_call_count > 0 # No drain node exceptions. assert mock_metrics.drain_node_exceptions.inc.call_count == 0 # Each drain node call succeeded. @@ -1557,7 +1580,7 @@ class AutoscalingTest(unittest.TestCase): ) elif drain_node_outcome == DrainNodeOutcome.Unimplemented: # DrainNode call was made. - mock_node_info_stub.drain_node_call_count > 0 + assert mock_node_info_stub.drain_node_call_count > 0 # All errors were supressed. assert mock_metrics.drain_node_exceptions.inc.call_count == 0 # Every call failed. @@ -1567,7 +1590,7 @@ class AutoscalingTest(unittest.TestCase): DrainNodeOutcome.GenericException, ): # DrainNode call was made. - mock_node_info_stub.drain_node_call_count > 0 + assert mock_node_info_stub.drain_node_call_count > 0 # We encountered an exception. assert mock_metrics.drain_node_exceptions.inc.call_count > 0 @@ -1583,17 +1606,30 @@ class AutoscalingTest(unittest.TestCase): assert mock_node_info_stub.drain_node_call_count == 0 # We encountered an exception fetching ip. assert mock_metrics.drain_node_exceptions.inc.call_count > 0 + elif drain_node_outcome == DrainNodeOutcome.DrainDisabled: + # We never called this API. + assert mock_node_info_stub.drain_node_call_count == 0 + # There were no failed calls. + assert mock_metrics.drain_node_exceptions.inc.call_count == 0 + # There were no successful calls either. + assert mock_node_info_stub.drain_node_reply_success == 0 def testDynamicScalingForegroundLauncher(self): """Test autoscaling with node launcher in the foreground.""" self.helperDynamicScaling(foreground_node_launcher=True) def _helperDynamicScaling( - self, mock_metrics, mock_node_info_stub, foreground_node_launcher=False + self, + mock_metrics, + mock_node_info_stub, + foreground_node_launcher=False, + disable_drain=False, ): config = copy.deepcopy(SMALL_CLUSTER) if foreground_node_launcher: config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True + if disable_drain: + config["provider"][WORKER_RPC_DRAIN_KEY] = False config_path = self.write_config(config) self.provider = MockProvider() @@ -2681,7 +2717,28 @@ class AutoscalingTest(unittest.TestCase): Similar to testRecoverUnhealthyWorkers. """ - config_path = self.write_config(SMALL_CLUSTER) + self.unhealthyWorkerHelper(disable_liveness_check=False) + + def testDontTerminateUnhealthyWorkers(self): + """Test that the autoscaler leaves unhealthy workers alone when the worker + liveness check is disabled. + """ + self.unhealthyWorkerHelper(disable_liveness_check=True) + + def unhealthyWorkerHelper(self, disable_liveness_check: bool): + """Helper used to test the autoscaler's handling of unhealthy worker nodes. + If disable liveness check is False, the default code path is tested and we + expect to see workers terminated. + + If disable liveness check is True, we expect the autoscaler not to take action + on unhealthy nodes, instead delegating node management to another component. + """ + config = copy.deepcopy(SMALL_CLUSTER) + # Make it clear we're not timing out idle nodes here. + config["idle_timeout_minutes"] = 1000000000 + if disable_liveness_check: + config["provider"][WORKER_LIVENESS_CHECK_KEY] = False + config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) @@ -2702,13 +2759,16 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) - # Mark a node as unhealthy + # Clear out updaters. for _ in range(5): if autoscaler.updaters: time.sleep(0.05) autoscaler.update() assert not autoscaler.updaters + num_calls = len(runner.calls) + + # Mark a node as unhealthy lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0 # Turn off updaters. autoscaler.disable_node_updaters = True @@ -2717,24 +2777,43 @@ class AutoscalingTest(unittest.TestCase): "min_workers" ] = 1 fill_in_raylet_ids(self.provider, lm) - autoscaler.update() - # Stopped node metric incremented. - mock_metrics.stopped_nodes.inc.assert_called_once_with() - # One node left. - self.waitForNodes(1) - # Check the node removal event is generated. - autoscaler.update() - events = autoscaler.event_summarizer.summary() - assert ( - "Removing 1 nodes of type " - "ray-legacy-worker-node-type (lost contact with raylet)." in events - ), events + if disable_liveness_check: + # We've disabled the liveness check, so the unhealthy node should stick + # around until someone else takes care of it. + # Do several autoscaler updates, to reinforce the fact that the + # autoscaler will never take down the unhealthy nodes. + for _ in range(10): + autoscaler.update() + # The nodes are still there. + assert self.num_nodes() == 2 + # There's no synchronization required to make the last assertion valid: + # The autoscaler's node termination is synchronous and blocking, as is + # the terminate_node method of the mock node provider used in this test. - # No additional runner calls, since updaters were disabled. - time.sleep(1) - assert len(runner.calls) == num_calls - assert mock_metrics.drain_node_exceptions.inc.call_count == 0 + # No events generated indicating that we are removing nodes. + for event in autoscaler.event_summarizer.summary(): + assert "Removing" not in event + else: + # We expect the unhealthy node to be cleared out with a single + # autoscaler update. + autoscaler.update() + # Stopped node metric incremented. + mock_metrics.stopped_nodes.inc.assert_called_once_with() + # One node left. + self.waitForNodes(1) + + # Check the node removal event is generated. + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert ( + "Removing 1 nodes of type " + "ray-legacy-worker-node-type (lost contact with raylet)." in events + ), events + + # No additional runner calls, since updaters were disabled. + assert len(runner.calls) == num_calls + assert mock_metrics.drain_node_exceptions.inc.call_count == 0 def testTerminateUnhealthyWorkers2(self): """Tests finer details of termination of unhealthy workers when