[State Observability] Use a table format by default (#26159)

NOTE: tabulate is copied/pasted to the codebase for table formatting.

This PR changes the default layout to be the table format for both summary and list APIs.
This commit is contained in:
SangBin Cho 2022-07-19 00:54:16 -07:00 committed by GitHub
parent 518b6b427b
commit adf24bfa97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 2399 additions and 36 deletions

24
LICENSE
View file

@ -402,6 +402,30 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---------------------------------------------------------------------------------------------------------------
Code in python/ray/_private/thirdparty/tabulate/tabulate.py is adapted from https://github.com/astanin/python-tabulate/blob/4892c6e9a79638c7897ccea68b602040da9cc7a7/tabulate.py
Copyright (c) 2011-2020 Sergey Astanin and contributors
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
--------------------------------------------------------------------------------
Code in python/ray/_private/thirdparty/dacite is adapted from https://github.com/konradhalas/dacite/blob/master/dacite

View file

@ -155,6 +155,7 @@ class StateAPIManager:
match = True
for filter_column, filter_predicate, filter_value in filters:
filterable_columns = state_dataclass.filterable_columns()
filter_column = filter_column.lower()
if filter_column not in filterable_columns:
raise ValueError(
f"The given filter column {filter_column} is not supported. "

View file

File diff suppressed because it is too large Load diff

View file

@ -408,8 +408,8 @@ class TaskSummaries:
total_tasks: int
# Total actor tasks
total_actor_tasks: int
# Total actor scheduling tasks
total_actor_scheduling_tasks: int
# Total scheduling actors
total_actor_scheduled: int
summary_by: str = "func_name"
@classmethod
@ -422,7 +422,7 @@ class TaskSummaries:
summary = {}
total_tasks = 0
total_actor_tasks = 0
total_actor_scheduling_tasks = 0
total_actor_scheduled = 0
for task in tasks:
key = task["func_or_class_name"]
@ -442,7 +442,7 @@ class TaskSummaries:
if type_enum == TaskType.NORMAL_TASK:
total_tasks += 1
elif type_enum == TaskType.ACTOR_CREATION_TASK:
total_actor_scheduling_tasks += 1
total_actor_scheduled += 1
elif type_enum == TaskType.ACTOR_TASK:
total_actor_tasks += 1
@ -450,7 +450,7 @@ class TaskSummaries:
summary=summary,
total_tasks=total_tasks,
total_actor_tasks=total_actor_tasks,
total_actor_scheduling_tasks=total_actor_scheduling_tasks,
total_actor_scheduled=total_actor_scheduled,
)

View file

@ -1,7 +1,8 @@
import json
import logging
from datetime import datetime
from enum import Enum, unique
from typing import List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
import click
import yaml
@ -10,6 +11,7 @@ import ray
import ray._private.ray_constants as ray_constants
import ray._private.services as services
from ray._private.gcs_utils import GcsClient
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.experimental.state.api import (
StateApiClient,
summarize_actors,
@ -143,18 +145,41 @@ def get_api_server_url() -> str:
return api_server_url
def get_table_output(state_data: List) -> str:
time = datetime.now()
header = "=" * 8 + f" List: {time} " + "=" * 8
headers = []
table = []
for data in state_data:
for key, val in data.items():
if isinstance(val, dict):
data[key] = yaml.dump(val, indent=2)
headers = sorted([key.upper() for key in data.keys()])
table.append([data[header.lower()] for header in headers])
return f"""
{header}
Stats:
------------------------------
Total: {len(state_data)}
Table:
------------------------------
{tabulate(table, headers=headers, showindex=True, tablefmt="plain", floatfmt=".3f")}
"""
def output_with_format(
state_data: Union[dict, list], format: AvailableFormat = AvailableFormat.DEFAULT
):
state_data: List, format: AvailableFormat = AvailableFormat.DEFAULT
) -> str:
# Default is yaml.
if format == AvailableFormat.DEFAULT:
return yaml.dump(state_data, indent=4, explicit_start=True)
return get_table_output(state_data)
if format == AvailableFormat.YAML:
return yaml.dump(state_data, indent=4, explicit_start=True)
elif format == AvailableFormat.JSON:
return json.dumps(state_data)
elif format == AvailableFormat.TABLE:
raise NotImplementedError("Table formatter is not implemented yet.")
return get_table_output(state_data)
else:
raise ValueError(
f"Unexpected format: {format}. "
@ -162,11 +187,101 @@ def output_with_format(
)
def format_summary_output(state_data: Dict, *, resource: StateResource) -> str:
if len(state_data) == 0:
return "No resource in the cluster"
# Parse the data.
cluster_data = state_data["cluster"]
summaries = cluster_data["summary"]
summary_by = cluster_data["summary_by"]
del cluster_data["summary_by"]
del cluster_data["summary"]
cluster_info_table = yaml.dump(cluster_data, indent=2)
# Create a table.
table = []
headers = []
for summary in summaries.values():
# Convert dict to yaml for better formatting.
for key, val in summary.items():
if isinstance(val, dict):
summary[key] = yaml.dump(val, indent=2)
headers = sorted([key.upper() for key in summary.keys()])
table.append([summary[header.lower()] for header in headers])
summary_table = tabulate(
table, headers=headers, showindex=True, tablefmt="plain", numalign="left"
)
time = datetime.now()
header = "=" * 8 + f" {resource.value.capitalize()} Summary: {time} " + "=" * 8
return f"""
{header}
Stats:
------------------------------------
{cluster_info_table}
Table (group by {summary_by}):
------------------------------------
{summary_table}
"""
def format_object_summary_output(state_data: Dict) -> str:
if len(state_data) == 0:
return "No resource in the cluster"
# Parse the data.
cluster_data = state_data["cluster"]
summaries = cluster_data["summary"]
summary_by = cluster_data["summary_by"]
del cluster_data["summary_by"]
del cluster_data["summary"]
cluster_info_table = yaml.dump(cluster_data, indent=2)
# Create a table per callsite.
tables = []
for callsite, summary in summaries.items():
# Convert dict to yaml for better formatting.
for key, val in summary.items():
if isinstance(val, dict):
summary[key] = yaml.dump(val, indent=2)
table = []
headers = sorted([key.upper() for key in summary.keys()])
table.append([summary[header.lower()] for header in headers])
table_for_callsite = tabulate(
table, headers=headers, showindex=True, numalign="left"
)
# Format callsite. | is a separator for ray callsite.
formatted_callsite = callsite.replace("|", "\n|")
tables.append(f"{formatted_callsite}\n{table_for_callsite}")
time = datetime.now()
header = "=" * 8 + f" Object Summary: {time} " + "=" * 8
table_string = "\n\n\n\n".join(tables)
return f"""
{header}
Stats:
------------------------------------
{cluster_info_table}
Table (group by {summary_by})
------------------------------------
{table_string}
"""
def format_get_api_output(
state_data: Union[dict, list],
id: str,
format: AvailableFormat = AvailableFormat.DEFAULT,
):
) -> str:
if len(state_data) == 0:
return f"Resource with id={id} not found in the cluster."
@ -175,13 +290,13 @@ def format_get_api_output(
def format_list_api_output(
state_data: Union[dict, list], *, format: AvailableFormat = AvailableFormat.DEFAULT
):
) -> str:
if len(state_data) == 0:
return "No resource in the cluster"
return output_with_format(state_data, format)
def _should_explain(format: AvailableFormat):
def _should_explain(format: AvailableFormat) -> bool:
# If the format is json or yaml, it should not print stats because
# users don't want additional strings.
return format == AvailableFormat.DEFAULT or format == AvailableFormat.TABLE
@ -381,13 +496,13 @@ def summary_state_cli_group(ctx):
@click.pass_context
def task_summary(ctx, timeout: float, address: str):
print(
output_with_format(
format_summary_output(
summarize_tasks(
address=address,
timeout=timeout,
_explain=True,
),
format=AvailableFormat.YAML,
resource=StateResource.TASKS,
)
)
@ -398,13 +513,13 @@ def task_summary(ctx, timeout: float, address: str):
@click.pass_context
def actor_summary(ctx, timeout: float, address: str):
print(
output_with_format(
format_summary_output(
summarize_actors(
address=address,
timeout=timeout,
_explain=True,
),
format=AvailableFormat.YAML,
resource=StateResource.ACTORS,
)
)
@ -415,12 +530,11 @@ def actor_summary(ctx, timeout: float, address: str):
@click.pass_context
def object_summary(ctx, timeout: float, address: str):
print(
output_with_format(
format_object_summary_output(
summarize_objects(
address=address,
timeout=timeout,
_explain=True,
),
format=AvailableFormat.YAML,
)
)

View file

@ -53,6 +53,7 @@ from ray.experimental.state.state_cli import (
get_api_server_url,
output_with_format,
summary_state_cli_group,
AvailableFormat,
)
logger = logging.getLogger(__name__)
@ -2104,7 +2105,7 @@ def logs(
print(f"Node ID: {node_id}")
elif node_ip:
print(f"Node IP: {node_ip}")
print(output_with_format(logs))
print(output_with_format(logs, format=AvailableFormat.YAML))
# If there's an unique match, print the log file.
if match_unique:

View file

@ -1,3 +1,4 @@
import time
import json
import sys
from dataclasses import dataclass
@ -1434,17 +1435,31 @@ def test_cli_apis_sanity_check(ray_start_cluster):
print(result.output)
return exit_code_correct and substring_matched
wait_for_condition(lambda: verify_output(cli_list, ["actors"], ["actor_id"]))
wait_for_condition(lambda: verify_output(cli_list, ["workers"], ["worker_id"]))
wait_for_condition(lambda: verify_output(cli_list, ["nodes"], ["node_id"]))
wait_for_condition(
lambda: verify_output(cli_list, ["placement-groups"], ["placement_group_id"])
lambda: verify_output(cli_list, ["actors"], ["Stats:", "Table:", "ACTOR_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["workers"], ["Stats:", "Table:", "WORKER_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["nodes"], ["Stats:", "Table:", "NODE_ID"])
)
wait_for_condition(
lambda: verify_output(
cli_list, ["placement-groups"], ["Stats:", "Table:", "PLACEMENT_GROUP_ID"]
)
)
wait_for_condition(lambda: verify_output(cli_list, ["jobs"], ["raysubmit"]))
wait_for_condition(lambda: verify_output(cli_list, ["tasks"], ["task_id"]))
wait_for_condition(lambda: verify_output(cli_list, ["objects"], ["object_id"]))
wait_for_condition(
lambda: verify_output(cli_list, ["runtime-envs"], ["runtime_env"])
lambda: verify_output(cli_list, ["tasks"], ["Stats:", "Table:", "TASK_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["objects"], ["Stats:", "Table:", "OBJECT_ID"])
)
wait_for_condition(
lambda: verify_output(
cli_list, ["runtime-envs"], ["Stats:", "Table:", "RUNTIME_ENV"]
)
)
# Test get node by id
@ -1953,12 +1968,17 @@ async def test_cli_format_print(state_api_manager):
)
# If the format is not json, it will raise an exception.
json.loads(format_list_api_output(result, format=AvailableFormat.JSON))
# Verify the default format is yaml
yaml.load(format_list_api_output(result), Loader=yaml.FullLoader)
# Test a table formatting.
output = format_list_api_output(result, format=AvailableFormat.TABLE)
assert "Table:" in output
assert "Stats:" in output
with pytest.raises(ValueError):
format_list_api_output(result, format="random_format")
with pytest.raises(NotImplementedError):
format_list_api_output(result, format=AvailableFormat.TABLE)
# Verify the default format.
output = format_list_api_output(result)
assert "Table:" in output
assert "Stats:" in output
def test_filter(shutdown_only):
@ -2102,11 +2122,14 @@ def test_detail(shutdown_only):
"""
runner = CliRunner()
result = runner.invoke(cli_list, ["actors", "--detail"])
print(result.output)
assert result.exit_code == 0
# The column for --detail should be in the output.
assert "serialized_runtime_env" in result.output
assert "test_detail" in result.output
assert "actor_id" in result.output
# Columns are upper case in the default formatting (table).
assert "serialized_runtime_env".upper() in result.output
assert "actor_id".upper() in result.output
def _try_state_query_expect_rate_limit(api_func, res_q, start_q=None):
@ -2145,7 +2168,7 @@ def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
"RAY_testing_asio_delay_us",
(
"NodeManagerService.grpc_server.GetTasksInfo=10000000:10000000,"
"NodeInfoGcsService.grpc_server.GetAllNodeInfo=1000000:1000000,"
"WorkerInfoGcsService.grpc_server.GetAllWorkerInfo=10000000:10000000,"
"ActorInfoGcsService.grpc_server.GetAllActorInfo=10000000:10000000"
),
)
@ -2183,7 +2206,7 @@ def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
mp.Process(
target=_try_state_query_expect_rate_limit,
args=(
list_nodes,
list_workers,
res_q,
start_q,
),
@ -2216,6 +2239,9 @@ def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
return started == 3
wait_for_condition(_wait_to_start)
# Wait 1 more second to make sure the API call happens after all
# process has a call.
time.sleep(1)
# Running another 1 should return error
with pytest.raises(RayStateApiException) as e:

View file

@ -125,7 +125,7 @@ async def test_api_manager_summary_tasks(state_api_manager):
assert data.total_tasks == 4
assert data.total_actor_tasks == 1
assert data.total_actor_scheduling_tasks == 0
assert data.total_actor_scheduled == 0
"""
Test if it can be correctly modified to a dictionary.