mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Dashboard] Move gcs health check to a separate thread to avoid crashing due to excessive CPU usage. (#18236)
This commit is contained in:
parent
e049d52d29
commit
e61160d514
7 changed files with 79 additions and 46 deletions
|
@ -15,6 +15,7 @@ GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR = env_integer(
|
|||
"GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR", 10)
|
||||
GCS_CHECK_ALIVE_INTERVAL_SECONDS = env_integer(
|
||||
"GCS_CHECK_ALIVE_INTERVAL_SECONDS", 5)
|
||||
GCS_CHECK_ALIVE_RPC_TIMEOUT = env_integer("GCS_CHECK_ALIVE_RPC_TIMEOUT", 10)
|
||||
GCS_RETRY_CONNECT_INTERVAL_SECONDS = env_integer(
|
||||
"GCS_RETRY_CONNECT_INTERVAL_SECONDS", 2)
|
||||
# aiohttp_cache
|
||||
|
|
|
@ -4,6 +4,7 @@ import socket
|
|||
import asyncio
|
||||
import logging
|
||||
import ipaddress
|
||||
import threading
|
||||
|
||||
from grpc.experimental import aio as aiogrpc
|
||||
|
||||
|
@ -28,12 +29,73 @@ routes = dashboard_utils.ClassMethodRouteTable
|
|||
aiogrpc.init_grpc_aio()
|
||||
|
||||
|
||||
async def make_gcs_grpc_channel(redis_client):
|
||||
while True:
|
||||
try:
|
||||
gcs_address = await redis_client.get(
|
||||
dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS)
|
||||
if not gcs_address:
|
||||
raise Exception("GCS address not found.")
|
||||
logger.info("Connect to GCS at %s", gcs_address)
|
||||
options = (("grpc.enable_http_proxy", 0), )
|
||||
channel = aiogrpc.insecure_channel(gcs_address, options=options)
|
||||
return channel
|
||||
except Exception as ex:
|
||||
logger.error("Connect to GCS failed: %s, retry...", ex)
|
||||
await asyncio.sleep(
|
||||
dashboard_consts.GCS_RETRY_CONNECT_INTERVAL_SECONDS)
|
||||
|
||||
|
||||
class GCSHealthCheckThread(threading.Thread):
|
||||
def __init__(self, redis_client):
|
||||
self.thread_local_loop = asyncio.new_event_loop()
|
||||
self.aiogrpc_gcs_channel = None
|
||||
self.gcs_heartbeat_info_stub = None
|
||||
|
||||
async def on_startup():
|
||||
aiogrpc.init_grpc_aio()
|
||||
self.aiogrpc_gcs_channel = await (
|
||||
make_gcs_grpc_channel(redis_client))
|
||||
self.gcs_heartbeat_info_stub = (
|
||||
gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(
|
||||
self.aiogrpc_gcs_channel))
|
||||
|
||||
self.startup_task = self.thread_local_loop.create_task(on_startup())
|
||||
|
||||
super().__init__(daemon=True)
|
||||
|
||||
async def _check_once(self) -> bool:
|
||||
request = gcs_service_pb2.CheckAliveRequest()
|
||||
try:
|
||||
reply = await self.gcs_heartbeat_info_stub.CheckAlive(
|
||||
request, timeout=dashboard_consts.GCS_CHECK_ALIVE_RPC_TIMEOUT)
|
||||
if reply.status.code != 0:
|
||||
logger.exception(
|
||||
f"Failed to CheckAlive: {reply.status.message}")
|
||||
return False
|
||||
except aiogrpc.AioRpcError: # Deadline Exceeded
|
||||
logger.exception("Got AioRpcError when checking GCS is alive")
|
||||
return False
|
||||
return True
|
||||
|
||||
async def check_once(self) -> bool:
|
||||
# Make sure startup is complete
|
||||
if not self.startup_task.done():
|
||||
return False
|
||||
# Make the grpc call inside the thread loop so it's not blocked by
|
||||
# potentially busy main loop.
|
||||
return await asyncio.wrap_future(
|
||||
asyncio.run_coroutine_threadsafe(self._check_once(),
|
||||
self.thread_local_loop))
|
||||
|
||||
def run(self) -> None:
|
||||
self.thread_local_loop.run_forever()
|
||||
|
||||
|
||||
class DashboardHead:
|
||||
def __init__(self, http_host, http_port, http_port_retries, redis_address,
|
||||
redis_password, log_dir):
|
||||
# HeartbeatInfoGcsService
|
||||
self._gcs_heartbeat_info_stub = None
|
||||
self._gcs_check_alive_seq = 0
|
||||
self.health_check_thread: GCSHealthCheckThread = None
|
||||
self._gcs_rpc_error_counter = 0
|
||||
# Public attributes are accessible for all head modules.
|
||||
# Walkaround for issue: https://github.com/ray-project/ray/issues/7084
|
||||
|
@ -56,20 +118,10 @@ class DashboardHead:
|
|||
|
||||
@async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS)
|
||||
async def _gcs_check_alive(self):
|
||||
try:
|
||||
self._gcs_check_alive_seq += 1
|
||||
request = gcs_service_pb2.CheckAliveRequest(
|
||||
seq=self._gcs_check_alive_seq)
|
||||
reply = await self._gcs_heartbeat_info_stub.CheckAlive(
|
||||
request, timeout=2)
|
||||
if reply.status.code != 0:
|
||||
raise Exception(
|
||||
f"Failed to CheckAlive: {reply.status.message}")
|
||||
is_alive = await self.health_check_thread.check_once()
|
||||
if is_alive:
|
||||
self._gcs_rpc_error_counter = 0
|
||||
except aiogrpc.AioRpcError:
|
||||
logger.exception(
|
||||
"Got AioRpcError when checking GCS is alive, seq=%s.",
|
||||
self._gcs_check_alive_seq)
|
||||
else:
|
||||
self._gcs_rpc_error_counter += 1
|
||||
if self._gcs_rpc_error_counter > \
|
||||
dashboard_consts.GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR:
|
||||
|
@ -84,9 +136,6 @@ class DashboardHead:
|
|||
# shutdown(). Please refer to:
|
||||
# https://github.com/ray-project/ray/issues/16328
|
||||
os._exit(-1)
|
||||
except Exception:
|
||||
logger.exception("Error checking GCS is alive, seq=%s.",
|
||||
self._gcs_check_alive_seq)
|
||||
|
||||
def _load_modules(self):
|
||||
"""Load dashboard head modules."""
|
||||
|
@ -120,28 +169,12 @@ class DashboardHead:
|
|||
loop=asyncio.get_event_loop())
|
||||
|
||||
# Waiting for GCS is ready.
|
||||
while True:
|
||||
try:
|
||||
gcs_address = await self.aioredis_client.get(
|
||||
dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS)
|
||||
if not gcs_address:
|
||||
raise Exception("GCS address not found.")
|
||||
logger.info("Connect to GCS at %s", gcs_address)
|
||||
options = (("grpc.enable_http_proxy", 0), )
|
||||
channel = aiogrpc.insecure_channel(
|
||||
gcs_address, options=options)
|
||||
except Exception as ex:
|
||||
logger.error("Connect to GCS failed: %s, retry...", ex)
|
||||
await asyncio.sleep(
|
||||
dashboard_consts.GCS_RETRY_CONNECT_INTERVAL_SECONDS)
|
||||
else:
|
||||
self.aiogrpc_gcs_channel = channel
|
||||
break
|
||||
self.aiogrpc_gcs_channel = await make_gcs_grpc_channel(
|
||||
self.aioredis_client)
|
||||
|
||||
# Create a HeartbeatInfoGcsServiceStub.
|
||||
self._gcs_heartbeat_info_stub = \
|
||||
gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(
|
||||
self.aiogrpc_gcs_channel)
|
||||
self.health_check_thread = GCSHealthCheckThread(
|
||||
redis_client=self.aioredis_client)
|
||||
self.health_check_thread.start()
|
||||
|
||||
# Start a grpc asyncio server.
|
||||
await self.server.start()
|
||||
|
|
|
@ -83,7 +83,7 @@ def test_log(disable_aiohttp_cache, ray_start_with_dashboard):
|
|||
# Test range request.
|
||||
response = requests.get(
|
||||
webui_url + "/logs/dashboard.log",
|
||||
headers={"Range": "bytes=43-51"})
|
||||
headers={"Range": "bytes=44-52"})
|
||||
response.raise_for_status()
|
||||
assert response.text == "Dashboard"
|
||||
|
||||
|
@ -122,7 +122,7 @@ def test_log_proxy(ray_start_with_dashboard):
|
|||
# Test range request.
|
||||
response = requests.get(
|
||||
f"{webui_url}/log_proxy?url={webui_url}/logs/dashboard.log",
|
||||
headers={"Range": "bytes=43-51"})
|
||||
headers={"Range": "bytes=44-52"})
|
||||
response.raise_for_status()
|
||||
assert response.text == "Dashboard"
|
||||
# Test 404.
|
||||
|
|
|
@ -64,7 +64,9 @@ def fast_gcs_failure_detection():
|
|||
os.environ["GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR"] = "2"
|
||||
os.environ["GCS_CHECK_ALIVE_INTERVAL_SECONDS"] = "1"
|
||||
os.environ["GCS_RETRY_CONNECT_INTERVAL_SECONDS"] = "1"
|
||||
os.environ["GCS_CHECK_ALIVE_RPC_TIMEOUT"] = "1"
|
||||
yield
|
||||
os.environ.pop("GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR", None)
|
||||
os.environ.pop("GCS_CHECK_ALIVE_INTERVAL_SECONDS", None)
|
||||
os.environ.pop("GCS_RETRY_CONNECT_INTERVAL_SECONDS", None)
|
||||
os.environ.pop("GCS_CHECK_ALIVE_RPC_TIMEOUT", None)
|
||||
|
|
|
@ -1786,7 +1786,7 @@ def healthcheck(address, redis_password, component):
|
|||
options = (("grpc.enable_http_proxy", 0), )
|
||||
channel = grpc.insecure_channel(gcs_address, options=options)
|
||||
stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(channel)
|
||||
request = gcs_service_pb2.CheckAliveRequest(seq=0)
|
||||
request = gcs_service_pb2.CheckAliveRequest()
|
||||
reply = stub.CheckAlive(
|
||||
request, timeout=ray.ray_constants.HEALTHCHECK_EXPIRATION_S)
|
||||
if reply.status.code == 0:
|
||||
|
|
|
@ -91,7 +91,6 @@ void GcsHeartbeatManager::HandleReportHeartbeat(
|
|||
void GcsHeartbeatManager::HandleCheckAlive(const rpc::CheckAliveRequest &request,
|
||||
rpc::CheckAliveReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
reply->set_seq(request.seq());
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
}
|
||||
|
||||
|
|
|
@ -190,12 +190,10 @@ message ReportHeartbeatReply {
|
|||
}
|
||||
|
||||
message CheckAliveRequest {
|
||||
int32 seq = 1;
|
||||
}
|
||||
|
||||
message CheckAliveReply {
|
||||
GcsStatus status = 1;
|
||||
int32 seq = 2;
|
||||
}
|
||||
|
||||
message GetInternalConfigRequest {
|
||||
|
|
Loading…
Add table
Reference in a new issue