2021-11-04 11:59:47 -07:00
import asyncio
from asyncio.tasks import FIRST_COMPLETED
2022-05-10 11:43:04 -05:00
import copy
2021-11-04 11:59:47 -07:00
import os
import json
import logging
2022-02-18 07:54:37 -08:00
import time
2021-11-04 11:59:47 -07:00
import traceback
2021-11-17 21:48:22 -06:00
import random
2021-10-28 16:40:03 -05:00
import subprocess
2021-11-17 21:48:22 -06:00
import string
2021-12-14 17:01:53 -08:00
from collections import deque
from typing import Any, Dict, Iterator, Tuple, Optional
2021-10-22 12:18:11 -07:00
import ray
2021-11-18 10:15:23 -06:00
from ray.exceptions import RuntimeEnvSetupError
2021-10-28 10:12:51 -07:00
import ray.ray_constants as ray_constants
2021-10-22 12:18:11 -07:00
from ray.actor import ActorHandle
2022-02-18 07:54:37 -08:00
from ray.job_submission import JobStatus
2021-11-10 14:14:55 -08:00
from ray.dashboard.modules.job.common import (
2022-02-22 16:18:16 -06:00
2022-01-29 18:41:57 -08:00
2021-12-14 17:01:53 -08:00
from ray.dashboard.modules.job.utils import file_tail_iterator
2021-10-28 10:12:51 -07:00
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
2021-11-04 11:59:47 -07:00
logger = logging.getLogger(__name__)
# asyncio python version compatibility
create_task = asyncio.create_task
except AttributeError:
create_task = asyncio.ensure_future
2021-10-22 12:18:11 -07:00
2021-11-17 21:48:22 -06:00
def generate_job_id() -> str:
"""Returns a job_id of the form 'raysubmit_XYZ'.
Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID).
rand = random.SystemRandom()
possible_characters = list(
2022-01-29 18:41:57 -08:00
set(string.ascii_letters + string.digits)
- {"I", "l", "o", "O", "0"} # No confusing characters
2021-11-17 21:48:22 -06:00
id_part = "".join(rand.choices(possible_characters, k=16))
return f"raysubmit_{id_part}"
2021-10-22 12:18:11 -07:00
class JobLogStorageClient:
Disk storage for stdout / stderr of driver script logs.
2022-01-29 18:41:57 -08:00
2021-11-09 22:34:12 -08:00
JOB_LOGS_PATH = "job-driver-{job_id}.log"
2021-12-14 17:01:53 -08:00
# Number of last N lines to put in job message upon failure.
2021-10-22 12:18:11 -07:00
2021-11-18 10:15:23 -06:00
def get_logs(self, job_id: str) -> str:
2021-10-22 12:18:11 -07:00
2021-11-09 22:34:12 -08:00
with open(self.get_log_file_path(job_id), "r") as f:
return f.read()
2021-10-22 12:18:11 -07:00
except FileNotFoundError:
2021-11-09 22:34:12 -08:00
return ""
2021-10-22 12:18:11 -07:00
2021-12-14 17:01:53 -08:00
def tail_logs(self, job_id: str) -> Iterator[str]:
return file_tail_iterator(self.get_log_file_path(job_id))
2022-01-29 18:41:57 -08:00
def get_last_n_log_lines(
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
) -> str:
2021-12-14 17:01:53 -08:00
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for line in log_tail_iter:
if line is None:
return "".join(log_tail_deque)
2021-11-18 10:15:23 -06:00
2021-11-09 22:34:12 -08:00
def get_log_file_path(self, job_id: str) -> Tuple[str, str]:
2021-10-22 12:18:11 -07:00
2021-11-09 22:34:12 -08:00
Get the file path to the logs of a given job. Example:
2021-10-22 12:18:11 -07:00
2021-11-09 22:34:12 -08:00
return os.path.join(
2022-01-29 18:41:57 -08:00
2021-10-22 12:18:11 -07:00
class JobSupervisor:
Ray actor created by JobManager for each submitted job, responsible to
setup runtime_env, execute given shell command in subprocess, update job
2021-11-04 11:59:47 -07:00
status, persist job logs and manage subprocess group cleaning.
2021-10-22 12:18:11 -07:00
One job supervisor actor maps to one subprocess, for one job_id.
Job supervisor actor should fate share with subprocess it created.
2021-11-04 11:59:47 -07:00
2022-01-29 18:41:57 -08:00
def __init__(self, job_id: str, entrypoint: str, user_metadata: Dict[str, str]):
2021-10-22 12:18:11 -07:00
self._job_id = job_id
2022-02-22 16:18:16 -06:00
self._job_info_client = JobInfoStorageClient()
2021-10-22 12:18:11 -07:00
self._log_client = JobLogStorageClient()
2022-05-10 11:43:04 -05:00
self._driver_runtime_env = self._get_driver_runtime_env()
2021-11-18 10:15:23 -06:00
self._entrypoint = entrypoint
2021-11-03 09:49:28 -05:00
2021-11-17 21:48:22 -06:00
# Default metadata if not passed by the user.
2022-01-29 18:41:57 -08:00
self._metadata = {JOB_ID_METADATA_KEY: job_id, JOB_NAME_METADATA_KEY: job_id}
2021-11-17 21:48:22 -06:00
2021-10-22 12:18:11 -07:00
2021-11-04 11:59:47 -07:00
# fire and forget call from outer job manager to this actor
self._stop_event = asyncio.Event()
2022-05-10 11:43:04 -05:00
def _get_driver_runtime_env(self) -> Dict[str, Any]:
# Get the runtime_env set for the supervisor actor.
curr_runtime_env = dict(ray.get_runtime_context().runtime_env)
# Allow CUDA_VISIBLE_DEVICES to be set normally for the driver's tasks
# & actors.
env_vars = curr_runtime_env.get("env_vars", {})
curr_runtime_env["env_vars"] = env_vars
return curr_runtime_env
2022-02-07 15:25:25 -06:00
def ping(self):
"""Used to check the health of the actor."""
2021-10-22 12:18:11 -07:00
2021-11-18 10:15:23 -06:00
def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen:
2021-11-04 11:59:47 -07:00
2021-11-18 10:15:23 -06:00
Runs the entrypoint command as a child process, streaming stderr &
stdout to given log files.
2021-11-04 11:59:47 -07:00
Meanwhile we start a demon process and group driver
subprocess in same pgid, such that if job actor dies, entire process
group also fate share with it.
2021-11-09 22:34:12 -08:00
logs_path: File path on head node's local disk to store driver
command's stdout & stderr.
2021-11-04 11:59:47 -07:00
child_process: Child process that runs the driver command. Can be
terminated or killed upon user calling stop().
2021-11-09 22:34:12 -08:00
with open(logs_path, "w") as logs_file:
2021-11-04 11:59:47 -07:00
child_process = subprocess.Popen(
2021-11-18 10:15:23 -06:00
2021-11-04 11:59:47 -07:00
2021-11-09 22:34:12 -08:00
2022-01-29 18:41:57 -08:00
2021-11-04 11:59:47 -07:00
parent_pid = os.getpid()
# Create new pgid with new subprocess to execute driver command
child_pid = child_process.pid
child_pgid = os.getpgid(child_pid)
# Open a new subprocess to kill the child process when the parent
# process dies kill -s 0 parent_pid will succeed if the parent is
# alive. If it fails, SIGKILL the child process group and exit
f"while kill -s 0 {parent_pid}; do sleep 1; done; kill -9 -{child_pgid}", # noqa: E501
# Suppress output
return child_process
2022-05-10 11:43:04 -05:00
def _get_driver_env_vars(self) -> Dict[str, str]:
"""Returns environment variables that should be set in the driver."""
ray_addr = ray._private.services.find_bootstrap_address().pop()
return {
# Set JobConfig for the child process (runtime_env, metadata).
"runtime_env": self._driver_runtime_env,
"metadata": self._metadata,
# Always set RAY_ADDRESS as find_bootstrap_address address for
# job submission. In case of local development, prevent user from
# re-using http://{address}:{dashboard_port} to interact with
# jobs SDK.
# TODO:(mwtian) Check why "auto" does not work in entrypoint script
# Set PYTHONUNBUFFERED=1 to stream logs during the job instead of
# only streaming them upon completion of the job.
2021-11-04 11:59:47 -07:00
async def _polling(self, child_process) -> int:
while child_process is not None:
return_code = child_process.poll()
if return_code is not None:
# subprocess finished with return code
return return_code
# still running, yield control, 0.1s by default
await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S)
except Exception:
if child_process:
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
return 1
async def run(
2022-01-29 18:41:57 -08:00
# Signal actor used in testing to capture PENDING -> RUNNING cases
_start_signal_actor: Optional[ActorHandle] = None,
2021-11-04 11:59:47 -07:00
Stop and start both happen asynchrously, coordinated by asyncio event
and coroutine, respectively.
2021-10-22 12:18:11 -07:00
2021-11-04 11:59:47 -07:00
1) Sets job status as running
2) Pass runtime env and metadata to subprocess as serialized env
3) Handle concurrent events of driver execution and
2021-10-22 12:18:11 -07:00
2022-02-22 16:18:16 -06:00
curr_status = self._job_info_client.get_status(self._job_id)
2022-02-18 07:54:37 -08:00
assert curr_status == JobStatus.PENDING, "Run should only be called once."
2021-11-04 11:59:47 -07:00
if _start_signal_actor:
# Block in PENDING state until start signal received.
await _start_signal_actor.wait.remote()
2022-02-22 16:18:16 -06:00
self._job_info_client.put_status(self._job_id, JobStatus.RUNNING)
2021-10-22 12:18:11 -07:00
2022-05-10 11:43:04 -05:00
# Configure environment variables for the child process. These
# will *not* be set in the runtime_env, so they apply to the driver
# only, not its tasks & actors.
2022-02-04 15:51:43 -08:00
"Submitting job with RAY_ADDRESS = "
2021-11-09 22:34:12 -08:00
log_path = self._log_client.get_log_file_path(self._job_id)
2021-11-18 10:15:23 -06:00
child_process = self._exec_entrypoint(log_path)
2021-11-04 11:59:47 -07:00
polling_task = create_task(self._polling(child_process))
finished, _ = await asyncio.wait(
2022-01-29 18:41:57 -08:00
[polling_task, self._stop_event.wait()], return_when=FIRST_COMPLETED
2021-11-04 11:59:47 -07:00
if self._stop_event.is_set():
# TODO (jiaodong): Improve this with SIGTERM then SIGKILL
2022-02-22 16:18:16 -06:00
self._job_info_client.put_status(self._job_id, JobStatus.STOPPED)
2021-10-22 12:18:11 -07:00
2021-11-04 11:59:47 -07:00
# Child process finished execution and no stop event is set
# at the same time
2022-01-29 18:41:57 -08:00
assert len(finished) == 1, "Should have only one coroutine done"
2021-11-04 11:59:47 -07:00
[child_process_task] = finished
return_code = child_process_task.result()
if return_code == 0:
2022-02-22 16:18:16 -06:00
self._job_info_client.put_status(self._job_id, JobStatus.SUCCEEDED)
2021-11-04 11:59:47 -07:00
2022-01-29 18:41:57 -08:00
log_tail = self._log_client.get_last_n_log_lines(self._job_id)
2021-11-18 10:15:23 -06:00
if log_tail is not None and log_tail != "":
2022-01-29 18:41:57 -08:00
message = (
"Job failed due to an application error, "
"last available logs:\n" + log_tail
2021-11-18 10:15:23 -06:00
message = None
2022-02-22 16:18:16 -06:00
2022-02-18 07:54:37 -08:00
self._job_id, JobStatus.FAILED, message=message
2022-01-29 18:41:57 -08:00
2021-11-04 11:59:47 -07:00
except Exception:
"Got unexpected exception while trying to execute driver "
2022-01-29 18:41:57 -08:00
f"command. {traceback.format_exc()}"
2021-11-04 11:59:47 -07:00
# clean up actor after tasks are finished
2021-10-22 12:18:11 -07:00
def stop(self):
2022-01-29 18:41:57 -08:00
"""Set step_event and let run() handle the rest in its asyncio.wait()."""
2021-11-04 11:59:47 -07:00
2021-10-22 12:18:11 -07:00
class JobManager:
2021-11-18 10:15:23 -06:00
"""Provide python APIs for job submission and management.
It does not provide persistence, all info will be lost if the cluster
goes down.
2021-10-22 12:18:11 -07:00
2022-01-29 18:41:57 -08:00
2021-10-22 12:18:11 -07:00
JOB_ACTOR_NAME = "_ray_internal_job_actor_{job_id}"
2021-12-14 17:01:53 -08:00
# Time that we will sleep while tailing logs if no new log line is
# available.
2022-02-07 15:25:25 -06:00
2021-10-22 12:18:11 -07:00
def __init__(self):
2022-02-22 16:18:16 -06:00
self._job_info_client = JobInfoStorageClient()
2021-10-22 12:18:11 -07:00
self._log_client = JobLogStorageClient()
self._supervisor_actor_cls = ray.remote(JobSupervisor)
2021-10-23 10:48:16 -07:00
2022-02-07 15:25:25 -06:00
def _recover_running_jobs(self):
"""Recovers all running jobs from the status client.
For each job, we will spawn a coroutine to monitor it.
Each will be added to self._running_jobs and reconciled.
2022-02-22 16:18:16 -06:00
all_jobs = self._job_info_client.get_all_jobs()
for job_id, job_info in all_jobs.items():
if not job_info.status.is_terminal():
2022-02-07 15:25:25 -06:00
2021-10-22 12:18:11 -07:00
def _get_actor_for_job(self, job_id: str) -> Optional[ActorHandle]:
return ray.get_actor(self.JOB_ACTOR_NAME.format(job_id=job_id))
except ValueError: # Ray returns ValueError for nonexistent actor.
return None
2022-02-07 15:25:25 -06:00
async def _monitor_job(
self, job_id: str, job_supervisor: Optional[ActorHandle] = None
"""Monitors the specified job until it enters a terminal state.
This is necessary because we need to handle the case where the
JobSupervisor dies unexpectedly.
is_alive = True
if job_supervisor is None:
job_supervisor = self._get_actor_for_job(job_id)
if job_supervisor is None:
logger.error(f"Failed to get job supervisor for job {job_id}.")
2022-02-22 16:18:16 -06:00
2022-02-07 15:25:25 -06:00
2022-02-18 07:54:37 -08:00
message="Unexpected error occurred: Failed to get job supervisor.",
2022-02-07 15:25:25 -06:00
is_alive = False
while is_alive:
await job_supervisor.ping.remote()
await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
except Exception as e:
is_alive = False
2022-02-22 16:18:16 -06:00
if self._job_info_client.get_status(job_id).is_terminal():
2022-02-07 15:25:25 -06:00
# If the job is already in a terminal state, then the actor
# exiting is expected.
elif isinstance(e, RuntimeEnvSetupError):
logger.info(f"Failed to set up runtime_env for job {job_id}.")
2022-02-22 16:18:16 -06:00
2022-02-07 15:25:25 -06:00
2022-02-18 07:54:37 -08:00
message=f"runtime_env setup failed: {e}",
2022-02-07 15:25:25 -06:00
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
2022-02-22 16:18:16 -06:00
2022-02-07 15:25:25 -06:00
2022-02-18 07:54:37 -08:00
message=f"Unexpected error occurred: {e}",
2022-02-07 15:25:25 -06:00
# Kill the actor defensively to avoid leaking actors in unexpected error cases.
if job_supervisor is not None:
ray.kill(job_supervisor, no_restart=True)
2021-10-29 16:09:18 -07:00
def _get_current_node_resource_key(self) -> str:
"""Get the Ray resource key for current node.
It can be used for actor placement.
current_node_id = ray.get_runtime_context().node_id.hex()
for node in ray.nodes():
if node["NodeID"] == current_node_id:
# Found the node.
for key in node["Resources"].keys():
if key.startswith("node:"):
return key
2022-01-29 18:41:57 -08:00
raise ValueError("Cannot find the node dictionary for current node.")
2021-11-18 10:15:23 -06:00
2022-01-29 18:41:57 -08:00
def _handle_supervisor_startup(self, job_id: str, result: Optional[Exception]):
2021-11-18 10:15:23 -06:00
"""Handle the result of starting a job supervisor actor.
If started successfully, result should be None. Otherwise it should be
an Exception.
On failure, the job will be marked failed with a relevant error
if result is None:
2021-10-29 16:09:18 -07:00
2022-05-10 11:43:04 -05:00
def _get_supervisor_runtime_env(
self, user_runtime_env: Dict[str, Any]
) -> Dict[str, Any]:
"""Configure and return the runtime_env for the supervisor actor."""
# Make a copy to avoid mutating passed runtime_env.
runtime_env = (
copy.deepcopy(user_runtime_env) if user_runtime_env is not None else {}
# Don't set CUDA_VISIBLE_DEVICES for the supervisor actor so the
# driver can use GPUs if it wants to. This will be removed from
# the driver's runtime_env so it isn't inherited by tasks & actors.
env_vars = runtime_env.get("env_vars", {})
env_vars[ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
runtime_env["env_vars"] = env_vars
return runtime_env
2022-01-29 18:41:57 -08:00
def submit_job(
entrypoint: str,
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
_start_signal_actor: Optional[ActorHandle] = None,
) -> str:
2021-10-22 12:18:11 -07:00
2021-11-04 11:59:47 -07:00
Job execution happens asynchronously.
1) Generate a new unique id for this job submission, each call of this
method assumes they're independent submission with its own new
2021-11-17 21:48:22 -06:00
ID, job supervisor actor, and child process.
2021-11-04 11:59:47 -07:00
2) Create new detached actor with same runtime_env as job spec
Actual setting up runtime_env, subprocess group, driver command
execution, subprocess cleaning up and running status update to GCS
is all handled by job supervisor actor.
entrypoint: Driver command to execute in subprocess shell.
Represents the entrypoint to start user application.
runtime_env: Runtime environment used to execute driver command,
which could contain its own ray.init() to configure runtime
2021-11-18 10:15:23 -06:00
env at ray cluster, task and actor level.
2021-11-04 11:59:47 -07:00
metadata: Support passing arbitrary data to driver command in
case needed.
_start_signal_actor: Used in testing only to capture state
transitions between PENDING -> RUNNING. Regular user shouldn't
need this.
job_id: Generated uuid for further job management. Only valid
within the same ray cluster.
2021-10-22 12:18:11 -07:00
2021-11-08 23:10:27 -08:00
if job_id is None:
2021-11-17 21:48:22 -06:00
job_id = generate_job_id()
2022-02-22 16:18:16 -06:00
elif self._job_info_client.get_status(job_id) is not None:
2021-11-08 23:10:27 -08:00
raise RuntimeError(f"Job {job_id} already exists.")
2021-11-18 10:15:23 -06:00
logger.info(f"Starting job with job_id: {job_id}")
2022-02-22 16:18:16 -06:00
job_info = JobInfo(
2022-03-16 20:02:22 -07:00
2022-02-18 07:54:37 -08:00
[Jobs] Change jobs start_time end_time from seconds to ms for consistency (#24123)
In the snapshot, all timestamps are given in ms except for Jobs:
wget -q -O -
"message":"Job finished successfully.",
"message":"Job finished successfully.",
"entrypoint":"echo hi"
"message":"Job finished successfully.",
"entrypoint":"echo hi"
This PR fixes the inconsistency by changing Jobs start/end timestamps to ms.
2022-04-26 08:37:41 -07:00
start_time=int(time.time() * 1000),
2022-02-18 07:54:37 -08:00
2022-02-22 16:18:16 -06:00
self._job_info_client.put_info(job_id, job_info)
2021-10-22 12:18:11 -07:00
2021-11-18 10:15:23 -06:00
# Wait for the actor to start up asynchronously so this call always
# returns immediately and we can catch errors with the actor starting
2022-02-07 15:25:25 -06:00
# up.
2021-10-22 12:18:11 -07:00
2022-02-07 15:25:25 -06:00
supervisor = self._supervisor_actor_cls.options(
2021-11-04 11:59:47 -07:00
# Currently we assume JobManager is created by dashboard server
# running on headnode, same for job supervisor actors scheduled
self._get_current_node_resource_key(): 0.001,
2022-05-10 11:43:04 -05:00
2022-01-29 18:41:57 -08:00
).remote(job_id, entrypoint, metadata or {})
2022-02-07 15:25:25 -06:00
2021-10-22 12:18:11 -07:00
2022-02-07 15:25:25 -06:00
# Monitor the job in the background so we can detect errors without
# requiring a client to poll.
create_task(self._monitor_job(job_id, job_supervisor=supervisor))
2021-11-18 10:15:23 -06:00
except Exception as e:
2022-02-22 16:18:16 -06:00
2022-02-07 15:25:25 -06:00
2022-02-18 07:54:37 -08:00
message=f"Failed to start job supervisor: {e}.",
2022-02-07 15:25:25 -06:00
2021-10-22 12:18:11 -07:00
return job_id
def stop_job(self, job_id) -> bool:
2022-02-07 15:25:25 -06:00
"""Request a job to exit, fire and forget.
2021-11-04 11:59:47 -07:00
2022-02-07 15:25:25 -06:00
Returns whether or not the job was running.
2021-11-04 11:59:47 -07:00
2021-10-23 10:48:16 -07:00
job_supervisor_actor = self._get_actor_for_job(job_id)
if job_supervisor_actor is not None:
2021-11-04 11:59:47 -07:00
# Actor is still alive, signal it to stop the driver, fire and
# forget
2021-10-23 10:48:16 -07:00
2021-11-04 11:59:47 -07:00
return True
return False
2021-12-14 17:01:53 -08:00
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
2022-02-07 15:25:25 -06:00
"""Get latest status of a job."""
2022-02-22 16:18:16 -06:00
return self._job_info_client.get_status(job_id)
2022-02-18 07:54:37 -08:00
2022-02-22 16:18:16 -06:00
def get_job_info(self, job_id: str) -> Optional[JobInfo]:
"""Get latest info of a job."""
return self._job_info_client.get_info(job_id)
2021-10-22 12:18:11 -07:00
2022-03-01 19:27:09 -08:00
def list_jobs(self) -> Dict[str, JobInfo]:
"""Get info for all jobs."""
return self._job_info_client.get_all_jobs()
2021-12-14 17:01:53 -08:00
def get_job_logs(self, job_id: str) -> str:
2022-02-07 15:25:25 -06:00
"""Get all logs produced by a job."""
2021-11-09 22:34:12 -08:00
return self._log_client.get_logs(job_id)
2021-12-14 17:01:53 -08:00
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
2022-02-07 15:25:25 -06:00
"""Return an iterator following the logs of a job."""
2021-12-14 17:01:53 -08:00
if self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")
for line in self._log_client.tail_logs(job_id):
if line is None:
# Return if the job has exited and there are no new log lines.
status = self.get_job_status(job_id)
2022-02-18 07:54:37 -08:00
if status not in {JobStatus.PENDING, JobStatus.RUNNING}:
2021-12-14 17:01:53 -08:00
await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
yield line