[Job submission] Add list_jobs API (#22679)

Adds an API to the REST server, the SDK, and the CLI for listing all jobs that have been submitted, along with their information.

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Archit Kulkarni 2022-03-01 19:27:09 -08:00 committed by GitHub
parent d97afb9e60
commit 1752f17c6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 241 additions and 29 deletions

View file

@ -1,6 +1,7 @@
import asyncio
import json
import os
import pprint
from subprocess import list2cmdline
import time
from typing import Optional, Tuple
@ -314,4 +315,31 @@ def logs(address: Optional[str], job_id: str, follow: bool):
"for this feature."
)
else:
print(client.get_job_logs(job_id), end="")
# Set no_format to True because the logs may have unescaped "{" and "}"
# and the CLILogger calls str.format().
cli_logger.print(client.get_job_logs(job_id), end="", no_format=True)
@job_cli_group.command()
@click.option(
"--address",
type=str,
default=None,
required=False,
help=(
"Address of the Ray cluster to connect to. Can also be specified "
"using the RAY_ADDRESS environment variable."
),
)
@add_click_logging_options
@PublicAPI(stability="beta")
def list(address: Optional[str]):
"""Lists all running jobs and their information.
Example:
>>> ray job list
"""
client = _get_sdk_client(address)
# Set no_format to True because the logs may have unescaped "{" and "}"
# and the CLILogger calls str.format().
cli_logger.print(pprint.pformat(client.list_jobs()), no_format=True)

View file

@ -177,6 +177,20 @@ class JobHead(dashboard_utils.DashboardHeadModule):
text=json.dumps(dataclasses.asdict(data)), content_type="application/json"
)
@routes.get("/api/jobs/")
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
async def list_jobs(self, req: Request) -> Response:
data: dict[str, JobInfo] = self._job_manager.list_jobs()
return Response(
text=json.dumps(
{
job_id: dataclasses.asdict(job_info)
for job_id, job_info in data.items()
}
),
content_type="application/json",
)
@routes.get("/api/jobs/{job_id}/logs")
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
async def get_job_logs(self, req: Request) -> Response:

View file

@ -489,6 +489,10 @@ class JobManager:
"""Get latest info of a job."""
return self._job_info_client.get_info(job_id)
def list_jobs(self) -> Dict[str, JobInfo]:
"""Get info for all jobs."""
return self._job_info_client.get_all_jobs()
def get_job_logs(self, job_id: str) -> str:
"""Get all logs produced by a job."""
return self._log_client.get_logs(job_id)

View file

@ -26,6 +26,7 @@ from ray.dashboard.modules.job.common import (
JobLogsResponse,
uri_to_http_components,
)
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
from ray.util.annotations import PublicAPI
from ray.client_builder import _split_address
@ -189,7 +190,7 @@ class JobSubmissionClient:
*,
data: Optional[bytes] = None,
json_data: Optional[dict] = None,
) -> Optional[object]:
) -> "requests.Response":
url = self._address + endpoint
logger.debug(f"Sending request to {url} with json data: {json_data or {}}.")
return requests.request(
@ -335,6 +336,20 @@ class JobSubmissionClient:
else:
self._raise_error(r)
@PublicAPI(stability="beta")
def list_jobs(self) -> Dict[str, JobInfo]:
r = self._do_request("GET", "/api/jobs/")
if r.status_code == 200:
jobs_info_json = r.json()
jobs_info = {
job_id: JobInfo(**job_info_json)
for job_id, job_info_json in jobs_info_json.items()
}
return jobs_info
else:
self._raise_error(r)
@PublicAPI(stability="beta")
def get_job_status(self, job_id: str) -> JobStatus:
return self.get_job_info(job_id).status

View file

@ -77,25 +77,50 @@ def set_env_var(key: str, val: Optional[str] = None):
os.environ[key] = old_val
class TestSubmit:
def test_address(self, mock_sdk_client):
runner = CliRunner()
def _job_cli_group_test_address(mock_sdk_client, cmd, *args):
runner = CliRunner()
# Test passing address via command line.
result = runner.invoke(
job_cli_group, ["submit", "--address=arg_addr", "--", "echo hello"]
)
assert mock_sdk_client.called_with("arg_addr")
# Test passing address via command line.
result = runner.invoke(job_cli_group, [cmd, "--address=arg_addr", *args])
assert mock_sdk_client.called_with("arg_addr")
assert result.exit_code == 0
# Test passing address via env var.
with set_env_var("RAY_ADDRESS", "env_addr"):
result = runner.invoke(job_cli_group, [cmd, *args])
assert result.exit_code == 0
# Test passing address via env var.
assert mock_sdk_client.called_with("env_addr")
# Test passing no address.
result = runner.invoke(job_cli_group, [cmd, *args])
assert result.exit_code == 1
assert "Address must be specified" in str(result.exception)
class TestList:
def test_address(self, mock_sdk_client):
_job_cli_group_test_address(mock_sdk_client, "list")
def test_list(self, mock_sdk_client):
runner = CliRunner()
with set_env_var("RAY_ADDRESS", "env_addr"):
result = runner.invoke(
job_cli_group,
["list"],
)
assert result.exit_code == 0
result = runner.invoke(job_cli_group, ["submit", "--", "echo hello"])
assert result.exit_code == 0
assert mock_sdk_client.called_with("env_addr")
# Test passing no address.
result = runner.invoke(job_cli_group, ["submit", "--", "echo hello"])
assert result.exit_code == 1
assert "Address must be specified" in str(result.exception)
result = runner.invoke(
job_cli_group,
["list"],
)
assert result.exit_code == 0
class TestSubmit:
def test_address(self, mock_sdk_client):
_job_cli_group_test_address(mock_sdk_client, "submit", "--", "echo", "hello")
def test_working_dir(self, mock_sdk_client):
runner = CliRunner()

View file

@ -1,4 +1,5 @@
from contextlib import contextmanager
import json
import os
import logging
import sys
@ -157,6 +158,26 @@ class TestJobStop:
assert f"Job '{job_id}' was stopped" not in stdout
class TestJobList:
def test_empty(self, ray_start_stop):
stdout, _ = _run_cmd("ray job list")
assert "{}" in stdout
def test_list(self, ray_start_stop):
_run_cmd("ray job submit --job-id='hello_id' -- echo hello")
runtime_env = {"env_vars": {"TEST": "123"}}
_run_cmd(
"ray job submit --job-id='hi_id' "
f"--runtime-env-json='{json.dumps(runtime_env)}' -- echo hi"
)
stdout, _ = _run_cmd("ray job list")
assert "JobInfo" in stdout
assert "123" in stdout
assert "hello_id" in stdout
assert "hi_id" in stdout
def test_quote_escaping(ray_start_stop):
cmd = "echo \"hello 'world'\""
job_id = "test_quote_escaping"

View file

@ -1,6 +1,7 @@
import logging
from pathlib import Path
import sys
import json
import tempfile
from typing import Optional
@ -9,7 +10,7 @@ from unittest.mock import patch
import ray
from ray.job_submission import JobSubmissionClient, JobStatus
from ray.dashboard.modules.job.common import CURRENT_VERSION
from ray.dashboard.modules.job.common import CURRENT_VERSION, JobInfo
from ray.dashboard.modules.job.sdk import (
ClusterInfo,
parse_cluster_info,
@ -33,12 +34,61 @@ def headers():
@pytest.fixture(scope="module")
def job_sdk_client(headers):
with _ray_start(include_dashboard=True, num_cpus=1) as address_info:
address = address_info["webui_url"]
with _ray_start(include_dashboard=True, num_cpus=1) as ctx:
address = ctx.address_info["webui_url"]
assert wait_until_server_available(address)
yield JobSubmissionClient(format_web_url(address), headers=headers)
# NOTE(architkulkarni): This test must be run first in order for the job
# submission history of the shared Ray runtime to be empty.
@pytest.mark.parametrize("use_sdk", [True, False])
def test_list_jobs_empty(job_sdk_client: JobSubmissionClient, use_sdk: bool):
client = job_sdk_client
if use_sdk:
assert client.list_jobs() == dict()
else:
r = client._do_request(
"GET",
"/api/jobs/",
)
assert r.status_code == 200
assert json.loads(r.text) == dict()
@pytest.mark.parametrize("use_sdk", [True, False])
def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool):
client = job_sdk_client
runtime_env = {"env_vars": {"TEST": "123"}}
metadata = {"foo": "bar"}
job_id = client.submit_job(
entrypoint="echo hello", runtime_env=runtime_env, metadata=metadata
)
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
if use_sdk:
info: JobInfo = client.list_jobs()[job_id]
else:
r = client._do_request(
"GET",
"/api/jobs/",
)
assert r.status_code == 200
jobs_info_json = json.loads(r.text)
info_json = jobs_info_json[job_id]
info = JobInfo(**info_json)
assert info.status == JobStatus.SUCCEEDED
assert info.message is not None
assert info.end_time >= info.start_time
assert info.runtime_env == runtime_env
assert info.metadata == metadata
def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
if status == JobStatus.FAILED:

View file

@ -20,7 +20,7 @@ from ray._private.test_utils import SignalActor, async_wait_for_condition
TEST_NAMESPACE = "jobs_test_namespace"
@pytest.fixture(scope="session")
@pytest.fixture(scope="module")
def shared_ray_instance():
# Remove ray address for test ray cluster in case we have
# lingering RAY_ADDRESS="http://127.0.0.1:8265" from previous local job
@ -118,6 +118,39 @@ def test_generate_job_id():
assert len(ids) == 10000
# NOTE(architkulkarni): This test must be run first in order for the job
# submission history of the shared Ray runtime to be empty.
def test_list_jobs_empty(job_manager: JobManager):
assert job_manager.list_jobs() == dict()
@pytest.mark.asyncio
async def test_list_jobs(job_manager: JobManager):
job_manager.submit_job(entrypoint="echo hi", job_id="1")
runtime_env = {"env_vars": {"TEST": "123"}}
metadata = {"foo": "bar"}
job_manager.submit_job(
entrypoint="echo hello", job_id="2", runtime_env=runtime_env, metadata=metadata
)
await async_wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id="1"
)
await async_wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id="2"
)
jobs_info = job_manager.list_jobs()
assert "1" in jobs_info
assert jobs_info["1"].status == JobStatus.SUCCEEDED
assert "2" in jobs_info
assert jobs_info["2"].status == JobStatus.SUCCEEDED
assert jobs_info["2"].message is not None
assert jobs_info["2"].end_time >= jobs_info["2"].start_time
assert jobs_info["2"].runtime_env == runtime_env
assert jobs_info["2"].metadata == metadata
@pytest.mark.asyncio
async def test_pass_job_id(job_manager):
job_id = "my_custom_id"

View file

@ -113,7 +113,7 @@ Now you may run the following CLI commands:
.. code-block::
ray job submit --runtime-env-json='{"working_dir": "./", "pip": ["requests==2.26.0"]}' -- "python script.py"
ray job submit --runtime-env-json='{"working_dir": "./", "pip": ["requests==2.26.0"]}' -- python script.py
2021-12-01 23:04:52,672 INFO cli.py:25 -- Creating JobSubmissionClient at address: http://127.0.0.1:8265
2021-12-01 23:04:52,809 INFO sdk.py:144 -- Uploading package gcs://_ray_pkg_bbcc8ca7e83b4dc0.zip.
2021-12-01 23:04:52,810 INFO packaging.py:352 -- Creating a file package for local directory './'.
@ -143,6 +143,10 @@ Now you may run the following CLI commands:
5
2.26.0
ray job list
{'raysubmit_AYhLMgDJ6XBQFvFP': JobInfo(status='SUCCEEDED', message='Job finished successfully.', error_type=None, start_time=1645908622, end_time=1645908623, metadata={}, runtime_env={}),
'raysubmit_su9UcdUviUZ86b1t': JobInfo(status='SUCCEEDED', message='Job finished successfully.', error_type=None, start_time=1645908669, end_time=1645908670, metadata={}, runtime_env={})}
Using the CLI with the Ray Cluster Launcher
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -252,6 +256,8 @@ A submitted Job can be stopped by the user before it finishes executing.
wait_until_finish(job_id)
logs = client.get_job_logs(job_id)
To get information about all jobs, call ``client.list_jobs()``. This returns a `Dict[str, JobInfo]` object mapping Job IDs to their information.
REST API
------------
@ -300,6 +306,16 @@ Under the hood, both the Job Client and the CLI make HTTP calls to the job serve
rst = json.loads(resp.text)
logs = rst["logs"]
**List all jobs**
.. code-block:: python
resp = requests.get(
"http://127.0.0.1:8265/api/jobs/"
)
print(resp.json())
# {"job_id": {"metadata": ..., "status": ..., "message": ...}, ...}
Job Submission Architecture
----------------------------

View file

@ -33,6 +33,12 @@ Job Submission CLI
:prog: ray job logs
:show-nested:
.. _ray-job-list-doc:
.. click:: ray.dashboard.modules.job.cli:list
:prog: ray job list
:show-nested:
.. _ray-job-submission-sdk-ref:
Job Submission SDK

View file

@ -158,9 +158,9 @@ def _external_caller_info():
def _format_msg(
msg: str,
*args: Any,
no_format: bool = None,
_tags: Dict[str, Any] = None,
_numbered: Tuple[str, int, int] = None,
_no_format: bool = None,
**kwargs: Any
):
"""Formats a message for printing.
@ -170,6 +170,12 @@ def _format_msg(
Args:
*args (Any): `.format` arguments for `msg`.
no_format (bool):
If `no_format` is `True`,
`.format` will not be called on the message.
Useful if the output is user-provided or may otherwise
contain an unexpected formatting string (e.g. "{}").
_tags (Dict[str, Any]):
key-value pairs to display at the end of
the message in square brackets.
@ -193,12 +199,6 @@ def _format_msg(
E.g. `_format_msg("hello", _numbered=("[]", 0, 5))`
`[0/5] hello`
_no_format (bool):
If `_no_format` is `True`,
`.format` will not be called on the message.
Useful if the output is user-provided or may otherwise
contain an unexpected formatting string (e.g. "{}").
Returns:
The formatted message.
@ -224,7 +224,7 @@ def _format_msg(
chars, i, n = _numbered
numbering_str = cf.dimmed(chars[0] + str(i) + "/" + str(n) + chars[1]) + " "
if _no_format:
if no_format:
# todo: throw if given args/kwargs?
return numbering_str + msg + tags_str
return numbering_str + msg.format(*args, **kwargs) + tags_str