[autoscaler][kuberay] Disable autoscaler health check and drain functionality (#26764)

Signed-off-by: Dmitri Gekhtman <dmitri.m.gekhtman@gmail.com>

For KubeRay,

Disables autoscaler's RPC drain of worker nodes prior to termination.
Disables autoscaler's termination of nodes disconnected from the GCS.
This commit is contained in:
Dmitri Gekhtman 2022-07-25 23:35:01 -07:00 committed by GitHub
parent 084f06f49a
commit b2b11316cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 31 deletions

View file

@ -24,6 +24,8 @@ from ray.autoscaler._private.constants import (
DISABLE_LAUNCH_CONFIG_CHECK_KEY,
DISABLE_NODE_UPDATERS_KEY,
FOREGROUND_NODE_LAUNCH_KEY,
WORKER_LIVENESS_CHECK_KEY,
WORKER_RPC_DRAIN_KEY,
)
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.legacy_info_string import legacy_log_info_string
@ -263,10 +265,28 @@ class StandardAutoscaler:
# are launched in the main thread, all in one batch, blocking until all
# NodeProvider.create_node calls have returned.
self.foreground_node_launch = self.config["provider"].get(
FOREGROUND_NODE_LAUNCH_KEY
FOREGROUND_NODE_LAUNCH_KEY, False
)
logger.info(f"{FOREGROUND_NODE_LAUNCH_KEY}:{self.foreground_node_launch}")
# By default, the autoscaler kills and/or tries to recover
# a worker node if it hasn't produced a resource heartbeat in the last 30
# seconds. The worker_liveness_check flag allows disabling this behavior in
# settings where another component, such as a Kubernetes operator, is
# responsible for healthchecks.
self.worker_liveness_check = self.config["provider"].get(
WORKER_LIVENESS_CHECK_KEY, True
)
logger.info(f"{WORKER_LIVENESS_CHECK_KEY}:{self.worker_liveness_check}")
# By default, before worker node termination, the autoscaler sends an RPC to the
# GCS asking to kill the worker node.
# The worker_rpc_drain flag allows disabling this behavior in settings where
# another component, such as a Kubernetes operator, is responsible for worker
# lifecycle.
self.worker_rpc_drain = self.config["provider"].get(WORKER_RPC_DRAIN_KEY, True)
logger.info(f"{WORKER_RPC_DRAIN_KEY}:{self.worker_rpc_drain}")
# Node launchers
self.foreground_node_launcher: Optional[BaseNodeLauncher] = None
self.launch_queue: Optional[queue.Queue[NodeLaunchData]] = None
@ -370,11 +390,17 @@ class StandardAutoscaler:
self.terminate_nodes_to_enforce_config_constraints(now)
if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
# Don't handle unhealthy nodes if the liveness check is disabled.
# self.worker_liveness_check is True by default.
if self.worker_liveness_check:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
# Don't handle unhealthy nodes if the liveness check is disabled.
# self.worker_liveness_check is True by default.
if self.worker_liveness_check:
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
# Dict[NodeType, int], List[ResourceDict]
@ -539,8 +565,10 @@ class StandardAutoscaler:
if not self.nodes_to_terminate:
return
# Do Ray-internal preparation for termination
self.drain_nodes_via_gcs(self.nodes_to_terminate)
# Do Ray-internal preparation for termination, unless this behavior is
# explicitly disabled.
if self.worker_rpc_drain:
self.drain_nodes_via_gcs(self.nodes_to_terminate)
# Terminate the nodes
self.provider.terminate_nodes(self.nodes_to_terminate)
for node in self.nodes_to_terminate:

View file

@ -109,3 +109,5 @@ MAX_PARALLEL_SHUTDOWN_WORKERS = env_integer("MAX_PARALLEL_SHUTDOWN_WORKERS", 50)
DISABLE_NODE_UPDATERS_KEY = "disable_node_updaters"
DISABLE_LAUNCH_CONFIG_CHECK_KEY = "disable_launch_config_check"
FOREGROUND_NODE_LAUNCH_KEY = "foreground_node_launch"
WORKER_LIVENESS_CHECK_KEY = "worker_liveness_check"
WORKER_RPC_DRAIN_KEY = "worker_rpc_drain"

View file

@ -11,6 +11,8 @@ from ray.autoscaler._private.constants import (
DISABLE_LAUNCH_CONFIG_CHECK_KEY,
DISABLE_NODE_UPDATERS_KEY,
FOREGROUND_NODE_LAUNCH_KEY,
WORKER_LIVENESS_CHECK_KEY,
WORKER_RPC_DRAIN_KEY,
)
from ray.autoscaler._private.kuberay import node_provider
from ray.autoscaler._private.util import validate_config
@ -145,6 +147,8 @@ def _generate_provider_config(ray_cluster_namespace: str) -> Dict[str, Any]:
DISABLE_NODE_UPDATERS_KEY: True,
DISABLE_LAUNCH_CONFIG_CHECK_KEY: True,
FOREGROUND_NODE_LAUNCH_KEY: True,
WORKER_LIVENESS_CHECK_KEY: False,
WORKER_RPC_DRAIN_KEY: False,
}

View file

@ -8,6 +8,8 @@ from ray.autoscaler._private.constants import (
DISABLE_LAUNCH_CONFIG_CHECK_KEY,
DISABLE_NODE_UPDATERS_KEY,
FOREGROUND_NODE_LAUNCH_KEY,
WORKER_LIVENESS_CHECK_KEY,
WORKER_RPC_DRAIN_KEY,
)
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import (
@ -172,6 +174,12 @@ class KuberayNodeProvider(NodeProvider): # type: ignore
assert (
provider_config.get(FOREGROUND_NODE_LAUNCH_KEY, False) is True
), f"To use KuberayNodeProvider, must set `{FOREGROUND_NODE_LAUNCH_KEY}:True`."
assert (
provider_config.get(WORKER_LIVENESS_CHECK_KEY, True) is False
), f"To use KuberayNodeProvider, must set `{WORKER_LIVENESS_CHECK_KEY}:False`."
assert (
provider_config.get(WORKER_RPC_DRAIN_KEY, True) is False
), f"To use KuberayNodeProvider, must set `{WORKER_RPC_DRAIN_KEY}:False`."
provider_exists = True
super().__init__(provider_config, cluster_name)

View file

@ -42,9 +42,11 @@ def _get_basic_autoscaling_config() -> dict:
return {
"cluster_name": "raycluster-complete",
"provider": {
"disable_launch_config_check": True,
"disable_node_updaters": True,
"disable_launch_config_check": True,
"foreground_node_launch": True,
"worker_liveness_check": False,
"worker_rpc_drain": False,
"namespace": "default",
"type": "kuberay",
},

View file

@ -27,7 +27,11 @@ from ray._private.test_utils import RayTestTimeoutException
from ray.autoscaler._private import commands
from ray.autoscaler._private.autoscaler import NonTerminatedNodes, StandardAutoscaler
from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.constants import FOREGROUND_NODE_LAUNCH_KEY
from ray.autoscaler._private.constants import (
FOREGROUND_NODE_LAUNCH_KEY,
WORKER_LIVENESS_CHECK_KEY,
WORKER_RPC_DRAIN_KEY,
)
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.monitor import Monitor
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
@ -75,6 +79,8 @@ class DrainNodeOutcome(str, Enum):
GenericException = "GenericException"
# Tell the autoscaler to fail finding ips during drain
FailedToFindIp = "FailedToFindIp"
# Represents the situation in which draining nodes before termination is disabled.
DrainDisabled = "DrainDisabled"
class MockRpcException(grpc.RpcError):
@ -450,6 +456,13 @@ class MockAutoscaler(StandardAutoscaler):
self.provider.fail_to_fetch_ip = False
class NoUpdaterMockAutoscaler(MockAutoscaler):
def update_nodes(self):
raise AssertionError(
"Node updaters are disabled. This method should not be accessed!"
)
SMALL_CLUSTER = {
"cluster_name": "default",
"min_workers": 2,
@ -1405,7 +1418,13 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = MockAutoscaler(
if disable_node_updaters:
# This class raises an assertion error if we try to create
# a node updater thread.
autoscaler_class = NoUpdaterMockAutoscaler
else:
autoscaler_class = MockAutoscaler
autoscaler = autoscaler_class(
config_path,
LoadMetrics(),
MockNodeInfoStub(),
@ -1434,7 +1453,6 @@ class AutoscalingTest(unittest.TestCase):
if disable_node_updaters:
# Node Updaters have NOT been invoked because they were explicitly
# disabled.
time.sleep(1)
assert len(runner.calls) == 0
# Nodes were create in uninitialized and not updated.
self.waitForNodes(
@ -1528,6 +1546,9 @@ class AutoscalingTest(unittest.TestCase):
def testDynamicScaling6(self):
self.helperDynamicScaling(DrainNodeOutcome.FailedToFindIp)
def testDynamicScaling7(self):
self.helperDynamicScaling(DrainNodeOutcome.DrainDisabled)
def helperDynamicScaling(
self,
drain_node_outcome: DrainNodeOutcome = DrainNodeOutcome.Succeeded,
@ -1535,19 +1556,21 @@ class AutoscalingTest(unittest.TestCase):
):
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
mock_node_info_stub = MockNodeInfoStub(drain_node_outcome)
disable_drain = drain_node_outcome == DrainNodeOutcome.DrainDisabled
# Run the core of the test logic.
self._helperDynamicScaling(
mock_metrics,
mock_node_info_stub,
foreground_node_launcher=foreground_node_launcher,
disable_drain=disable_drain,
)
# Make assertions about DrainNode error handling during scale-down.
if drain_node_outcome == DrainNodeOutcome.Succeeded:
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
assert mock_node_info_stub.drain_node_call_count > 0
# No drain node exceptions.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Each drain node call succeeded.
@ -1557,7 +1580,7 @@ class AutoscalingTest(unittest.TestCase):
)
elif drain_node_outcome == DrainNodeOutcome.Unimplemented:
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
assert mock_node_info_stub.drain_node_call_count > 0
# All errors were supressed.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# Every call failed.
@ -1567,7 +1590,7 @@ class AutoscalingTest(unittest.TestCase):
DrainNodeOutcome.GenericException,
):
# DrainNode call was made.
mock_node_info_stub.drain_node_call_count > 0
assert mock_node_info_stub.drain_node_call_count > 0
# We encountered an exception.
assert mock_metrics.drain_node_exceptions.inc.call_count > 0
@ -1583,17 +1606,30 @@ class AutoscalingTest(unittest.TestCase):
assert mock_node_info_stub.drain_node_call_count == 0
# We encountered an exception fetching ip.
assert mock_metrics.drain_node_exceptions.inc.call_count > 0
elif drain_node_outcome == DrainNodeOutcome.DrainDisabled:
# We never called this API.
assert mock_node_info_stub.drain_node_call_count == 0
# There were no failed calls.
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# There were no successful calls either.
assert mock_node_info_stub.drain_node_reply_success == 0
def testDynamicScalingForegroundLauncher(self):
"""Test autoscaling with node launcher in the foreground."""
self.helperDynamicScaling(foreground_node_launcher=True)
def _helperDynamicScaling(
self, mock_metrics, mock_node_info_stub, foreground_node_launcher=False
self,
mock_metrics,
mock_node_info_stub,
foreground_node_launcher=False,
disable_drain=False,
):
config = copy.deepcopy(SMALL_CLUSTER)
if foreground_node_launcher:
config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True
if disable_drain:
config["provider"][WORKER_RPC_DRAIN_KEY] = False
config_path = self.write_config(config)
self.provider = MockProvider()
@ -2681,7 +2717,28 @@ class AutoscalingTest(unittest.TestCase):
Similar to testRecoverUnhealthyWorkers.
"""
config_path = self.write_config(SMALL_CLUSTER)
self.unhealthyWorkerHelper(disable_liveness_check=False)
def testDontTerminateUnhealthyWorkers(self):
"""Test that the autoscaler leaves unhealthy workers alone when the worker
liveness check is disabled.
"""
self.unhealthyWorkerHelper(disable_liveness_check=True)
def unhealthyWorkerHelper(self, disable_liveness_check: bool):
"""Helper used to test the autoscaler's handling of unhealthy worker nodes.
If disable liveness check is False, the default code path is tested and we
expect to see workers terminated.
If disable liveness check is True, we expect the autoscaler not to take action
on unhealthy nodes, instead delegating node management to another component.
"""
config = copy.deepcopy(SMALL_CLUSTER)
# Make it clear we're not timing out idle nodes here.
config["idle_timeout_minutes"] = 1000000000
if disable_liveness_check:
config["provider"][WORKER_LIVENESS_CHECK_KEY] = False
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
@ -2702,13 +2759,16 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
# Mark a node as unhealthy
# Clear out updaters.
for _ in range(5):
if autoscaler.updaters:
time.sleep(0.05)
autoscaler.update()
assert not autoscaler.updaters
num_calls = len(runner.calls)
# Mark a node as unhealthy
lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0
# Turn off updaters.
autoscaler.disable_node_updaters = True
@ -2717,24 +2777,43 @@ class AutoscalingTest(unittest.TestCase):
"min_workers"
] = 1
fill_in_raylet_ids(self.provider, lm)
autoscaler.update()
# Stopped node metric incremented.
mock_metrics.stopped_nodes.inc.assert_called_once_with()
# One node left.
self.waitForNodes(1)
# Check the node removal event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert (
"Removing 1 nodes of type "
"ray-legacy-worker-node-type (lost contact with raylet)." in events
), events
if disable_liveness_check:
# We've disabled the liveness check, so the unhealthy node should stick
# around until someone else takes care of it.
# Do several autoscaler updates, to reinforce the fact that the
# autoscaler will never take down the unhealthy nodes.
for _ in range(10):
autoscaler.update()
# The nodes are still there.
assert self.num_nodes() == 2
# There's no synchronization required to make the last assertion valid:
# The autoscaler's node termination is synchronous and blocking, as is
# the terminate_node method of the mock node provider used in this test.
# No additional runner calls, since updaters were disabled.
time.sleep(1)
assert len(runner.calls) == num_calls
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
# No events generated indicating that we are removing nodes.
for event in autoscaler.event_summarizer.summary():
assert "Removing" not in event
else:
# We expect the unhealthy node to be cleared out with a single
# autoscaler update.
autoscaler.update()
# Stopped node metric incremented.
mock_metrics.stopped_nodes.inc.assert_called_once_with()
# One node left.
self.waitForNodes(1)
# Check the node removal event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert (
"Removing 1 nodes of type "
"ray-legacy-worker-node-type (lost contact with raylet)." in events
), events
# No additional runner calls, since updaters were disabled.
assert len(runner.calls) == num_calls
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
def testTerminateUnhealthyWorkers2(self):
"""Tests finer details of termination of unhealthy workers when