[Dashboard] Optimize dashboard datacenter (#11391)

* Optimize dashboard datacenter

* Fix tests

* Fix tests

* Fix

* Fix CI

* python/build-wheel-macos.sh

Co-authored-by: 刘宝 <po.lb@antfin.com>
Co-authored-by: Max Fitton <maxfitton@anyscale.com>
This commit is contained in:
fyrestone 2020-10-28 14:49:31 +08:00 committed by GitHub
parent 55a090fb16
commit 05ad4c7499
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 580 additions and 241 deletions

View file

@ -8,6 +8,7 @@ UPDATE_NODES_INTERVAL_SECONDS = 5
CONNECT_GCS_INTERVAL_SECONDS = 2
CONNECT_REDIS_INTERNAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
ORGANIZE_DATA_INTERVAL_SECONDS = 2
REDIS_KEY_DASHBOARD = "dashboard"
REDIS_KEY_DASHBOARD_RPC = "dashboard_rpc"
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress"
@ -19,6 +20,11 @@ AIOHTTP_CACHE_MAX_SIZE = 128
AIOHTTP_CACHE_DISABLE_ENVIRONMENT_KEY = "RAY_DASHBOARD_NO_CACHE"
# Named signals
SIGNAL_NODE_INFO_FETCHED = "node_info_fetched"
SIGNAL_NODE_SUMMARY_FETCHED = "node_summary_fetched"
SIGNAL_WORKER_INFO_FETCHED = "worker_info_fetched"
# Default param for RotatingFileHandler
LOGGING_ROTATE_BYTES = 100 * 1000 * 1000 # maxBytes
LOGGING_ROTATE_BACKUP_COUNT = 5 # backupCount
# Default value for datacenter (the default value in protobuf)
DEFAULT_LANGUAGE = "PYTHON"
DEFAULT_JOB_ID = "ffff"

View file

@ -1,15 +1,16 @@
import logging
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.memory_utils as memory_utils
from collections import defaultdict
from ray.new_dashboard.actor_utils import actor_classname_from_task_spec
from ray.new_dashboard.utils import Dict, Signal
from ray.new_dashboard.utils import Dict, Signal, async_loop_forever
logger = logging.getLogger(__name__)
class GlobalSignals:
node_info_fetched = Signal(dashboard_consts.SIGNAL_NODE_INFO_FETCHED)
node_summary_fetched = Signal(dashboard_consts.SIGNAL_NODE_SUMMARY_FETCHED)
worker_info_fetched = Signal(dashboard_consts.SIGNAL_WORKER_INFO_FETCHED)
class DataSource:
@ -29,6 +30,16 @@ class DataSource:
node_id_to_ip = Dict()
# {node id hex(str): hostname(str)}
node_id_to_hostname = Dict()
# {node id hex(str): worker list}
node_workers = Dict()
# {node id hex(str): {actor id hex(str): actor table data}}
node_actors = Dict()
# {job id hex(str): worker list}
job_workers = Dict()
# {job id hex(str): {actor id hex(str): actor table data}}
job_actors = Dict()
# {worker id(str): core worker stats}
core_worker_stats = Dict()
# {node ip (str): log entries by pid
# (dict from pid to list of latest log entries)}
ip_and_pid_to_logs = Dict()
@ -39,6 +50,7 @@ class DataSource:
class DataOrganizer:
@staticmethod
@async_loop_forever(dashboard_consts.PURGE_DATA_INTERVAL_SECONDS)
async def purge():
# Purge data that is out of date.
# These data sources are maintained by DashboardHead,
@ -60,56 +72,57 @@ class DataOrganizer:
DataSource.node_physical_stats.pop(key)
@classmethod
async def get_node_actors(cls, node_id):
node_stats = DataSource.node_stats.get(node_id, {})
@async_loop_forever(dashboard_consts.ORGANIZE_DATA_INTERVAL_SECONDS)
async def organize(cls):
job_workers = {}
node_workers = {}
core_worker_stats = {}
for node_id in DataSource.nodes.keys():
workers = await cls.get_node_workers(node_id)
for worker in workers:
job_id = worker["jobId"]
job_workers.setdefault(job_id, []).append(worker)
for stats in worker.get("coreWorkerStats", []):
worker_id = stats["workerId"]
core_worker_stats[worker_id] = stats
node_workers[node_id] = workers
DataSource.job_workers.reset(job_workers)
DataSource.node_workers.reset(node_workers)
DataSource.core_worker_stats.reset(core_worker_stats)
@classmethod
async def get_node_workers(cls, node_id):
workers = []
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
worker_id_to_raylet_info = {}
pid_to_worker_id = {}
node_stats = DataSource.node_stats.get(node_id, {})
# Merge coreWorkerStats (node stats) to workers (node physical stats)
pid_to_worker_stats = {}
pid_to_language = {}
pid_to_job_id = {}
for core_worker_stats in node_stats.get("coreWorkersStats", []):
pid = core_worker_stats["pid"]
pid_to_worker_stats.setdefault(pid, []).append(core_worker_stats)
pid_to_language[pid] = core_worker_stats["language"]
pid_to_job_id[pid] = core_worker_stats["jobId"]
for worker in node_physical_stats.get("workers", []):
worker = dict(worker)
pid = worker["pid"]
worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, [])
worker["language"] = pid_to_language.get(
pid, dashboard_consts.DEFAULT_LANGUAGE)
worker["jobId"] = pid_to_job_id.get(
pid, dashboard_consts.DEFAULT_JOB_ID)
for worker_stats in node_stats.get("workersStats", []):
worker_id_to_raylet_info[worker_stats["workerId"]] = worker_stats
pid_to_worker_id[worker_stats["pid"]] = worker_stats["workerId"]
worker_id_to_process_info = {}
await GlobalSignals.worker_info_fetched.send(node_id, worker)
for process_stats in node_physical_stats.get("workers"):
if process_stats["pid"] in pid_to_worker_id:
worker_id = pid_to_worker_id[process_stats["pid"]]
worker_id_to_process_info[worker_id] = process_stats
worker_id_to_gpu_stats = defaultdict(list)
for gpu_stats in node_physical_stats.get("gpus"):
for process in gpu_stats.get("processes", []):
if process["pid"] in pid_to_worker_id:
worker_id = pid_to_worker_id[process["pid"]]
worker_id_to_gpu_stats[worker_id].append(gpu_stats)
node_actors = {}
for actor_id, actor_table_data in DataSource.actors.items():
worker_id = actor_table_data["address"]["workerId"]
if worker_id in worker_id_to_raylet_info:
worker_raylet_stats = worker_id_to_raylet_info[worker_id]
core_worker = worker_raylet_stats.get("coreWorkerStats", {})
actor_constructor = core_worker.get(
"actorTitle", "Unknown actor constructor")
actor_table_data["actorConstructor"] = actor_constructor
actor_class = actor_classname_from_task_spec(
actor_table_data.get("taskSpec", {}))
actor_table_data["actorClass"] = actor_class
actor_table_data.update(core_worker)
node_actors[actor_id] = actor_table_data
actor_table_data["gpus"] = worker_id_to_gpu_stats.get(
worker_id, [])
actor_table_data["processStats"] = worker_id_to_process_info.get(
worker_id, {})
return node_actors
workers.append(worker)
return workers
@classmethod
async def get_node_info(cls, node_id):
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
node_stats = DataSource.node_stats.get(node_id, {})
node_physical_stats = dict(
DataSource.node_physical_stats.get(node_id, {}))
node_stats = dict(DataSource.node_stats.get(node_id, {}))
node = DataSource.nodes.get(node_id, {})
node_ip = DataSource.node_id_to_ip.get(node_id)
# Merge node log count information into the payload
@ -122,29 +135,9 @@ class DataOrganizer:
for entries in error_info.values():
node_err_count += len(entries)
# Merge coreWorkerStats (node stats) to workers (node physical stats)
workers_stats = node_stats.pop("workersStats", {})
pid_to_worker_stats = {}
pid_to_language = {}
pid_to_job_id = {}
for stats in workers_stats:
d = pid_to_worker_stats.setdefault(stats["pid"], {}).setdefault(
stats["workerId"], stats["coreWorkerStats"])
d["workerId"] = stats["workerId"]
pid_to_language.setdefault(stats["pid"],
stats.get("language", "PYTHON"))
pid_to_job_id.setdefault(stats["pid"],
stats["coreWorkerStats"]["jobId"])
node_stats.pop("coreWorkersStats", None)
for worker in node_physical_stats.get("workers", []):
worker_stats = pid_to_worker_stats.get(worker["pid"], {})
worker["coreWorkerStats"] = list(worker_stats.values())
worker["language"] = pid_to_language.get(worker["pid"], "")
worker["jobId"] = pid_to_job_id.get(worker["pid"], "ffff")
worker["logCount"] = len(log_info.get(str(worker["pid"]), []))
worker["errorCount"] = len(error_info.get(str(worker["pid"]), []))
ray_stats = _extract_view_data(
ray_stats = cls._extract_view_data(
node_stats["viewData"],
{"object_store_used_memory", "object_store_available_memory"})
@ -157,57 +150,132 @@ class DataOrganizer:
node_info["raylet"].update(node)
# Merge actors to node physical stats
node_info["actors"] = await cls.get_node_actors(node_id)
# Update workers to node physical stats
node_info["workers"] = DataSource.node_workers.get(node_id, [])
node_info["logCount"] = node_log_count
node_info["errorCount"] = node_err_count
await GlobalSignals.node_info_fetched.send(node_info)
return node_info
@staticmethod
async def get_node_summary(node_id):
node_physical_stats = dict(
DataSource.node_physical_stats.get(node_id, {}))
node_stats = dict(DataSource.node_stats.get(node_id, {}))
node = DataSource.nodes.get(node_id, {})
node_physical_stats.pop("workers", None)
node_stats.pop("workersStats", None)
node_stats.pop("viewData", None)
node_summary = node_physical_stats
# Merge node stats to node physical stats
node_summary["raylet"] = node_stats
# Merge GcsNodeInfo to node physical stats
node_summary["raylet"].update(node)
await GlobalSignals.node_summary_fetched.send(node_summary)
return node_summary
@classmethod
async def get_all_node_summary(cls):
all_nodes_summary = []
for node_id in DataSource.nodes.keys():
node_info = await cls.get_node_info(node_id)
node_info.pop("workers", None)
node_info.pop("actors", None)
node_info["raylet"].pop("workersStats", None)
node_info["raylet"].pop("viewData", None)
all_nodes_summary.append(node_info)
return all_nodes_summary
return [
await DataOrganizer.get_node_summary(node_id)
for node_id in DataSource.nodes.keys()
]
@classmethod
async def get_all_node_details(cls):
node_details = []
for node_id in DataSource.nodes.keys():
node_details.append(await cls.get_node_info(node_id))
return node_details
return [
await DataOrganizer.get_node_info(node_id)
for node_id in DataSource.nodes.keys()
]
@classmethod
async def get_node_actors(cls, node_id):
node_actors = DataSource.node_actors.get(node_id, {})
return {
actor_id: await cls._get_actor(actor)
for actor_id, actor in node_actors.items()
}
@classmethod
async def get_job_actors(cls, job_id):
job_actors = DataSource.job_actors.get(job_id, {})
return {
actor_id: await cls._get_actor(actor)
for actor_id, actor in job_actors.items()
}
@classmethod
async def get_all_actors(cls):
all_actors = {}
for node_id in DataSource.nodes.keys():
all_actors.update(await cls.get_node_actors(node_id))
return all_actors
return {
actor_id: await cls._get_actor(actor)
for actor_id, actor in DataSource.actors.items()
}
@staticmethod
async def _get_actor(actor):
actor = dict(actor)
worker_id = actor["address"]["workerId"]
core_worker_stats = DataSource.core_worker_stats.get(worker_id, {})
actor_constructor = core_worker_stats.get("actorTitle",
"Unknown actor constructor")
actor["actorConstructor"] = actor_constructor
actor.update(core_worker_stats)
# TODO(fyrestone): remove this, give a link from actor
# info to worker info in front-end.
node_id = actor["address"]["rayletId"]
pid = core_worker_stats["pid"]
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
actor_process_stats = None
for process_stats in node_physical_stats.get("workers"):
if process_stats["pid"] == pid:
actor_process_stats = process_stats
break
actor_process_gpu_stats = None
for gpu_stats in node_physical_stats.get("gpus"):
for process in gpu_stats.get("processes", []):
if process["pid"] == pid:
actor_process_gpu_stats = gpu_stats
break
if actor_process_gpu_stats is not None:
break
actor["gpus"] = actor_process_gpu_stats
actor["processStats"] = actor_process_stats
return actor
@classmethod
async def get_actor_creation_tasks(cls):
infeasible_tasks = sum(
(node_stats.get("infeasibleTasks", [])
(list(node_stats.get("infeasibleTasks", []))
for node_stats in DataSource.node_stats.values()), [])
new_infeasible_tasks = []
for task in infeasible_tasks:
task = dict(task)
task["actorClass"] = actor_classname_from_task_spec(task)
task["state"] = "INFEASIBLE"
new_infeasible_tasks.append(task)
resource_pending_tasks = sum(
(data.get("readyTasks", [])
(list(data.get("readyTasks", []))
for data in DataSource.node_stats.values()), [])
new_resource_pending_tasks = []
for task in resource_pending_tasks:
task = dict(task)
task["actorClass"] = actor_classname_from_task_spec(task)
task["state"] = "PENDING_RESOURCES"
new_resource_pending_tasks.append(task)
results = {
task["actorCreationTaskSpec"]["actorId"]: task
for task in resource_pending_tasks + infeasible_tasks
for task in new_resource_pending_tasks + new_infeasible_tasks
}
return results
@ -217,27 +285,27 @@ class DataOrganizer:
group_by=memory_utils.GroupByType.STACK_TRACE):
all_worker_stats = []
for node_stats in DataSource.node_stats.values():
all_worker_stats.extend(node_stats.get("workersStats", []))
all_worker_stats.extend(node_stats.get("coreWorkersStats", []))
memory_information = memory_utils.construct_memory_table(
all_worker_stats, group_by=group_by, sort_by=sort_by)
return memory_information
@staticmethod
def _extract_view_data(views, data_keys):
view_data = {}
for view in views:
view_name = view["viewName"]
if view_name in data_keys:
if not view.get("measures"):
view_data[view_name] = 0
continue
measure = view["measures"][0]
if "doubleValue" in measure:
measure_value = measure["doubleValue"]
elif "intValue" in measure:
measure_value = measure["intValue"]
else:
measure_value = 0
view_data[view_name] = measure_value
def _extract_view_data(views, data_keys):
view_data = {}
for view in views:
view_name = view["viewName"]
if view_name in data_keys:
if not view.get("measures"):
view_data[view_name] = 0
continue
measure = view["measures"][0]
if "doubleValue" in measure:
measure_value = measure["doubleValue"]
elif "intValue" in measure:
measure_value = measure["intValue"]
else:
measure_value = 0
view_data[view_name] = measure_value
return view_data
return view_data

View file

@ -191,16 +191,6 @@ class DashboardHead:
except Exception:
logger.exception(f"Error notifying coroutine {co}")
async def _purge_data():
"""Purge data in datacenter."""
while True:
await asyncio.sleep(
dashboard_consts.PURGE_DATA_INTERVAL_SECONDS)
try:
await DataOrganizer.purge()
except Exception:
logger.exception("Error purging data.")
modules = self._load_modules()
# Http server should be initialized after all modules loaded.
@ -219,7 +209,13 @@ class DashboardHead:
# Freeze signal after all modules loaded.
dashboard_utils.SignalManager.freeze()
await asyncio.gather(self._update_nodes(), _async_notify(),
_purge_data(), web_server,
concurrent_tasks = [
self._update_nodes(),
_async_notify(),
DataOrganizer.purge(),
DataOrganizer.organize(),
web_server,
]
await asyncio.gather(*concurrent_tasks,
*(m.run(self.server) for m in modules))
await self.server.wait_for_termination()

View file

@ -278,14 +278,13 @@ class MemoryTable:
return self.__repr__()
def construct_memory_table(workers_info: List,
def construct_memory_table(workers_stats: List,
group_by: GroupByType = GroupByType.NODE_ADDRESS,
sort_by=SortingType.OBJECT_SIZE) -> MemoryTable:
memory_table_entries = []
for worker_info in workers_info:
pid = worker_info["pid"]
is_driver = worker_info.get("isDriver", False)
core_worker_stats = worker_info["coreWorkerStats"]
for core_worker_stats in workers_stats:
pid = core_worker_stats["pid"]
is_driver = core_worker_stats.get("workerType") == "DRIVER"
node_address = core_worker_stats["ipAddress"]
object_refs = core_worker_stats.get("objectRefs", [])

View file

@ -20,6 +20,8 @@ class LogHead(dashboard_utils.DashboardHeadModule):
routes.static("/logs", self._dashboard_head.log_dir, show_index=True)
GlobalSignals.node_info_fetched.append(
self.insert_log_url_to_node_info)
GlobalSignals.node_summary_fetched.append(
self.insert_log_url_to_node_info)
async def insert_log_url_to_node_info(self, node_info):
node_id = node_info.get("raylet", {}).get("nodeId")

View file

@ -10,6 +10,7 @@ import ray.gcs_utils
import ray.new_dashboard.modules.stats_collector.stats_collector_consts \
as stats_collector_consts
import ray.new_dashboard.utils as dashboard_utils
from ray.new_dashboard.actor_utils import actor_classname_from_task_spec
from ray.new_dashboard.utils import async_loop_forever
from ray.new_dashboard.memory_utils import GroupByType, SortingType
from ray.core.generated import node_manager_pb2
@ -17,25 +18,36 @@ from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_service_pb2_grpc
from ray.new_dashboard.datacenter import DataSource, DataOrganizer
from ray.utils import binary_to_hex
logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable
def node_stats_to_dict(message):
return dashboard_utils.message_to_dict(
message, {
"actorId", "jobId", "taskId", "parentTaskId", "sourceActorId",
"callerId", "rayletId", "workerId"
})
decode_keys = {
"actorId", "jobId", "taskId", "parentTaskId", "sourceActorId",
"callerId", "rayletId", "workerId", "placementGroupId"
}
core_workers_stats = message.core_workers_stats
message.ClearField("core_workers_stats")
try:
result = dashboard_utils.message_to_dict(message, decode_keys)
result["coreWorkersStats"] = [
dashboard_utils.message_to_dict(
m, decode_keys, including_default_value_fields=True)
for m in core_workers_stats
]
return result
finally:
message.core_workers_stats.extend(core_workers_stats)
def actor_table_data_to_dict(message):
return dashboard_utils.message_to_dict(
message, {
"actorId", "parentId", "jobId", "workerId", "rayletId",
"actorCreationDummyObjectId"
"actorCreationDummyObjectId", "callerId", "taskId", "parentTaskId",
"sourceActorId", "placementGroupId"
},
including_default_value_fields=True)
@ -160,21 +172,40 @@ class StatsCollector(dashboard_utils.DashboardHeadModule):
await aioredis_client.psubscribe(pattern)
logger.info("Subscribed to %s", key)
def _process_actor_table_data(data):
actor_class = actor_classname_from_task_spec(
data.get("taskSpec", {}))
data["actorClass"] = actor_class
# Get all actor info.
while True:
try:
logger.info("Getting all actor info from GCS.")
request = gcs_service_pb2.GetAllActorInfoRequest()
reply = await self._gcs_actor_info_stub.GetAllActorInfo(
request, timeout=2)
request, timeout=5)
if reply.status.code == 0:
result = {}
for actor_info in reply.actor_table_data:
result[binary_to_hex(actor_info.actor_id)] = \
actor_table_data_to_dict(actor_info)
DataSource.actors.reset(result)
actors = {}
for message in reply.actor_table_data:
actor_table_data = actor_table_data_to_dict(message)
_process_actor_table_data(actor_table_data)
actors[actor_table_data["actorId"]] = actor_table_data
# Update actors.
DataSource.actors.reset(actors)
# Update node actors and job actors.
job_actors = {}
node_actors = {}
for actor_id, actor_table_data in actors.items():
job_id = actor_table_data["jobId"]
node_id = actor_table_data["address"]["rayletId"]
job_actors.setdefault(job_id,
{})[actor_id] = actor_table_data
node_actors.setdefault(node_id,
{})[actor_id] = actor_table_data
DataSource.job_actors.reset(job_actors)
DataSource.node_actors.reset(node_actors)
logger.info("Received %d actor info from GCS.",
len(result))
len(actors))
break
else:
raise Exception(
@ -187,12 +218,26 @@ class StatsCollector(dashboard_utils.DashboardHeadModule):
# Receive actors from channel.
async for sender, msg in receiver.iter():
try:
_, data = msg
pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data)
actor_info = ray.gcs_utils.ActorTableData.FromString(
_, actor_table_data = msg
pubsub_message = ray.gcs_utils.PubSubMessage.FromString(
actor_table_data)
message = ray.gcs_utils.ActorTableData.FromString(
pubsub_message.data)
DataSource.actors[binary_to_hex(actor_info.actor_id)] = \
actor_table_data_to_dict(actor_info)
actor_table_data = actor_table_data_to_dict(message)
_process_actor_table_data(actor_table_data)
actor_id = actor_table_data["actorId"]
job_id = actor_table_data["jobId"]
node_id = actor_table_data["address"]["rayletId"]
# Update actors.
DataSource.actors[actor_id] = actor_table_data
# Update node actors.
node_actors = dict(DataSource.node_actors.get(node_id, {}))
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors
# Update job actors.
job_actors = dict(DataSource.job_actors.get(job_id, {}))
job_actors[actor_id] = actor_table_data
DataSource.job_actors[job_id] = job_actors
except Exception:
logger.exception("Error receiving actor info.")
@ -224,11 +269,10 @@ class StatsCollector(dashboard_utils.DashboardHeadModule):
async for sender, msg in receiver.iter():
try:
data = json.loads(ray.utils.decode(msg))
logger.error(f"data={data}")
ip = data["ip"]
pid = str(data["pid"])
logs_for_ip = DataSource.ip_and_pid_to_logs.get(ip, {})
logs_for_pid = logs_for_ip.get(pid, [])
logs_for_ip = dict(DataSource.ip_and_pid_to_logs.get(ip, {}))
logs_for_pid = list(logs_for_ip.get(pid, []))
logs_for_pid.extend(data["lines"])
logs_for_ip[pid] = logs_for_pid
DataSource.ip_and_pid_to_logs[ip] = logs_for_ip

View file

@ -91,7 +91,7 @@ def test_node_info(disable_aiohttp_cache, ray_start_with_dashboard):
raise Exception(f"Timed out while testing, {ex_stack}")
def test_memory_table(ray_start_with_dashboard):
def test_memory_table(disable_aiohttp_cache, ray_start_with_dashboard):
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
@ray.remote
@ -129,7 +129,7 @@ def test_memory_table(ray_start_with_dashboard):
wait_for_condition(check_mem_table, 10)
def test_get_all_node_details(ray_start_with_dashboard):
def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard):
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]))
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
@ -193,6 +193,10 @@ def test_multi_nodes_info(enable_test_module, disable_aiohttp_cache,
assert detail["result"] is True, detail["msg"]
detail = detail["data"]["detail"]
assert detail["raylet"]["state"] == "ALIVE"
response = requests.get(webui_url + "/test/dump?key=agents")
response.raise_for_status()
agents = response.json()
assert len(agents["data"]["agents"]) == 3
return True
except Exception as ex:
logger.info(ex)

View file

@ -1,11 +1,13 @@
import os
import sys
import copy
import json
import time
import logging
import asyncio
import collections
import numpy as np
import aiohttp.web
import ray
import psutil
@ -493,5 +495,81 @@ def test_get_cluster_status(ray_start_with_dashboard):
assert response.json()["data"]["autoscalingError"] == "world"
def test_immutable_types():
d = {str(i): i for i in range(1000)}
d["list"] = list(range(1000))
d["list"][0] = {str(i): i for i in range(1000)}
d["dict"] = {str(i): i for i in range(1000)}
immutable_dict = dashboard_utils.make_immutable(d)
assert type(immutable_dict) == dashboard_utils.ImmutableDict
assert immutable_dict == dashboard_utils.ImmutableDict(d)
assert immutable_dict == d
assert dashboard_utils.ImmutableDict(immutable_dict) == immutable_dict
assert dashboard_utils.ImmutableList(
immutable_dict["list"]) == immutable_dict["list"]
assert "512" in d
assert "512" in d["list"][0]
assert "512" in d["dict"]
# Test type conversion
assert type(dict(immutable_dict)["list"]) == dashboard_utils.ImmutableList
assert type(list(
immutable_dict["list"])[0]) == dashboard_utils.ImmutableDict
# Test json dumps / loads
json_str = json.dumps(immutable_dict, cls=dashboard_utils.CustomEncoder)
deserialized_immutable_dict = json.loads(json_str)
assert type(deserialized_immutable_dict) == dict
assert type(deserialized_immutable_dict["list"]) == list
assert immutable_dict.mutable() == deserialized_immutable_dict
dashboard_utils.rest_response(True, "OK", data=immutable_dict)
dashboard_utils.rest_response(True, "OK", **immutable_dict)
# Test copy
copy_of_immutable = copy.copy(immutable_dict)
assert copy_of_immutable == immutable_dict
deepcopy_of_immutable = copy.deepcopy(immutable_dict)
assert deepcopy_of_immutable == immutable_dict
# Test get default immutable
immutable_default_value = immutable_dict.get("not exist list", [1, 2])
assert type(immutable_default_value) == dashboard_utils.ImmutableList
# Test recursive immutable
assert type(immutable_dict["list"]) == dashboard_utils.ImmutableList
assert type(immutable_dict["dict"]) == dashboard_utils.ImmutableDict
assert type(immutable_dict["list"][0]) == dashboard_utils.ImmutableDict
# Test exception
with pytest.raises(TypeError):
dashboard_utils.ImmutableList((1, 2))
with pytest.raises(TypeError):
dashboard_utils.ImmutableDict([1, 2])
with pytest.raises(TypeError):
immutable_dict["list"] = []
with pytest.raises(AttributeError):
immutable_dict.update({1: 3})
with pytest.raises(TypeError):
immutable_dict["list"][0] = 0
with pytest.raises(AttributeError):
immutable_dict["list"].extend([1, 2])
with pytest.raises(AttributeError):
immutable_dict["list"].insert(1, 2)
d2 = dashboard_utils.ImmutableDict({1: np.zeros([3, 5])})
with pytest.raises(TypeError):
print(d2[1])
d3 = dashboard_utils.ImmutableList([1, np.zeros([3, 5])])
with pytest.raises(TypeError):
print(d3[1])
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -1,9 +1,9 @@
import abc
import os
import socket
import time
import asyncio
import collections
import copy
import json
import datetime
import functools
@ -13,10 +13,11 @@ import logging
import pkgutil
import traceback
from base64 import b64decode
from collections.abc import MutableMapping, Mapping
from abc import ABCMeta, abstractmethod
from collections.abc import MutableMapping, Mapping, Sequence
from collections import namedtuple
from typing import Any
import os
import aioredis
import aiohttp.web
import ray.new_dashboard.consts as dashboard_consts
@ -129,6 +130,7 @@ class ClassMethodRouteTable:
req = args[-1]
return await handler(bind_info.instance, req)
except Exception:
logger.exception("Handle %s %s failed.", method, path)
return rest_response(
success=False, message=traceback.format_exc())
@ -224,6 +226,8 @@ class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return binary_to_hex(obj)
if isinstance(obj, Immutable):
return obj.mutable()
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
@ -427,7 +431,8 @@ class Change:
self.new = new
def __str__(self):
return f"Change(owner: {self.owner}, old: {self.old}, new: {self.new}"
return f"Change(owner: {type(self.owner)}), " \
f"old: {self.old}, new: {self.new}"
class NotifyQueue:
@ -444,7 +449,163 @@ class NotifyQueue:
return await cls._queue.get()
class Dict(MutableMapping):
"""
https://docs.python.org/3/library/json.html?highlight=json#json.JSONEncoder
+-------------------+---------------+
| Python | JSON |
+===================+===============+
| dict | object |
+-------------------+---------------+
| list, tuple | array |
+-------------------+---------------+
| str | string |
+-------------------+---------------+
| int, float | number |
+-------------------+---------------+
| True | true |
+-------------------+---------------+
| False | false |
+-------------------+---------------+
| None | null |
+-------------------+---------------+
"""
_json_compatible_types = {
dict, list, tuple, str, int, float, bool,
type(None), bytes
}
def is_immutable(self):
raise TypeError("%r objects are immutable" % self.__class__.__name__)
def make_immutable(value, strict=True):
value_type = type(value)
if value_type is dict:
return ImmutableDict(value)
if value_type is list:
return ImmutableList(value)
if strict:
if value_type not in _json_compatible_types:
raise TypeError("Type {} can't be immutable.".format(value_type))
return value
class Immutable(metaclass=ABCMeta):
@abstractmethod
def mutable(self):
pass
class ImmutableList(Immutable, Sequence):
"""Makes a :class:`list` immutable.
"""
__slots__ = ("_list", "_proxy")
def __init__(self, list_value):
if type(list_value) not in (list, ImmutableList):
raise TypeError(f"{type(list_value)} object is not a list.")
if isinstance(list_value, ImmutableList):
list_value = list_value.mutable()
self._list = list_value
self._proxy = [None] * len(list_value)
def __reduce_ex__(self, protocol):
return type(self), (self._list, )
def mutable(self):
return self._list
def __eq__(self, other):
if isinstance(other, ImmutableList):
other = other.mutable()
return list.__eq__(self._list, other)
def __ne__(self, other):
if isinstance(other, ImmutableList):
other = other.mutable()
return list.__ne__(self._list, other)
def __contains__(self, item):
if isinstance(item, Immutable):
item = item.mutable()
return list.__contains__(self._list, item)
def __getitem__(self, item):
proxy = self._proxy[item]
if proxy is None:
proxy = self._proxy[item] = make_immutable(self._list[item])
return proxy
def __len__(self):
return len(self._list)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, list.__repr__(self._list))
class ImmutableDict(Immutable, Mapping):
"""Makes a :class:`dict` immutable.
"""
__slots__ = ("_dict", "_proxy")
def __init__(self, dict_value):
if type(dict_value) not in (dict, ImmutableDict):
raise TypeError(f"{type(dict_value)} object is not a dict.")
if isinstance(dict_value, ImmutableDict):
dict_value = dict_value.mutable()
self._dict = dict_value
self._proxy = {}
def __reduce_ex__(self, protocol):
return type(self), (self._dict, )
def mutable(self):
return self._dict
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return make_immutable(default)
def __eq__(self, other):
if isinstance(other, ImmutableDict):
other = other.mutable()
return dict.__eq__(self._dict, other)
def __ne__(self, other):
if isinstance(other, ImmutableDict):
other = other.mutable()
return dict.__ne__(self._dict, other)
def __contains__(self, item):
if isinstance(item, Immutable):
item = item.mutable()
return dict.__contains__(self._dict, item)
def __getitem__(self, item):
proxy = self._proxy.get(item, None)
if proxy is None:
proxy = self._proxy[item] = make_immutable(self._dict[item])
return proxy
def __len__(self) -> int:
return len(self._dict)
def __iter__(self):
if len(self._proxy) != len(self._dict):
for key in self._dict.keys() - self._proxy.keys():
self._proxy[key] = make_immutable(self._dict[key])
return iter(self._proxy)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, dict.__repr__(self._dict))
class Dict(ImmutableDict, MutableMapping):
"""A simple descriptor for dict type to notify data changes.
:note: Only the first level data report change.
@ -453,12 +614,13 @@ class Dict(MutableMapping):
ChangeItem = namedtuple("DictChangeItem", ["key", "value"])
def __init__(self, *args, **kwargs):
self._data = dict(*args, **kwargs)
super().__init__(dict(*args, **kwargs))
self.signal = Signal(self)
def __setitem__(self, key, value):
old = self._data.pop(key, None)
self._data[key] = value
old = self._dict.pop(key, None)
self._proxy.pop(key, None)
self._dict[key] = value
if len(self.signal) and old != value:
if old is None:
co = self.signal.send(
@ -471,30 +633,25 @@ class Dict(MutableMapping):
new=Dict.ChangeItem(key, value)))
NotifyQueue.put(co)
def __getitem__(self, item):
return copy.deepcopy(self._data[item])
def __delitem__(self, key):
old = self._data.pop(key, None)
old = self._dict.pop(key, None)
self._proxy.pop(key, None)
if len(self.signal) and old is not None:
co = self.signal.send(
Change(owner=self, old=Dict.ChangeItem(key, old)))
NotifyQueue.put(co)
def __len__(self):
return len(self._data)
def __iter__(self):
return iter(copy.deepcopy(self._data))
def __str__(self):
return str(self._data)
def reset(self, d):
assert isinstance(d, Mapping)
for key in self._data.keys() - d.keys():
self.pop(key)
self.update(d)
for key in self._dict.keys() - d.keys():
del self[key]
for key, value in d.items():
self[key] = value
# Register immutable types.
for immutable_type in Immutable.__subclasses__():
_json_compatible_types.add(immutable_type)
async def get_aioredis_client(redis_address, redis_password,

View file

@ -13,6 +13,7 @@ MACPYTHON_URL=https://www.python.org/ftp/python
MACPYTHON_PY_PREFIX=/Library/Frameworks/Python.framework/Versions
DOWNLOAD_DIR=python_downloads
NODE_VERSION="14"
PY_VERSIONS=("3.6.1"
"3.7.0"
"3.8.2")
@ -36,6 +37,7 @@ mkdir -p .whl
# Use the latest version of Node.js in order to build the dashboard.
source "$HOME"/.nvm/nvm.sh
nvm install $NODE_VERSION
nvm use node
# Build the dashboard so its static assets can be included in the wheel.

View file

@ -81,7 +81,7 @@ class DashboardController(BaseDashboardController):
def _construct_raylet_info(self):
D = self.raylet_stats.get_raylet_stats()
workers_info_by_node = {
data["nodeId"]: data.get("workersStats")
data["nodeId"]: data.get("coreWorkersStats")
for data in D.values()
}

View file

@ -159,20 +159,18 @@ class NodeStats(threading.Thread):
actors[actor_id].update(self._addr_to_extra_info_dict[addr])
for node_id, workers_info in workers_info_by_node.items():
for worker_info in workers_info:
if "coreWorkerStats" in worker_info:
core_worker_stats = worker_info["coreWorkerStats"]
addr = (core_worker_stats["ipAddress"],
str(core_worker_stats["port"]))
if addr in self._addr_to_actor_id:
actor_info = actors[self._addr_to_actor_id[addr]]
format_reply_id(core_worker_stats)
actor_info.update(core_worker_stats)
actor_info["averageTaskExecutionSpeed"] = round(
actor_info["numExecutedTasks"] /
(now - actor_info["timestamp"] / 1000), 2)
actor_info["nodeId"] = node_id
actor_info["pid"] = worker_info["pid"]
for core_worker_stats in workers_info:
addr = (core_worker_stats["ipAddress"],
str(core_worker_stats["port"]))
if addr in self._addr_to_actor_id:
actor_info = actors[self._addr_to_actor_id[addr]]
format_reply_id(core_worker_stats)
actor_info.update(core_worker_stats)
actor_info["averageTaskExecutionSpeed"] = round(
actor_info["numExecutedTasks"] /
(now - actor_info["timestamp"] / 1000), 2)
actor_info["nodeId"] = node_id
actor_info["pid"] = core_worker_stats["pid"]
def _update_from_actor_tasks(task, task_spec_type,
invalid_state_type):
@ -183,8 +181,9 @@ class NodeStats(threading.Thread):
elif invalid_state_type == "infeasibleActor":
task["state"] = -2
else:
raise ValueError(f"Invalid argument"
"invalid_state_type={invalid_state_type}")
raise ValueError(
"Invalid argument"
f"invalid_state_type={invalid_state_type}")
task["actorTitle"] = task["functionDescriptor"][
"pythonFunctionDescriptor"]["className"]
format_reply_id(task)

View file

@ -4,6 +4,7 @@ import requests
import time
import ray
from ray.core.generated import common_pb2
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.test_utils import (RayTestTimeoutException,
@ -36,7 +37,10 @@ def test_worker_stats(shutdown_only):
reply = try_get_node_stats()
# Check that there is one connected driver.
drivers = [worker for worker in reply.workers_stats if worker.is_driver]
drivers = [
worker for worker in reply.core_workers_stats
if worker.worker_type == common_pb2.DRIVER
]
assert len(drivers) == 1
assert os.getpid() == drivers[0].pid
@ -58,11 +62,10 @@ def test_worker_stats(shutdown_only):
worker_pid = ray.get(f.remote())
reply = try_get_node_stats()
target_worker_present = False
for worker in reply.workers_stats:
stats = worker.core_worker_stats
for stats in reply.core_workers_stats:
if stats.webui_display[""] == '{"message": "test", "dtype": "text"}':
target_worker_present = True
assert worker.pid == worker_pid
assert stats.pid == worker_pid
else:
assert stats.webui_display[""] == "" # Empty proto
assert target_worker_present
@ -72,11 +75,10 @@ def test_worker_stats(shutdown_only):
worker_pid = ray.get(a.f.remote())
reply = try_get_node_stats()
target_worker_present = False
for worker in reply.workers_stats:
stats = worker.core_worker_stats
for stats in reply.core_workers_stats:
if stats.webui_display[""] == '{"message": "test", "dtype": "text"}':
target_worker_present = True
assert worker.pid == worker_pid
assert stats.pid == worker_pid
else:
assert stats.webui_display[""] == "" # Empty proto
assert target_worker_present
@ -89,15 +91,15 @@ def test_worker_stats(shutdown_only):
"Timed out while waiting for worker processes")
# Wait for the workers to start.
if len(reply.workers_stats) < num_cpus + 1:
if len(reply.core_workers_stats) < num_cpus + 1:
time.sleep(1)
reply = try_get_node_stats()
continue
# Check that the rest of the processes are workers, 1 for each CPU.
assert len(reply.workers_stats) == num_cpus + 1
assert len(reply.core_workers_stats) == num_cpus + 1
# Check that all processes are Python.
pids = [worker.pid for worker in reply.workers_stats]
pids = [worker.pid for worker in reply.core_workers_stats]
processes = [
p.info["name"] for p in psutil.process_iter(attrs=["pid", "name"])
if p.info["pid"] in pids

View file

@ -8,6 +8,7 @@ import pytest
import ray
import ray.test_utils
from ray.core.generated import common_pb2
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
from ray.test_utils import (wait_for_condition, wait_for_pid_to_exit,
run_string_as_driver,
@ -22,8 +23,8 @@ def get_workers():
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
return [
worker for worker in stub.GetNodeStats(
node_manager_pb2.GetNodeStatsRequest()).workers_stats
if not worker.is_driver
node_manager_pb2.GetNodeStatsRequest()).core_workers_stats
if worker.worker_type != common_pb2.DRIVER
]

View file

@ -2182,8 +2182,12 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString());
stats->set_ip_address(rpc_address_.ip_address());
stats->set_port(rpc_address_.port());
stats->set_pid(getpid());
stats->set_language(options_.language);
stats->set_job_id(worker_context_.GetCurrentJobID().Binary());
stats->set_worker_id(worker_context_.GetWorkerID().Binary());
stats->set_actor_id(actor_id_.Binary());
stats->set_worker_type(worker_context_.GetWorkerType());
auto used_resources_map = stats->mutable_used_resources();
for (auto const &it : *resource_ids_) {
rpc::ResourceAllocations allocations;

View file

@ -392,6 +392,14 @@ message CoreWorkerStats {
repeated ObjectRefInfo object_refs = 18;
// Job ID.
bytes job_id = 19;
// Worker id of core worker.
bytes worker_id = 20;
// Language
Language language = 21;
// PID of the worker process.
uint32 pid = 22;
// The worker type.
WorkerType worker_type = 23;
}
message MetricPoint {

View file

@ -115,28 +115,12 @@ message GetNodeStatsRequest {
bool include_memory_info = 1;
}
message WorkerStats {
// PID of the worker process.
uint32 pid = 1;
// Whether this is a driver.
bool is_driver = 2;
// Debug information returned from the core worker.
CoreWorkerStats core_worker_stats = 3;
// Error string if fetching core worker stats failed.
string fetch_error = 4;
// Worker id of core worker.
bytes worker_id = 5;
// Worker language.
Language language = 6;
}
message GetNodeStatsReply {
repeated WorkerStats workers_stats = 1;
repeated CoreWorkerStats core_workers_stats = 1;
repeated ViewData view_data = 2;
uint32 num_workers = 3;
repeated TaskSpec infeasible_tasks = 4;
repeated TaskSpec ready_tasks = 5;
int32 pid = 6;
}
message GlobalGCRequest {

View file

@ -3174,7 +3174,6 @@ void NodeManager::FlushObjectsToFree() {
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_pid(getpid());
for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto infeasible_task = reply->add_infeasible_tasks();
@ -3254,22 +3253,8 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_
worker->rpc_client()->GetCoreWorkerStats(
request, [reply, worker, all_workers, driver_ids, send_reply_callback](
const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) {
auto worker_stats = reply->add_workers_stats();
worker_stats->set_pid(worker->GetProcess().GetId());
worker_stats->set_worker_id(worker->WorkerId().Binary());
worker_stats->set_is_driver(driver_ids.contains(worker->WorkerId()));
worker_stats->set_language(worker->GetLanguage());
reply->add_core_workers_stats()->MergeFrom(r.core_worker_stats());
reply->set_num_workers(reply->num_workers() + 1);
if (status.ok()) {
worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats());
} else {
RAY_LOG(WARNING) << "Failed to send get core worker stats request, "
<< "worker id is " << worker->WorkerId() << ", status is "
<< status.ToString()
<< ". This is likely since the worker has died before the "
"request was sent.";
worker_stats->set_fetch_error(status.ToString());
}
if (reply->num_workers() == all_workers.size()) {
send_reply_callback(Status::OK(), nullptr, nullptr);
}
@ -3281,8 +3266,8 @@ std::string FormatMemoryInfo(std::vector<rpc::GetNodeStatsReply> node_stats) {
// First pass to compute object sizes.
absl::flat_hash_map<ObjectID, int64_t> object_sizes;
for (const auto &reply : node_stats) {
for (const auto &worker_stats : reply.workers_stats()) {
for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) {
for (const auto &core_worker_stats : reply.core_workers_stats()) {
for (const auto &object_ref : core_worker_stats.object_refs()) {
auto obj_id = ObjectID::FromBinary(object_ref.object_id());
if (object_ref.object_size() > 0) {
object_sizes[obj_id] = object_ref.object_size();
@ -3304,9 +3289,9 @@ std::string FormatMemoryInfo(std::vector<rpc::GetNodeStatsReply> node_stats) {
// Second pass builds the summary string for each node.
for (const auto &reply : node_stats) {
for (const auto &worker_stats : reply.workers_stats()) {
for (const auto &core_worker_stats : reply.core_workers_stats()) {
bool pid_printed = false;
for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) {
for (const auto &object_ref : core_worker_stats.object_refs()) {
auto obj_id = ObjectID::FromBinary(object_ref.object_id());
if (!object_ref.pinned_in_memory() && object_ref.local_ref_count() == 0 &&
object_ref.submitted_task_ref_count() == 0 &&
@ -3317,10 +3302,10 @@ std::string FormatMemoryInfo(std::vector<rpc::GetNodeStatsReply> node_stats) {
continue;
}
if (!pid_printed) {
if (worker_stats.is_driver()) {
builder << "; driver pid=" << worker_stats.pid() << "\n";
if (core_worker_stats.worker_type() == rpc::WorkerType::DRIVER) {
builder << "; driver pid=" << core_worker_stats.pid() << "\n";
} else {
builder << "; worker pid=" << worker_stats.pid() << "\n";
builder << "; worker pid=" << core_worker_stats.pid() << "\n";
}
pid_printed = true;
}