diff --git a/dashboard/consts.py b/dashboard/consts.py index c1d3ec474..e73d8eb50 100644 --- a/dashboard/consts.py +++ b/dashboard/consts.py @@ -15,6 +15,7 @@ GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR = env_integer( "GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR", 10) GCS_CHECK_ALIVE_INTERVAL_SECONDS = env_integer( "GCS_CHECK_ALIVE_INTERVAL_SECONDS", 5) +GCS_CHECK_ALIVE_RPC_TIMEOUT = env_integer("GCS_CHECK_ALIVE_RPC_TIMEOUT", 10) GCS_RETRY_CONNECT_INTERVAL_SECONDS = env_integer( "GCS_RETRY_CONNECT_INTERVAL_SECONDS", 2) # aiohttp_cache diff --git a/dashboard/head.py b/dashboard/head.py index d25cf34f7..525128144 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -4,6 +4,7 @@ import socket import asyncio import logging import ipaddress +import threading from grpc.experimental import aio as aiogrpc @@ -28,12 +29,73 @@ routes = dashboard_utils.ClassMethodRouteTable aiogrpc.init_grpc_aio() +async def make_gcs_grpc_channel(redis_client): + while True: + try: + gcs_address = await redis_client.get( + dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS) + if not gcs_address: + raise Exception("GCS address not found.") + logger.info("Connect to GCS at %s", gcs_address) + options = (("grpc.enable_http_proxy", 0), ) + channel = aiogrpc.insecure_channel(gcs_address, options=options) + return channel + except Exception as ex: + logger.error("Connect to GCS failed: %s, retry...", ex) + await asyncio.sleep( + dashboard_consts.GCS_RETRY_CONNECT_INTERVAL_SECONDS) + + +class GCSHealthCheckThread(threading.Thread): + def __init__(self, redis_client): + self.thread_local_loop = asyncio.new_event_loop() + self.aiogrpc_gcs_channel = None + self.gcs_heartbeat_info_stub = None + + async def on_startup(): + aiogrpc.init_grpc_aio() + self.aiogrpc_gcs_channel = await ( + make_gcs_grpc_channel(redis_client)) + self.gcs_heartbeat_info_stub = ( + gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub( + self.aiogrpc_gcs_channel)) + + self.startup_task = self.thread_local_loop.create_task(on_startup()) + + super().__init__(daemon=True) + + async def _check_once(self) -> bool: + request = gcs_service_pb2.CheckAliveRequest() + try: + reply = await self.gcs_heartbeat_info_stub.CheckAlive( + request, timeout=dashboard_consts.GCS_CHECK_ALIVE_RPC_TIMEOUT) + if reply.status.code != 0: + logger.exception( + f"Failed to CheckAlive: {reply.status.message}") + return False + except aiogrpc.AioRpcError: # Deadline Exceeded + logger.exception("Got AioRpcError when checking GCS is alive") + return False + return True + + async def check_once(self) -> bool: + # Make sure startup is complete + if not self.startup_task.done(): + return False + # Make the grpc call inside the thread loop so it's not blocked by + # potentially busy main loop. + return await asyncio.wrap_future( + asyncio.run_coroutine_threadsafe(self._check_once(), + self.thread_local_loop)) + + def run(self) -> None: + self.thread_local_loop.run_forever() + + class DashboardHead: def __init__(self, http_host, http_port, http_port_retries, redis_address, redis_password, log_dir): - # HeartbeatInfoGcsService - self._gcs_heartbeat_info_stub = None - self._gcs_check_alive_seq = 0 + self.health_check_thread: GCSHealthCheckThread = None self._gcs_rpc_error_counter = 0 # Public attributes are accessible for all head modules. # Walkaround for issue: https://github.com/ray-project/ray/issues/7084 @@ -56,20 +118,10 @@ class DashboardHead: @async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS) async def _gcs_check_alive(self): - try: - self._gcs_check_alive_seq += 1 - request = gcs_service_pb2.CheckAliveRequest( - seq=self._gcs_check_alive_seq) - reply = await self._gcs_heartbeat_info_stub.CheckAlive( - request, timeout=2) - if reply.status.code != 0: - raise Exception( - f"Failed to CheckAlive: {reply.status.message}") + is_alive = await self.health_check_thread.check_once() + if is_alive: self._gcs_rpc_error_counter = 0 - except aiogrpc.AioRpcError: - logger.exception( - "Got AioRpcError when checking GCS is alive, seq=%s.", - self._gcs_check_alive_seq) + else: self._gcs_rpc_error_counter += 1 if self._gcs_rpc_error_counter > \ dashboard_consts.GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR: @@ -84,9 +136,6 @@ class DashboardHead: # shutdown(). Please refer to: # https://github.com/ray-project/ray/issues/16328 os._exit(-1) - except Exception: - logger.exception("Error checking GCS is alive, seq=%s.", - self._gcs_check_alive_seq) def _load_modules(self): """Load dashboard head modules.""" @@ -120,28 +169,12 @@ class DashboardHead: loop=asyncio.get_event_loop()) # Waiting for GCS is ready. - while True: - try: - gcs_address = await self.aioredis_client.get( - dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS) - if not gcs_address: - raise Exception("GCS address not found.") - logger.info("Connect to GCS at %s", gcs_address) - options = (("grpc.enable_http_proxy", 0), ) - channel = aiogrpc.insecure_channel( - gcs_address, options=options) - except Exception as ex: - logger.error("Connect to GCS failed: %s, retry...", ex) - await asyncio.sleep( - dashboard_consts.GCS_RETRY_CONNECT_INTERVAL_SECONDS) - else: - self.aiogrpc_gcs_channel = channel - break + self.aiogrpc_gcs_channel = await make_gcs_grpc_channel( + self.aioredis_client) - # Create a HeartbeatInfoGcsServiceStub. - self._gcs_heartbeat_info_stub = \ - gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub( - self.aiogrpc_gcs_channel) + self.health_check_thread = GCSHealthCheckThread( + redis_client=self.aioredis_client) + self.health_check_thread.start() # Start a grpc asyncio server. await self.server.start() diff --git a/dashboard/modules/log/tests/test_log.py b/dashboard/modules/log/tests/test_log.py index 09651a94c..db6549918 100644 --- a/dashboard/modules/log/tests/test_log.py +++ b/dashboard/modules/log/tests/test_log.py @@ -83,7 +83,7 @@ def test_log(disable_aiohttp_cache, ray_start_with_dashboard): # Test range request. response = requests.get( webui_url + "/logs/dashboard.log", - headers={"Range": "bytes=43-51"}) + headers={"Range": "bytes=44-52"}) response.raise_for_status() assert response.text == "Dashboard" @@ -122,7 +122,7 @@ def test_log_proxy(ray_start_with_dashboard): # Test range request. response = requests.get( f"{webui_url}/log_proxy?url={webui_url}/logs/dashboard.log", - headers={"Range": "bytes=43-51"}) + headers={"Range": "bytes=44-52"}) response.raise_for_status() assert response.text == "Dashboard" # Test 404. diff --git a/dashboard/tests/conftest.py b/dashboard/tests/conftest.py index d607c6aaa..34acc9925 100644 --- a/dashboard/tests/conftest.py +++ b/dashboard/tests/conftest.py @@ -64,7 +64,9 @@ def fast_gcs_failure_detection(): os.environ["GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR"] = "2" os.environ["GCS_CHECK_ALIVE_INTERVAL_SECONDS"] = "1" os.environ["GCS_RETRY_CONNECT_INTERVAL_SECONDS"] = "1" + os.environ["GCS_CHECK_ALIVE_RPC_TIMEOUT"] = "1" yield os.environ.pop("GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR", None) os.environ.pop("GCS_CHECK_ALIVE_INTERVAL_SECONDS", None) os.environ.pop("GCS_RETRY_CONNECT_INTERVAL_SECONDS", None) + os.environ.pop("GCS_CHECK_ALIVE_RPC_TIMEOUT", None) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 66f9cd1b8..a8ac4008f 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -1786,7 +1786,7 @@ def healthcheck(address, redis_password, component): options = (("grpc.enable_http_proxy", 0), ) channel = grpc.insecure_channel(gcs_address, options=options) stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(channel) - request = gcs_service_pb2.CheckAliveRequest(seq=0) + request = gcs_service_pb2.CheckAliveRequest() reply = stub.CheckAlive( request, timeout=ray.ray_constants.HEALTHCHECK_EXPIRATION_S) if reply.status.code == 0: diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc index fd01207af..49ffc1e04 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -91,7 +91,6 @@ void GcsHeartbeatManager::HandleReportHeartbeat( void GcsHeartbeatManager::HandleCheckAlive(const rpc::CheckAliveRequest &request, rpc::CheckAliveReply *reply, rpc::SendReplyCallback send_reply_callback) { - reply->set_seq(request.seq()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 3caee9334..3c1d16e24 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -190,12 +190,10 @@ message ReportHeartbeatReply { } message CheckAliveRequest { - int32 seq = 1; } message CheckAliveReply { GcsStatus status = 1; - int32 seq = 2; } message GetInternalConfigRequest {