[State Observability] Support output formatting (#24847)

This PR supports various output formatting. By default, we support yaml format. But this can be changed depending on the UX research we will conduct in the future.

1cb0f4b51a5799e0360a66db01000000:
  actor_id: 1cb0f4b51a5799e0360a66db01000000
  class_name: A
  state: ALIVE
f90ba34fa27f79a808b4b5aa01000000:
  actor_id: f90ba34fa27f79a808b4b5aa01000000
  class_name: A
  state: ALIVE
Table format is not supported yet. We will support this once we enhance the API output (which I will create an initial API review soon).
This commit is contained in:
SangBin Cho 2022-05-19 07:00:40 +09:00 committed by GitHub
parent 448f5273bc
commit 2fa7a00588
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 144 additions and 21 deletions

View file

@ -1377,7 +1377,6 @@ def start_dashboard(
# Make sure the process can start.
minimal = not ray._private.utils.check_dashboard_dependencies_installed()
# Start the dashboard process.
dashboard_dir = "dashboard"
dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")
@ -1487,7 +1486,6 @@ def start_dashboard(
# If it is the minimal installation, the web url (dashboard url)
# shouldn't be configured because it doesn't start a server.
dashboard_url = ""
return dashboard_url, process_info
except Exception as e:
if require_dashboard:

View file

@ -1,6 +1,10 @@
from pprint import pprint
import click
import json
import yaml
from enum import Enum, unique
from typing import Union, List
import ray
import ray._private.services as services
@ -20,6 +24,41 @@ from ray.experimental.state.api import (
)
@unique
class AvailableFormat(Enum):
DEFAULT = "default"
JSON = "json"
YAML = "yaml"
TABLE = "table"
def _get_available_formats() -> List[str]:
"""Return the available formats in a list of string"""
return [format_enum.value for format_enum in AvailableFormat]
def get_state_api_output_to_print(
state_data: Union[dict, list], *, format: AvailableFormat = AvailableFormat.DEFAULT
):
if len(state_data) == 0:
return "No resource in the cluster"
# Default is yaml.
if format == AvailableFormat.DEFAULT:
return yaml.dump(state_data, indent=4, explicit_start=True)
if format == AvailableFormat.YAML:
return yaml.dump(state_data, indent=4, explicit_start=True)
elif format == AvailableFormat.JSON:
return json.dumps(state_data)
elif format == AvailableFormat.TABLE:
raise NotImplementedError("Table formatter is not implemented yet.")
else:
raise ValueError(
f"Unexpected format: {format}. "
f"Supported formatting: {_get_available_formats()}"
)
@click.group("list")
@click.pass_context
def list_state_cli_group(ctx):
@ -47,56 +86,114 @@ def list_state_cli_group(ctx):
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def actors(ctx):
def actors(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_actors(url))
print(
get_state_api_output_to_print(
list_actors(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def placement_groups(ctx):
def placement_groups(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_placement_groups(url))
print(
get_state_api_output_to_print(
list_placement_groups(api_server_url=url),
format=AvailableFormat(format),
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def nodes(ctx):
def nodes(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_nodes(url))
print(
get_state_api_output_to_print(
list_nodes(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def jobs(ctx):
def jobs(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_jobs(url))
print(
get_state_api_output_to_print(
list_jobs(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def workers(ctx):
def workers(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_workers(url))
print(
get_state_api_output_to_print(
list_workers(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def tasks(ctx):
def tasks(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_tasks(url))
print(
get_state_api_output_to_print(
list_tasks(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def objects(ctx):
def objects(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_objects(url))
print(
get_state_api_output_to_print(
list_objects(api_server_url=url), format=AvailableFormat(format)
)
)
@list_state_cli_group.command()
@click.option(
"--format", default="default", type=click.Choice(_get_available_formats())
)
@click.pass_context
def runtime_envs(ctx):
def runtime_envs(ctx, format: str):
url = ctx.obj["api_server_url"]
pprint(list_runtime_envs(url))
print(
get_state_api_output_to_print(
list_runtime_envs(api_server_url=url),
format=AvailableFormat(format),
)
)

View file

@ -1,6 +1,7 @@
import json
import sys
import pytest
import yaml
from typing import List
from dataclasses import fields
@ -67,7 +68,11 @@ from ray.experimental.state.state_manager import (
StateDataSourceClient,
StateSourceNetworkException,
)
from ray.experimental.state.state_cli import list_state_cli_group
from ray.experimental.state.state_cli import (
list_state_cli_group,
get_state_api_output_to_print,
AvailableFormat,
)
from ray.runtime_env import RuntimeEnv
from ray._private.test_utils import wait_for_condition
from ray.job_submission import JobSubmissionClient
@ -909,6 +914,29 @@ def test_limit(shutdown_only):
assert output == list_actors(limit=2)
@pytest.mark.asyncio
async def test_cli_format_print(state_api_manager):
data_source_client = state_api_manager.data_source_client
actor_id = b"1234"
data_source_client.get_all_actor_info.return_value = GetAllActorInfoReply(
actor_table_data=[generate_actor_data(actor_id), generate_actor_data(b"12345")]
)
result = await state_api_manager.list_actors(option=list_api_options())
# If the format is not yaml, it will raise an exception.
yaml.load(
get_state_api_output_to_print(result, format=AvailableFormat.YAML),
Loader=yaml.FullLoader,
)
# If the format is not json, it will raise an exception.
json.loads(get_state_api_output_to_print(result, format=AvailableFormat.JSON))
# Verify the default format is yaml
yaml.load(get_state_api_output_to_print(result), Loader=yaml.FullLoader)
with pytest.raises(ValueError):
get_state_api_output_to_print(result, format="random_format")
with pytest.raises(NotImplementedError):
get_state_api_output_to_print(result, format=AvailableFormat.TABLE)
if __name__ == "__main__":
import sys