2020-07-27 11:34:47 +08:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
|
|
|
|
import aiohttp.web
|
|
|
|
from aioredis.pubsub import Receiver
|
|
|
|
from grpc.experimental import aio as aiogrpc
|
|
|
|
|
|
|
|
import ray
|
|
|
|
import ray.gcs_utils
|
|
|
|
import ray.new_dashboard.modules.reporter.reporter_consts as reporter_consts
|
|
|
|
import ray.new_dashboard.utils as dashboard_utils
|
|
|
|
import ray.services
|
|
|
|
import ray.utils
|
|
|
|
from ray.core.generated import reporter_pb2
|
|
|
|
from ray.core.generated import reporter_pb2_grpc
|
|
|
|
from ray.new_dashboard.datacenter import DataSource
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
routes = dashboard_utils.ClassMethodRouteTable
|
|
|
|
|
|
|
|
|
|
|
|
class ReportHead(dashboard_utils.DashboardHeadModule):
|
|
|
|
def __init__(self, dashboard_head):
|
|
|
|
super().__init__(dashboard_head)
|
|
|
|
self._stubs = {}
|
|
|
|
DataSource.agents.signal.append(self._update_stubs)
|
|
|
|
|
|
|
|
async def _update_stubs(self, change):
|
2020-08-30 14:09:34 +08:00
|
|
|
if change.old:
|
|
|
|
ip, port = change.old
|
|
|
|
self._stubs.pop(ip)
|
2020-07-27 11:34:47 +08:00
|
|
|
if change.new:
|
2020-08-30 14:09:34 +08:00
|
|
|
ip, ports = change.new
|
|
|
|
channel = aiogrpc.insecure_channel(f"{ip}:{ports[1]}")
|
2020-07-27 11:34:47 +08:00
|
|
|
stub = reporter_pb2_grpc.ReporterServiceStub(channel)
|
|
|
|
self._stubs[ip] = stub
|
|
|
|
|
|
|
|
@routes.get("/api/launch_profiling")
|
|
|
|
async def launch_profiling(self, req) -> aiohttp.web.Response:
|
2020-08-30 14:09:34 +08:00
|
|
|
ip = req.query["ip"]
|
|
|
|
pid = int(req.query["pid"])
|
|
|
|
duration = int(req.query["duration"])
|
|
|
|
reporter_stub = self._stubs[ip]
|
2020-07-27 11:34:47 +08:00
|
|
|
reply = await reporter_stub.GetProfilingStats(
|
|
|
|
reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration))
|
2020-08-30 14:09:34 +08:00
|
|
|
profiling_info = (json.loads(reply.profiling_stats)
|
|
|
|
if reply.profiling_stats else reply.std_out)
|
2020-07-27 11:34:47 +08:00
|
|
|
return await dashboard_utils.rest_response(
|
|
|
|
success=True,
|
2020-08-30 14:09:34 +08:00
|
|
|
message="Profiling success.",
|
|
|
|
profiling_info=profiling_info)
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2020-08-25 04:24:23 +08:00
|
|
|
async def run(self, server):
|
|
|
|
aioredis_client = self._dashboard_head.aioredis_client
|
|
|
|
receiver = Receiver()
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
reporter_key = "{}*".format(reporter_consts.REPORTER_PREFIX)
|
2020-08-25 04:24:23 +08:00
|
|
|
await aioredis_client.psubscribe(receiver.pattern(reporter_key))
|
2020-08-30 14:09:34 +08:00
|
|
|
logger.info(f"Subscribed to {reporter_key}")
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2020-08-25 04:24:23 +08:00
|
|
|
async for sender, msg in receiver.iter():
|
2020-07-27 11:34:47 +08:00
|
|
|
try:
|
|
|
|
_, data = msg
|
|
|
|
data = json.loads(ray.utils.decode(data))
|
|
|
|
DataSource.node_physical_stats[data["ip"]] = data
|
2020-08-30 14:09:34 +08:00
|
|
|
except Exception:
|
|
|
|
logger.exception(
|
|
|
|
"Error receiving node physical stats from reporter agent.")
|