2021-11-04 11:59:47 -07:00
|
|
|
import asyncio
|
2022-05-10 11:43:04 -05:00
|
|
|
import copy
|
2021-11-04 11:59:47 -07:00
|
|
|
import json
|
|
|
|
import logging
|
2022-06-21 15:13:29 -07:00
|
|
|
import os
|
2021-11-17 21:48:22 -06:00
|
|
|
import random
|
|
|
|
import string
|
2022-06-21 15:13:29 -07:00
|
|
|
import subprocess
|
|
|
|
import time
|
|
|
|
import traceback
|
|
|
|
from asyncio.tasks import FIRST_COMPLETED
|
2021-12-14 17:01:53 -08:00
|
|
|
from collections import deque
|
2022-06-21 15:13:29 -07:00
|
|
|
from typing import Any, Dict, Iterator, Optional, Tuple
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
import ray
|
2022-06-21 15:13:29 -07:00
|
|
|
import ray._private.ray_constants as ray_constants
|
|
|
|
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
|
2021-10-22 12:18:11 -07:00
|
|
|
from ray.actor import ActorHandle
|
2021-11-10 14:14:55 -08:00
|
|
|
from ray.dashboard.modules.job.common import (
|
2021-11-18 10:15:23 -06:00
|
|
|
JOB_ID_METADATA_KEY,
|
2021-11-17 21:48:22 -06:00
|
|
|
JOB_NAME_METADATA_KEY,
|
2022-06-21 15:13:29 -07:00
|
|
|
JobInfo,
|
|
|
|
JobInfoStorageClient,
|
2021-11-17 21:48:22 -06:00
|
|
|
)
|
2021-12-14 17:01:53 -08:00
|
|
|
from ray.dashboard.modules.job.utils import file_tail_iterator
|
2022-06-21 15:13:29 -07:00
|
|
|
from ray.exceptions import RuntimeEnvSetupError
|
|
|
|
from ray.job_submission import JobStatus
|
2021-11-04 11:59:47 -07:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
# asyncio python version compatibility
|
|
|
|
try:
|
|
|
|
create_task = asyncio.create_task
|
|
|
|
except AttributeError:
|
|
|
|
create_task = asyncio.ensure_future
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
|
2021-11-17 21:48:22 -06:00
|
|
|
def generate_job_id() -> str:
|
|
|
|
"""Returns a job_id of the form 'raysubmit_XYZ'.
|
|
|
|
|
|
|
|
Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID).
|
|
|
|
"""
|
|
|
|
rand = random.SystemRandom()
|
|
|
|
possible_characters = list(
|
|
|
|
set(string.ascii_letters + string.digits)
|
|
|
|
- {"I", "l", "o", "O", "0"} # No confusing characters
|
|
|
|
)
|
|
|
|
id_part = "".join(rand.choices(possible_characters, k=16))
|
|
|
|
return f"raysubmit_{id_part}"
|
|
|
|
|
|
|
|
|
2021-10-22 12:18:11 -07:00
|
|
|
class JobLogStorageClient:
|
|
|
|
"""
|
|
|
|
Disk storage for stdout / stderr of driver script logs.
|
|
|
|
"""
|
2022-01-29 18:41:57 -08:00
|
|
|
|
2021-11-09 22:34:12 -08:00
|
|
|
JOB_LOGS_PATH = "job-driver-{job_id}.log"
|
2021-12-14 17:01:53 -08:00
|
|
|
# Number of last N lines to put in job message upon failure.
|
|
|
|
NUM_LOG_LINES_ON_ERROR = 10
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
def get_logs(self, job_id: str) -> str:
|
2021-10-22 12:18:11 -07:00
|
|
|
try:
|
2021-11-09 22:34:12 -08:00
|
|
|
with open(self.get_log_file_path(job_id), "r") as f:
|
|
|
|
return f.read()
|
2021-10-22 12:18:11 -07:00
|
|
|
except FileNotFoundError:
|
2021-11-09 22:34:12 -08:00
|
|
|
return ""
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2021-12-14 17:01:53 -08:00
|
|
|
def tail_logs(self, job_id: str) -> Iterator[str]:
|
|
|
|
return file_tail_iterator(self.get_log_file_path(job_id))
|
|
|
|
|
|
|
|
def get_last_n_log_lines(
|
|
|
|
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
|
|
|
|
) -> str:
|
|
|
|
log_tail_iter = self.tail_logs(job_id)
|
|
|
|
log_tail_deque = deque(maxlen=num_log_lines)
|
|
|
|
for line in log_tail_iter:
|
|
|
|
if line is None:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
log_tail_deque.append(line)
|
|
|
|
return "".join(log_tail_deque)
|
2021-11-18 10:15:23 -06:00
|
|
|
|
2021-11-09 22:34:12 -08:00
|
|
|
def get_log_file_path(self, job_id: str) -> Tuple[str, str]:
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2021-11-09 22:34:12 -08:00
|
|
|
Get the file path to the logs of a given job. Example:
|
|
|
|
/tmp/ray/session_date/logs/job-driver-{job_id}.log
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2021-11-09 22:34:12 -08:00
|
|
|
return os.path.join(
|
2022-06-21 15:13:29 -07:00
|
|
|
ray._private.worker._global_node.get_logs_dir_path(),
|
2021-11-09 22:34:12 -08:00
|
|
|
self.JOB_LOGS_PATH.format(job_id=job_id),
|
|
|
|
)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
|
|
|
|
class JobSupervisor:
|
|
|
|
"""
|
|
|
|
Ray actor created by JobManager for each submitted job, responsible to
|
|
|
|
setup runtime_env, execute given shell command in subprocess, update job
|
2021-11-04 11:59:47 -07:00
|
|
|
status, persist job logs and manage subprocess group cleaning.
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
One job supervisor actor maps to one subprocess, for one job_id.
|
|
|
|
Job supervisor actor should fate share with subprocess it created.
|
|
|
|
"""
|
|
|
|
|
2021-11-04 11:59:47 -07:00
|
|
|
SUBPROCESS_POLL_PERIOD_S = 0.1
|
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
def __init__(self, job_id: str, entrypoint: str, user_metadata: Dict[str, str]):
|
2021-10-22 12:18:11 -07:00
|
|
|
self._job_id = job_id
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client = JobInfoStorageClient()
|
2021-10-22 12:18:11 -07:00
|
|
|
self._log_client = JobLogStorageClient()
|
2022-05-10 11:43:04 -05:00
|
|
|
self._driver_runtime_env = self._get_driver_runtime_env()
|
2021-11-18 10:15:23 -06:00
|
|
|
self._entrypoint = entrypoint
|
2021-11-03 09:49:28 -05:00
|
|
|
|
2021-11-17 21:48:22 -06:00
|
|
|
# Default metadata if not passed by the user.
|
|
|
|
self._metadata = {JOB_ID_METADATA_KEY: job_id, JOB_NAME_METADATA_KEY: job_id}
|
|
|
|
self._metadata.update(user_metadata)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2021-11-04 11:59:47 -07:00
|
|
|
# fire and forget call from outer job manager to this actor
|
|
|
|
self._stop_event = asyncio.Event()
|
|
|
|
|
2022-05-10 11:43:04 -05:00
|
|
|
def _get_driver_runtime_env(self) -> Dict[str, Any]:
|
|
|
|
# Get the runtime_env set for the supervisor actor.
|
|
|
|
curr_runtime_env = dict(ray.get_runtime_context().runtime_env)
|
|
|
|
# Allow CUDA_VISIBLE_DEVICES to be set normally for the driver's tasks
|
|
|
|
# & actors.
|
|
|
|
env_vars = curr_runtime_env.get("env_vars", {})
|
|
|
|
env_vars.pop(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR)
|
|
|
|
curr_runtime_env["env_vars"] = env_vars
|
|
|
|
return curr_runtime_env
|
|
|
|
|
2022-02-07 15:25:25 -06:00
|
|
|
def ping(self):
|
|
|
|
"""Used to check the health of the actor."""
|
2021-10-22 12:18:11 -07:00
|
|
|
pass
|
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen:
|
2021-11-04 11:59:47 -07:00
|
|
|
"""
|
2021-11-18 10:15:23 -06:00
|
|
|
Runs the entrypoint command as a child process, streaming stderr &
|
|
|
|
stdout to given log files.
|
2021-11-04 11:59:47 -07:00
|
|
|
|
|
|
|
Meanwhile we start a demon process and group driver
|
|
|
|
subprocess in same pgid, such that if job actor dies, entire process
|
|
|
|
group also fate share with it.
|
|
|
|
|
|
|
|
Args:
|
2021-11-09 22:34:12 -08:00
|
|
|
logs_path: File path on head node's local disk to store driver
|
|
|
|
command's stdout & stderr.
|
2021-11-04 11:59:47 -07:00
|
|
|
Returns:
|
|
|
|
child_process: Child process that runs the driver command. Can be
|
|
|
|
terminated or killed upon user calling stop().
|
|
|
|
"""
|
2021-11-09 22:34:12 -08:00
|
|
|
with open(logs_path, "w") as logs_file:
|
2021-11-04 11:59:47 -07:00
|
|
|
child_process = subprocess.Popen(
|
2021-11-18 10:15:23 -06:00
|
|
|
self._entrypoint,
|
2021-11-04 11:59:47 -07:00
|
|
|
shell=True,
|
|
|
|
start_new_session=True,
|
2021-11-09 22:34:12 -08:00
|
|
|
stdout=logs_file,
|
|
|
|
stderr=subprocess.STDOUT,
|
|
|
|
)
|
2021-11-04 11:59:47 -07:00
|
|
|
parent_pid = os.getpid()
|
|
|
|
# Create new pgid with new subprocess to execute driver command
|
|
|
|
child_pid = child_process.pid
|
|
|
|
child_pgid = os.getpgid(child_pid)
|
|
|
|
|
|
|
|
# Open a new subprocess to kill the child process when the parent
|
|
|
|
# process dies kill -s 0 parent_pid will succeed if the parent is
|
|
|
|
# alive. If it fails, SIGKILL the child process group and exit
|
|
|
|
subprocess.Popen(
|
|
|
|
f"while kill -s 0 {parent_pid}; do sleep 1; done; kill -9 -{child_pgid}", # noqa: E501
|
|
|
|
shell=True,
|
|
|
|
# Suppress output
|
|
|
|
stdout=subprocess.DEVNULL,
|
|
|
|
stderr=subprocess.DEVNULL,
|
|
|
|
)
|
|
|
|
return child_process
|
|
|
|
|
2022-05-10 11:43:04 -05:00
|
|
|
def _get_driver_env_vars(self) -> Dict[str, str]:
|
|
|
|
"""Returns environment variables that should be set in the driver."""
|
[core] ray.init defaults to an existing Ray instance if there is one (#26678)
ray.init() will currently start a new Ray instance even if one is already existing, which is very confusing if you are a new user trying to go from local development to a cluster. This PR changes it so that, when no address is specified, we first try to find an existing Ray cluster that was created through `ray start`. If none is found, we will start a new one.
This makes two changes to the ray.init() resolution order:
1. When `ray start` is called, the started cluster address was already written to a file called `/tmp/ray/ray_current_cluster`. For ray.init() and ray.init(address="auto"), we will first check this local file for an existing cluster address. The file is deleted on `ray stop`. If the file is empty, autodetect any running cluster (legacy behavior) if address="auto", or we will start a new local Ray instance if address=None.
2. When ray.init(address="local") is called, we will create a new local Ray instance, even if one is already existing. This behavior seems to be necessary mainly for `ray.client` use cases.
This also surfaces the logs about which Ray instance we are connecting to. Previously these were hidden because we didn't set up the log until after connecting to Ray. So now Ray will log one of the following messages during ray.init:
```
(Connecting to existing Ray cluster at address: <IP>...)
...connection...
(Started a local Ray cluster.| Connected to Ray Cluster.)( View the dashboard at <URL>)
```
Note that this changes the dashboard URL to be printed with `ray.init()` instead of when the dashboard is first started.
Co-authored-by: Eric Liang <ekhliang@gmail.com>
2022-07-23 14:27:22 -04:00
|
|
|
ray_addr = ray._private.services.canonicalize_bootstrap_address_or_die(
|
|
|
|
"auto", ray.worker._global_node._ray_params.temp_dir
|
|
|
|
)
|
|
|
|
assert ray_addr is not None
|
2022-05-10 11:43:04 -05:00
|
|
|
return {
|
|
|
|
# Set JobConfig for the child process (runtime_env, metadata).
|
|
|
|
RAY_JOB_CONFIG_JSON_ENV_VAR: json.dumps(
|
|
|
|
{
|
|
|
|
"runtime_env": self._driver_runtime_env,
|
|
|
|
"metadata": self._metadata,
|
|
|
|
}
|
|
|
|
),
|
|
|
|
# Always set RAY_ADDRESS as find_bootstrap_address address for
|
|
|
|
# job submission. In case of local development, prevent user from
|
|
|
|
# re-using http://{address}:{dashboard_port} to interact with
|
|
|
|
# jobs SDK.
|
|
|
|
# TODO:(mwtian) Check why "auto" does not work in entrypoint script
|
|
|
|
ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE: ray_addr,
|
|
|
|
# Set PYTHONUNBUFFERED=1 to stream logs during the job instead of
|
|
|
|
# only streaming them upon completion of the job.
|
|
|
|
"PYTHONUNBUFFERED": "1",
|
|
|
|
}
|
|
|
|
|
2021-11-04 11:59:47 -07:00
|
|
|
async def _polling(self, child_process) -> int:
|
|
|
|
try:
|
|
|
|
while child_process is not None:
|
|
|
|
return_code = child_process.poll()
|
|
|
|
if return_code is not None:
|
|
|
|
# subprocess finished with return code
|
|
|
|
return return_code
|
|
|
|
else:
|
|
|
|
# still running, yield control, 0.1s by default
|
|
|
|
await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S)
|
|
|
|
except Exception:
|
|
|
|
if child_process:
|
|
|
|
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
|
|
|
|
child_process.kill()
|
|
|
|
return 1
|
|
|
|
|
|
|
|
async def run(
|
|
|
|
self,
|
|
|
|
# Signal actor used in testing to capture PENDING -> RUNNING cases
|
2021-11-08 23:08:43 -08:00
|
|
|
_start_signal_actor: Optional[ActorHandle] = None,
|
|
|
|
):
|
2021-11-04 11:59:47 -07:00
|
|
|
"""
|
|
|
|
Stop and start both happen asynchrously, coordinated by asyncio event
|
|
|
|
and coroutine, respectively.
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2021-11-04 11:59:47 -07:00
|
|
|
1) Sets job status as running
|
|
|
|
2) Pass runtime env and metadata to subprocess as serialized env
|
|
|
|
variables.
|
|
|
|
3) Handle concurrent events of driver execution and
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2022-02-22 16:18:16 -06:00
|
|
|
curr_status = self._job_info_client.get_status(self._job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
assert curr_status == JobStatus.PENDING, "Run should only be called once."
|
2021-11-04 11:59:47 -07:00
|
|
|
|
|
|
|
if _start_signal_actor:
|
|
|
|
# Block in PENDING state until start signal received.
|
|
|
|
await _start_signal_actor.wait.remote()
|
|
|
|
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(self._job_id, JobStatus.RUNNING)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
try:
|
2022-05-10 11:43:04 -05:00
|
|
|
# Configure environment variables for the child process. These
|
|
|
|
# will *not* be set in the runtime_env, so they apply to the driver
|
|
|
|
# only, not its tasks & actors.
|
|
|
|
os.environ.update(self._get_driver_env_vars())
|
2022-02-04 15:51:43 -08:00
|
|
|
logger.info(
|
|
|
|
"Submitting job with RAY_ADDRESS = "
|
|
|
|
f"{os.environ[ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE]}"
|
|
|
|
)
|
2021-11-09 22:34:12 -08:00
|
|
|
log_path = self._log_client.get_log_file_path(self._job_id)
|
2021-11-18 10:15:23 -06:00
|
|
|
child_process = self._exec_entrypoint(log_path)
|
2021-11-04 11:59:47 -07:00
|
|
|
|
|
|
|
polling_task = create_task(self._polling(child_process))
|
|
|
|
finished, _ = await asyncio.wait(
|
|
|
|
[polling_task, self._stop_event.wait()], return_when=FIRST_COMPLETED
|
|
|
|
)
|
|
|
|
|
|
|
|
if self._stop_event.is_set():
|
|
|
|
polling_task.cancel()
|
|
|
|
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
|
|
|
|
child_process.kill()
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(self._job_id, JobStatus.STOPPED)
|
2021-10-22 12:18:11 -07:00
|
|
|
else:
|
2021-11-04 11:59:47 -07:00
|
|
|
# Child process finished execution and no stop event is set
|
|
|
|
# at the same time
|
|
|
|
assert len(finished) == 1, "Should have only one coroutine done"
|
|
|
|
[child_process_task] = finished
|
|
|
|
return_code = child_process_task.result()
|
|
|
|
if return_code == 0:
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(self._job_id, JobStatus.SUCCEEDED)
|
2021-11-04 11:59:47 -07:00
|
|
|
else:
|
2021-12-14 17:01:53 -08:00
|
|
|
log_tail = self._log_client.get_last_n_log_lines(self._job_id)
|
2021-11-18 10:15:23 -06:00
|
|
|
if log_tail is not None and log_tail != "":
|
|
|
|
message = (
|
|
|
|
"Job failed due to an application error, "
|
|
|
|
"last available logs:\n" + log_tail
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
message = None
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(
|
2022-02-18 07:54:37 -08:00
|
|
|
self._job_id, JobStatus.FAILED, message=message
|
2021-11-18 10:15:23 -06:00
|
|
|
)
|
2021-11-04 11:59:47 -07:00
|
|
|
except Exception:
|
|
|
|
logger.error(
|
|
|
|
"Got unexpected exception while trying to execute driver "
|
|
|
|
f"command. {traceback.format_exc()}"
|
|
|
|
)
|
|
|
|
finally:
|
|
|
|
# clean up actor after tasks are finished
|
2021-10-22 12:18:11 -07:00
|
|
|
ray.actor.exit_actor()
|
|
|
|
|
|
|
|
def stop(self):
|
2021-11-04 11:59:47 -07:00
|
|
|
"""Set step_event and let run() handle the rest in its asyncio.wait()."""
|
|
|
|
self._stop_event.set()
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
|
|
|
|
class JobManager:
|
2021-11-18 10:15:23 -06:00
|
|
|
"""Provide python APIs for job submission and management.
|
|
|
|
|
|
|
|
It does not provide persistence, all info will be lost if the cluster
|
|
|
|
goes down.
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2022-01-29 18:41:57 -08:00
|
|
|
|
2022-07-25 10:54:22 -07:00
|
|
|
JOB_ACTOR_NAME_TEMPLATE = (
|
|
|
|
f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
|
|
|
|
)
|
2021-12-14 17:01:53 -08:00
|
|
|
# Time that we will sleep while tailing logs if no new log line is
|
|
|
|
# available.
|
|
|
|
LOG_TAIL_SLEEP_S = 1
|
2022-02-07 15:25:25 -06:00
|
|
|
JOB_MONITOR_LOOP_PERIOD_S = 1
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
def __init__(self):
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client = JobInfoStorageClient()
|
2021-10-22 12:18:11 -07:00
|
|
|
self._log_client = JobLogStorageClient()
|
|
|
|
self._supervisor_actor_cls = ray.remote(JobSupervisor)
|
2021-10-23 10:48:16 -07:00
|
|
|
|
2022-02-07 15:25:25 -06:00
|
|
|
self._recover_running_jobs()
|
|
|
|
|
|
|
|
def _recover_running_jobs(self):
|
|
|
|
"""Recovers all running jobs from the status client.
|
|
|
|
|
|
|
|
For each job, we will spawn a coroutine to monitor it.
|
|
|
|
Each will be added to self._running_jobs and reconciled.
|
|
|
|
"""
|
2022-02-22 16:18:16 -06:00
|
|
|
all_jobs = self._job_info_client.get_all_jobs()
|
|
|
|
for job_id, job_info in all_jobs.items():
|
|
|
|
if not job_info.status.is_terminal():
|
2022-02-07 15:25:25 -06:00
|
|
|
create_task(self._monitor_job(job_id))
|
|
|
|
|
2021-10-22 12:18:11 -07:00
|
|
|
def _get_actor_for_job(self, job_id: str) -> Optional[ActorHandle]:
|
|
|
|
try:
|
2022-07-25 10:54:22 -07:00
|
|
|
return ray.get_actor(self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id))
|
2021-10-22 12:18:11 -07:00
|
|
|
except ValueError: # Ray returns ValueError for nonexistent actor.
|
|
|
|
return None
|
|
|
|
|
2022-02-07 15:25:25 -06:00
|
|
|
async def _monitor_job(
|
|
|
|
self, job_id: str, job_supervisor: Optional[ActorHandle] = None
|
|
|
|
):
|
|
|
|
"""Monitors the specified job until it enters a terminal state.
|
|
|
|
|
|
|
|
This is necessary because we need to handle the case where the
|
|
|
|
JobSupervisor dies unexpectedly.
|
|
|
|
"""
|
|
|
|
is_alive = True
|
|
|
|
if job_supervisor is None:
|
|
|
|
job_supervisor = self._get_actor_for_job(job_id)
|
|
|
|
|
|
|
|
if job_supervisor is None:
|
|
|
|
logger.error(f"Failed to get job supervisor for job {job_id}.")
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(
|
2022-02-07 15:25:25 -06:00
|
|
|
job_id,
|
2022-02-18 07:54:37 -08:00
|
|
|
JobStatus.FAILED,
|
|
|
|
message="Unexpected error occurred: Failed to get job supervisor.",
|
2022-02-07 15:25:25 -06:00
|
|
|
)
|
|
|
|
is_alive = False
|
|
|
|
|
|
|
|
while is_alive:
|
|
|
|
try:
|
|
|
|
await job_supervisor.ping.remote()
|
|
|
|
await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
|
|
|
|
except Exception as e:
|
|
|
|
is_alive = False
|
2022-02-22 16:18:16 -06:00
|
|
|
if self._job_info_client.get_status(job_id).is_terminal():
|
2022-02-07 15:25:25 -06:00
|
|
|
# If the job is already in a terminal state, then the actor
|
|
|
|
# exiting is expected.
|
|
|
|
pass
|
|
|
|
elif isinstance(e, RuntimeEnvSetupError):
|
|
|
|
logger.info(f"Failed to set up runtime_env for job {job_id}.")
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(
|
2022-02-07 15:25:25 -06:00
|
|
|
job_id,
|
2022-02-18 07:54:37 -08:00
|
|
|
JobStatus.FAILED,
|
|
|
|
message=f"runtime_env setup failed: {e}",
|
2022-02-07 15:25:25 -06:00
|
|
|
)
|
|
|
|
else:
|
|
|
|
logger.warning(
|
|
|
|
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
|
|
|
|
)
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(
|
2022-02-07 15:25:25 -06:00
|
|
|
job_id,
|
2022-02-18 07:54:37 -08:00
|
|
|
JobStatus.FAILED,
|
|
|
|
message=f"Unexpected error occurred: {e}",
|
2022-02-07 15:25:25 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
# Kill the actor defensively to avoid leaking actors in unexpected error cases.
|
|
|
|
if job_supervisor is not None:
|
|
|
|
ray.kill(job_supervisor, no_restart=True)
|
|
|
|
|
2021-10-29 16:09:18 -07:00
|
|
|
def _get_current_node_resource_key(self) -> str:
|
|
|
|
"""Get the Ray resource key for current node.
|
|
|
|
|
|
|
|
It can be used for actor placement.
|
|
|
|
"""
|
|
|
|
current_node_id = ray.get_runtime_context().node_id.hex()
|
|
|
|
for node in ray.nodes():
|
|
|
|
if node["NodeID"] == current_node_id:
|
|
|
|
# Found the node.
|
|
|
|
for key in node["Resources"].keys():
|
|
|
|
if key.startswith("node:"):
|
|
|
|
return key
|
|
|
|
else:
|
2021-11-18 10:15:23 -06:00
|
|
|
raise ValueError("Cannot find the node dictionary for current node.")
|
|
|
|
|
|
|
|
def _handle_supervisor_startup(self, job_id: str, result: Optional[Exception]):
|
|
|
|
"""Handle the result of starting a job supervisor actor.
|
|
|
|
|
|
|
|
If started successfully, result should be None. Otherwise it should be
|
|
|
|
an Exception.
|
|
|
|
|
|
|
|
On failure, the job will be marked failed with a relevant error
|
|
|
|
message.
|
|
|
|
"""
|
|
|
|
if result is None:
|
|
|
|
return
|
2021-10-29 16:09:18 -07:00
|
|
|
|
2022-05-10 11:43:04 -05:00
|
|
|
def _get_supervisor_runtime_env(
|
|
|
|
self, user_runtime_env: Dict[str, Any]
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
|
|
|
"""Configure and return the runtime_env for the supervisor actor."""
|
|
|
|
|
|
|
|
# Make a copy to avoid mutating passed runtime_env.
|
|
|
|
runtime_env = (
|
|
|
|
copy.deepcopy(user_runtime_env) if user_runtime_env is not None else {}
|
|
|
|
)
|
2022-05-24 11:01:19 -05:00
|
|
|
|
|
|
|
# NOTE(edoakes): Can't use .get(, {}) here because we need to handle the case
|
|
|
|
# where env_vars is explicitly set to `None`.
|
|
|
|
env_vars = runtime_env.get("env_vars")
|
|
|
|
if env_vars is None:
|
|
|
|
env_vars = {}
|
|
|
|
|
2022-05-10 11:43:04 -05:00
|
|
|
# Don't set CUDA_VISIBLE_DEVICES for the supervisor actor so the
|
|
|
|
# driver can use GPUs if it wants to. This will be removed from
|
|
|
|
# the driver's runtime_env so it isn't inherited by tasks & actors.
|
|
|
|
env_vars[ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
|
|
|
|
runtime_env["env_vars"] = env_vars
|
|
|
|
return runtime_env
|
|
|
|
|
2021-11-04 11:59:47 -07:00
|
|
|
def submit_job(
|
|
|
|
self,
|
2021-11-08 23:10:27 -08:00
|
|
|
*,
|
2021-11-04 11:59:47 -07:00
|
|
|
entrypoint: str,
|
2021-11-08 23:10:27 -08:00
|
|
|
job_id: Optional[str] = None,
|
2021-11-04 11:59:47 -07:00
|
|
|
runtime_env: Optional[Dict[str, Any]] = None,
|
|
|
|
metadata: Optional[Dict[str, str]] = None,
|
2021-11-08 23:08:43 -08:00
|
|
|
_start_signal_actor: Optional[ActorHandle] = None,
|
|
|
|
) -> str:
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2021-11-04 11:59:47 -07:00
|
|
|
Job execution happens asynchronously.
|
|
|
|
|
|
|
|
1) Generate a new unique id for this job submission, each call of this
|
|
|
|
method assumes they're independent submission with its own new
|
2021-11-17 21:48:22 -06:00
|
|
|
ID, job supervisor actor, and child process.
|
2021-11-04 11:59:47 -07:00
|
|
|
2) Create new detached actor with same runtime_env as job spec
|
|
|
|
|
|
|
|
Actual setting up runtime_env, subprocess group, driver command
|
|
|
|
execution, subprocess cleaning up and running status update to GCS
|
|
|
|
is all handled by job supervisor actor.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
entrypoint: Driver command to execute in subprocess shell.
|
|
|
|
Represents the entrypoint to start user application.
|
|
|
|
runtime_env: Runtime environment used to execute driver command,
|
|
|
|
which could contain its own ray.init() to configure runtime
|
2021-11-18 10:15:23 -06:00
|
|
|
env at ray cluster, task and actor level.
|
2021-11-04 11:59:47 -07:00
|
|
|
metadata: Support passing arbitrary data to driver command in
|
|
|
|
case needed.
|
|
|
|
_start_signal_actor: Used in testing only to capture state
|
|
|
|
transitions between PENDING -> RUNNING. Regular user shouldn't
|
|
|
|
need this.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
job_id: Generated uuid for further job management. Only valid
|
|
|
|
within the same ray cluster.
|
2021-10-22 12:18:11 -07:00
|
|
|
"""
|
2021-11-08 23:10:27 -08:00
|
|
|
if job_id is None:
|
2021-11-17 21:48:22 -06:00
|
|
|
job_id = generate_job_id()
|
2022-02-22 16:18:16 -06:00
|
|
|
elif self._job_info_client.get_status(job_id) is not None:
|
2021-11-08 23:10:27 -08:00
|
|
|
raise RuntimeError(f"Job {job_id} already exists.")
|
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
logger.info(f"Starting job with job_id: {job_id}")
|
2022-02-22 16:18:16 -06:00
|
|
|
job_info = JobInfo(
|
2022-03-16 20:02:22 -07:00
|
|
|
entrypoint=entrypoint,
|
2022-02-18 07:54:37 -08:00
|
|
|
status=JobStatus.PENDING,
|
[Jobs] Change jobs start_time end_time from seconds to ms for consistency (#24123)
In the snapshot, all timestamps are given in ms except for Jobs:
```
wget -q -O - http://127.0.0.1:8265/api/snapshot
{
"result":true,
"msg":"hello",
"data":{
"snapshot":{
"jobs":{
"01000000":{
"status":null,
"statusMessage":null,
"isDead":false,
"startTime":1650315791249,
"endTime":0,
"config":{
"namespace":"_ray_internal_dashboard",
"metadata":{
},
"runtimeEnv":{
}
}
}
},
"jobSubmission":{
"raysubmit9Bsej1Rtxqqetxup":{
"status":"SUCCEEDED",
"message":"Job finished successfully.",
"errorType":null,
"startTime":1650315925,
"endTime":1650315926,
"metadata":{
"creatorId":"usr_f6tgCaaFBJC6tZz1ZVzzAVf4"
},
"runtimeEnv":{
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"entrypoint":"ls"
},
"raysubmitEibragqkyg16Hpcj":{
"status":"SUCCEEDED",
"message":"Job finished successfully.",
"errorType":null,
"startTime":1650316039,
"endTime":1650316041,
"metadata":{
"creatorId":"usr_f6tgCaaFBJC6tZz1ZVzzAVf4"
},
"runtimeEnv":{
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"entrypoint":"echo hi"
},
"raysubmitSh1U7Grdsbqrf6Je":{
"status":"SUCCEEDED",
"message":"Job finished successfully.",
"errorType":null,
"startTime":1650316354,
"endTime":1650316355,
"metadata":{
"creatorId":"usr_f6tgCaaFBJC6tZz1ZVzzAVf4"
},
"runtimeEnv":{
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"entrypoint":"echo hi"
}
},
"actors":{
"8c8e28e642ba2cfd0457d45e01000000":{
"jobId":"01000000",
"state":"DEAD",
"name":"_ray_internal_job_actor_raysubmit_9BSeJ1rTXQqEtXuP",
"namespace":"_ray_internal_dashboard",
"runtimeEnv":{
"uris":{
"workingDirUri":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"startTime":1650315926620,
"endTime":1650315927499,
"isDetached":true,
"resources":{
"node:172.31.73.39":0.001
},
"actorClass":"JobSupervisor",
"currentWorkerId":"9628b5eb54e98353601413845fbca0a8c4e5379d1469ce95f3dfbace",
"currentRayletId":"61ab3958258c82266b222f4691a53e71b6315e312408a21cb3350bc7",
"ipAddress":"172.31.73.39",
"port":10003,
"metadata":{
}
},
"a7fd8354567129910c44298401000000":{
"jobId":"01000000",
"state":"DEAD",
"name":"_ray_internal_job_actor_raysubmit_sh1u7grDsBQRf6je",
"namespace":"_ray_internal_dashboard",
"runtimeEnv":{
"uris":{
"workingDirUri":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"startTime":1650316355718,
"endTime":1650316356620,
"isDetached":true,
"resources":{
"node:172.31.73.39":0.001
},
"actorClass":"JobSupervisor",
"currentWorkerId":"f07fd7a393898bf7d9027a5de0b0f566bb64ae80c0fcbcc107185505",
"currentRayletId":"61ab3958258c82266b222f4691a53e71b6315e312408a21cb3350bc7",
"ipAddress":"172.31.73.39",
"port":10005,
"metadata":{
}
},
"19ca9ad190f47bae963592d601000000":{
"jobId":"01000000",
"state":"DEAD",
"name":"_ray_internal_job_actor_raysubmit_eibRAGqKyG16HpCj",
"namespace":"_ray_internal_dashboard",
"runtimeEnv":{
"uris":{
"workingDirUri":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"workingDir":"gcs://_ray_pkg_6068c19fb3b8530f.zip"
},
"startTime":1650316041089,
"endTime":1650316041978,
"isDetached":true,
"resources":{
"node:172.31.73.39":0.001
},
"actorClass":"JobSupervisor",
"currentWorkerId":"50b8e7e9a6981fe0270afd7f6387bc93788356822c9a664c2988f5ba",
"currentRayletId":"61ab3958258c82266b222f4691a53e71b6315e312408a21cb3350bc7",
"ipAddress":"172.31.73.39",
"port":10004,
"metadata":{
}
}
},
"deployments":{
},
"sessionName":"session_2022-04-18_13-49-44_814862_139",
"rayVersion":"1.12.0",
"rayCommit":"f18fc31c7562990955556899090f8e8656b48d2d"
}
}
}
```
This PR fixes the inconsistency by changing Jobs start/end timestamps to ms.
2022-04-26 08:37:41 -07:00
|
|
|
start_time=int(time.time() * 1000),
|
2022-02-18 07:54:37 -08:00
|
|
|
metadata=metadata,
|
|
|
|
runtime_env=runtime_env,
|
|
|
|
)
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_info(job_id, job_info)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2021-11-18 10:15:23 -06:00
|
|
|
# Wait for the actor to start up asynchronously so this call always
|
|
|
|
# returns immediately and we can catch errors with the actor starting
|
2022-02-07 15:25:25 -06:00
|
|
|
# up.
|
2021-10-22 12:18:11 -07:00
|
|
|
try:
|
2022-02-07 15:25:25 -06:00
|
|
|
supervisor = self._supervisor_actor_cls.options(
|
2021-11-04 11:59:47 -07:00
|
|
|
lifetime="detached",
|
2022-07-25 10:54:22 -07:00
|
|
|
name=self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id),
|
2021-11-04 11:59:47 -07:00
|
|
|
num_cpus=0,
|
|
|
|
# Currently we assume JobManager is created by dashboard server
|
|
|
|
# running on headnode, same for job supervisor actors scheduled
|
|
|
|
resources={
|
|
|
|
self._get_current_node_resource_key(): 0.001,
|
|
|
|
},
|
2022-05-10 11:43:04 -05:00
|
|
|
runtime_env=self._get_supervisor_runtime_env(runtime_env),
|
2021-11-18 10:15:23 -06:00
|
|
|
).remote(job_id, entrypoint, metadata or {})
|
2022-02-07 15:25:25 -06:00
|
|
|
supervisor.run.remote(_start_signal_actor=_start_signal_actor)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2022-02-07 15:25:25 -06:00
|
|
|
# Monitor the job in the background so we can detect errors without
|
|
|
|
# requiring a client to poll.
|
|
|
|
create_task(self._monitor_job(job_id, job_supervisor=supervisor))
|
2021-11-18 10:15:23 -06:00
|
|
|
except Exception as e:
|
2022-02-22 16:18:16 -06:00
|
|
|
self._job_info_client.put_status(
|
2022-02-07 15:25:25 -06:00
|
|
|
job_id,
|
2022-02-18 07:54:37 -08:00
|
|
|
JobStatus.FAILED,
|
|
|
|
message=f"Failed to start job supervisor: {e}.",
|
2022-02-07 15:25:25 -06:00
|
|
|
)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
|
|
|
return job_id
|
|
|
|
|
|
|
|
def stop_job(self, job_id) -> bool:
|
2022-02-07 15:25:25 -06:00
|
|
|
"""Request a job to exit, fire and forget.
|
2021-11-04 11:59:47 -07:00
|
|
|
|
2022-02-07 15:25:25 -06:00
|
|
|
Returns whether or not the job was running.
|
2021-11-04 11:59:47 -07:00
|
|
|
"""
|
2021-10-23 10:48:16 -07:00
|
|
|
job_supervisor_actor = self._get_actor_for_job(job_id)
|
|
|
|
if job_supervisor_actor is not None:
|
2021-11-04 11:59:47 -07:00
|
|
|
# Actor is still alive, signal it to stop the driver, fire and
|
|
|
|
# forget
|
2021-10-23 10:48:16 -07:00
|
|
|
job_supervisor_actor.stop.remote()
|
2021-11-04 11:59:47 -07:00
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
2021-12-14 17:01:53 -08:00
|
|
|
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
|
2022-02-07 15:25:25 -06:00
|
|
|
"""Get latest status of a job."""
|
2022-02-22 16:18:16 -06:00
|
|
|
return self._job_info_client.get_status(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
|
2022-02-22 16:18:16 -06:00
|
|
|
def get_job_info(self, job_id: str) -> Optional[JobInfo]:
|
|
|
|
"""Get latest info of a job."""
|
|
|
|
return self._job_info_client.get_info(job_id)
|
2021-10-22 12:18:11 -07:00
|
|
|
|
2022-03-01 19:27:09 -08:00
|
|
|
def list_jobs(self) -> Dict[str, JobInfo]:
|
|
|
|
"""Get info for all jobs."""
|
|
|
|
return self._job_info_client.get_all_jobs()
|
|
|
|
|
2021-12-14 17:01:53 -08:00
|
|
|
def get_job_logs(self, job_id: str) -> str:
|
2022-02-07 15:25:25 -06:00
|
|
|
"""Get all logs produced by a job."""
|
2021-11-09 22:34:12 -08:00
|
|
|
return self._log_client.get_logs(job_id)
|
2021-12-14 17:01:53 -08:00
|
|
|
|
|
|
|
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
|
2022-02-07 15:25:25 -06:00
|
|
|
"""Return an iterator following the logs of a job."""
|
2021-12-14 17:01:53 -08:00
|
|
|
if self.get_job_status(job_id) is None:
|
|
|
|
raise RuntimeError(f"Job '{job_id}' does not exist.")
|
|
|
|
|
|
|
|
for line in self._log_client.tail_logs(job_id):
|
|
|
|
if line is None:
|
|
|
|
# Return if the job has exited and there are no new log lines.
|
|
|
|
status = self.get_job_status(job_id)
|
2022-02-18 07:54:37 -08:00
|
|
|
if status not in {JobStatus.PENDING, JobStatus.RUNNING}:
|
2021-12-14 17:01:53 -08:00
|
|
|
return
|
|
|
|
|
|
|
|
await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
|
|
|
|
else:
|
|
|
|
yield line
|