ray/dashboard/modules/reporter/reporter_head.py

95 lines
3.6 KiB
Python
Raw Normal View History

import json
import logging
import uuid
import aiohttp.web
from aioredis.pubsub import Receiver
from grpc.experimental import aio as aiogrpc
import ray
import ray.gcs_utils
import ray.new_dashboard.modules.reporter.reporter_consts as reporter_consts
import ray.new_dashboard.utils as dashboard_utils
import ray.services
import ray.utils
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.new_dashboard.datacenter import DataSource
logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
class ReportHead(dashboard_utils.DashboardHeadModule):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._stubs = {}
self._profiling_stats = {}
DataSource.agents.signal.append(self._update_stubs)
async def _update_stubs(self, change):
if change.new:
ip, ports = next(iter(change.new.items()))
channel = aiogrpc.insecure_channel("{}:{}".format(ip, ports[1]))
stub = reporter_pb2_grpc.ReporterServiceStub(channel)
self._stubs[ip] = stub
if change.old:
ip, port = next(iter(change.old.items()))
self._stubs.pop(ip)
@routes.get("/api/launch_profiling")
async def launch_profiling(self, req) -> aiohttp.web.Response:
node_id = req.query.get("node_id")
pid = int(req.query.get("pid"))
duration = int(req.query.get("duration"))
profiling_id = str(uuid.uuid4())
reporter_stub = self._stubs[node_id]
reply = await reporter_stub.GetProfilingStats(
reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration))
self._profiling_stats[profiling_id] = reply
return await dashboard_utils.rest_response(
success=True,
message="Profiling launched.",
profiling_id=profiling_id)
@routes.get("/api/check_profiling_status")
async def check_profiling_status(self, req) -> aiohttp.web.Response:
profiling_id = req.query.get("profiling_id")
is_present = profiling_id in self._profiling_stats
if not is_present:
status = {"status": "pending"}
else:
reply = self._profiling_stats[profiling_id]
if reply.stderr:
status = {"status": "error", "error": reply.stderr}
else:
status = {"status": "finished"}
return await dashboard_utils.rest_response(
success=True, message="Profiling status fetched.", status=status)
@routes.get("/api/get_profiling_info")
async def get_profiling_info(self, req) -> aiohttp.web.Response:
profiling_id = req.query.get("profiling_id")
profiling_stats = self._profiling_stats.get(profiling_id)
assert profiling_stats, "profiling not finished"
return await dashboard_utils.rest_response(
success=True,
message="Profiling info fetched.",
profiling_info=json.loads(profiling_stats.profiling_stats))
async def run(self, server):
aioredis_client = self._dashboard_head.aioredis_client
receiver = Receiver()
reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX)
await aioredis_client.psubscribe(receiver.pattern(reporter_key))
logger.info("Subscribed to {}".format(reporter_key))
async for sender, msg in receiver.iter():
try:
_, data = msg
data = json.loads(ray.utils.decode(data))
DataSource.node_physical_stats[data["ip"]] = data
except Exception as ex:
logger.exception(ex)