ray/dashboard/modules/actor/actor_utils.py
Tao Wang 6aefe9b36e
[Core]Save task spec in separate table (#22650)
This is a rebase version of #11592. As task spec info is only needed when gcs create or start an actor, so we can remove it from actor table and save the serialization time and memory/network cost when gcs clients get actor infos from gcs.

As internal repository varies very much from the community. This pr just add some manual check with simple cherry pick. Welcome to comment first and at the meantime I'll see if there's any test case failed or some points were missed.
2022-04-12 12:24:26 -07:00

78 lines
2.3 KiB
Python

import time
import re
from collections import defaultdict
PYCLASSNAME_RE = re.compile(r"(.+?)\(")
def construct_actor_groups(actors):
"""actors is a dict from actor id to an actor or an
actor creation task The shared fields currently are
"actorClass", "actorId", and "state" """
actor_groups = _group_actors_by_python_class(actors)
stats_by_group = {
name: _get_actor_group_stats(group) for name, group in actor_groups.items()
}
summarized_actor_groups = {}
for name, group in actor_groups.items():
summarized_actor_groups[name] = {
"entries": group,
"summary": stats_by_group[name],
}
return summarized_actor_groups
def actor_classname_from_task_spec(task_spec):
return (
task_spec.get("functionDescriptor", {})
.get("pythonFunctionDescriptor", {})
.get("className", "Unknown actor class")
.split(".")[-1]
)
def actor_classname_from_func_descriptor(func_desc):
return (
func_desc.get("pythonFunctionDescriptor", {})
.get("className", "Unknown actor class")
.split(".")[-1]
)
def _group_actors_by_python_class(actors):
groups = defaultdict(list)
for actor in actors.values():
actor_class = actor["actorClass"]
groups[actor_class].append(actor)
return dict(groups)
def _get_actor_group_stats(group):
state_to_count = defaultdict(lambda: 0)
executed_tasks = 0
min_timestamp = None
num_timestamps = 0
sum_timestamps = 0
now = time.time() * 1000 # convert S -> MS
for actor in group:
state_to_count[actor["state"]] += 1
if "timestamp" in actor:
if not min_timestamp or actor["timestamp"] < min_timestamp:
min_timestamp = actor["timestamp"]
num_timestamps += 1
sum_timestamps += now - actor["timestamp"]
if "numExecutedTasks" in actor:
executed_tasks += actor["numExecutedTasks"]
if num_timestamps > 0:
avg_lifetime = int((sum_timestamps / num_timestamps) / 1000)
max_lifetime = int((now - min_timestamp) / 1000)
else:
avg_lifetime = 0
max_lifetime = 0
return {
"stateToCount": state_to_count,
"avgLifetime": avg_lifetime,
"maxLifetime": max_lifetime,
"numExecutedTasks": executed_tasks,
}