[Doc][Core][State Observability] Adding Python SDK doc and docstring (#26997)

1. Add doc for python SDK and docstrings on public SDK
2. Rename list -> ray_list and get -> ray_get for better naming 
3. Fix some typos 
4. Auto translate address to api server url.

Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
Ricky Xu 2022-08-02 09:24:59 -07:00 committed by GitHub
parent d527c7b335
commit 82a24f9319
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 956 additions and 294 deletions

View file

@ -34,7 +34,6 @@ from ray._private.test_utils import (
)
from ray.dashboard import dashboard
from ray.dashboard.head import DashboardHead
from ray.dashboard.modules.dashboard_sdk import DEFAULT_DASHBOARD_ADDRESS
from ray.experimental.state.api import StateApiClient
from ray.experimental.state.common import ListApiOptions, StateResource
from ray.experimental.state.exception import ServerUnavailable
@ -964,7 +963,7 @@ def test_dashboard_requests_fail_on_missing_deps(ray_start_with_dashboard):
response = None
with pytest.raises(ServerUnavailable):
client = StateApiClient(address=DEFAULT_DASHBOARD_ADDRESS)
client = StateApiClient()
response = client.list(
StateResource.NODES, options=ListApiOptions(), raise_on_missing_output=False
)

View file

@ -19,10 +19,10 @@ State CLI allows users to access the state of various resources (e.g., actor, ta
.. click:: ray.experimental.state.state_cli:object_summary
:prog: ray summary objects
.. click:: ray.experimental.state.state_cli:list
.. click:: ray.experimental.state.state_cli:ray_list
:prog: ray list
.. click:: ray.experimental.state.state_cli:get
.. click:: ray.experimental.state.state_cli:ray_get
:prog: ray get
.. _ray-logs-api-doc:
@ -36,6 +36,44 @@ Note that only the logs from alive nodes are available through this API.
.. click:: ray.scripts.scripts:ray_logs
:prog: ray logs
State Python SDK
-----------------
State APIs are also exported as functions.
Summary APIs
~~~~~~~~~~~~
.. autofunction:: ray.experimental.state.api.summarize_actors
.. autofunction:: ray.experimental.state.api.summarize_objects
.. autofunction:: ray.experimental.state.api.summarize_tasks
List APIs
~~~~~~~~~~
.. autofunction:: ray.experimental.state.api.list_actors
.. autofunction:: ray.experimental.state.api.list_placement_groups
.. autofunction:: ray.experimental.state.api.list_nodes
.. autofunction:: ray.experimental.state.api.list_jobs
.. autofunction:: ray.experimental.state.api.list_workers
.. autofunction:: ray.experimental.state.api.list_tasks
.. autofunction:: ray.experimental.state.api.list_objects
.. autofunction:: ray.experimental.state.api.list_runtime_envs
Get APIs
~~~~~~~~~
.. autofunction:: ray.experimental.state.api.get_actor
.. autofunction:: ray.experimental.state.api.get_placement_group
.. autofunction:: ray.experimental.state.api.get_node
.. autofunction:: ray.experimental.state.api.get_worker
.. autofunction:: ray.experimental.state.api.get_task
.. autofunction:: ray.experimental.state.api.get_objects
Log APIs
~~~~~~~~
.. autofunction:: ray.experimental.state.api.list_logs
.. autofunction:: ray.experimental.state.api.get_log
.. _state-api-schema:
State APIs Schema

View file

@ -43,10 +43,19 @@ Run any workload. In this example, you will use the following script that runs 2
Now, let's see the summarized states of tasks. If it doesn't return the output immediately, retry the command.
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray summary tasks
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import summarize_tasks
print(summarize_tasks())
.. code-block:: text
======== Tasks Summary: 2022-07-22 08:54:38.332537 ========
@ -65,10 +74,19 @@ Now, let's see the summarized states of tasks. If it doesn't return the output i
Let's list all actors.
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray list actors
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_actors
print(list_actors())
.. code-block:: text
======== List: 2022-07-23 21:29:39.323925 ========
@ -84,9 +102,21 @@ Let's list all actors.
You can get the state of a single task using the get API.
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
# In this case, 31405554844820381c2f0f8501000000
ray get actors <ACTOR_ID>
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_actor
# In this case, 31405554844820381c2f0f8501000000
print(get_actor(id=<ACTOR_ID>))
ray get actors <ACTOR_ID> # In this case, 31405554844820381c2f0f8501000000
.. code-block:: text
@ -103,11 +133,24 @@ You can get the state of a single task using the get API.
You can also access logs through ``ray logs`` API.
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray list actors
# In this case, ACTOR_ID is 31405554844820381c2f0f8501000000
ray logs --actor-id <ACTOR_ID>
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_log
# In this case, ACTOR_ID is 31405554844820381c2f0f8501000000
for line in get_log(actor_id=<ACTOR_ID>):
print(line)
.. code-block:: text
--- Log has been truncated to last 1000 lines. Use `--tail` flag to toggle. ---
@ -134,105 +177,242 @@ It is recommended to start monitoring states through summary APIs first. When yo
(e.g., actors running for a long time, tasks that are not scheduled for a long time),
you can use ``list`` or ``get`` APIs to get more details for an individual abnormal resource.
E.g., Summarize all actors (e.g., number of alive actors, different actor classes, etc)
E.g., Summarize all actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray summary actors
E.g., Summarize all tasks (e.g., task count in different states, type of different tasks, etc)
.. tabbed:: Python SDK
.. code-block:: bash
.. code-block:: python
from ray.experimental.state.api import summarize_actors
print(summarize_actors())
E.g., Summarize all tasks
~~~~~~~~~~~~~~~~~~~~~~~~~
.. tabbed:: CLI
.. code-block:: bash
ray summary tasks
E.g., Summarize all objects (e.g., the total number of objects, size of all objects, etc)
.. tabbed:: Python SDK
.. code-block:: bash
.. code-block:: python
from ray.experimental.state.api import summarize_tasks
print(summarize_tasks())
E.g., Summarize all objects
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. note::
By default, objects are summarized by callsite. However, callsite is not recorded by Ray by default.
To get callsite info, set env variable `RAY_record_ref_creation_sites=1` when starting the ray cluster
RAY_record_ref_creation_sites=1 ray start --head
.. tabbed:: CLI
.. code-block:: bash
# To get callsite info, set env variable `RAY_record_ref_creation_sites=1` when starting the ray cluster
# RAY_record_ref_creation_sites=1 ray start --head
ray summary objects
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import summarize_objects
print(summarize_objects())
List
----
Get a list of resources, possible resources include:
- :ref:`Actors <actor-guide>`
- :ref:`Tasks <ray-remote-functions>`
- :ref:`Objects <objects-in-ray>`
- :ref:`Jobs <jobs-overview>`
- :ref:`Placement Groups <ray-placement-group-doc-ref>`
- Nodes (Ray worker nodes)
- Workers (Ray worker processes)
- :ref:`Runtime environments <runtime-environments>`
- :ref:`Actors <actor-guide>`, e.g., actor id, state, pid, death_cause. (:ref:`output schema <state-api-schema-actor>`)
- :ref:`Tasks <ray-remote-functions>`, e.g., name, scheduling state, type, runtime env info (:ref:`output schema <state-api-schema-task>`)
- :ref:`Objects <objects-in-ray>`, e.g., object id, callsites, reference types. (:ref:`output schema <state-api-schema-obj>`)
- :ref:`Jobs <jobs-overview>`, e.g., start/end time, entrypoint, status. (:ref:`output schema <state-api-schema-job>`)
- :ref:`Placement Groups <ray-placement-group-doc-ref>`, e.g., name, bundles, stats. (:ref:`output schema <state-api-schema-pg>`)
- Nodes (Ray worker nodes), e.g., node id, node ip, node state. (:ref:`output schema <state-api-schema-node>`)
- Workers (Ray worker processes), e.g., worker id, type, exit type and details. (:ref:`output schema <state-api-schema-worker>`)
- :ref:`Runtime environments <runtime-environments>`, e.g., runtime envs, creation time, nodes (:ref:`output schema <state-api-schema-runtime-env>`)
E.g., List all nodes
~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray list nodes
E.g., List all placement groups
.. tabbed:: Python SDK
.. code-block:: bash
.. code-block:: python
from ray.experimental.state.api import list_nodes()
list_nodes()
E.g., List all placement groups
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. tabbed:: CLI
.. code-block:: bash
ray list placement-groups
You can list resources with one or multiple filters.
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_placement_groups
list_placement_groups()
E.g., List local referenced objects created by a process
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tip:: You can list resources with one or multiple filters: using `--filter` or `-f`
ray list objects -f pid=12345 -f reference_type=LOCAL_REFERENCE
.. tabbed:: CLI
.. code-block:: bash
ray list objects -f pid=<PID> -f reference_type=LOCAL_REFERENCE
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_objects
list_objects(filters=[("pid", "=", <PID>), ("reference_type", "=", "LOCAL_REFERENCE")])
E.g., List alive actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray list actors -f state=ALIVE
E.g., List running tasks
.. tabbed:: Python SDK
.. code-block:: bash
.. code-block:: python
from ray.experimental.state.api import list_actors
list_actors(filters=[("state", "=", "ALIVE")])
E.g., List running tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. tabbed:: CLI
.. code-block:: bash
ray list tasks -f scheduling_state=RUNNING
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING")])
E.g., List non-running tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray list tasks -f shceduling_state!=RUNNING
.. code-block:: bash
ray list tasks -f scheduling_state!=RUNNING
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "!=", "RUNNING")])
E.g., List running tasks that have a name func
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray list tasks -f scheduling_state=RUNNING -f name=func
.. code-block:: bash
E.g., List tasks with more details. When ``--detail`` is specified, the API can query more data sources to obtain state information in details.
ray list tasks -f scheduling_state=RUNNING -f name="task_running_300_seconds()"
.. code-block:: bash
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(filters=[("scheduling_state", "=", "RUNNING"), ("name", "=", "task_running_300_seconds()")])
E.g., List tasks with more details
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. tip:: When ``--detail`` is specified, the API can query more data sources to obtain state information in details.
.. tabbed:: CLI
.. code-block:: bash
ray list tasks --detail
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import list_tasks
list_tasks(detail=True)
Get
---
E.g., Get a task info
~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray get tasks <worker_id>
.. code-block:: bash
ray get tasks <TASK_ID>
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_task
get_task(id=<TASK_ID>)
E.g., Get a node info
~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray get nodes <node_id>
.. code-block:: bash
ray get nodes <NODE_ID>
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_node
get_node(id=<NODE_ID>)
Logs
----
@ -240,37 +420,88 @@ Logs
State API also allows you to conveniently access ray logs. Note that you cannot access the logs from a dead node.
By default, the API prints log from a head node.
E.g., Get all retrievable log file names
E.g., Get all retrievable log file names from a head node
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
.. code-block:: bash
ray logs
E.g., Get a particular log file from a node
.. tabbed:: Python SDK
.. code-block:: bash
.. code-block:: python
# You could get the node id / node ip from `ray list nodes`
ray logs gcs_server.out --node-id <XYZ>
from ray.experimental.state.api import list_logs
# `ray logs` by default print logs from a head node.
# So in order to list the same logs, you should provide the head node id.
# You could get the node id / node ip from `ray list nodes`
list_logs(node_id=<HEAD_NODE_ID>)
E.g., Get a particular log file from a node
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. tabbed:: CLI
.. code-block:: bash
# You could get the node id / node ip from `ray list nodes`
ray logs gcs_server.out --node-id <NODE_ID>
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_log
# Node IP could be retrieved from list_nodes() or ray.nodes()
for line in get_log(filename="gcs_server.out", node_id=<NODE_ID>):
print(line)
E.g., Stream a log file from a node
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray logs -f raylet.out --node-ip 172.31.47.143
.. code-block:: bash
E.g., Stream actor log with actor id
# You could get the node id / node ip from `ray list nodes`
ray logs -f raylet.out --node-ip <NODE_IP>
.. code-block:: bash
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_log
# Node IP could be retrieved from list_nodes() or ray.nodes()
for line in get_log(filename="raylet.out", node_ip=<NODE_IP>, follow=True):
print(line)
# You could use ray list actors to get the actor ids
ray logs --actor-id=<XXX>
E.g., Stream log from a pid
~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
.. tabbed:: CLI
ray logs --pid=<XXX> --follow
.. code-block:: bash
ray logs --pid=<PID> --follow
.. tabbed:: Python SDK
.. code-block:: python
from ray.experimental.state.api import get_log
# Node IP could be retrieved from list_nodes() or ray.nodes()
# You could get the pid of the worker running the actor easily when output
# of worker being directed to the driver (default)
# The loop will block with `follow=True`
for line in get_log(pid=<PID>, node_ip=<NODE_IP>, follow=True):
print(line)
Failure Semantics
-----------------

View file

@ -8,7 +8,6 @@ from typing import Any, Dict, Generator, List, Optional, Tuple, Union
import requests
import ray
from ray.dashboard.modules.dashboard_sdk import SubmissionClient
from ray.experimental.state.common import (
DEFAULT_LIMIT,
@ -17,7 +16,6 @@ from ray.experimental.state.common import (
ActorState,
GetApiOptions,
GetLogOptions,
JobState,
ListApiOptions,
NodeState,
ObjectState,
@ -29,6 +27,7 @@ from ray.experimental.state.common import (
SupportedFilterType,
TaskState,
WorkerState,
ray_address_to_api_server_url,
)
from ray.experimental.state.exception import RayStateApiException, ServerUnavailable
@ -92,7 +91,7 @@ If you have any feedback, you could do so at either way as below:
Usage:
1. [Recommended] With StateApiClient:
```
client = StateApiClient(address="localhost:8265")
client = StateApiClient(address="auto")
data = client.list(StateResource.NODES)
...
```
@ -102,7 +101,7 @@ Usage:
invocations of listing are used, it is better to reuse the `StateApiClient`
as suggested above.
```
data = list_nodes(address="localhost:8265")
data = list_nodes(address="auto")
```
"""
@ -119,8 +118,9 @@ class StateApiClient(SubmissionClient):
"""Initialize a StateApiClient and check the connection to the cluster.
Args:
address: The address of Ray API server. If not provided,
it will be configured automatically from querying the GCS server.
address: Ray bootstrap address. E.g. `127.0.0.0:6379`, `auto`.
If not provided, it will be detected automatically from any running
local ray cluster.
cookies: Cookies to use when sending requests to the HTTP job server.
headers: Headers to use when sending requests to the HTTP job server, used
for cases like authentication to a remote cluster.
@ -132,8 +132,12 @@ class StateApiClient(SubmissionClient):
)
if not headers:
headers = {"Content-Type": "application/json"}
# Resolve API server URL
api_server_url = ray_address_to_api_server_url(address)
super().__init__(
address,
address=api_server_url,
create_cluster_if_needed=False,
headers=headers,
cookies=cookies,
@ -155,7 +159,7 @@ class StateApiClient(SubmissionClient):
for filter in options.filters:
if len(filter) != 3:
raise ValueError(
f"The given filter has incorrect intput type, {filter}. "
f"The given filter has incorrect input type, {filter}. "
"Provide (key, predicate, value) tuples."
)
filter_k, filter_predicate, filter_val = filter
@ -176,7 +180,7 @@ class StateApiClient(SubmissionClient):
params: Dict,
timeout: float,
_explain: bool = False,
):
) -> Dict:
with warnings_on_slow_request(
address=self._address, endpoint=endpoint, timeout=timeout, explain=_explain
):
@ -249,7 +253,7 @@ class StateApiClient(SubmissionClient):
latency or failed query information.
Returns:
None if not found, and found:
None if not found, and if found, a dictionarified:
- ActorState for actors
- PlacementGroupState for placement groups
- NodeState for nodes
@ -387,8 +391,8 @@ class StateApiClient(SubmissionClient):
"""Raise an exception when the API resopnse contains a missing output.
Output can be missing if (1) Failures on some of data source queries (e.g.,
`ray list tasks` queries all raylets, and if some of quries fail, it will
contain missing output. If all quries fail, it will just fail). (2) Data
`ray list tasks` queries all raylets, and if some of queries fail, it will
contain missing output. If all queries fail, it will just fail). (2) Data
is truncated because the output is too large.
Args:
@ -511,17 +515,30 @@ class StateApiClient(SubmissionClient):
return summary_api_response["result"]["node_id_to_summary"]
"""
Convenient Methods for get_<RESOURCE> by id
"""
def get_actor(
id: str,
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[ActorState]:
) -> Optional[Dict]:
"""Get an actor by id.
Args:
id: Id of the actor
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state API requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
None if actor not found, or dictionarified
:ref:`ActorState <state-api-schema-actor>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.ACTORS, id, GetApiOptions(timeout=timeout), _explain=_explain
)
@ -533,7 +550,7 @@ def get_job(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[JobState]:
) -> Optional[Dict]:
raise NotImplementedError("Get Job by id is currently not supported")
@ -542,7 +559,25 @@ def get_placement_group(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[PlacementGroupState]:
) -> Optional[Dict]:
"""Get a placement group by id.
Args:
id: Id of the placement group
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state APIs requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
None if actor not found, or dictionarified
:ref:`PlacementGroupState <state-api-schema-pg>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.PLACEMENT_GROUPS,
id,
@ -556,7 +591,25 @@ def get_node(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[NodeState]:
) -> Optional[Dict]:
"""Get a node by id.
Args:
id: Id of the node.
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state APIs requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
None if actor not found, or dictionarified
:ref:`NodeState <state-api-schema-node>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>`
if the CLI is failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.NODES,
id,
@ -570,7 +623,25 @@ def get_worker(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[WorkerState]:
) -> Optional[Dict]:
"""Get a worker by id.
Args:
id: Id of the worker
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state APIs requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
None if actor not found, or dictionarified
:ref:`WorkerState <state-api-schema-worker>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.WORKERS,
id,
@ -584,7 +655,25 @@ def get_task(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> Optional[TaskState]:
) -> Optional[Dict]:
"""Get a task by id.
Args:
id: Id of the task
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state APIs requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
None if actor not found, or dictionarified
:ref:`TaskState <state-api-schema-task>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.TASKS,
id,
@ -598,7 +687,27 @@ def get_objects(
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
_explain: bool = False,
) -> List[ObjectState]:
) -> List[Dict]:
"""Get objects by id.
There could be more than 1 entry returned since an object could be
referenced at different places.
Args:
id: Id of the object
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout value for the state APIs requests made.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified :ref:`ObjectState <state-api-schema-obj>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).get(
StateResource.OBJECTS,
id,
@ -607,20 +716,6 @@ def get_objects(
)
"""
Convenient methods for list_<RESOURCE>
Supported arguments to the below methods, see `ListApiOptions`:
address: The IP address and port of the head node. Defaults to
http://localhost:8265.
filters: Optional list of filter key-value pair.
timeout: Time for the request.
limit: Limit of entries in the result
detail: If True, APIs will return more detailed output.
In this case, it can query more sources (more expensive).
"""
def list_actors(
address: Optional[str] = None,
filters: Optional[List[Tuple[str, PredicateType, SupportedFilterType]]] = None,
@ -629,7 +724,32 @@ def list_actors(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List actors in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("id", "=", "abcd")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `ActorState`)
will be queried and returned. See
:ref:`ActorState <state-api-schema-actor>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`ActorState <state-api-schema-actor>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.ACTORS,
options=ListApiOptions(
@ -651,7 +771,32 @@ def list_placement_groups(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List placement groups in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("state", "=", "abcd")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `PlacementGroupState`)
will be queried and returned. See
:ref:`PlacementGroupState <state-api-schema-pg>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`PlacementGroupState <state-api-schema-pg>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.PLACEMENT_GROUPS,
options=ListApiOptions(
@ -670,7 +815,32 @@ def list_nodes(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List nodes in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("node_name", "=", "abcd")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `NodeState`)
will be queried and returned. See
:ref:`NodeState <state-api-schema-node>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`NodeState <state-api-schema-node>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>`
if the CLI failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.NODES,
options=ListApiOptions(
@ -689,7 +859,32 @@ def list_jobs(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List jobs submitted to the cluster by :ref: `ray job submission <jobs-overview>`.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("status", "=", "abcd")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `JobState`)
will be queried and returned. See
:ref:`JobState <state-api-schema-job>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`JobState <state-api-schema-job>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.JOBS,
options=ListApiOptions(
@ -708,7 +903,32 @@ def list_workers(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List workers in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("is_alive", "=", "True")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `WorkerState`)
will be queried and returned. See
:ref:`WorkerState <state-api-schema-worker>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`WorkerState <state-api-schema-worker>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.WORKERS,
options=ListApiOptions(
@ -727,7 +947,32 @@ def list_tasks(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List tasks in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("is_alive", "=", "True")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `WorkerState`)
will be queried and returned. See
:ref:`WorkerState <state-api-schema-worker>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`WorkerState <state-api-schema-worker>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.TASKS,
options=ListApiOptions(
@ -746,7 +991,32 @@ def list_objects(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List objects in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("ip", "=", "0.0.0.0")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `ObjectState`)
will be queried and returned. See
:ref:`ObjectState <state-api-schema-obj>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`ObjectState <state-api-schema-obj>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.OBJECTS,
options=ListApiOptions(
@ -765,7 +1035,32 @@ def list_runtime_envs(
detail: bool = False,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> List[Dict]:
"""List runtime environments in the cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
filters: List of tuples of filter key, predicate (=, or !=), and
the filter value. E.g., `("node_id", "=", "abcdef")`
limit: Max number of entries returned by the state backend.
timeout: Max timeout value for the state APIs requests made.
detail: When True, more details info (specified in `RuntimeEnvState`)
will be queried and returned. See
:ref:`RuntimeEnvState <state-api-schema-runtime-env>`.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Returns:
List of dictionarified
:ref:`RuntimeEnvState <state-api-schema-runtime-env>`.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).list(
StateResource.RUNTIME_ENVS,
options=ListApiOptions(
@ -782,7 +1077,7 @@ Log APIs
def get_log(
api_server_url: str = None,
address: Optional[str] = None,
node_id: Optional[str] = None,
node_ip: Optional[str] = None,
filename: Optional[str] = None,
@ -794,12 +1089,44 @@ def get_log(
timeout: int = DEFAULT_RPC_TIMEOUT,
_interval: Optional[float] = None,
) -> Generator[str, None, None]:
if api_server_url is None:
assert ray.is_initialized()
api_server_url = (
f"http://{ray._private.worker.global_worker.node.address_info['webui_url']}"
)
"""Retrieve log file based on file name or some entities ids (pid, actor id, task id).
Examples:
>>> import ray
>>> from ray.experimental.state.api import get_log # doctest: +SKIP
# To connect to an existing ray instance if there is
>>> ray.init("auto") # doctest: +SKIP
# Node IP could be retrieved from list_nodes() or ray.nodes()
>>> node_ip = "172.31.47.143" # doctest: +SKIP
>>> filename = "gcs_server.out" # doctest: +SKIP
>>> for l in get_log(filename=filename, node_ip=node_ip): # doctest: +SKIP
>>> print(l) # doctest: +SKIP
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If not specified, it will be retrieved from the initialized ray cluster.
node_id: Id of the node containing the logs .
node_ip: Ip of the node containing the logs. (At least one of the node_id and
node_ip have to be supplied when identifying a node).
filename: Name of the file (relative to the ray log directory) to be retrieved.
actor_id: Id of the actor if getting logs from an actor.
task_id: Id of the task if getting logs generated by a task.
pid: PID of the worker if getting logs generated by a worker.
follow: When set to True, logs will be streamed and followed.
tail: Number of lines to get from the end of the log file. Set to -1 for getting
the entire log.
timeout: Max timeout for requests made when getting the logs.
_interval: The interval in secs to print new logs when `follow=True`.
Return:
A Generator of log line, None for SendType and ReturnType.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
api_server_url = ray_address_to_api_server_url(address)
media_type = "stream" if follow else "file"
options = GetLogOptions(
@ -841,17 +1168,36 @@ def get_log(
def list_logs(
api_server_url: str = None,
node_id: str = None,
node_ip: str = None,
glob_filter: str = None,
address: Optional[str] = None,
node_id: Optional[str] = None,
node_ip: Optional[str] = None,
glob_filter: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
) -> Dict[str, List[str]]:
if api_server_url is None:
assert ray.is_initialized()
api_server_url = (
f"http://{ray._private.worker.global_worker.node.address_info['webui_url']}"
)
"""Listing log files available.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If not specified, it will be retrieved from the initialized ray cluster.
node_id: Id of the node containing the logs .
node_ip: Ip of the node containing the logs. (At least one of the node_id and
node_ip have to be supplied when identifying a node).
glob_filter: Name of the file (relative to the ray log directory) to be
retrieved. E.g. `glob_filter="*worker*"` for all worker logs.
actor_id: Id of the actor if getting logs from an actor.
timeout: Max timeout for requests made when getting the logs.
_interval: The interval in secs to print new logs when `follow=True`.
Return:
A dictionary where the keys are log groups (e.g. gcs, raylet, worker), and
values are list of log filenames.
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
api_server_url = ray_address_to_api_server_url(address)
if not glob_filter:
glob_filter = "*"
@ -885,11 +1231,29 @@ Summary APIs
def summarize_tasks(
address: str = None,
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> Dict:
"""Summarize the tasks in cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout for requests made when getting the states.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Return:
Dictionarified :ref:`TaskSummaries <state-api-schema-task-summaries>`
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>`
if the CLI is failed to query the data.
"""
return StateApiClient(address=address).summary(
SummaryResource.TASKS,
options=SummaryApiOptions(timeout=timeout),
@ -899,11 +1263,29 @@ def summarize_tasks(
def summarize_actors(
address: str = None,
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> Dict:
"""Summarize the actors in cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout for requests made when getting the states.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Return:
Dictionarified :ref:`ActorSummaries <state-api-schema-actor-summaries>`
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).summary(
SummaryResource.ACTORS,
options=SummaryApiOptions(timeout=timeout),
@ -913,11 +1295,29 @@ def summarize_actors(
def summarize_objects(
address: str = None,
address: Optional[str] = None,
timeout: int = DEFAULT_RPC_TIMEOUT,
raise_on_missing_output: bool = True,
_explain: bool = False,
):
) -> Dict:
"""Summarize the objects in cluster.
Args:
address: Ray bootstrap address, could be `auto`, `localhost:6379`.
If None, it will be resolved automatically from an initialized ray.
timeout: Max timeout for requests made when getting the states.
raise_on_missing_output: When True, exceptions will be raised if
there is missing data due to truncation/data source unavailable.
_explain: Print the API information such as API latency or
failed query information.
Return:
Dictionarified :ref:`ObjectSummaries <state-api-schema-object-summaries>`
Raises:
Exceptions: :ref:`RayStateApiException <state-api-exceptions>` if the CLI
failed to query the data.
"""
return StateApiClient(address=address).summary(
SummaryResource.OBJECTS,
options=SummaryApiOptions(timeout=timeout),

View file

@ -4,6 +4,10 @@ from dataclasses import dataclass, field, fields
from enum import Enum, unique
from typing import Dict, List, Optional, Set, Tuple, Union
import ray
import ray._private.ray_constants as ray_constants
import ray._private.services as services
from ray._private.gcs_utils import GcsClient
from ray._private.ray_constants import env_integer
from ray.core.generated.common_pb2 import TaskType
from ray.dashboard.modules.job.common import JobInfo
@ -897,3 +901,41 @@ def resource_to_schema(resource: StateResource) -> StateSchema:
return WorkerState
else:
assert False, "Unreachable"
def ray_address_to_api_server_url(address: Optional[str]) -> str:
"""Parse a ray cluster bootstrap address into API server URL.
When an address is provided, it will be used to query GCS for
API server address from GCS.
When an address is not provided, it will first try to auto-detect
a running ray instance, or look for local GCS process.
Args:
address: Ray cluster bootstrap address. Could also be `auto`.
Return:
API server HTTP URL.
"""
address = services.canonicalize_bootstrap_address_or_die(address)
gcs_client = GcsClient(address=address, nums_reconnect_retry=0)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
api_server_url = ray._private.utils.internal_kv_get_with_retry(
gcs_client,
ray_constants.DASHBOARD_ADDRESS,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
num_retries=20,
)
if api_server_url is None:
raise ValueError(
(
"Couldn't obtain the API server address from GCS. It is likely that "
"the GCS server is down. Check gcs_server.[out | err] to see if it is "
"still alive."
)
)
api_server_url = f"http://{api_server_url.decode()}"
return api_server_url

View file

@ -7,10 +7,6 @@ from typing import Dict, List, Optional, Tuple
import click
import yaml
import ray
import ray._private.ray_constants as ray_constants
import ray._private.services as services
from ray._private.gcs_utils import GcsClient
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.experimental.state.api import (
StateApiClient,
@ -112,30 +108,6 @@ def _get_available_resources(
]
def get_api_server_url() -> str:
address = services.canonicalize_bootstrap_address_or_die(None)
gcs_client = GcsClient(address=address, nums_reconnect_retry=0)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
api_server_url = ray._private.utils.internal_kv_get_with_retry(
gcs_client,
ray_constants.DASHBOARD_ADDRESS,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
num_retries=20,
)
if api_server_url is None:
raise ValueError(
(
"Couldn't obtain the API server address from GCS. It is likely that "
"the GCS server is down. Check gcs_server.[out | err] to see if it is "
"still alive."
)
)
api_server_url = f"http://{api_server_url.decode()}"
return api_server_url
def get_table_output(state_data: List, schema: StateSchema) -> str:
"""Display the table output.
@ -302,7 +274,7 @@ def format_get_api_output(
id: str,
*,
schema: StateSchema,
format: AvailableFormat = AvailableFormat.DEFAULT,
format: AvailableFormat = AvailableFormat.YAML,
) -> str:
if not state_data or len(state_data) == 0:
return f"Resource with id={id} not found in the cluster."
@ -345,9 +317,6 @@ address_option = click.option(
)
# TODO(rickyyx): Once we have other APIs stablized, we should refactor them to
# reuse some of the options, e.g. `--address`.
# list/get/summary could all go under a single command group for options sharing.
@click.command()
@click.argument(
"resource",
@ -364,7 +333,7 @@ address_option = click.option(
)
@address_option
@timeout_option
def get(
def ray_get(
resource: str,
id: str,
address: Optional[str],
@ -394,7 +363,7 @@ def get(
```
The API queries one or more components from the cluster to obtain the data.
The returned state snanpshot could be stale, and it is not guaranteed to return
The returned state snapshot could be stale, and it is not guaranteed to return
the live data.
Args:
@ -408,18 +377,10 @@ def get(
# All resource names use '_' rather than '-'. But users options have '-'
resource = StateResource(resource.replace("-", "_"))
# Get the state API server address from ray if not provided by user
address = address if address else get_api_server_url()
# Create the State API server and put it into context
logger.debug(f"Create StateApiClient at {address}...")
client = StateApiClient(
address=address,
)
options = GetApiOptions(
timeout=timeout,
)
logger.debug(f"Create StateApiClient to ray instance at: {address}...")
client = StateApiClient(address=address)
options = GetApiOptions(timeout=timeout)
# If errors occur, exceptions will be thrown.
data = client.get(
@ -478,7 +439,7 @@ def get(
)
@timeout_option
@address_option
def list(
def ray_list(
resource: str,
format: str,
filter: List[str],
@ -522,7 +483,7 @@ def list(
ray list actors --format yaml
```
List actors with details. When --detail is specifed, it might query
List actors with details. When --detail is specified, it might query
more data sources to obtain data in details.
```
@ -530,13 +491,13 @@ def list(
```
The API queries one or more components from the cluster to obtain the data.
The returned state snanpshot could be stale, and it is not guaranteed to return
The returned state snapshot could be stale, and it is not guaranteed to return
the live data.
The API can return partial or missing output upon the following scenarios.
- When the API queries more than 1 component, if some of them fail,
the API will return the partial result (with a suppressable warning).
the API will return the partial result (with a suppressible warning).
- When the API returns too many entries, the API
will truncate the output. Currently, truncated data cannot be
selected by users.
@ -553,9 +514,7 @@ def list(
format = AvailableFormat(format)
# Create the State API server and put it into context
client = StateApiClient(
address=address if address else get_api_server_url(),
)
client = StateApiClient(address=address)
filter = [_parse_filter(f) for f in filter]
@ -592,8 +551,7 @@ def list(
@click.pass_context
def summary_state_cli_group(ctx):
"""Return the summarized information of a given resource."""
ctx.ensure_object(dict)
ctx.obj["api_server_url"] = get_api_server_url()
pass
@summary_state_cli_group.command(name="tasks")
@ -613,7 +571,6 @@ def task_summary(ctx, timeout: float, address: str):
:ref:`RayStateApiException <state-api-exceptions>`
if the CLI is failed to query the data.
"""
address = address or ctx.obj["api_server_url"]
print(
format_summary_output(
summarize_tasks(
@ -645,7 +602,6 @@ def actor_summary(ctx, timeout: float, address: str):
:ref:`RayStateApiException <state-api-exceptions>`
if the CLI is failed to query the data.
"""
address = address or ctx.obj["api_server_url"]
print(
format_summary_output(
summarize_actors(
@ -696,7 +652,6 @@ def object_summary(ctx, timeout: float, address: str):
:ref:`RayStateApiException <state-api-exceptions>`
if the CLI is failed to query the data.
"""
address = address or ctx.obj["api_server_url"]
print(
format_object_summary_output(
summarize_objects(

View file

@ -48,9 +48,8 @@ from ray.experimental.state.common import DEFAULT_RPC_TIMEOUT, DEFAULT_LOG_LIMIT
from ray.util.annotations import PublicAPI
from ray.experimental.state.state_cli import (
get as state_cli_get,
list as state_cli_list,
get_api_server_url,
ray_get,
ray_list,
output_with_format,
summary_state_cli_group,
AvailableFormat,
@ -2034,6 +2033,14 @@ def local_dump(
"this option will be ignored."
),
)
@click.option(
"--address",
default=None,
help=(
"The address of Ray API server. If not provided, it will be configured "
"automatically from querying the GCS server."
),
)
def ray_logs(
glob_filter,
node_ip: str,
@ -2045,6 +2052,7 @@ def ray_logs(
tail: int,
interval: float,
timeout: int,
address: Optional[str],
):
"""Print the log file that matches the GLOB_FILTER.
@ -2089,13 +2097,9 @@ def ray_logs(
if task_id is not None:
raise NotImplementedError("--task-id is not yet supported")
api_server_url = get_api_server_url()
# If both id & ip are not provided, choose a head node as a default.
if node_id is None and node_ip is None:
# TODO(swang): This command should also support
# passing --address or RAY_ADDRESS, like others.
address = ray._private.services.canonicalize_bootstrap_address_or_die(None)
address = ray._private.services.canonicalize_bootstrap_address_or_die(address)
node_ip = address.split(":")[0]
filename = None
@ -2104,7 +2108,7 @@ def ray_logs(
# If there's no unique match, try listing logs based on the glob filter.
if not match_unique:
logs = list_logs(
api_server_url=api_server_url,
address=address,
node_id=node_id,
node_ip=node_ip,
glob_filter=glob_filter,
@ -2139,7 +2143,7 @@ def ray_logs(
)
for chunk in get_log(
api_server_url=api_server_url,
address=address,
node_id=node_id,
node_ip=node_ip,
filename=filename,
@ -2555,8 +2559,8 @@ cli.add_command(cpp)
cli.add_command(disable_usage_stats)
cli.add_command(enable_usage_stats)
add_command_alias(ray_logs, name="logs", hidden=False)
cli.add_command(state_cli_list)
cli.add_command(state_cli_get)
cli.add_command(ray_list, name="list")
cli.add_command(ray_get, name="get")
add_command_alias(summary_state_cli_group, name="summary", hidden=False)
try:

View file

@ -82,6 +82,7 @@ from ray.experimental.state.common import (
TaskState,
WorkerState,
StateSchema,
ray_address_to_api_server_url,
state_column,
)
from ray.experimental.state.exception import DataSourceUnavailable, RayStateApiException
@ -91,8 +92,8 @@ from ray.experimental.state.state_cli import (
_parse_filter,
summary_state_cli_group,
)
from ray.experimental.state.state_cli import get as cli_get
from ray.experimental.state.state_cli import list as cli_list
from ray.experimental.state.state_cli import ray_get
from ray.experimental.state.state_cli import ray_list
from ray.experimental.state.state_manager import IdToIpMap, StateDataSourceClient
from ray.job_submission import JobSubmissionClient
from ray.runtime_env import RuntimeEnv
@ -262,6 +263,25 @@ def create_api_options(
)
def test_ray_address_to_api_server_url(shutdown_only):
ctx = ray.init()
api_server_url = f'http://{ctx.address_info["webui_url"]}'
address = ctx.address_info["address"]
gcs_address = ctx.address_info["gcs_address"]
# None should auto detect current ray address
assert api_server_url == ray_address_to_api_server_url(None)
# 'auto' should get
assert api_server_url == ray_address_to_api_server_url("auto")
# ray address
assert api_server_url == ray_address_to_api_server_url(address)
# explicit head node gcs address
assert api_server_url == ray_address_to_api_server_url(gcs_address)
# localhost string
gcs_port = gcs_address.split(":")[1]
assert api_server_url == ray_address_to_api_server_url(f"localhost:{gcs_port}")
def test_state_schema():
@dataclass
class TestSchema(StateSchema):
@ -1459,29 +1479,29 @@ def test_cli_apis_sanity_check(ray_start_cluster):
return exit_code_correct and substring_matched
wait_for_condition(
lambda: verify_output(cli_list, ["actors"], ["Stats:", "Table:", "ACTOR_ID"])
lambda: verify_output(ray_list, ["actors"], ["Stats:", "Table:", "ACTOR_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["workers"], ["Stats:", "Table:", "WORKER_ID"])
lambda: verify_output(ray_list, ["workers"], ["Stats:", "Table:", "WORKER_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["nodes"], ["Stats:", "Table:", "NODE_ID"])
lambda: verify_output(ray_list, ["nodes"], ["Stats:", "Table:", "NODE_ID"])
)
wait_for_condition(
lambda: verify_output(
cli_list, ["placement-groups"], ["Stats:", "Table:", "PLACEMENT_GROUP_ID"]
ray_list, ["placement-groups"], ["Stats:", "Table:", "PLACEMENT_GROUP_ID"]
)
)
wait_for_condition(lambda: verify_output(cli_list, ["jobs"], ["raysubmit"]))
wait_for_condition(lambda: verify_output(ray_list, ["jobs"], ["raysubmit"]))
wait_for_condition(
lambda: verify_output(cli_list, ["tasks"], ["Stats:", "Table:", "TASK_ID"])
lambda: verify_output(ray_list, ["tasks"], ["Stats:", "Table:", "TASK_ID"])
)
wait_for_condition(
lambda: verify_output(cli_list, ["objects"], ["Stats:", "Table:", "OBJECT_ID"])
lambda: verify_output(ray_list, ["objects"], ["Stats:", "Table:", "OBJECT_ID"])
)
wait_for_condition(
lambda: verify_output(
cli_list, ["runtime-envs"], ["Stats:", "Table:", "RUNTIME_ENV"]
ray_list, ["runtime-envs"], ["Stats:", "Table:", "RUNTIME_ENV"]
)
)
@ -1489,7 +1509,7 @@ def test_cli_apis_sanity_check(ray_start_cluster):
nodes = ray.nodes()
wait_for_condition(
lambda: verify_output(
cli_get, ["nodes", nodes[0]["NodeID"]], ["node_id", nodes[0]["NodeID"]]
ray_get, ["nodes", nodes[0]["NodeID"]], ["node_id", nodes[0]["NodeID"]]
)
)
# Test get workers by id
@ -1497,13 +1517,13 @@ def test_cli_apis_sanity_check(ray_start_cluster):
assert len(workers) > 0
worker_id = list(workers.keys())[0]
wait_for_condition(
lambda: verify_output(cli_get, ["workers", worker_id], ["worker_id", worker_id])
lambda: verify_output(ray_get, ["workers", worker_id], ["worker_id", worker_id])
)
# Test get actors by id
wait_for_condition(
lambda: verify_output(
cli_get,
ray_get,
["actors", actor._actor_id.hex()],
["actor_id", actor._actor_id.hex()],
)
@ -1512,7 +1532,7 @@ def test_cli_apis_sanity_check(ray_start_cluster):
# Test get placement groups by id
wait_for_condition(
lambda: verify_output(
cli_get,
ray_get,
["placement-groups", pg.id.hex()],
["placement_group_id", pg.id.hex()],
)
@ -1520,7 +1540,21 @@ def test_cli_apis_sanity_check(ray_start_cluster):
# Test get objects by id
wait_for_condition(
lambda: verify_output(cli_get, ["objects", obj.hex()], ["object_id", obj.hex()])
lambda: verify_output(ray_get, ["objects", obj.hex()], ["object_id", obj.hex()])
)
# Test address flag auto detection
wait_for_condition(
lambda: verify_output(
ray_get,
["objects", obj.hex(), "--address", "auto"],
["object_id", obj.hex()],
)
)
wait_for_condition(
lambda: verify_output(
ray_list, ["tasks", "--address", "auto"], ["Stats:", "Table:", "TASK_ID"]
)
)
# TODO(rickyyx:alpha-obs):
@ -1917,7 +1951,7 @@ def test_network_failure(shutdown_only):
# Kill raylet so that list_tasks will have network error on querying raylets.
ray._private.worker._global_node.kill_raylet()
with pytest.raises(RayStateApiException):
with pytest.raises(ConnectionError):
list_tasks(_explain=True)
@ -2125,12 +2159,12 @@ def test_filter(shutdown_only):
dead_actor_id = list_actors(filters=[("state", "=", "DEAD")])[0]["actor_id"]
alive_actor_id = list_actors(filters=[("state", "=", "ALIVE")])[0]["actor_id"]
runner = CliRunner()
result = runner.invoke(cli_list, ["actors", "--filter", "state=DEAD"])
result = runner.invoke(ray_list, ["actors", "--filter", "state=DEAD"])
assert result.exit_code == 0
assert dead_actor_id in result.output
assert alive_actor_id not in result.output
result = runner.invoke(cli_list, ["actors", "--filter", "state!=DEAD"])
result = runner.invoke(ray_list, ["actors", "--filter", "state!=DEAD"])
assert result.exit_code == 0
assert dead_actor_id not in result.output
assert alive_actor_id in result.output
@ -2154,7 +2188,7 @@ def test_data_truncate(shutdown_only, monkeypatch):
]
runner = CliRunner()
with pytest.warns(UserWarning) as record:
result = runner.invoke(cli_list, ["placement-groups"])
result = runner.invoke(ray_list, ["placement-groups"])
assert (
f"{max_limit_data_source} ({max_limit_data_source + 1} total "
"from the cluster) placement_groups are retrieved from the "
@ -2183,7 +2217,7 @@ def test_data_truncate(shutdown_only, monkeypatch):
ray.get(a.ready.remote())
with pytest.warns(None) as record:
result = runner.invoke(cli_list, ["actors"])
result = runner.invoke(ray_list, ["actors"])
assert len(record) == 0
@ -2208,7 +2242,7 @@ def test_detail(shutdown_only):
Test CLI
"""
runner = CliRunner()
result = runner.invoke(cli_list, ["actors", "--detail"])
result = runner.invoke(ray_list, ["actors", "--detail"])
print(result.output)
assert result.exit_code == 0
# The column for --detail should be in the output.
@ -2228,19 +2262,19 @@ def test_detail(shutdown_only):
)
# When the format is given, it should respect that formatting.
result = runner.invoke(cli_list, ["actors", "--detail", "--format=table"])
result = runner.invoke(ray_list, ["actors", "--detail", "--format=table"])
assert result.exit_code == 0
with pytest.raises(yaml.YAMLError):
yaml.load(result.output, Loader=yaml.FullLoader)
def _try_state_query_expect_rate_limit(api_func, res_q, start_q=None):
def _try_state_query_expect_rate_limit(api_func, res_q, start_q=None, **kwargs):
"""Utility functions for rate limit related e2e tests below"""
try:
# Indicate start of the process
if start_q is not None:
start_q.put(1)
api_func()
api_func(**kwargs)
except RayStateApiException as e:
# Other exceptions will be thrown
if "Max number of in-progress requests" in str(e):
@ -2259,19 +2293,18 @@ def _try_state_query_expect_rate_limit(api_func, res_q, start_q=None):
)
def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
import queue
import multiprocessing as mp
import os
import signal
import threading
# Set environment
with monkeypatch.context() as m:
m.setenv("RAY_STATE_SERVER_MAX_HTTP_REQUEST", "3")
# These make list_nodes, list_workers, list_actors never return in 20secs
m.setenv(
"RAY_testing_asio_delay_us",
(
"NodeManagerService.grpc_server.GetTasksInfo=10000000:10000000,"
"WorkerInfoGcsService.grpc_server.GetAllWorkerInfo=10000000:10000000,"
"ActorInfoGcsService.grpc_server.GetAllActorInfo=10000000:10000000"
"NodeManagerService.grpc_server.GetTasksInfo=20000000:20000000,"
"WorkerInfoGcsService.grpc_server.GetAllWorkerInfo=20000000:20000000,"
"ActorInfoGcsService.grpc_server.GetAllActorInfo=20000000:20000000"
),
)
@ -2302,32 +2335,35 @@ def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
wait_for_condition(lambda: len(list_objects()) > 0)
# Running 3 slow apis to exhaust the limits
res_q = mp.Queue()
start_q = mp.Queue() # not used
res_q = queue.Queue()
start_q = queue.Queue() # used for sync
procs = [
mp.Process(
threading.Thread(
target=_try_state_query_expect_rate_limit,
args=(
list_workers,
res_q,
start_q,
),
kwargs={"timeout": 6},
),
mp.Process(
threading.Thread(
target=_try_state_query_expect_rate_limit,
args=(
list_tasks,
res_q,
start_q,
),
kwargs={"timeout": 6},
),
mp.Process(
threading.Thread(
target=_try_state_query_expect_rate_limit,
args=(
list_actors,
res_q,
start_q,
),
kwargs={"timeout": 6},
),
]
@ -2353,57 +2389,14 @@ def test_state_api_rate_limit_with_failure(monkeypatch, shutdown_only):
e
), f"Expect an exception raised due to rate limit, but have {str(e)}"
# Kill the 3 slow running threads
[os.kill(p.pid, signal.SIGKILL) for p in procs]
[p.join() for p in procs]
for p in procs:
assert not p.is_alive(), "Slow queries should be killed"
# Consecutive APIs should be successful after the previous delay ones timeout
def verify():
assert len(list_objects()) > 0, "non-delay APIs should be successful"
"after previous ones timeout"
# Running another 3 should return no error
q = mp.Queue()
procs = [
mp.Process(
target=_try_state_query_expect_rate_limit,
args=(
list_objects,
q,
),
),
mp.Process(
target=_try_state_query_expect_rate_limit,
args=(
list_runtime_envs,
q,
),
),
mp.Process(
target=_try_state_query_expect_rate_limit,
args=(
list_placement_groups,
q,
),
),
]
return True
[p.start() for p in procs]
max_concurrent_reqs_error = 0
for _ in range(len(procs)):
try:
res = q.get(timeout=10)
if isinstance(res, Exception):
assert False, f"State API error: {res}"
elif isinstance(res, int):
max_concurrent_reqs_error += res
else:
raise ValueError(res)
except queue.Empty:
assert False, "Failed to get some results from a subprocess"
assert max_concurrent_reqs_error == 0, "All requests should be successful"
[p.join(5) for p in procs]
for proc in procs:
assert not proc.is_alive(), "All processes should exit"
wait_for_condition(verify)
@pytest.mark.skipif(
@ -2520,7 +2513,7 @@ def test_callsite_warning(callsite_enabled, monkeypatch, shutdown_only):
wait_for_condition(lambda: len(list_objects()) > 0)
with pytest.warns(None) as record:
result = runner.invoke(cli_list, ["objects"])
result = runner.invoke(ray_list, ["objects"])
assert result.exit_code == 0
if callsite_enabled:
@ -2586,7 +2579,7 @@ def test_raise_on_missing_output_partial_failures(monkeypatch, ray_start_cluster
# Verify when CLI is used, exceptions are not raised.
with pytest.warns(None) as record:
result = runner.invoke(cli_list, ["tasks", "--timeout=3"])
result = runner.invoke(ray_list, ["tasks", "--timeout=3"])
assert len(record) == 1
assert result.exit_code == 0
@ -2647,7 +2640,7 @@ def test_raise_on_missing_output_truncation(monkeypatch, shutdown_only):
# Verify when CLI is used, exceptions are not raised.
with pytest.warns(None) as record:
result = runner.invoke(cli_list, ["tasks", "--timeout=3"])
result = runner.invoke(ray_list, ["tasks", "--timeout=3"])
assert len(record) == 1
assert result.exit_code == 0
@ -2668,7 +2661,7 @@ def test_get_id_not_found(shutdown_only):
"""
ray.init()
runner = CliRunner()
result = runner.invoke(cli_get, ["actors", "1234"])
result = runner.invoke(ray_get, ["actors", "1234"])
assert result.exit_code == 0
assert "Resource with id=1234 not found in the cluster." in result.output