[RFC] Ray memory improvements: format and summary (#14520)

* Better formatting when terminal size doesn't support tabular

* Summary now displays size of reference types

* Add unit conversion support (e.g. b, kb, mb, gb)

* Format and test

* Add ability to specify the number of sorted entries

* Linting

* Clean up group summary, move import defaultdict, comment num entries counter, n

* Format and lint
This commit is contained in:
Micah Yong 2021-03-28 21:03:06 -07:00 committed by GitHub
parent 374d166f6d
commit b3089b31f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 15 deletions

View file

@ -328,10 +328,28 @@ def construct_memory_table(workers_stats: List,
return memory_table
def track_reference_size(group):
"""Returns dictionary mapping reference type
to memory usage for a given memory table group."""
d = defaultdict(int)
table_name = {
"LOCAL_REFERENCE": "total_local_ref_count",
"PINNED_IN_MEMORY": "total_pinned_in_memory",
"USED_BY_PENDING_TASK": "total_used_by_pending_task",
"CAPTURED_IN_OBJECT": "total_captured_in_objects",
"ACTOR_HANDLE": "total_actor_handles"
}
for entry in group["entries"]:
d[table_name[entry["reference_type"]]] += entry["object_size"]
return d
def memory_summary(state,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
line_wrap=True) -> str:
line_wrap=True,
unit="B",
num_entries=None) -> str:
from ray.new_dashboard.modules.stats_collector.stats_collector_head\
import node_stats_to_dict
@ -340,6 +358,9 @@ def memory_summary(state,
size = shutil.get_terminal_size((80, 20)).columns
line_wrap_threshold = 137
# Unit conversions
units = {"B": 10**0, "KB": 10**3, "MB": 10**6, "GB": 10**9}
# Fetch core memory worker stats, store as a dictionary
core_worker_stats = []
for raylet in state.node_table():
@ -360,8 +381,8 @@ def memory_summary(state,
group_by, sort_by = group_by.name.lower().replace(
"_", " "), sort_by.name.lower().replace("_", " ")
summary_labels = [
"Mem Used by Objects", "Local References", "Pinned Count",
"Pending Tasks", "Captured in Objects", "Actor Handles"
"Mem Used by Objects", "Local References", "Pinned", "Pending Tasks",
"Captured in Objects", "Actor Handles"
]
summary_string = "{:<19} {:<16} {:<12} {:<13} {:<19} {:<13}\n"
@ -369,18 +390,28 @@ def memory_summary(state,
"IP Address", "PID", "Type", "Call Site", "Size", "Reference Type",
"Object Ref"
]
object_ref_string = "{:<8} {:<3} {:<4} {:<9} {:<4} {:<14} {:<10}\n"
object_ref_string = "{:<8} | {:<3} | {:<4} | {:<9} \
| {:<4} | {:<14} | {:<10}\n"
if size > line_wrap_threshold and line_wrap:
object_ref_string = "{:<12} {:<5} {:<6} {:<22} {:<6} {:<18} \
{:<56}\n"
mem += f"Grouping by {group_by}...\
Sorting by {sort_by}...\n\n\n\n"
Sorting by {sort_by}...\
Display {num_entries if num_entries is not None else 'all'}\
entries per group...\n\n\n"
for key, group in memory_table["group"].items():
# Group summary
summary = group["summary"]
summary["total_object_size"] = str(summary["total_object_size"]) + " B"
ref_size = track_reference_size(group)
for key in summary:
if key == "total_object_size":
summary[key] = str(summary[key] / units[unit]) + f" {unit}"
else:
summary[key] = str(
summary[key]) + f", ({ref_size[key] / units[unit]} {unit})"
mem += f"--- Summary for {group_by}: {key} ---\n"
mem += summary_string\
.format(*summary_labels)
@ -391,10 +422,13 @@ def memory_summary(state,
mem += f"--- Object references for {group_by}: {key} ---\n"
mem += object_ref_string\
.format(*object_ref_labels)
n = 1 # Counter for num entries per group
for entry in group["entries"]:
if num_entries is not None and n > num_entries:
break
entry["object_size"] = str(
entry["object_size"]
) + " B" if entry["object_size"] > -1 else "?"
entry["object_size"] /
units[unit]) + f" {unit}" if entry["object_size"] > -1 else "?"
num_lines = 1
if size > line_wrap_threshold and line_wrap:
call_site_length = 22
@ -403,6 +437,8 @@ def memory_summary(state,
0, len(entry["call_site"]), call_site_length)
]
num_lines = len(entry["call_site"])
else:
mem += "\n"
object_ref_values = [
entry["node_ip_address"], entry["pid"], entry["type"],
entry["call_site"], entry["object_size"],
@ -418,5 +454,6 @@ def memory_summary(state,
mem += object_ref_string\
.format(*row)
mem += "\n"
mem += "\n\n\n"
n += 1
mem += "\n\n"
return mem

View file

@ -20,8 +20,10 @@ def memory_summary(address=None,
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
units="B",
line_wrap=True,
stats_only=False):
stats_only=False,
num_entries=None):
from ray.new_dashboard.memory_utils import memory_summary
if not address:
address = services.get_ray_address_to_use_or_die()
@ -29,8 +31,8 @@ def memory_summary(address=None,
state._initialize_global_state(address, redis_password)
if stats_only:
return get_store_stats(state)
return (memory_summary(state, group_by, sort_by, line_wrap) +
get_store_stats(state))
return (memory_summary(state, group_by, sort_by, line_wrap, units,
num_entries) + get_store_stats(state))
def get_store_stats(state, node_manager_address=None, node_manager_port=None):

View file

@ -1365,6 +1365,12 @@ def timeline(address):
default="OBJECT_SIZE",
help="Sort object references in ascending order by a SortingType \
(e.g. PID, OBJECT_SIZE, or REFERENCE_TYPE).")
@click.option(
"--units",
type=click.Choice(["B", "KB", "MB", "GB"]),
default="B",
help="Specify unit metrics for displaying object sizes \
(e.g. B, KB, MB, GB).")
@click.option(
"--no-format",
is_flag=True,
@ -1377,14 +1383,21 @@ terminal width is less than 137 characters.")
is_flag=True,
default=False,
help="Display plasma store stats only.")
def memory(address, redis_password, group_by, sort_by, no_format, stats_only):
@click.option(
"--num-entries",
"--n",
type=int,
default=None,
help="Specify number of sorted entries per group.")
def memory(address, redis_password, group_by, sort_by, units, no_format,
stats_only, num_entries):
"""Print object references held in a Ray cluster."""
if not address:
address = services.get_ray_address_to_use_or_die()
time = datetime.now()
header = "=" * 8 + f" Object references status: {time} " + "=" * 8
mem_stats = memory_summary(address, redis_password, group_by, sort_by,
no_format, stats_only)
units, no_format, stats_only, num_entries)
print(f"{header}\n{mem_stats}")

View file

@ -267,7 +267,7 @@ def test_memory_used_output(ray_start_regular):
info = memory_summary(address)
print(info)
assert count(info, "Plasma memory usage 8 MiB") == 1, info
assert count(info, "8388861 B") == 2, info
assert count(info, "8388861.0 B") == 2, info
if __name__ == "__main__":