ray/dashboard/modules/reporter/reporter_head.py

119 lines
4.4 KiB
Python
Raw Normal View History

import json
import logging
import yaml
import os
import aiohttp.web
from aioredis.pubsub import Receiver
from grpc.experimental import aio as aiogrpc
import ray
import ray.gcs_utils
import ray.new_dashboard.modules.reporter.reporter_consts as reporter_consts
import ray.new_dashboard.utils as dashboard_utils
import ray._private.services
import ray.utils
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.new_dashboard.datacenter import DataSource
logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
class ReportHead(dashboard_utils.DashboardHeadModule):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._stubs = {}
DataSource.agents.signal.append(self._update_stubs)
async def _update_stubs(self, change):
if change.old:
node_id, port = change.old
ip = DataSource.node_id_to_ip[node_id]
self._stubs.pop(ip)
if change.new:
node_id, ports = change.new
ip = DataSource.node_id_to_ip[node_id]
channel = aiogrpc.insecure_channel(f"{ip}:{ports[1]}")
stub = reporter_pb2_grpc.ReporterServiceStub(channel)
self._stubs[ip] = stub
@routes.get("/api/launch_profiling")
async def launch_profiling(self, req) -> aiohttp.web.Response:
ip = req.query["ip"]
pid = int(req.query["pid"])
duration = int(req.query["duration"])
reporter_stub = self._stubs[ip]
reply = await reporter_stub.GetProfilingStats(
reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration))
profiling_info = (json.loads(reply.profiling_stats)
if reply.profiling_stats else reply.std_out)
return await dashboard_utils.rest_response(
success=True,
message="Profiling success.",
profiling_info=profiling_info)
@routes.get("/api/ray_config")
async def get_ray_config(self, req) -> aiohttp.web.Response:
if self._ray_config is None:
try:
config_path = os.path.expanduser("~/ray_bootstrap_config.yaml")
with open(config_path) as f:
cfg = yaml.safe_load(f)
except yaml.YAMLError:
return await dashboard_utils.rest_response(
success=False,
message=f"No config found at {config_path}.",
)
except FileNotFoundError:
return await dashboard_utils.rest_response(
success=False,
message=f"Invalid config, could not load YAML.")
payload = {
"min_workers": cfg["min_workers"],
"max_workers": cfg["max_workers"],
"initial_workers": cfg["initial_workers"],
"autoscaling_mode": cfg["autoscaling_mode"],
"idle_timeout_minutes": cfg["idle_timeout_minutes"],
}
try:
payload["head_type"] = cfg["head_node"]["InstanceType"]
except KeyError:
payload["head_type"] = "unknown"
try:
payload["worker_type"] = cfg["worker_nodes"]["InstanceType"]
except KeyError:
payload["worker_type"] = "unknown"
self._ray_config = payload
return await dashboard_utils.rest_response(
success=True,
message="Fetched ray config.",
**self._ray_config,
)
async def run(self, server):
aioredis_client = self._dashboard_head.aioredis_client
receiver = Receiver()
reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX)
await aioredis_client.psubscribe(receiver.pattern(reporter_key))
logger.info(f"Subscribed to {reporter_key}")
async for sender, msg in receiver.iter():
try:
# The key is b'RAY_REPORTER:{node id hex}',
# e.g. b'RAY_REPORTER:2b4fbd406898cc86fb88fb0acfd5456b0afd87cf'
key, data = msg
data = json.loads(ray.utils.decode(data))
key = key.decode("utf-8")
node_id = key.split(":")[-1]
DataSource.node_physical_stats[node_id] = data
except Exception:
logger.exception(
"Error receiving node physical stats from reporter agent.")