[Jobs] Add log streaming for jobs (#20976)

Current logs API simply returns a str to unblock development and integration. We should add proper log streaming for better UX and external job manager integration.

Co-authored-by: Sven Mika <sven@anyscale.io>
Co-authored-by: sven1977 <svenmika1977@gmail.com>
Co-authored-by: Ed Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Simon Mo <simon.mo@hey.com>
Co-authored-by: Avnish Narayan <38871737+avnishn@users.noreply.github.com>
Co-authored-by: Jiao Dong <jiaodong@anyscale.com>
This commit is contained in:
Jiao 2021-12-14 17:01:53 -08:00 committed by GitHub
parent 10947c83b3
commit ed34434131
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 847 additions and 191 deletions

View file

@ -1,15 +1,17 @@
import asyncio
import json
import logging
import os
import time
from typing import Optional, Tuple
import yaml
import click
from ray.autoscaler._private.cli_logger import (add_click_logging_options,
cli_logger, cf)
from ray.dashboard.modules.job.common import JobStatus
from ray.dashboard.modules.job.sdk import JobSubmissionClient
logger = logging.getLogger(__name__)
def _get_sdk_client(address: Optional[str],
create_cluster_if_needed: bool = False
@ -22,10 +24,50 @@ def _get_sdk_client(address: Optional[str],
"or RAY_ADDRESS environment variable.")
address = os.environ["RAY_ADDRESS"]
logger.info(f"Creating JobSubmissionClient at address: {address}")
cli_logger.labeled_value("Job submission server address", address)
return JobSubmissionClient(address, create_cluster_if_needed)
def _log_big_success_msg(success_msg):
cli_logger.newline()
cli_logger.success("-" * len(success_msg))
cli_logger.success(success_msg)
cli_logger.success("-" * len(success_msg))
cli_logger.newline()
def _log_big_error_msg(success_msg):
cli_logger.newline()
cli_logger.error("-" * len(success_msg))
cli_logger.error(success_msg)
cli_logger.error("-" * len(success_msg))
cli_logger.newline()
def _log_job_status(client: JobSubmissionClient, job_id: str):
status = client.get_job_status(job_id)
if status.status == JobStatus.SUCCEEDED:
_log_big_success_msg(f"Job '{job_id}' succeeded")
elif status.status == JobStatus.STOPPED:
cli_logger.warning(f"Job '{job_id}' was stopped")
elif status.status == JobStatus.FAILED:
_log_big_error_msg(f"Job '{job_id}' failed")
if status.message is not None:
cli_logger.print(f"Status message: {status.message}")
else:
# Catch-all.
cli_logger.print(f"Status for job '{job_id}': {status.status}")
if status.message is not None:
cli_logger.print(f"Status message: {status.message}")
async def _tail_logs(client: JobSubmissionClient, job_id: str):
async for lines in client.tail_job_logs(job_id):
print(lines, end="")
_log_job_status(client, job_id)
@click.group("job")
def job_cli_group():
pass
@ -68,10 +110,18 @@ def job_cli_group():
"local directory or a remote URI to a .zip file (S3, GS, HTTP). "
"If specified, this overrides the option in --runtime-env."),
)
@click.option(
"--no-wait",
is_flag=True,
type=bool,
default=False,
help="If set, will not stream logs and wait for the job to exit.")
@add_click_logging_options
@click.argument("entrypoint", nargs=-1, required=True, type=click.UNPROCESSED)
def job_submit(address: Optional[str], job_id: Optional[str],
runtime_env: Optional[str], runtime_env_json: Optional[str],
working_dir: Optional[str], entrypoint: Tuple[str]):
working_dir: Optional[str], entrypoint: Tuple[str],
no_wait: bool):
"""Submits a job to be run on the cluster.
Example:
@ -92,9 +142,8 @@ def job_submit(address: Optional[str], job_id: Optional[str],
if working_dir is not None:
if "working_dir" in final_runtime_env:
logger.warning(
"Overriding runtime_env working_dir with --working-dir option."
)
cli_logger.warning(
"Overriding runtime_env working_dir with --working-dir option")
final_runtime_env["working_dir"] = working_dir
@ -102,9 +151,36 @@ def job_submit(address: Optional[str], job_id: Optional[str],
entrypoint=" ".join(entrypoint),
job_id=job_id,
runtime_env=final_runtime_env)
logger.info(f"Job submitted successfully: {job_id}.")
logger.info(
f"Query the status of the job using: `ray job status {job_id}`.")
_log_big_success_msg(f"Job '{job_id}' submitted successfully")
with cli_logger.group("Next steps"):
cli_logger.print("Query the logs of the job:")
with cli_logger.indented():
cli_logger.print(cf.bold(f"ray job logs {job_id}"))
cli_logger.print("Query the status of the job:")
with cli_logger.indented():
cli_logger.print(cf.bold(f"ray job status {job_id}"))
cli_logger.print("Request the job to be stopped:")
with cli_logger.indented():
cli_logger.print(cf.bold(f"ray job stop {job_id}"))
cli_logger.newline()
sdk_version = client.get_version()
# sdk version 0 does not have log streaming
if not no_wait:
if int(sdk_version) > 0:
cli_logger.print("Tailing logs until the job exits "
"(disable with --no-wait):")
asyncio.get_event_loop().run_until_complete(
_tail_logs(client, job_id))
else:
cli_logger.warning(
"Tailing logs is not enabled for job sdk client version "
f"{sdk_version}. Please upgrade your ray to latest version "
"for this feature.")
@job_cli_group.command("status", help="Get the status of a running job.")
@ -116,6 +192,7 @@ def job_submit(address: Optional[str], job_id: Optional[str],
help=("Address of the Ray cluster to connect to. Can also be specified "
"using the RAY_ADDRESS environment variable."))
@click.argument("job-id", type=str)
@add_click_logging_options
def job_status(address: Optional[str], job_id: str):
"""Queries for the current status of a job.
@ -123,10 +200,7 @@ def job_status(address: Optional[str], job_id: str):
>>> ray job status <my_job_id>
"""
client = _get_sdk_client(address)
status = client.get_job_status(job_id)
logger.info(f"Job status for '{job_id}': {status.status}.")
if status.message is not None:
logger.info(status.message)
_log_job_status(client, job_id)
@job_cli_group.command("stop", help="Attempt to stop a running job.")
@ -137,17 +211,41 @@ def job_status(address: Optional[str], job_id: str):
required=False,
help=("Address of the Ray cluster to connect to. Can also be specified "
"using the RAY_ADDRESS environment variable."))
@click.option(
"--no-wait",
is_flag=True,
type=bool,
default=False,
help="If set, will not wait for the job to exit.")
@click.argument("job-id", type=str)
def job_stop(address: Optional[str], job_id: str):
@add_click_logging_options
def job_stop(address: Optional[str], no_wait: bool, job_id: str):
"""Attempts to stop a job.
Example:
>>> ray job stop <my_job_id>
"""
# TODO(edoakes): should we wait for the job to exit afterwards?
client = _get_sdk_client(address)
cli_logger.print(f"Attempting to stop job {job_id}")
client.stop_job(job_id)
if no_wait:
return
else:
cli_logger.print(f"Waiting for job '{job_id}' to exit "
f"(disable with --no-wait):")
while True:
status = client.get_job_status(job_id)
if status.status in {
JobStatus.STOPPED, JobStatus.SUCCEEDED, JobStatus.FAILED
}:
_log_job_status(client, job_id)
break
else:
cli_logger.print(f"Job has not exited yet. Status: {status}")
time.sleep(1)
@job_cli_group.command("logs", help="Get the logs of a running job.")
@click.option(
@ -158,11 +256,31 @@ def job_stop(address: Optional[str], job_id: str):
help=("Address of the Ray cluster to connect to. Can also be specified "
"using the RAY_ADDRESS environment variable."))
@click.argument("job-id", type=str)
def job_logs(address: Optional[str], job_id: str):
@click.option(
"-f",
"--follow",
is_flag=True,
type=bool,
default=False,
help="If set, follow the logs (like `tail -f`).")
@add_click_logging_options
def job_logs(address: Optional[str], job_id: str, follow: bool):
"""Gets the logs of a job.
Example:
>>> ray job logs <my_job_id>
"""
client = _get_sdk_client(address)
sdk_version = client.get_version()
# sdk version 0 did not have log streaming
if follow:
if int(sdk_version) > 0:
asyncio.get_event_loop().run_until_complete(
_tail_logs(client, job_id))
else:
cli_logger.warning(
"Tailing logs is not enabled for job sdk client version "
f"{sdk_version}. Please upgrade your ray to latest version "
"for this feature.")
else:
print(client.get_job_logs(job_id), end="")

View file

@ -16,7 +16,8 @@ from ray._private.runtime_env.packaging import parse_uri
JOB_ID_METADATA_KEY = "job_submission_id"
JOB_NAME_METADATA_KEY = "job_name"
CURRENT_VERSION = "0"
# Version 0 -> 1: Added log streaming and changed behavior of job logs cli.
CURRENT_VERSION = "1"
class JobStatus(str, Enum):

View file

@ -210,13 +210,26 @@ class JobHead(dashboard_utils.DashboardHeadModule):
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code)
logs: str = self._job_manager.get_job_logs(job_id)
# TODO(jiaodong): Support log streaming #19415
resp = JobLogsResponse(logs=logs)
resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job_id))
return Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json")
@routes.get("/api/jobs/{job_id}/logs/tail")
@_init_ray_and_catch_exceptions
async def tail_job_logs(self, req: Request) -> Response:
job_id = req.match_info["job_id"]
if not self.job_exists(job_id):
return Response(
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code)
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(req)
async for lines in self._job_manager.tail_job_logs(job_id):
await ws.send_str(lines)
async def run(self, server):
if not self._job_manager:
self._job_manager = JobManager()

View file

@ -7,8 +7,8 @@ import traceback
import random
import subprocess
import string
from typing import Any, Dict, Tuple, Optional
from collections import deque
from typing import Any, Dict, Iterator, Tuple, Optional
import ray
from ray.exceptions import RuntimeEnvSetupError
@ -17,6 +17,7 @@ from ray.actor import ActorHandle
from ray.dashboard.modules.job.common import (
JobStatus, JobStatusInfo, JobStatusStorageClient, JOB_ID_METADATA_KEY,
JOB_NAME_METADATA_KEY)
from ray.dashboard.modules.job.utils import file_tail_iterator
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
logger = logging.getLogger(__name__)
@ -47,6 +48,8 @@ class JobLogStorageClient:
Disk storage for stdout / stderr of driver script logs.
"""
JOB_LOGS_PATH = "job-driver-{job_id}.log"
# Number of last N lines to put in job message upon failure.
NUM_LOG_LINES_ON_ERROR = 10
def get_logs(self, job_id: str) -> str:
try:
@ -55,12 +58,20 @@ class JobLogStorageClient:
except FileNotFoundError:
return ""
def tail_logs(self, job_id: str, n_lines=10) -> str:
all_logs = self.get_logs(job_id)
# TODO(edoakes): optimize this to not read the whole file into memory.
log_lines = all_logs.split("\n")
start = max(0, len(log_lines) - n_lines)
return "\n".join(log_lines[start:])
def tail_logs(self, job_id: str) -> Iterator[str]:
return file_tail_iterator(self.get_log_file_path(job_id))
def get_last_n_log_lines(self,
job_id: str,
num_log_lines=NUM_LOG_LINES_ON_ERROR) -> str:
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for line in log_tail_iter:
if line is None:
break
else:
log_tail_deque.append(line)
return "".join(log_tail_deque)
def get_log_file_path(self, job_id: str) -> Tuple[str, str]:
"""
@ -224,7 +235,8 @@ class JobSupervisor:
self._status_client.put_status(self._job_id,
JobStatus.SUCCEEDED)
else:
log_tail = self._log_client.tail_logs(self._job_id)
log_tail = self._log_client.get_last_n_log_lines(
self._job_id)
if log_tail is not None and log_tail != "":
message = ("Job failed due to an application error, "
"last available logs:\n" + log_tail)
@ -258,6 +270,9 @@ class JobManager:
goes down.
"""
JOB_ACTOR_NAME = "_ray_internal_job_actor_{job_id}"
# Time that we will sleep while tailing logs if no new log line is
# available.
LOG_TAIL_SLEEP_S = 1
def __init__(self):
self._status_client = JobStatusStorageClient()
@ -405,7 +420,7 @@ class JobManager:
else:
return False
def get_job_status(self, job_id: str) -> JobStatusInfo:
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
"""Get latest status of a job. If job supervisor actor is no longer
alive, it will also attempt to make adjustments needed to bring job
to correct terminiation state.
@ -430,5 +445,20 @@ class JobManager:
return self._status_client.get_status(job_id)
def get_job_logs(self, job_id: str) -> bytes:
def get_job_logs(self, job_id: str) -> str:
return self._log_client.get_logs(job_id)
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
if self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")
for line in self._log_client.tail_logs(job_id):
if line is None:
# Return if the job has exited and there are no new log lines.
status = self.get_job_status(job_id)
if status.status not in {JobStatus.PENDING, JobStatus.RUNNING}:
return
await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
else:
yield line

View file

@ -3,11 +3,13 @@ import importlib
import logging
from pathlib import Path
import tempfile
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterator, List, Optional
try:
import aiohttp
import requests
except ImportError:
aiohttp = None
requests = None
from ray._private.runtime_env.packaging import (
@ -231,6 +233,13 @@ class JobSubmissionClient:
working_dir, excludes=runtime_env.get("excludes", None))
runtime_env["working_dir"] = package_uri
def get_version(self) -> str:
r = self._do_request("GET", "/api/version")
if r.status_code == 200:
return r.json().get("version")
else:
self._raise_error(r)
def submit_job(
self,
*,
@ -291,3 +300,18 @@ class JobSubmissionClient:
return JobLogsResponse(**r.json()).logs
else:
self._raise_error(r)
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async with aiohttp.ClientSession(cookies=self._cookies) as session:
ws = await session.ws_connect(
f"{self._address}/api/jobs/{job_id}/logs/tail")
while True:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
yield msg.data
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
pass

View file

@ -0,0 +1,25 @@
import ray
import requests
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return self.counter
counter = Counter.remote()
for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
print(requests.__version__)

View file

@ -0,0 +1,86 @@
#!/usr/bin/env bash
unset RAY_ADDRESS
if ! [ -x "$(command -v conda)" ]; then
echo "conda doesn't exist. Please download conda for this machine"
exit 1
else
echo "conda exists"
fi
pip install --upgrade pip
# This is required to use conda activate
source "$(conda info --base)/etc/profile.d/conda.sh"
PYTHON_VERSION=$(python -c"from platform import python_version; print(python_version())")
RAY_VERSIONS=("1.9.0")
for RAY_VERSION in "${RAY_VERSIONS[@]}"
do
env_name=${JOB_COMPATIBILITY_TEST_TEMP_ENV}
# Clean up if env name is already taken from previous leaking runs
conda env remove --name="${env_name}"
printf "\n\n\n"
echo "========================================================================================="
printf "Creating new conda environment with python %s for ray %s \n" "${PYTHON_VERSION}" "${RAY_VERSION}"
echo "========================================================================================="
printf "\n\n\n"
conda create -y -n "${env_name}" python="${PYTHON_VERSION}"
conda activate "${env_name}"
pip install -U ray=="${RAY_VERSION}"
pip install -U ray[default]=="${RAY_VERSION}"
printf "\n\n\n"
echo "========================================================="
printf "Installed ray job server version: "
SERVER_RAY_VERSION=$(python -c "import ray; print(ray.__version__)")
printf "%s \n" "${SERVER_RAY_VERSION}"
echo "========================================================="
printf "\n\n\n"
ray stop --force
ray start --head
conda deactivate
CLIENT_RAY_VERSION=$(python -c "import ray; print(ray.__version__)")
CLIENT_RAY_COMMIT=$(python -c "import ray; print(ray.__commit__)")
printf "\n\n\n"
echo "========================================================================================="
printf "Using Ray %s on %s as job client \n" "${CLIENT_RAY_VERSION}" "${CLIENT_RAY_COMMIT}"
echo "========================================================================================="
printf "\n\n\n"
export RAY_ADDRESS=127.0.0.1:8265
cleanup () {
unset RAY_ADDRESS
ray stop --force
conda remove -y --name "${env_name}" --all
}
JOB_ID=$(python -c "import uuid; print(uuid.uuid4().hex)")
if ! ray job submit --job-id="${JOB_ID}" --runtime-env-json='{"working_dir": "./", "pip": ["requests==2.26.0"]}' -- "python script.py"; then
cleanup
exit 1
fi
if ! ray job status "${JOB_ID}"; then
cleanup
exit 1
fi
if ! ray job logs "${JOB_ID}"; then
cleanup
exit 1
fi
cleanup
done

View file

@ -0,0 +1,60 @@
import logging
import pytest
import sys
import os
import subprocess
import uuid
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@contextmanager
def conda_env(env_name):
# Set env name for shell script
os.environ["JOB_COMPATIBILITY_TEST_TEMP_ENV"] = env_name
# Delete conda env if it already exists
try:
yield
finally:
# Clean up created conda env upon test exit to prevent leaking
del os.environ["JOB_COMPATIBILITY_TEST_TEMP_ENV"]
subprocess.run(
f"conda env remove -y --name {env_name}",
shell=True,
stdout=subprocess.PIPE)
def _compatibility_script_path(file_name: str) -> str:
return os.path.join(
os.path.dirname(__file__), "backwards_compatibility_scripts",
file_name)
class TestBackwardsCompatibility:
def test_cli(self):
"""
1) Create a new conda environment with ray version X installed
inherits same env as current conda envionment except ray version
2) Start head node and dashboard with ray version X
3) Use current commit's CLI code to do sample job submission flow
4) Deactivate the new conda environment and back to original place
"""
# Shell script creates and cleans up tmp conda environment regardless
# of the outcome
env_name = f"jobs-backwards-compatibility-{uuid.uuid4().hex}"
with conda_env(env_name):
shell_cmd = f"{_compatibility_script_path('test_backwards_compatibility.sh')}" # noqa: E501
try:
subprocess.check_output(
shell_cmd, shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error(str(e))
logger.error(e.stdout.decode())
raise e
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -19,10 +19,27 @@ logger = logging.getLogger(__name__)
@pytest.fixture
def mock_sdk_client():
class AsyncIterator:
def __init__(self, seq):
self.iter = iter(seq)
def __aiter__(self):
return self
async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration
if "RAY_ADDRESS" in os.environ:
del os.environ["RAY_ADDRESS"]
with mock.patch("ray.dashboard.modules.job.cli.JobSubmissionClient"
) as mock_client:
# In python 3.6 it will fail with error
# 'async for' requires an object with __aiter__ method, got MagicMock"
mock_client().tail_job_logs.return_value = AsyncIterator(range(10))
yield mock_client
@ -64,6 +81,7 @@ def set_env_var(key: str, val: Optional[str] = None):
class TestSubmit:
@pytest.mark.asyncio
def test_address(self, mock_sdk_client):
runner = CliRunner()

View file

@ -29,7 +29,10 @@ def set_env_var(key: str, val: Optional[str] = None):
@pytest.fixture
def ray_start_stop():
subprocess.check_output(["ray", "start", "--head"])
try:
with set_env_var("RAY_ADDRESS", "127.0.0.1:8265"):
yield
finally:
subprocess.check_output(["ray", "stop", "--force"])
@ -39,11 +42,13 @@ def ray_cluster_manager():
Used not as fixture in case we want to set RAY_ADDRESS first.
"""
subprocess.check_output(["ray", "start", "--head"])
try:
yield
finally:
subprocess.check_output(["ray", "stop", "--force"])
class TestSubmitIntegration:
class TestRayAddress:
"""
Integration version of job CLI test that ensures interaction with the
following components are working as expected:
@ -62,45 +67,95 @@ class TestSubmitIntegration:
assert ("Address must be specified using either the "
"--address flag or RAY_ADDRESS environment") in stderr
def test_ray_client_adress(self, ray_start_stop):
with set_env_var("RAY_ADDRESS", "127.0.0.1:8265"):
def test_ray_client_address(self, ray_start_stop):
completed_process = subprocess.run(
["ray", "job", "submit", "--", "echo hello"],
stderr=subprocess.PIPE)
stderr = completed_process.stderr.decode("utf-8")
# Current dashboard module that raises no exception from requests..
assert "Query the status of the job" in stderr
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello" in stdout
assert "succeeded" in stdout
def test_valid_http_ray_address(self, ray_start_stop):
with set_env_var("RAY_ADDRESS", "http://127.0.0.1:8265"):
completed_process = subprocess.run(
["ray", "job", "submit", "--", "echo hello"],
stderr=subprocess.PIPE)
stderr = completed_process.stderr.decode("utf-8")
# Current dashboard module that raises no exception from requests..
assert "Query the status of the job" in stderr
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello" in stdout
assert "succeeded" in stdout
def test_set_ray_http_address_first(self):
with set_env_var("RAY_ADDRESS", "http://127.0.0.1:8265"):
with ray_cluster_manager():
completed_process = subprocess.run(
["ray", "job", "submit", "--", "echo hello"],
stderr=subprocess.PIPE)
stderr = completed_process.stderr.decode("utf-8")
# Current dashboard module that raises no exception from
# requests..
assert "Query the status of the job" in stderr
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello" in stdout
assert "succeeded" in stdout
def test_set_ray_client_address_first(self):
with set_env_var("RAY_ADDRESS", "127.0.0.1:8265"):
with ray_cluster_manager():
completed_process = subprocess.run(
["ray", "job", "submit", "--", "echo hello"],
stderr=subprocess.PIPE)
stderr = completed_process.stderr.decode("utf-8")
# Current dashboard module that raises no exception from
# requests..
assert "Query the status of the job" in stderr
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello" in stdout
assert "succeeded" in stdout
class TestJobSubmit:
def test_basic_submit(self, ray_start_stop):
"""Should tail logs and wait for process to exit."""
cmd = "sleep 1 && echo hello && sleep 1 && echo hello"
completed_process = subprocess.run(
["ray", "job", "submit", "--", cmd], stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello\nhello" in stdout
assert "succeeded" in stdout
def test_submit_no_wait(self, ray_start_stop):
"""Should exit immediately w/o printing logs."""
cmd = "echo hello && sleep 1000"
completed_process = subprocess.run(
["ray", "job", "submit", "--no-wait", "--", cmd],
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "hello" not in stdout
assert "Tailing logs until the job exits" not in stdout
class TestJobStop:
def test_basic_stop(self, ray_start_stop):
"""Should wait until the job is stopped."""
cmd = "sleep 1000"
job_id = "test_basic_stop"
completed_process = subprocess.run([
"ray", "job", "submit", "--no-wait", f"--job-id={job_id}", "--",
cmd
])
completed_process = subprocess.run(
["ray", "job", "stop", job_id], stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "Waiting for job" in stdout
assert f"Job '{job_id}' was stopped" in stdout
def test_stop_no_wait(self, ray_start_stop):
"""Should not wait until the job is stopped."""
cmd = "echo hello && sleep 1000"
job_id = "test_stop_no_wait"
completed_process = subprocess.run([
"ray", "job", "submit", "--no-wait", f"--job-id={job_id}", "--",
cmd
])
completed_process = subprocess.run(
["ray", "job", "stop", "--no-wait", job_id],
stdout=subprocess.PIPE)
stdout = completed_process.stdout.decode("utf-8")
assert "Waiting for job" not in stdout
assert f"Job '{job_id}' was stopped" not in stdout
if __name__ == "__main__":

View file

@ -331,5 +331,34 @@ def test_parse_cluster_info(address: str):
parse_cluster_info(address, False)
@pytest.mark.asyncio
async def test_tail_job_logs(job_sdk_client):
client = job_sdk_client
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
driver_script = """
import time
for i in range(100):
print("Hello", i)
time.sleep(0.1)
"""
test_script_file = path / "test_script.py"
with open(test_script_file, "w+") as f:
f.write(driver_script)
job_id = client.submit_job(
entrypoint="python test_script.py",
runtime_env={"working_dir": tmp_dir})
i = 0
async for lines in client.tail_job_logs(job_id):
print(lines, end="")
for line in lines.strip().split("\n"):
assert line.split(" ") == ["Hello", str(i)]
i += 1
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -3,6 +3,7 @@ import psutil
import tempfile
import sys
from uuid import uuid4
import signal
import pytest
@ -30,6 +31,34 @@ def _driver_script_path(file_name: str) -> str:
os.path.dirname(__file__), "subprocess_driver_scripts", file_name)
def _run_hanging_command(job_manager, tmp_dir, start_signal_actor=None):
tmp_file = os.path.join(tmp_dir, "hello")
pid_file = os.path.join(tmp_dir, "pid")
# Write subprocess pid to pid_file and block until tmp_file is present.
wait_for_file_cmd = (f"echo $$ > {pid_file} && "
f"until [ -f {tmp_file} ]; "
"do echo 'Waiting...' && sleep 1; "
"done")
job_id = job_manager.submit_job(
entrypoint=wait_for_file_cmd, _start_signal_actor=start_signal_actor)
status = job_manager.get_job_status(job_id)
if start_signal_actor:
for _ in range(10):
assert status.status == JobStatus.PENDING
logs = job_manager.get_job_logs(job_id)
assert logs == ""
else:
wait_for_condition(
check_job_running, job_manager=job_manager, job_id=job_id)
wait_for_condition(
lambda: "Waiting..." in job_manager.get_job_logs(job_id))
return pid_file, tmp_file, job_id
def check_job_succeeded(job_manager, job_id):
status = job_manager.get_job_status(job_id)
if status.status == JobStatus.FAILED:
@ -302,40 +331,9 @@ class TestRuntimeEnv:
class TestAsyncAPI:
def _run_hanging_command(self,
job_manager,
tmp_dir,
_start_signal_actor=None):
tmp_file = os.path.join(tmp_dir, "hello")
pid_file = os.path.join(tmp_dir, "pid")
# Write subprocess pid to pid_file and block until tmp_file is present.
wait_for_file_cmd = (f"echo $$ > {pid_file} && "
f"until [ -f {tmp_file} ]; "
"do echo 'Waiting...' && sleep 1; "
"done")
job_id = job_manager.submit_job(
entrypoint=wait_for_file_cmd,
_start_signal_actor=_start_signal_actor)
status = job_manager.get_job_status(job_id)
if _start_signal_actor:
for _ in range(10):
assert status.status == JobStatus.PENDING
logs = job_manager.get_job_logs(job_id)
assert logs == ""
else:
wait_for_condition(
check_job_running, job_manager=job_manager, job_id=job_id)
wait_for_condition(
lambda: "Waiting..." in job_manager.get_job_logs(job_id))
return pid_file, tmp_file, job_id
def test_status_and_logs_while_blocking(self, job_manager):
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, tmp_file, job_id = self._run_hanging_command(
pid_file, tmp_file, job_id = _run_hanging_command(
job_manager, tmp_dir)
with open(pid_file, "r") as file:
pid = int(file.read())
@ -354,7 +352,7 @@ class TestAsyncAPI:
def test_stop_job(self, job_manager):
with tempfile.TemporaryDirectory() as tmp_dir:
_, _, job_id = self._run_hanging_command(job_manager, tmp_dir)
_, _, job_id = _run_hanging_command(job_manager, tmp_dir)
assert job_manager.stop_job(job_id) is True
wait_for_condition(
@ -375,8 +373,7 @@ class TestAsyncAPI:
"""
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, _, job_id = self._run_hanging_command(
job_manager, tmp_dir)
pid_file, _, job_id = _run_hanging_command(job_manager, tmp_dir)
with open(pid_file, "r") as file:
pid = int(file.read())
assert psutil.pid_exists(pid), (
@ -398,18 +395,18 @@ class TestAsyncAPI:
1) Job can correctly be stop immediately with correct JobStatus
2) No dangling subprocess left.
"""
_start_signal_actor = SignalActor.remote()
start_signal_actor = SignalActor.remote()
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, _, job_id = self._run_hanging_command(
job_manager, tmp_dir, _start_signal_actor=_start_signal_actor)
pid_file, _, job_id = _run_hanging_command(
job_manager, tmp_dir, start_signal_actor=start_signal_actor)
assert not os.path.exists(pid_file), (
"driver subprocess should NOT be running while job is "
"still PENDING.")
assert job_manager.stop_job(job_id) is True
# Send run signal to unblock run function
ray.get(_start_signal_actor.send.remote())
ray.get(start_signal_actor.send.remote())
wait_for_condition(
check_job_stopped, job_manager=job_manager, job_id=job_id)
@ -420,11 +417,11 @@ class TestAsyncAPI:
1) Job can correctly be stop immediately with correct JobStatus
2) No dangling subprocess left.
"""
_start_signal_actor = SignalActor.remote()
start_signal_actor = SignalActor.remote()
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, _, job_id = self._run_hanging_command(
job_manager, tmp_dir, _start_signal_actor=_start_signal_actor)
pid_file, _, job_id = _run_hanging_command(
job_manager, tmp_dir, start_signal_actor=start_signal_actor)
assert not os.path.exists(pid_file), (
"driver subprocess should NOT be running while job is "
@ -443,8 +440,7 @@ class TestAsyncAPI:
SIGTERM first, SIGKILL after 3 seconds.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, _, job_id = self._run_hanging_command(
job_manager, tmp_dir)
pid_file, _, job_id = _run_hanging_command(job_manager, tmp_dir)
with open(pid_file, "r") as file:
pid = int(file.read())
assert psutil.pid_exists(pid), (
@ -459,5 +455,109 @@ class TestAsyncAPI:
wait_for_condition(check_subprocess_cleaned, pid=pid)
class TestTailLogs:
async def _tail_and_assert_logs(self,
job_id,
job_manager,
expected_log="",
num_iteration=5):
i = 0
async for lines in job_manager.tail_job_logs(job_id):
assert all(s == expected_log for s in lines.strip().split("\n"))
print(lines, end="")
if i == num_iteration:
break
i += 1
@pytest.mark.asyncio
async def test_unknown_job(self, job_manager):
with pytest.raises(
RuntimeError, match="Job 'unknown' does not exist."):
async for _ in job_manager.tail_job_logs("unknown"):
pass
@pytest.mark.asyncio
async def test_successful_job(self, job_manager):
"""Test tailing logs for a PENDING -> RUNNING -> SUCCESSFUL job."""
start_signal_actor = SignalActor.remote()
with tempfile.TemporaryDirectory() as tmp_dir:
_, tmp_file, job_id = _run_hanging_command(
job_manager, tmp_dir, start_signal_actor=start_signal_actor)
# TODO(edoakes): check we get no logs before actor starts (not sure
# how to timeout the iterator call).
assert job_manager.get_job_status(
job_id).status == JobStatus.PENDING
# Signal job to start.
ray.get(start_signal_actor.send.remote())
await self._tail_and_assert_logs(
job_id,
job_manager,
expected_log="Waiting...",
num_iteration=5)
# Signal the job to exit by writing to the file.
with open(tmp_file, "w") as f:
print("hello", file=f)
async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." for s in lines.strip().split("\n"))
print(lines, end="")
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
@pytest.mark.asyncio
async def test_failed_job(self, job_manager):
"""Test tailing logs for a job that unexpectedly exits."""
with tempfile.TemporaryDirectory() as tmp_dir:
pid_file, _, job_id = _run_hanging_command(job_manager, tmp_dir)
await self._tail_and_assert_logs(
job_id,
job_manager,
expected_log="Waiting...",
num_iteration=5)
# Kill the job unexpectedly.
with open(pid_file, "r") as f:
os.kill(int(f.read()), signal.SIGKILL)
async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." for s in lines.strip().split("\n"))
print(lines, end="")
wait_for_condition(
check_job_failed, job_manager=job_manager, job_id=job_id)
@pytest.mark.asyncio
async def test_stopped_job(self, job_manager):
"""Test tailing logs for a job that unexpectedly exits."""
with tempfile.TemporaryDirectory() as tmp_dir:
_, _, job_id = _run_hanging_command(job_manager, tmp_dir)
await self._tail_and_assert_logs(
job_id,
job_manager,
expected_log="Waiting...",
num_iteration=5)
# Stop the job via the API.
job_manager.stop_job(job_id)
async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." for s in lines.strip().split("\n"))
print(lines, end="")
wait_for_condition(
check_job_stopped, job_manager=job_manager, job_id=job_id)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,88 @@
from tempfile import NamedTemporaryFile
import sys
import pytest
from ray.dashboard.modules.job.utils import file_tail_iterator
@pytest.fixture
def tmp():
with NamedTemporaryFile() as f:
yield f.name
class TestIterLine():
def test_invalid_type(self):
with pytest.raises(TypeError, match="path must be a string"):
next(file_tail_iterator(1))
def test_file_not_created(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
f.write("hi\n")
f.flush()
assert next(it) is not None
def test_wait_for_newline(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
f.write("no_newline_yet")
assert next(it) is None
f.write("\n")
f.flush()
assert next(it) == "no_newline_yet\n"
def test_multiple_lines(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
num_lines = 10
for i in range(num_lines):
s = f"{i}\n"
f.write(s)
f.flush()
assert next(it) == s
assert next(it) is None
def test_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
# Write lines in batches of 10, check that we get them back in batches.
for _ in range(100):
num_lines = 10
for i in range(num_lines):
f.write(f"{i}\n")
f.flush()
assert next(it) == "\n".join(str(i) for i in range(10)) + "\n"
assert next(it) is None
def test_delete_file(self):
with NamedTemporaryFile() as tmp:
it = file_tail_iterator(tmp.name)
f = open(tmp.name, "w")
assert next(it) is None
f.write("hi\n")
f.flush()
assert next(it) == "hi\n"
# Calls should continue returning None after file deleted.
assert next(it) is None
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,31 @@
import logging
import os
from typing import Iterator, Optional
logger = logging.getLogger(__name__)
def file_tail_iterator(path: str) -> Iterator[Optional[str]]:
"""Yield lines from a file as it's written.
Returns lines in batches opportunistically.
Returns None until the file exists or if no new line has been written.
"""
if not isinstance(path, str):
raise TypeError(f"path must be a string, got {type(path)}.")
while not os.path.exists(path):
logger.debug(f"Path {path} doesn't exist yet.")
yield None
with open(path, "r") as f:
lines = ""
while True:
curr_line = f.readline()
# readline() returns empty string when there's no new line.
if curr_line:
lines += curr_line
else:
yield lines or None
lines = ""

View file

@ -8,12 +8,12 @@ Supports color, bold text, italics, underlines, etc.
as well as indentation and other structured output.
"""
from contextlib import contextmanager
import sys
import logging
from functools import wraps
import inspect
import logging
import os
from typing import Any, Dict, Tuple, Optional, List
import sys
from typing import Any, Callable, Dict, Tuple, Optional, List
import click
@ -776,3 +776,35 @@ class SilentClickException(click.ClickException):
cli_logger = _CliLogger()
CLICK_LOGGING_OPTIONS = [
click.option(
"--log-style",
required=False,
type=click.Choice(cli_logger.VALID_LOG_STYLES, case_sensitive=False),
default="auto",
help=("If 'pretty', outputs with formatting and color. If 'record', "
"outputs record-style without formatting. "
"'auto' defaults to 'pretty', and disables pretty logging "
"if stdin is *not* a TTY.")),
click.option(
"--log-color",
required=False,
type=click.Choice(["auto", "false", "true"], case_sensitive=False),
default="auto",
help=("Use color logging. "
"Auto enables color logging if stdout is a TTY.")),
click.option("-v", "--verbose", default=None, count=True)
]
def add_click_logging_options(f: Callable) -> Callable:
for option in reversed(CLICK_LOGGING_OPTIONS):
f = option(f)
@wraps(f)
def wrapper(*args, log_style=None, log_color=None, verbose=None, **kwargs):
cli_logger.configure(log_style, log_color, verbose)
return f(*args, **kwargs)
return wrapper

View file

@ -31,7 +31,8 @@ from ray.autoscaler._private.fake_multi_node.node_provider import \
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \
DEBUG_AUTOSCALING_STATUS
from ray.internal.internal_api import memory_summary
from ray.autoscaler._private.cli_logger import cli_logger, cf
from ray.autoscaler._private.cli_logger import (add_click_logging_options,
cli_logger, cf)
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_service_pb2_grpc
from ray.dashboard.modules.job.cli import job_cli_group
@ -39,35 +40,6 @@ from distutils.dir_util import copy_tree
logger = logging.getLogger(__name__)
logging_options = [
click.option(
"--log-style",
required=False,
type=click.Choice(cli_logger.VALID_LOG_STYLES, case_sensitive=False),
default="auto",
help=("If 'pretty', outputs with formatting and color. If 'record', "
"outputs record-style without formatting. "
"'auto' defaults to 'pretty', and disables pretty logging "
"if stdin is *not* a TTY.")),
click.option(
"--log-color",
required=False,
type=click.Choice(["auto", "false", "true"], case_sensitive=False),
default="auto",
help=("Use color logging. "
"Auto enables color logging if stdout is a TTY.")),
click.option("-v", "--verbose", default=None, count=True)
]
def add_click_options(options):
def wrapper(f):
for option in reversed(logging_options):
f = option(f)
return f
return wrapper
@click.group()
@click.option(
@ -483,7 +455,7 @@ def debug(address):
default=False,
help="Make the Ray debugger available externally to the node. This is only"
"safe to activate if the node is behind a firewall.")
@add_click_options(logging_options)
@add_click_logging_options
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, gcs_server_port,
min_worker_port, max_worker_port, worker_port_list,
@ -495,9 +467,8 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, system_config, enable_object_reconstruction,
metrics_export_port, no_monitor, tracing_startup_hook,
ray_debugger_external, log_style, log_color, verbose):
ray_debugger_external):
"""Start Ray processes manually on the local machine."""
cli_logger.configure(log_style, log_color, verbose)
if gcs_server_port and not head:
raise ValueError(
"gcs_server_port can be only assigned when you specify --head.")
@ -799,12 +770,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
"--force",
is_flag=True,
help="If set, ray will send SIGKILL instead of SIGTERM.")
@add_click_options(logging_options)
def stop(force, verbose, log_style, log_color):
@add_click_logging_options
def stop(force):
"""Stop Ray processes manually on the local machine."""
cli_logger.configure(log_style, log_color, verbose)
# Note that raylet needs to exit before object store, otherwise
# it cannot exit gracefully.
is_linux = sys.platform.startswith("linux")
@ -945,13 +914,11 @@ def stop(force, verbose, log_style, log_color):
help=("Ray uses login shells (bash --login -i) to run cluster commands "
"by default. If your workflow is compatible with normal shells, "
"this can be disabled for a better user experience."))
@add_click_options(logging_options)
@add_click_logging_options
def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
yes, cluster_name, no_config_cache, redirect_command_output,
use_login_shells, log_style, log_color, verbose):
use_login_shells):
"""Create or update a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
if restart_only or no_restart:
cli_logger.doassert(restart_only != no_restart,
"`{}` is incompatible with `{}`.",
@ -1008,12 +975,10 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
is_flag=True,
default=False,
help="Retain the minimal amount of workers specified in the config.")
@add_click_options(logging_options)
@add_click_logging_options
def down(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers, log_style, log_color, verbose):
keep_min_workers):
"""Tear down a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers)
@ -1058,12 +1023,9 @@ def kill_random_node(cluster_config_file, yes, hard, cluster_name):
required=False,
type=str,
help="Override the configured cluster name.")
@add_click_options(logging_options)
def monitor(cluster_config_file, lines, cluster_name, log_style, log_color,
verbose):
@add_click_logging_options
def monitor(cluster_config_file, lines, cluster_name):
"""Tails the autoscaler logs of a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
monitor_cluster(cluster_config_file, lines, cluster_name)
@ -1098,12 +1060,10 @@ def monitor(cluster_config_file, lines, cluster_name, log_style, log_color,
multiple=True,
type=int,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
@add_click_logging_options
def attach(cluster_config_file, start, screen, tmux, cluster_name,
no_config_cache, new, port_forward, log_style, log_color, verbose):
no_config_cache, new, port_forward):
"""Create or attach to a SSH session to a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
port_forward = [(port, port) for port in list(port_forward)]
attach_cluster(
cluster_config_file,
@ -1126,12 +1086,9 @@ def attach(cluster_config_file, start, screen, tmux, cluster_name,
required=False,
type=str,
help="Override the configured cluster name.")
@add_click_options(logging_options)
def rsync_down(cluster_config_file, source, target, cluster_name, log_style,
log_color, verbose):
@add_click_logging_options
def rsync_down(cluster_config_file, source, target, cluster_name):
"""Download specific files from a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
rsync(cluster_config_file, source, target, cluster_name, down=True)
@ -1151,12 +1108,9 @@ def rsync_down(cluster_config_file, source, target, cluster_name, log_style,
is_flag=True,
required=False,
help="Upload to all nodes (workers and head).")
@add_click_options(logging_options)
def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
log_style, log_color, verbose):
@add_click_logging_options
def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes):
"""Upload specific files to a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
if all_nodes:
cli_logger.warning(
"WARNING: the `all_nodes` option is deprecated and will be "
@ -1218,10 +1172,9 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
type=str,
help="(deprecated) Use '-- --arg1 --arg2' for script args.")
@click.argument("script_args", nargs=-1)
@add_click_options(logging_options)
@add_click_logging_options
def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
no_config_cache, port_forward, script, args, script_args, log_style,
log_color, verbose):
no_config_cache, port_forward, script, args, script_args):
"""Uploads and runs a script on the specified cluster.
The script is automatically synced to the following location:
@ -1231,8 +1184,6 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
Example:
>>> ray submit [CLUSTER.YAML] experiment.py -- --smoke-test
"""
cli_logger.configure(log_style, log_color, verbose)
cli_logger.doassert(not (screen and tmux),
"`{}` and `{}` are incompatible.", cf.bold("--screen"),
cf.bold("--tmux"))
@ -1342,13 +1293,10 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
multiple=True,
type=int,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
@add_click_logging_options
def exec(cluster_config_file, cmd, run_env, screen, tmux, stop, start,
cluster_name, no_config_cache, port_forward, log_style, log_color,
verbose):
cluster_name, no_config_cache, port_forward):
"""Execute a command via SSH on a Ray cluster."""
cli_logger.configure(log_style, log_color, verbose)
port_forward = [(port, port) for port in list(port_forward)]
exec_cluster(
@ -1899,15 +1847,13 @@ def install_nightly(verbose, dryrun):
type=str,
help="The directory to generate the bazel project template to,"
" if provided.")
@add_click_options(logging_options)
def cpp(show_library_path, generate_bazel_project_template_to, log_style,
log_color, verbose):
@add_click_logging_options
def cpp(show_library_path, generate_bazel_project_template_to):
"""Show the cpp library path and generate the bazel project template."""
if not show_library_path and not generate_bazel_project_template_to:
raise ValueError(
"Please input at least one option of '--show-library-path'"
" and '--generate-bazel-project-template-to'.")
cli_logger.configure(log_style, log_color, verbose)
raydir = os.path.abspath(os.path.dirname(ray.__file__))
cpp_dir = os.path.join(raydir, "cpp")
cpp_templete_dir = os.path.join(cpp_dir, "example")