2022-01-21 17:55:11 -08:00
|
|
|
import asyncio
|
2020-07-27 11:34:47 +08:00
|
|
|
import logging
|
2021-09-15 11:17:15 -05:00
|
|
|
import ray.dashboard.consts as dashboard_consts
|
|
|
|
import ray.dashboard.memory_utils as memory_utils
|
2022-01-29 18:41:57 -08:00
|
|
|
|
2021-06-02 21:58:30 +08:00
|
|
|
# TODO(fyrestone): Not import from dashboard module.
|
2022-01-29 18:41:57 -08:00
|
|
|
from ray.dashboard.modules.actor.actor_utils import actor_classname_from_task_spec
|
2021-09-15 11:17:15 -05:00
|
|
|
from ray.dashboard.utils import Dict, Signal, async_loop_forever
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class GlobalSignals:
|
|
|
|
node_info_fetched = Signal(dashboard_consts.SIGNAL_NODE_INFO_FETCHED)
|
2020-10-28 14:49:31 +08:00
|
|
|
node_summary_fetched = Signal(dashboard_consts.SIGNAL_NODE_SUMMARY_FETCHED)
|
2020-12-31 11:12:17 +08:00
|
|
|
job_info_fetched = Signal(dashboard_consts.SIGNAL_JOB_INFO_FETCHED)
|
2020-10-28 14:49:31 +08:00
|
|
|
worker_info_fetched = Signal(dashboard_consts.SIGNAL_WORKER_INFO_FETCHED)
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
|
|
|
|
class DataSource:
|
2020-09-17 01:17:29 +08:00
|
|
|
# {node id hex(str): node stats(dict of GetNodeStatsReply
|
2020-07-27 11:34:47 +08:00
|
|
|
# in node_manager.proto)}
|
|
|
|
node_stats = Dict()
|
2020-09-17 01:17:29 +08:00
|
|
|
# {node id hex(str): node physical stats(dict from reporter_agent.py)}
|
2020-07-27 11:34:47 +08:00
|
|
|
node_physical_stats = Dict()
|
|
|
|
# {actor id hex(str): actor table data(dict of ActorTableData
|
|
|
|
# in gcs.proto)}
|
|
|
|
actors = Dict()
|
2020-12-31 11:12:17 +08:00
|
|
|
# {job id hex(str): job table data(dict of JobTableData in gcs.proto)}
|
|
|
|
jobs = Dict()
|
2020-09-17 01:17:29 +08:00
|
|
|
# {node id hex(str): dashboard agent [http port(int), grpc port(int)]}
|
2020-07-27 11:34:47 +08:00
|
|
|
agents = Dict()
|
2020-09-17 01:17:29 +08:00
|
|
|
# {node id hex(str): gcs node info(dict of GcsNodeInfo in gcs.proto)}
|
2020-07-27 11:34:47 +08:00
|
|
|
nodes = Dict()
|
2020-09-17 01:17:29 +08:00
|
|
|
# {node id hex(str): ip address(str)}
|
|
|
|
node_id_to_ip = Dict()
|
|
|
|
# {node id hex(str): hostname(str)}
|
|
|
|
node_id_to_hostname = Dict()
|
2020-10-28 14:49:31 +08:00
|
|
|
# {node id hex(str): worker list}
|
|
|
|
node_workers = Dict()
|
|
|
|
# {node id hex(str): {actor id hex(str): actor table data}}
|
|
|
|
node_actors = Dict()
|
|
|
|
# {job id hex(str): worker list}
|
|
|
|
job_workers = Dict()
|
|
|
|
# {job id hex(str): {actor id hex(str): actor table data}}
|
|
|
|
job_actors = Dict()
|
|
|
|
# {worker id(str): core worker stats}
|
|
|
|
core_worker_stats = Dict()
|
2021-07-17 21:59:04 -07:00
|
|
|
# {job id hex(str): {event id(str): event dict}}
|
|
|
|
events = Dict()
|
2020-10-02 17:58:44 -07:00
|
|
|
# {node ip (str): log entries by pid
|
|
|
|
# (dict from pid to list of latest log entries)}
|
|
|
|
ip_and_pid_to_logs = Dict()
|
|
|
|
# {node ip (str): error entries by pid
|
|
|
|
# (dict from pid to list of latest err entries)}
|
|
|
|
ip_and_pid_to_errors = Dict()
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
|
|
|
|
class DataOrganizer:
|
|
|
|
@staticmethod
|
2020-10-28 14:49:31 +08:00
|
|
|
@async_loop_forever(dashboard_consts.PURGE_DATA_INTERVAL_SECONDS)
|
2020-07-27 11:34:47 +08:00
|
|
|
async def purge():
|
|
|
|
# Purge data that is out of date.
|
|
|
|
# These data sources are maintained by DashboardHead,
|
|
|
|
# we do not needs to purge them:
|
|
|
|
# * agents
|
|
|
|
# * nodes
|
2020-09-17 01:17:29 +08:00
|
|
|
# * node_id_to_ip
|
|
|
|
# * node_id_to_hostname
|
2020-07-27 11:34:47 +08:00
|
|
|
logger.info("Purge data.")
|
2020-09-17 01:17:29 +08:00
|
|
|
alive_nodes = {
|
|
|
|
node_id
|
|
|
|
for node_id, node_info in DataSource.nodes.items()
|
|
|
|
if node_info["state"] == "ALIVE"
|
|
|
|
}
|
|
|
|
for key in DataSource.node_stats.keys() - alive_nodes:
|
2020-07-27 11:34:47 +08:00
|
|
|
DataSource.node_stats.pop(key)
|
|
|
|
|
2020-09-17 01:17:29 +08:00
|
|
|
for key in DataSource.node_physical_stats.keys() - alive_nodes:
|
2020-07-27 11:34:47 +08:00
|
|
|
DataSource.node_physical_stats.pop(key)
|
|
|
|
|
|
|
|
@classmethod
|
2020-10-28 14:49:31 +08:00
|
|
|
@async_loop_forever(dashboard_consts.ORGANIZE_DATA_INTERVAL_SECONDS)
|
|
|
|
async def organize(cls):
|
|
|
|
job_workers = {}
|
|
|
|
node_workers = {}
|
|
|
|
core_worker_stats = {}
|
2020-12-23 11:14:23 +08:00
|
|
|
# await inside for loop, so we create a copy of keys().
|
|
|
|
for node_id in list(DataSource.nodes.keys()):
|
2020-10-28 14:49:31 +08:00
|
|
|
workers = await cls.get_node_workers(node_id)
|
|
|
|
for worker in workers:
|
|
|
|
job_id = worker["jobId"]
|
|
|
|
job_workers.setdefault(job_id, []).append(worker)
|
|
|
|
for stats in worker.get("coreWorkerStats", []):
|
|
|
|
worker_id = stats["workerId"]
|
|
|
|
core_worker_stats[worker_id] = stats
|
|
|
|
node_workers[node_id] = workers
|
|
|
|
DataSource.job_workers.reset(job_workers)
|
|
|
|
DataSource.node_workers.reset(node_workers)
|
|
|
|
DataSource.core_worker_stats.reset(core_worker_stats)
|
2020-10-13 21:23:23 -04:00
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
@classmethod
|
|
|
|
async def get_node_workers(cls, node_id):
|
|
|
|
workers = []
|
2020-11-11 14:55:54 -08:00
|
|
|
node_ip = DataSource.node_id_to_ip[node_id]
|
|
|
|
node_logs = DataSource.ip_and_pid_to_logs.get(node_ip, {})
|
|
|
|
node_errs = DataSource.ip_and_pid_to_errors.get(node_ip, {})
|
2020-10-28 14:49:31 +08:00
|
|
|
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
|
|
|
|
node_stats = DataSource.node_stats.get(node_id, {})
|
|
|
|
# Merge coreWorkerStats (node stats) to workers (node physical stats)
|
|
|
|
pid_to_worker_stats = {}
|
|
|
|
pid_to_language = {}
|
|
|
|
pid_to_job_id = {}
|
2021-10-20 11:34:42 +09:00
|
|
|
pids_on_node = set()
|
2020-10-28 14:49:31 +08:00
|
|
|
for core_worker_stats in node_stats.get("coreWorkersStats", []):
|
|
|
|
pid = core_worker_stats["pid"]
|
2021-10-20 11:34:42 +09:00
|
|
|
pids_on_node.add(pid)
|
2020-10-28 14:49:31 +08:00
|
|
|
pid_to_worker_stats.setdefault(pid, []).append(core_worker_stats)
|
|
|
|
pid_to_language[pid] = core_worker_stats["language"]
|
|
|
|
pid_to_job_id[pid] = core_worker_stats["jobId"]
|
2021-10-20 11:34:42 +09:00
|
|
|
|
2022-01-21 17:55:11 -08:00
|
|
|
# Clean up logs from a dead pid.
|
2021-10-20 11:34:42 +09:00
|
|
|
dead_pids = set(node_logs.keys()) - pids_on_node
|
|
|
|
for dead_pid in dead_pids:
|
|
|
|
if dead_pid in node_logs:
|
|
|
|
node_logs.mutable().pop(dead_pid)
|
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
for worker in node_physical_stats.get("workers", []):
|
|
|
|
worker = dict(worker)
|
|
|
|
pid = worker["pid"]
|
2020-11-11 14:55:54 -08:00
|
|
|
worker["logCount"] = len(node_logs.get(str(pid), []))
|
|
|
|
worker["errorCount"] = len(node_errs.get(str(pid), []))
|
2020-10-28 14:49:31 +08:00
|
|
|
worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, [])
|
|
|
|
worker["language"] = pid_to_language.get(
|
2022-01-29 18:41:57 -08:00
|
|
|
pid, dashboard_consts.DEFAULT_LANGUAGE
|
|
|
|
)
|
|
|
|
worker["jobId"] = pid_to_job_id.get(pid, dashboard_consts.DEFAULT_JOB_ID)
|
2020-10-13 21:23:23 -04:00
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
await GlobalSignals.worker_info_fetched.send(node_id, worker)
|
2020-10-13 21:23:23 -04:00
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
workers.append(worker)
|
|
|
|
return workers
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
@classmethod
|
2020-09-17 01:17:29 +08:00
|
|
|
async def get_node_info(cls, node_id):
|
2022-01-29 18:41:57 -08:00
|
|
|
node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {}))
|
2020-10-28 14:49:31 +08:00
|
|
|
node_stats = dict(DataSource.node_stats.get(node_id, {}))
|
2020-09-17 01:17:29 +08:00
|
|
|
node = DataSource.nodes.get(node_id, {})
|
2020-10-23 16:52:14 -04:00
|
|
|
node_ip = DataSource.node_id_to_ip.get(node_id)
|
2020-10-02 17:58:44 -07:00
|
|
|
# Merge node log count information into the payload
|
2020-10-23 16:52:14 -04:00
|
|
|
log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {})
|
2020-10-02 17:58:44 -07:00
|
|
|
node_log_count = 0
|
|
|
|
for entries in log_info.values():
|
|
|
|
node_log_count += len(entries)
|
2020-10-23 16:52:14 -04:00
|
|
|
error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {})
|
2020-10-02 17:58:44 -07:00
|
|
|
node_err_count = 0
|
|
|
|
for entries in error_info.values():
|
|
|
|
node_err_count += len(entries)
|
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
node_stats.pop("coreWorkersStats", None)
|
2020-07-27 11:34:47 +08:00
|
|
|
|
2020-11-19 11:04:59 -08:00
|
|
|
view_data = node_stats.get("viewData", [])
|
2020-10-28 14:49:31 +08:00
|
|
|
ray_stats = cls._extract_view_data(
|
2022-01-29 18:41:57 -08:00
|
|
|
view_data, {"object_store_used_memory", "object_store_available_memory"}
|
|
|
|
)
|
2020-07-27 11:34:47 +08:00
|
|
|
|
|
|
|
node_info = node_physical_stats
|
2020-10-02 17:58:44 -07:00
|
|
|
# Merge node stats to node physical stats under raylet
|
2020-07-27 11:34:47 +08:00
|
|
|
node_info["raylet"] = node_stats
|
2020-10-02 17:58:44 -07:00
|
|
|
node_info["raylet"].update(ray_stats)
|
|
|
|
|
2020-09-17 01:17:29 +08:00
|
|
|
# Merge GcsNodeInfo to node physical stats
|
|
|
|
node_info["raylet"].update(node)
|
|
|
|
# Merge actors to node physical stats
|
2021-01-13 16:45:12 +08:00
|
|
|
node_info["actors"] = DataSource.node_actors.get(node_id, {})
|
2020-10-28 14:49:31 +08:00
|
|
|
# Update workers to node physical stats
|
|
|
|
node_info["workers"] = DataSource.node_workers.get(node_id, [])
|
2020-10-02 17:58:44 -07:00
|
|
|
node_info["logCount"] = node_log_count
|
|
|
|
node_info["errorCount"] = node_err_count
|
2020-07-27 11:34:47 +08:00
|
|
|
await GlobalSignals.node_info_fetched.send(node_info)
|
|
|
|
|
|
|
|
return node_info
|
|
|
|
|
2021-05-11 00:32:14 +08:00
|
|
|
@classmethod
|
|
|
|
async def get_node_summary(cls, node_id):
|
2022-01-29 18:41:57 -08:00
|
|
|
node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {}))
|
2020-10-28 14:49:31 +08:00
|
|
|
node_stats = dict(DataSource.node_stats.get(node_id, {}))
|
|
|
|
node = DataSource.nodes.get(node_id, {})
|
|
|
|
|
|
|
|
node_physical_stats.pop("workers", None)
|
|
|
|
node_stats.pop("workersStats", None)
|
2021-05-11 00:32:14 +08:00
|
|
|
view_data = node_stats.get("viewData", [])
|
|
|
|
ray_stats = cls._extract_view_data(
|
2022-01-29 18:41:57 -08:00
|
|
|
view_data, {"object_store_used_memory", "object_store_available_memory"}
|
|
|
|
)
|
2020-10-28 14:49:31 +08:00
|
|
|
node_stats.pop("viewData", None)
|
|
|
|
|
|
|
|
node_summary = node_physical_stats
|
|
|
|
# Merge node stats to node physical stats
|
|
|
|
node_summary["raylet"] = node_stats
|
2021-05-11 00:32:14 +08:00
|
|
|
node_summary["raylet"].update(ray_stats)
|
2020-10-28 14:49:31 +08:00
|
|
|
# Merge GcsNodeInfo to node physical stats
|
|
|
|
node_summary["raylet"].update(node)
|
|
|
|
|
|
|
|
await GlobalSignals.node_summary_fetched.send(node_summary)
|
|
|
|
|
|
|
|
return node_summary
|
|
|
|
|
2020-07-27 11:34:47 +08:00
|
|
|
@classmethod
|
|
|
|
async def get_all_node_summary(cls):
|
2020-10-28 14:49:31 +08:00
|
|
|
return [
|
|
|
|
await DataOrganizer.get_node_summary(node_id)
|
|
|
|
for node_id in DataSource.nodes.keys()
|
|
|
|
]
|
2020-10-02 17:58:44 -07:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_all_node_details(cls):
|
2020-10-28 14:49:31 +08:00
|
|
|
return [
|
|
|
|
await DataOrganizer.get_node_info(node_id)
|
|
|
|
for node_id in DataSource.nodes.keys()
|
|
|
|
]
|
|
|
|
|
2020-10-02 17:58:44 -07:00
|
|
|
@classmethod
|
|
|
|
async def get_all_actors(cls):
|
2022-01-21 17:55:11 -08:00
|
|
|
result = {}
|
|
|
|
for index, (actor_id, actor) in enumerate(DataSource.actors.items()):
|
|
|
|
result[actor_id] = await cls._get_actor(actor)
|
|
|
|
# There can be thousands of actors including dead ones. Processing
|
|
|
|
# them all can take many seconds, which blocks all other requests
|
|
|
|
# to the dashboard. The ideal solution might be to implement
|
|
|
|
# pagination. For now, use a workaround to yield to the event loop
|
|
|
|
# periodically, so other request handlers have a chance to run and
|
|
|
|
# avoid long latencies.
|
|
|
|
if index % 1000 == 0 and index > 0:
|
|
|
|
# Canonical way to yield to the event loop:
|
|
|
|
# https://github.com/python/asyncio/issues/284
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
return result
|
2020-10-28 14:49:31 +08:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
async def _get_actor(actor):
|
|
|
|
actor = dict(actor)
|
|
|
|
worker_id = actor["address"]["workerId"]
|
|
|
|
core_worker_stats = DataSource.core_worker_stats.get(worker_id, {})
|
2022-01-29 18:41:57 -08:00
|
|
|
actor_constructor = core_worker_stats.get(
|
|
|
|
"actorTitle", "Unknown actor constructor"
|
|
|
|
)
|
2020-10-28 14:49:31 +08:00
|
|
|
actor["actorConstructor"] = actor_constructor
|
|
|
|
actor.update(core_worker_stats)
|
|
|
|
|
|
|
|
# TODO(fyrestone): remove this, give a link from actor
|
|
|
|
# info to worker info in front-end.
|
|
|
|
node_id = actor["address"]["rayletId"]
|
2020-11-09 09:36:34 -08:00
|
|
|
pid = core_worker_stats.get("pid")
|
2020-10-28 14:49:31 +08:00
|
|
|
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
|
|
|
|
actor_process_stats = None
|
2020-12-12 23:34:24 -08:00
|
|
|
actor_process_gpu_stats = []
|
2020-11-09 09:36:34 -08:00
|
|
|
if pid:
|
2020-11-25 11:06:45 -08:00
|
|
|
for process_stats in node_physical_stats.get("workers", []):
|
2020-11-09 09:36:34 -08:00
|
|
|
if process_stats["pid"] == pid:
|
|
|
|
actor_process_stats = process_stats
|
2020-10-28 14:49:31 +08:00
|
|
|
break
|
2020-11-09 09:36:34 -08:00
|
|
|
|
2020-11-25 11:06:45 -08:00
|
|
|
for gpu_stats in node_physical_stats.get("gpus", []):
|
2022-03-31 17:16:53 -07:00
|
|
|
# gpu_stats.get("processes") can be None, an empty list or a
|
|
|
|
# list of dictionaries.
|
|
|
|
for process in gpu_stats.get("processes") or []:
|
2020-11-09 09:36:34 -08:00
|
|
|
if process["pid"] == pid:
|
2020-12-12 23:34:24 -08:00
|
|
|
actor_process_gpu_stats.append(gpu_stats)
|
2020-11-09 09:36:34 -08:00
|
|
|
break
|
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
actor["gpus"] = actor_process_gpu_stats
|
|
|
|
actor["processStats"] = actor_process_stats
|
|
|
|
return actor
|
2020-10-02 17:58:44 -07:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def get_actor_creation_tasks(cls):
|
|
|
|
infeasible_tasks = sum(
|
2022-01-29 18:41:57 -08:00
|
|
|
(
|
|
|
|
list(node_stats.get("infeasibleTasks", []))
|
|
|
|
for node_stats in DataSource.node_stats.values()
|
|
|
|
),
|
|
|
|
[],
|
|
|
|
)
|
2020-10-28 14:49:31 +08:00
|
|
|
new_infeasible_tasks = []
|
2020-10-02 17:58:44 -07:00
|
|
|
for task in infeasible_tasks:
|
2020-10-28 14:49:31 +08:00
|
|
|
task = dict(task)
|
2020-10-02 17:58:44 -07:00
|
|
|
task["actorClass"] = actor_classname_from_task_spec(task)
|
|
|
|
task["state"] = "INFEASIBLE"
|
2020-10-28 14:49:31 +08:00
|
|
|
new_infeasible_tasks.append(task)
|
2020-10-02 17:58:44 -07:00
|
|
|
|
|
|
|
resource_pending_tasks = sum(
|
2022-01-29 18:41:57 -08:00
|
|
|
(
|
|
|
|
list(data.get("readyTasks", []))
|
|
|
|
for data in DataSource.node_stats.values()
|
|
|
|
),
|
|
|
|
[],
|
|
|
|
)
|
2020-10-28 14:49:31 +08:00
|
|
|
new_resource_pending_tasks = []
|
2020-10-02 17:58:44 -07:00
|
|
|
for task in resource_pending_tasks:
|
2020-10-28 14:49:31 +08:00
|
|
|
task = dict(task)
|
2020-10-02 17:58:44 -07:00
|
|
|
task["actorClass"] = actor_classname_from_task_spec(task)
|
|
|
|
task["state"] = "PENDING_RESOURCES"
|
2020-10-28 14:49:31 +08:00
|
|
|
new_resource_pending_tasks.append(task)
|
2020-10-02 17:58:44 -07:00
|
|
|
|
|
|
|
results = {
|
|
|
|
task["actorCreationTaskSpec"]["actorId"]: task
|
2020-10-28 14:49:31 +08:00
|
|
|
for task in new_resource_pending_tasks + new_infeasible_tasks
|
2020-10-02 17:58:44 -07:00
|
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
|
|
@classmethod
|
2022-01-29 18:41:57 -08:00
|
|
|
async def get_memory_table(
|
|
|
|
cls,
|
|
|
|
sort_by=memory_utils.SortingType.OBJECT_SIZE,
|
|
|
|
group_by=memory_utils.GroupByType.STACK_TRACE,
|
|
|
|
):
|
2020-10-02 17:58:44 -07:00
|
|
|
all_worker_stats = []
|
|
|
|
for node_stats in DataSource.node_stats.values():
|
2020-10-28 14:49:31 +08:00
|
|
|
all_worker_stats.extend(node_stats.get("coreWorkersStats", []))
|
2020-10-02 17:58:44 -07:00
|
|
|
memory_information = memory_utils.construct_memory_table(
|
2022-01-29 18:41:57 -08:00
|
|
|
all_worker_stats, group_by=group_by, sort_by=sort_by
|
|
|
|
)
|
2020-10-02 17:58:44 -07:00
|
|
|
return memory_information
|
|
|
|
|
2020-10-28 14:49:31 +08:00
|
|
|
@staticmethod
|
|
|
|
def _extract_view_data(views, data_keys):
|
|
|
|
view_data = {}
|
|
|
|
for view in views:
|
|
|
|
view_name = view["viewName"]
|
|
|
|
if view_name in data_keys:
|
|
|
|
if not view.get("measures"):
|
|
|
|
view_data[view_name] = 0
|
|
|
|
continue
|
|
|
|
measure = view["measures"][0]
|
|
|
|
if "doubleValue" in measure:
|
|
|
|
measure_value = measure["doubleValue"]
|
|
|
|
elif "intValue" in measure:
|
|
|
|
measure_value = measure["intValue"]
|
|
|
|
else:
|
|
|
|
measure_value = 0
|
|
|
|
view_data[view_name] = measure_value
|
|
|
|
|
|
|
|
return view_data
|