2021-07-27 14:05:44 +08:00
|
|
|
import os
|
2020-07-27 11:34:47 +08:00
|
|
|
import asyncio
|
|
|
|
import logging
|
2021-09-03 14:23:56 -07:00
|
|
|
import threading
|
2021-10-18 13:18:06 -07:00
|
|
|
from concurrent.futures import Future
|
|
|
|
from queue import Queue
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2021-11-12 22:59:57 -08:00
|
|
|
try:
|
|
|
|
from grpc import aio as aiogrpc
|
|
|
|
except ImportError:
|
|
|
|
from grpc.experimental import aio as aiogrpc
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2021-11-10 20:24:53 -08:00
|
|
|
import ray.experimental.internal_kv as internal_kv
|
2021-10-21 06:39:11 +01:00
|
|
|
import ray._private.utils
|
2022-04-18 09:58:45 -07:00
|
|
|
from ray._private.gcs_utils import GcsClient, check_health
|
2020-09-24 22:46:35 -07:00
|
|
|
import ray._private.services
|
2021-09-15 11:17:15 -05:00
|
|
|
import ray.dashboard.consts as dashboard_consts
|
|
|
|
import ray.dashboard.utils as dashboard_utils
|
2021-02-24 08:27:48 +08:00
|
|
|
from ray import ray_constants
|
2021-12-09 23:10:10 -08:00
|
|
|
from ray._private.gcs_pubsub import (
|
|
|
|
GcsAioErrorSubscriber,
|
|
|
|
GcsAioLogSubscriber,
|
2022-01-29 18:41:57 -08:00
|
|
|
)
|
2021-09-15 11:17:15 -05:00
|
|
|
from ray.dashboard.datacenter import DataOrganizer
|
|
|
|
from ray.dashboard.utils import async_loop_forever
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
aiogrpc.init_grpc_aio()
|
2021-10-18 13:18:06 -07:00
|
|
|
GRPC_CHANNEL_OPTIONS = (
|
2022-05-10 11:30:46 +08:00
|
|
|
*ray_constants.GLOBAL_GRPC_OPTIONS,
|
2021-10-18 13:18:06 -07:00
|
|
|
("grpc.max_send_message_length", ray_constants.GRPC_CPP_MAX_MESSAGE_SIZE),
|
|
|
|
("grpc.max_receive_message_length", ray_constants.GRPC_CPP_MAX_MESSAGE_SIZE),
|
|
|
|
)
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
|
2021-09-03 14:23:56 -07:00
|
|
|
class GCSHealthCheckThread(threading.Thread):
|
2021-10-18 13:18:06 -07:00
|
|
|
def __init__(self, gcs_address: str):
|
2022-04-18 09:58:45 -07:00
|
|
|
self.gcs_address = gcs_address
|
2021-10-18 13:18:06 -07:00
|
|
|
self.work_queue = Queue()
|
2021-09-03 14:23:56 -07:00
|
|
|
|
|
|
|
super().__init__(daemon=True)
|
|
|
|
|
2021-10-18 13:18:06 -07:00
|
|
|
def run(self) -> None:
|
|
|
|
while True:
|
|
|
|
future = self.work_queue.get()
|
2022-04-18 09:58:45 -07:00
|
|
|
check_result = check_health(self.gcs_address)
|
2021-10-18 13:18:06 -07:00
|
|
|
future.set_result(check_result)
|
|
|
|
|
2021-09-03 14:23:56 -07:00
|
|
|
async def check_once(self) -> bool:
|
2021-10-18 13:18:06 -07:00
|
|
|
"""Ask the thread to perform a healthcheck."""
|
|
|
|
assert (
|
|
|
|
threading.current_thread != self
|
|
|
|
), "caller shouldn't be from the same thread as GCSHealthCheckThread."
|
2021-09-03 14:23:56 -07:00
|
|
|
|
2021-10-18 13:18:06 -07:00
|
|
|
future = Future()
|
|
|
|
self.work_queue.put(future)
|
|
|
|
return await asyncio.wrap_future(future)
|
2021-09-03 14:23:56 -07:00
|
|
|
|
|
|
|
|
2020-07-27 11:34:47 +08:00
|
|
|
class DashboardHead:
|
2021-12-21 16:58:03 -08:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
http_host,
|
|
|
|
http_port,
|
|
|
|
http_port_retries,
|
|
|
|
gcs_address,
|
|
|
|
log_dir,
|
2022-02-01 15:34:40 +09:00
|
|
|
temp_dir,
|
2022-02-09 15:12:36 +09:00
|
|
|
session_dir,
|
2022-02-01 15:34:40 +09:00
|
|
|
minimal,
|
2021-12-21 16:58:03 -08:00
|
|
|
):
|
2022-02-01 15:34:40 +09:00
|
|
|
self.minimal = minimal
|
2021-09-03 14:23:56 -07:00
|
|
|
self.health_check_thread: GCSHealthCheckThread = None
|
2021-07-27 14:05:44 +08:00
|
|
|
self._gcs_rpc_error_counter = 0
|
2020-07-27 11:34:47 +08:00
|
|
|
# Public attributes are accessible for all head modules.
|
2021-02-24 08:27:48 +08:00
|
|
|
# Walkaround for issue: https://github.com/ray-project/ray/issues/7084
|
|
|
|
self.http_host = "127.0.0.1" if http_host == "localhost" else http_host
|
2020-08-25 04:24:23 +08:00
|
|
|
self.http_port = http_port
|
2021-02-24 08:27:48 +08:00
|
|
|
self.http_port_retries = http_port_retries
|
2021-12-21 16:58:03 -08:00
|
|
|
|
2022-02-01 15:34:40 +09:00
|
|
|
self.gcs_address = None
|
2022-03-04 12:32:17 -08:00
|
|
|
assert gcs_address is not None
|
|
|
|
self.gcs_address = gcs_address
|
2020-08-30 14:09:34 +08:00
|
|
|
self.log_dir = log_dir
|
2022-02-01 15:34:40 +09:00
|
|
|
self.temp_dir = temp_dir
|
2022-02-09 15:12:36 +09:00
|
|
|
self.session_dir = session_dir
|
2020-07-27 11:34:47 +08:00
|
|
|
self.aiogrpc_gcs_channel = None
|
2021-12-09 23:10:10 -08:00
|
|
|
self.gcs_error_subscriber = None
|
|
|
|
self.gcs_log_subscriber = None
|
2021-03-18 13:10:57 -07:00
|
|
|
self.ip = ray.util.get_node_ip_address()
|
2022-03-04 12:32:17 -08:00
|
|
|
ip, port = gcs_address.split(":")
|
2021-12-21 16:58:03 -08:00
|
|
|
|
2020-08-25 04:24:23 +08:00
|
|
|
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
|
2021-11-02 20:17:55 -07:00
|
|
|
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
|
2021-10-21 06:39:11 +01:00
|
|
|
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
|
2021-11-02 20:17:55 -07:00
|
|
|
self.server, f"{grpc_ip}:0"
|
|
|
|
)
|
|
|
|
logger.info("Dashboard head grpc address: %s:%s", grpc_ip, self.grpc_port)
|
2022-02-01 15:34:40 +09:00
|
|
|
# If the dashboard is started as non-minimal version, http server should
|
|
|
|
# be configured to expose APIs.
|
|
|
|
self.http_server = None
|
|
|
|
|
|
|
|
async def _configure_http_server(self, modules):
|
|
|
|
from ray.dashboard.http_server_head import HttpServerDashboardHead
|
|
|
|
|
|
|
|
http_server = HttpServerDashboardHead(
|
|
|
|
self.ip, self.http_host, self.http_port, self.http_port_retries
|
|
|
|
)
|
|
|
|
await http_server.run(modules)
|
|
|
|
return http_server
|
|
|
|
|
|
|
|
@property
|
|
|
|
def http_session(self):
|
|
|
|
assert self.http_server, "Accessing unsupported API in a minimal ray."
|
|
|
|
return self.http_server.http_session
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2021-07-27 14:05:44 +08:00
|
|
|
@async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS)
|
|
|
|
async def _gcs_check_alive(self):
|
2021-10-18 13:18:06 -07:00
|
|
|
check_future = self.health_check_thread.check_once()
|
|
|
|
|
|
|
|
# NOTE(simon): making sure the check procedure doesn't timeout itself.
|
|
|
|
# Otherwise, the dashboard will always think that gcs is alive.
|
|
|
|
try:
|
|
|
|
is_alive = await asyncio.wait_for(
|
|
|
|
check_future, dashboard_consts.GCS_CHECK_ALIVE_RPC_TIMEOUT + 1
|
|
|
|
)
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
logger.error("Failed to check gcs health, client timed out.")
|
|
|
|
is_alive = False
|
|
|
|
|
2021-09-03 14:23:56 -07:00
|
|
|
if is_alive:
|
2021-07-27 14:05:44 +08:00
|
|
|
self._gcs_rpc_error_counter = 0
|
2021-09-03 14:23:56 -07:00
|
|
|
else:
|
2021-07-27 14:05:44 +08:00
|
|
|
self._gcs_rpc_error_counter += 1
|
|
|
|
if (
|
|
|
|
self._gcs_rpc_error_counter
|
|
|
|
> dashboard_consts.GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR
|
|
|
|
):
|
|
|
|
logger.error(
|
2021-08-25 09:23:36 -07:00
|
|
|
"Dashboard exiting because it received too many GCS RPC "
|
|
|
|
"errors count: %s, threshold is %s.",
|
2021-07-27 14:05:44 +08:00
|
|
|
self._gcs_rpc_error_counter,
|
|
|
|
dashboard_consts.GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR,
|
|
|
|
)
|
|
|
|
# TODO(fyrestone): Do not use ray.state in
|
|
|
|
# PrometheusServiceDiscoveryWriter.
|
|
|
|
# Currently, we use os._exit() here to avoid hanging at the ray
|
|
|
|
# shutdown(). Please refer to:
|
|
|
|
# https://github.com/ray-project/ray/issues/16328
|
|
|
|
os._exit(-1)
|
|
|
|
|
2020-07-27 11:34:47 +08:00
|
|
|
def _load_modules(self):
|
|
|
|
"""Load dashboard head modules."""
|
|
|
|
modules = []
|
2020-08-25 04:24:23 +08:00
|
|
|
head_cls_list = dashboard_utils.get_all_modules(
|
|
|
|
dashboard_utils.DashboardHeadModule
|
|
|
|
)
|
|
|
|
for cls in head_cls_list:
|
|
|
|
logger.info(
|
2020-07-27 11:34:47 +08:00
|
|
|
"Loading %s: %s", dashboard_utils.DashboardHeadModule.__name__, cls
|
|
|
|
)
|
|
|
|
c = cls(self)
|
|
|
|
modules.append(c)
|
2020-08-30 14:09:34 +08:00
|
|
|
logger.info("Loaded %d modules.", len(modules))
|
2020-07-27 11:34:47 +08:00
|
|
|
return modules
|
|
|
|
|
2021-12-21 16:58:03 -08:00
|
|
|
async def run(self):
|
2022-03-04 12:32:17 -08:00
|
|
|
gcs_address = self.gcs_address
|
2021-12-21 16:58:03 -08:00
|
|
|
|
2021-11-15 23:34:41 -08:00
|
|
|
# Dashboard will handle connection failure automatically
|
|
|
|
self.gcs_client = GcsClient(address=gcs_address, nums_reconnect_retry=0)
|
2021-11-11 14:59:57 -08:00
|
|
|
internal_kv._initialize_internal_kv(self.gcs_client)
|
2021-10-21 06:39:11 +01:00
|
|
|
self.aiogrpc_gcs_channel = ray._private.utils.init_grpc_channel(
|
|
|
|
gcs_address, GRPC_CHANNEL_OPTIONS, asynchronous=True
|
|
|
|
)
|
2022-03-04 12:32:17 -08:00
|
|
|
|
|
|
|
self.gcs_error_subscriber = GcsAioErrorSubscriber(address=gcs_address)
|
|
|
|
self.gcs_log_subscriber = GcsAioLogSubscriber(address=gcs_address)
|
|
|
|
await self.gcs_error_subscriber.subscribe()
|
|
|
|
await self.gcs_log_subscriber.subscribe()
|
2021-09-03 14:23:56 -07:00
|
|
|
|
2021-10-18 13:18:06 -07:00
|
|
|
self.health_check_thread = GCSHealthCheckThread(gcs_address)
|
2021-09-03 14:23:56 -07:00
|
|
|
self.health_check_thread.start()
|
2021-07-27 14:05:44 +08:00
|
|
|
|
2020-08-25 04:24:23 +08:00
|
|
|
# Start a grpc asyncio server.
|
|
|
|
await self.server.start()
|
|
|
|
|
2020-07-27 11:34:47 +08:00
|
|
|
async def _async_notify():
|
|
|
|
"""Notify signals from queue."""
|
|
|
|
while True:
|
|
|
|
co = await dashboard_utils.NotifyQueue.get()
|
|
|
|
try:
|
|
|
|
await co
|
2020-08-30 14:09:34 +08:00
|
|
|
except Exception:
|
|
|
|
logger.exception(f"Error notifying coroutine {co}")
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
modules = self._load_modules()
|
2020-08-25 04:24:23 +08:00
|
|
|
|
2022-02-01 15:34:40 +09:00
|
|
|
http_host, http_port = self.http_host, self.http_port
|
|
|
|
if not self.minimal:
|
|
|
|
self.http_server = await self._configure_http_server(modules)
|
|
|
|
http_host, http_port = self.http_server.get_address()
|
2021-11-10 20:24:53 -08:00
|
|
|
internal_kv._internal_kv_put(
|
2021-12-21 16:58:03 -08:00
|
|
|
ray_constants.DASHBOARD_ADDRESS,
|
2021-11-10 20:24:53 -08:00
|
|
|
f"{http_host}:{http_port}",
|
|
|
|
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
|
|
|
|
)
|
2022-02-01 15:34:40 +09:00
|
|
|
|
|
|
|
# TODO: Use async version if performance is an issue
|
|
|
|
# Write the dashboard head port to gcs kv.
|
2021-11-10 20:24:53 -08:00
|
|
|
internal_kv._internal_kv_put(
|
2021-12-21 16:58:03 -08:00
|
|
|
dashboard_consts.DASHBOARD_RPC_ADDRESS,
|
2021-11-10 20:24:53 -08:00
|
|
|
f"{self.ip}:{self.grpc_port}",
|
|
|
|
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
|
|
|
|
)
|
2020-08-25 04:24:23 +08:00
|
|
|
|
2020-07-27 11:34:47 +08:00
|
|
|
# Freeze signal after all modules loaded.
|
|
|
|
dashboard_utils.SignalManager.freeze()
|
2020-10-28 14:49:31 +08:00
|
|
|
concurrent_tasks = [
|
2021-07-27 14:05:44 +08:00
|
|
|
self._gcs_check_alive(),
|
2020-10-28 14:49:31 +08:00
|
|
|
_async_notify(),
|
|
|
|
DataOrganizer.purge(),
|
|
|
|
DataOrganizer.organize(),
|
|
|
|
]
|
2020-08-25 04:24:23 +08:00
|
|
|
await asyncio.gather(*concurrent_tasks, *(m.run(self.server) for m in modules))
|
|
|
|
await self.server.wait_for_termination()
|
2022-02-01 15:34:40 +09:00
|
|
|
|
|
|
|
if self.http_server:
|
|
|
|
await self.http_server.cleanup()
|