diff --git a/dashboard/client/src/pages/job/hook/useJobList.ts b/dashboard/client/src/pages/job/hook/useJobList.ts index bc3bfa08a..04f97532f 100644 --- a/dashboard/client/src/pages/job/hook/useJobList.ts +++ b/dashboard/client/src/pages/job/hook/useJobList.ts @@ -1,23 +1,24 @@ -import { useCallback, useContext, useEffect, useRef, useState } from "react"; -import { GlobalContext } from "../../../App"; +import { useCallback, useEffect, useRef, useState } from "react"; import { getJobList } from "../../../service/job"; -import { UnifiedJob } from "../../../type/job"; +import { Job } from "../../../type/job"; export const useJobList = () => { - const [jobList, setList] = useState([]); + const [jobList, setList] = useState([]); const [page, setPage] = useState({ pageSize: 10, pageNo: 1 }); const [msg, setMsg] = useState("Loading the job list..."); const [isRefreshing, setRefresh] = useState(true); - const { ipLogMap } = useContext(GlobalContext); const [filter, setFilter] = useState< { - key: "job_id" | "status"; + key: "jobId" | "name" | "language" | "state" | "namespaceId"; val: string; }[] >([]); const refreshRef = useRef(isRefreshing); const tot = useRef(); - const changeFilter = (key: "job_id" | "status", val: string) => { + const changeFilter = ( + key: "jobId" | "name" | "language" | "state" | "namespaceId", + val: string, + ) => { const f = filter.find((e) => e.key === key); if (f) { f.val = val; @@ -36,11 +37,9 @@ export const useJobList = () => { } const rsp = await getJobList(); - if (rsp) { - setList( - rsp.data.sort((a, b) => (b.start_time ?? 0) - (a.start_time ?? 0)), - ); - setMsg("Fetched jobs"); + if (rsp?.data?.data?.summary) { + setList(rsp.data.data.summary.sort((a, b) => b.timestamp - a.timestamp)); + setMsg(rsp.data.msg || ""); } tot.current = setTimeout(getJob, 4000); @@ -56,7 +55,7 @@ export const useJobList = () => { }, [getJob]); return { jobList: jobList.filter((node) => - filter.every((f) => node[f.key] && (node[f.key] ?? "").includes(f.val)), + filter.every((f) => node[f.key] && node[f.key].includes(f.val)), ), msg, isRefreshing, @@ -65,6 +64,5 @@ export const useJobList = () => { page, originalJobs: jobList, setPage: (key: string, val: number) => setPage({ ...page, [key]: val }), - ipLogMap, }; }; diff --git a/dashboard/client/src/pages/job/index.tsx b/dashboard/client/src/pages/job/index.tsx index 5a0b2fd13..81be74b03 100644 --- a/dashboard/client/src/pages/job/index.tsx +++ b/dashboard/client/src/pages/job/index.tsx @@ -13,7 +13,7 @@ import dayjs from "dayjs"; import React from "react"; import { Link } from "react-router-dom"; import Loading from "../../components/Loading"; -import { SearchInput } from "../../components/SearchComponent"; +import { SearchInput, SearchSelect } from "../../components/SearchComponent"; import TitleCard from "../../components/TitleCard"; import { useJobList } from "./hook/useJobList"; @@ -25,13 +25,12 @@ const useStyles = makeStyles((theme) => ({ })); const columns = [ - "Job ID", - "Submission ID", - "Status", - "Logs", + "ID", + "DriverIpAddress", + "DriverPid", + "IsDead", "StartTime", "EndTime", - "Driver Pid", ]; const JobList = () => { @@ -44,7 +43,6 @@ const JobList = () => { changeFilter, page, setPage, - ipLogMap, } = useJobList(); return ( @@ -64,8 +62,13 @@ const JobList = () => { changeFilter("job_id", value)} + label="ID" + onChange={(value) => changeFilter("jobId", value)} + /> + changeFilter("language", value)} + options={["JAVA", "PYTHON"]} /> { page.pageNo * page.pageSize, ) .map( - ( - { - job_id, - submission_id, - driver_info, - type, - status, - start_time, - end_time, - }, - index, - ) => ( - - {job_id ?? "-"} + ({ + jobId = "", + driverIpAddress, + isDead, + driverPid, + startTime, + endTime, + }) => ( + - {submission_id ?? "-"} + {jobId} - {status} + {driverIpAddress} + {driverPid} - {/* TODO(aguo): Also show logs for the job id instead - of just the submission's logs */} - {driver_info && - ipLogMap[driver_info.node_ip_address] ? ( - - Log - - ) : ( - "-" - )} + {isDead ? "true" : "false"} - {dayjs(Number(start_time)).format( - "YYYY/MM/DD HH:mm:ss", - )} + {dayjs(Number(startTime)).format("YYYY/MM/DD HH:mm:ss")} - {end_time && end_time > 0 - ? dayjs(Number(end_time)).format( - "YYYY/MM/DD HH:mm:ss", - ) + {endTime > 0 + ? dayjs(Number(endTime)).format("YYYY/MM/DD HH:mm:ss") : "-"} - - {driver_info?.pid ?? "-"} - ), )} diff --git a/dashboard/client/src/service/job.ts b/dashboard/client/src/service/job.ts index ee0f699b5..f81d73414 100644 --- a/dashboard/client/src/service/job.ts +++ b/dashboard/client/src/service/job.ts @@ -2,7 +2,7 @@ import { JobDetailRsp, JobListRsp } from "../type/job"; import { get } from "./requestHandlers"; export const getJobList = () => { - return get("api/jobs/"); + return get("jobs?view=summary"); }; export const getJobDetail = (id: string) => { diff --git a/dashboard/client/src/type/job.d.ts b/dashboard/client/src/type/job.d.ts index b81a9510b..ef9181dd2 100644 --- a/dashboard/client/src/type/job.d.ts +++ b/dashboard/client/src/type/job.d.ts @@ -63,26 +63,10 @@ export type JobDetailRsp = { result: boolean; }; -export type JobListRsp = UnifiedJob[]; - -export type UnifiedJob = { - job_id: string | null; - submission_id: string | null; - type: string; - status: string; - entrypoint: string; - message: string | null; - error_type: string | null; - start_time: number | null; - end_time: number | null; - metadata: { [key: string]: string } | null; - runtime_env: { [key: string]: string } | null; - driver_info: DriverInfo | null; -}; - -export type DriverInfo = { - id: string; - node_ip_address: string; - node_id: string; - pid: string; +export type JobListRsp = { + data: { + summary: Job[]; + }; + msg: string; + result: boolean; }; diff --git a/dashboard/modules/job/cli.py b/dashboard/modules/job/cli.py index 7cf56a0f0..9f2253b2a 100644 --- a/dashboard/modules/job/cli.py +++ b/dashboard/modules/job/cli.py @@ -87,17 +87,7 @@ def job_cli_group(): type=str, default=None, required=False, - help=("DEPRECATED: Use -- submission-id instead."), -) -@click.option( - "--submission-id", - type=str, - default=None, - required=False, - help=( - "Submission ID to specify for the job. " - "If not provided, one will be generated." - ), + help=("Job ID to specify for the job. " "If not provided, one will be generated."), ) @click.option( "--runtime-env", @@ -137,7 +127,6 @@ def job_cli_group(): def submit( address: Optional[str], job_id: Optional[str], - submission_id: Optional[str], runtime_env: Optional[str], runtime_env_json: Optional[str], working_dir: Optional[str], @@ -150,19 +139,11 @@ def submit( ray job submit -- python my_script.py --arg=val """ - if job_id: - cli_logger.warning( - "--job-id option is deprecated. " "Please use --submission-id instead." - ) - - submission_id = submission_id or job_id - if ray_constants.RAY_JOB_SUBMIT_HOOK in os.environ: # Submit all args as **kwargs per the JOB_SUBMIT_HOOK contract. _load_class(os.environ[ray_constants.RAY_JOB_SUBMIT_HOOK])( address=address, - job_id=submission_id, - submission_id=submission_id, + job_id=job_id, runtime_env=runtime_env, runtime_env_json=runtime_env_json, working_dir=working_dir, @@ -180,7 +161,7 @@ def submit( job_id = client.submit_job( entrypoint=list2cmdline(entrypoint), - submission_id=submission_id, + job_id=job_id, runtime_env=final_runtime_env, ) diff --git a/dashboard/modules/job/common.py b/dashboard/modules/job/common.py index a60ca3c85..eab985912 100644 --- a/dashboard/modules/job/common.py +++ b/dashboard/modules/job/common.py @@ -49,7 +49,6 @@ class JobStatus(str, Enum): return self.value in {"STOPPED", "SUCCEEDED", "FAILED"} -# TODO(aguo): Convert to pydantic model @dataclass class JobInfo: """A class for recording information associated with a job and its execution.""" @@ -181,10 +180,10 @@ def validate_request_type(json_data: Dict[str, Any], request_type: dataclass) -> class JobSubmitRequest: # Command to start execution, ex: "python script.py" entrypoint: str - # Optional submission_id to specify for the job. If the submission_id - # is not specified, one will be generated. If a job with the same - # submission_id already exists, it will be rejected. - submission_id: Optional[str] = None + # Optional job_id to specify for the job. If the job_id is not specified, + # one will be generated. If a job with the same job_id already exists, it + # will be rejected. + job_id: Optional[str] = None # Dict to setup execution environment. runtime_env: Optional[Dict[str, Any]] = None # Metadata to pass in to the JobConfig. @@ -194,10 +193,9 @@ class JobSubmitRequest: if not isinstance(self.entrypoint, str): raise TypeError(f"entrypoint must be a string, got {type(self.entrypoint)}") - if self.submission_id is not None and not isinstance(self.submission_id, str): + if self.job_id is not None and not isinstance(self.job_id, str): raise TypeError( - "submission_id must be a string if provided, " - f"got {type(self.submission_id)}" + f"job_id must be a string if provided, got {type(self.job_id)}" ) if self.runtime_env is not None: @@ -228,9 +226,7 @@ class JobSubmitRequest: @dataclass class JobSubmitResponse: - # DEPRECATED: Use submission_id instead. job_id: str - submission_id: str @dataclass diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index 2682b3fc2..c46f4dd79 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -1,46 +1,34 @@ -import asyncio -import concurrent -import dataclasses -import json -import logging -import traceback -from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple - import aiohttp.web from aiohttp.web import Request, Response +import dataclasses +import logging +from typing import Any +import json +import traceback +from dataclasses import dataclass import ray -from ray._private import ray_constants -import ray.dashboard.optional_utils as optional_utils import ray.dashboard.utils as dashboard_utils +import ray.dashboard.optional_utils as optional_utils from ray._private.runtime_env.packaging import ( package_exists, - pin_runtime_env_uri, upload_package_to_gcs, + pin_runtime_env_uri, ) -from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc from ray.dashboard.modules.job.common import ( http_uri_components_to_uri, - JobStatus, + JobInfo, JobSubmitRequest, JobSubmitResponse, JobStopResponse, JobLogsResponse, validate_request_type, - JOB_ID_METADATA_KEY, -) -from ray.dashboard.modules.job.pydantic_models import ( - DriverInfo, - JobDetails, - JobType, ) from ray.dashboard.modules.version import ( CURRENT_VERSION, VersionResponse, ) from ray.dashboard.modules.job.job_manager import JobManager -from ray.runtime_env import RuntimeEnv logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -51,10 +39,7 @@ routes = optional_utils.ClassMethodRouteTable class JobHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) - self._dashboard_head = dashboard_head self._job_manager = None - self._gcs_job_info_stub = None - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) async def _parse_and_validate_request( self, req: Request, request_type: dataclass @@ -71,45 +56,9 @@ class JobHead(dashboard_utils.DashboardHeadModule): status=aiohttp.web.HTTPBadRequest.status_code, ) - async def find_job_by_ids(self, job_or_submission_id: str) -> Optional[JobDetails]: - """ - Attempts to find the job with a given submission_id or job id. - """ - # First try to find by job_id - driver_jobs, submission_job_drivers = await self._get_driver_jobs() - job = driver_jobs.get(job_or_submission_id) - if job: - return job - # Try to find a driver with the given id - submission_id = next( - ( - id - for id, driver in submission_job_drivers.items() - if driver.id == job_or_submission_id - ), - None, - ) - - if not submission_id: - # If we didn't find a driver with the given id, - # then lets try to search for a submission with given id - submission_id = job_or_submission_id - - job_info = await asyncio.get_event_loop().run_in_executor( - self._executor, lambda: self._job_manager.get_job_info(submission_id) - ) - if job_info: - driver = submission_job_drivers.get(submission_id) - job = JobDetails( - **dataclasses.asdict(job_info), - submission_id=submission_id, - job_id=driver.id if driver else None, - driver_info=driver, - type=JobType.SUBMISSION, - ) - return job - - return None + def job_exists(self, job_id: str) -> bool: + status = self._job_manager.get_job_status(job_id) + return status is not None @routes.get("/api/version") async def get_version(self, req: Request) -> Response: @@ -180,14 +129,14 @@ class JobHead(dashboard_utils.DashboardHeadModule): submit_request = result try: - submission_id = self._job_manager.submit_job( + job_id = self._job_manager.submit_job( entrypoint=submit_request.entrypoint, - submission_id=submit_request.submission_id, + job_id=submit_request.job_id, runtime_env=submit_request.runtime_env, metadata=submit_request.metadata, ) - resp = JobSubmitResponse(job_id=submission_id, submission_id=submission_id) + resp = JobSubmitResponse(job_id=job_id) except (TypeError, ValueError): return Response( text=traceback.format_exc(), @@ -205,24 +154,18 @@ class JobHead(dashboard_utils.DashboardHeadModule): status=aiohttp.web.HTTPOk.status_code, ) - @routes.post("/api/jobs/{job_or_submission_id}/stop") + @routes.post("/api/jobs/{job_id}/stop") @optional_utils.init_ray_and_catch_exceptions() async def stop_job(self, req: Request) -> Response: - job_or_submission_id = req.match_info["job_or_submission_id"] - job = await self.find_job_by_ids(job_or_submission_id) - if not job: + job_id = req.match_info["job_id"] + if not self.job_exists(job_id): return Response( - text=f"Job {job_or_submission_id} does not exist", + text=f"Job {job_id} does not exist", status=aiohttp.web.HTTPNotFound.status_code, ) - if job.type is not JobType.SUBMISSION: - return Response( - text="Can only stop submission type jobs", - status=aiohttp.web.HTTPBadRequest.status_code, - ) try: - stopped = self._job_manager.stop_job(job.submission_id) + stopped = self._job_manager.stop_job(job_id) resp = JobStopResponse(stopped=stopped) except Exception: return Response( @@ -234,162 +177,70 @@ class JobHead(dashboard_utils.DashboardHeadModule): text=json.dumps(dataclasses.asdict(resp)), content_type="application/json" ) - @routes.get("/api/jobs/{job_or_submission_id}") + @routes.get("/api/jobs/{job_id}") @optional_utils.init_ray_and_catch_exceptions() async def get_job_info(self, req: Request) -> Response: - job_or_submission_id = req.match_info["job_or_submission_id"] - job = await self.find_job_by_ids(job_or_submission_id) - if not job: + job_id = req.match_info["job_id"] + if not self.job_exists(job_id): return Response( - text=f"Job {job_or_submission_id} does not exist", + text=f"Job {job_id} does not exist", status=aiohttp.web.HTTPNotFound.status_code, ) + data: JobInfo = self._job_manager.get_job_info(job_id) return Response( - text=json.dumps(job.dict()), - content_type="application/json", + text=json.dumps(dataclasses.asdict(data)), content_type="application/json" ) @routes.get("/api/jobs/") @optional_utils.init_ray_and_catch_exceptions() async def list_jobs(self, req: Request) -> Response: - driver_jobs, submission_job_drivers = await self._get_driver_jobs() - - # TODO(aguo): convert _job_manager.list_jobs to an async function. - submission_jobs = await asyncio.get_event_loop().run_in_executor( - self._executor, self._job_manager.list_jobs - ) - submission_jobs = [ - JobDetails( - **dataclasses.asdict(job), - submission_id=submission_id, - job_id=submission_job_drivers.get(submission_id).id - if submission_id in submission_job_drivers - else None, - driver_info=submission_job_drivers.get(submission_id), - type=JobType.SUBMISSION, - ) - for submission_id, job in submission_jobs.items() - ] + data: dict[str, JobInfo] = self._job_manager.list_jobs() return Response( text=json.dumps( - [ - *[submission_job.dict() for submission_job in submission_jobs], - *[job_info.dict() for job_info in driver_jobs.values()], - ] + { + job_id: dataclasses.asdict(job_info) + for job_id, job_info in data.items() + } ), content_type="application/json", ) - async def _get_driver_jobs( - self, - ) -> Tuple[Dict[str, JobDetails], Dict[str, DriverInfo]]: - """Returns a tuple of dictionaries related to drivers. - - The first dictionary contains all driver jobs and is keyed by the job's id. - The second dictionary contains drivers that belong to submission jobs. - It's keyed by the submission job's submission id. - Only the last driver of a submission job is returned. - """ - request = gcs_service_pb2.GetAllJobInfoRequest() - reply = await self._gcs_job_info_stub.GetAllJobInfo(request, timeout=5) - - jobs = {} - submission_job_drivers = {} - for job_table_entry in reply.job_info_list: - if job_table_entry.config.ray_namespace.startswith( - ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX - ): - # Skip jobs in any _ray_internal_ namespace - continue - job_id = job_table_entry.job_id.hex() - metadata = dict(job_table_entry.config.metadata) - job_submission_id = metadata.get(JOB_ID_METADATA_KEY) - if not job_submission_id: - driver = DriverInfo( - id=job_id, - node_ip_address=job_table_entry.driver_ip_address, - pid=job_table_entry.driver_pid, - ) - job = JobDetails( - job_id=job_id, - type=JobType.DRIVER, - status=JobStatus.SUCCEEDED - if job_table_entry.is_dead - else JobStatus.RUNNING, - entrypoint="", - start_time=job_table_entry.start_time, - end_time=job_table_entry.end_time, - metadata=metadata, - runtime_env=RuntimeEnv.deserialize( - job_table_entry.config.runtime_env_info.serialized_runtime_env - ).to_dict(), - driver_info=driver, - ) - jobs[job_id] = job - else: - driver = DriverInfo( - id=job_id, - node_ip_address=job_table_entry.driver_ip_address, - pid=job_table_entry.driver_pid, - ) - submission_job_drivers[job_submission_id] = driver - - return jobs, submission_job_drivers - - @routes.get("/api/jobs/{job_or_submission_id}/logs") + @routes.get("/api/jobs/{job_id}/logs") @optional_utils.init_ray_and_catch_exceptions() async def get_job_logs(self, req: Request) -> Response: - job_or_submission_id = req.match_info["job_or_submission_id"] - job = await self.find_job_by_ids(job_or_submission_id) - if not job: + job_id = req.match_info["job_id"] + if not self.job_exists(job_id): return Response( - text=f"Job {job_or_submission_id} does not exist", + text=f"Job {job_id} does not exist", status=aiohttp.web.HTTPNotFound.status_code, ) - if job.type is not JobType.SUBMISSION: - return Response( - text="Can only get logs of submission type jobs", - status=aiohttp.web.HTTPBadRequest.status_code, - ) - - resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job.submission_id)) + resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job_id)) return Response( text=json.dumps(dataclasses.asdict(resp)), content_type="application/json" ) - @routes.get("/api/jobs/{job_or_submission_id}/logs/tail") + @routes.get("/api/jobs/{job_id}/logs/tail") @optional_utils.init_ray_and_catch_exceptions() async def tail_job_logs(self, req: Request) -> Response: - job_or_submission_id = req.match_info["job_or_submission_id"] - job = await self.find_job_by_ids(job_or_submission_id) - if not job: + job_id = req.match_info["job_id"] + if not self.job_exists(job_id): return Response( - text=f"Job {job_or_submission_id} does not exist", + text=f"Job {job_id} does not exist", status=aiohttp.web.HTTPNotFound.status_code, ) - if job.type is not JobType.SUBMISSION: - return Response( - text="Can only get logs of submission type jobs", - status=aiohttp.web.HTTPBadRequest.status_code, - ) - ws = aiohttp.web.WebSocketResponse() await ws.prepare(req) - async for lines in self._job_manager.tail_job_logs(job.submission_id): + async for lines in self._job_manager.tail_job_logs(job_id): await ws.send_str(lines) async def run(self, server): if not self._job_manager: self._job_manager = JobManager() - self._gcs_job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub( - self._dashboard_head.aiogrpc_gcs_channel - ) - @staticmethod def is_minimal_module(): return False diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 561cbe940..72856f03c 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -438,7 +438,7 @@ class JobManager: self, *, entrypoint: str, - submission_id: Optional[str] = None, + job_id: Optional[str] = None, runtime_env: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None, _start_signal_actor: Optional[ActorHandle] = None, @@ -471,12 +471,12 @@ class JobManager: job_id: Generated uuid for further job management. Only valid within the same ray cluster. """ - if submission_id is None: - submission_id = generate_job_id() - elif self._job_info_client.get_status(submission_id) is not None: - raise RuntimeError(f"Job {submission_id} already exists.") + if job_id is None: + job_id = generate_job_id() + elif self._job_info_client.get_status(job_id) is not None: + raise RuntimeError(f"Job {job_id} already exists.") - logger.info(f"Starting job with submission_id: {submission_id}") + logger.info(f"Starting job with job_id: {job_id}") job_info = JobInfo( entrypoint=entrypoint, status=JobStatus.PENDING, @@ -484,7 +484,7 @@ class JobManager: metadata=metadata, runtime_env=runtime_env, ) - self._job_info_client.put_info(submission_id, job_info) + self._job_info_client.put_info(job_id, job_info) # Wait for the actor to start up asynchronously so this call always # returns immediately and we can catch errors with the actor starting @@ -492,7 +492,7 @@ class JobManager: try: supervisor = self._supervisor_actor_cls.options( lifetime="detached", - name=self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id), + name=self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id), num_cpus=0, # Currently we assume JobManager is created by dashboard server # running on headnode, same for job supervisor actors scheduled @@ -500,20 +500,20 @@ class JobManager: self._get_current_node_resource_key(): 0.001, }, runtime_env=self._get_supervisor_runtime_env(runtime_env), - ).remote(submission_id, entrypoint, metadata or {}) + ).remote(job_id, entrypoint, metadata or {}) supervisor.run.remote(_start_signal_actor=_start_signal_actor) # Monitor the job in the background so we can detect errors without # requiring a client to poll. - create_task(self._monitor_job(submission_id, job_supervisor=supervisor)) + create_task(self._monitor_job(job_id, job_supervisor=supervisor)) except Exception as e: self._job_info_client.put_status( - submission_id, + job_id, JobStatus.FAILED, message=f"Failed to start job supervisor: {e}.", ) - return submission_id + return job_id def stop_job(self, job_id) -> bool: """Request a job to exit, fire and forget. diff --git a/dashboard/modules/job/pydantic_models.py b/dashboard/modules/job/pydantic_models.py deleted file mode 100644 index 20e93a136..000000000 --- a/dashboard/modules/job/pydantic_models.py +++ /dev/null @@ -1,81 +0,0 @@ -from enum import Enum -from typing import Any, Dict, Optional - -from pydantic import BaseModel, Field -from ray.dashboard.modules.job.common import JobStatus - - -class DriverInfo(BaseModel): - """A class for recording information about the driver related to the job.""" - - id: str = Field(..., description="The id of the driver") - node_ip_address: str = Field( - ..., description="The ip address of the node the driver is running on" - ) - pid: str = Field( - ..., description="The pid of the worker process the driver is using." - ) - # TODO(aguo): Add node_id as a field. - - -class JobType(str, Enum): - """An enumeration for describing the different job types.""" - - #: A job that was initiated by the job submission apis - SUBMISSION = "SUBMISSION" - #: A job that was initiated by a driver script. - DRIVER = "DRIVER" - - -class JobDetails(BaseModel): - """ - Job data with extra details about its driver and its submission. - """ - - type: JobType = Field(..., description="The type of job.") - entrypoint: Optional[str] = Field( - None, description="The entrypoint command for this job." - ) - job_id: Optional[str] = Field( - None, - description="The job id. An id that is created for every job that is " - "launched in ray. This can be used to fetch data about jobs using ray " - "core apis.", - ) - submission_id: Optional[str] = Field( - None, - description="A submission id is an id created for every submission job. It can " - "be used to fetch data about jobs using the job submission apis.", - ) - driver_info: Optional[DriverInfo] = Field( - None, - description="The driver related to this job. For submission jobs, " - "it is the last driver launched by that job submission, " - "or None if there is no driver.", - ) - - # The following fields are copied from JobInfo. - # TODO(aguo): Inherit from JobInfo once it's migrated to pydantic. - status: JobStatus = Field(..., description="The status of the job.") - entrypoint: str = Field(..., description="The entrypoint command for this job.") - message: Optional[str] = Field( - None, description="A message describing the status in more detail." - ) - error_type: Optional[str] = Field( - None, description="Internal error, user script error" - ) - start_time: Optional[int] = Field( - None, - description="The time when the job was started. " "A Unix timestamp in ms.", - ) - end_time: Optional[int] = Field( - None, - description="The time when the job moved into a terminal state. " - "A Unix timestamp in ms.", - ) - metadata: Optional[Dict[str, str]] = Field( - None, description="Arbitrary user-provided metadata for the job." - ) - runtime_env: Optional[Dict[str, Any]] = Field( - None, description="The runtime environment for the job." - ) diff --git a/dashboard/modules/job/sdk.py b/dashboard/modules/job/sdk.py index 144714af7..1c011b028 100644 --- a/dashboard/modules/job/sdk.py +++ b/dashboard/modules/job/sdk.py @@ -1,26 +1,22 @@ import dataclasses import logging -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, Iterator, Optional try: import aiohttp import requests - from ray.dashboard.modules.job.pydantic_models import ( - JobDetails, - ) except ImportError: aiohttp = None requests = None - JobDetails = None from ray.dashboard.modules.job.common import ( JobStatus, JobSubmitRequest, JobSubmitResponse, JobStopResponse, + JobInfo, JobLogsResponse, ) - from ray.dashboard.modules.dashboard_sdk import SubmissionClient from ray.runtime_env import RuntimeEnv @@ -87,7 +83,6 @@ class JobSubmissionClient(SubmissionClient): job_id: Optional[str] = None, runtime_env: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None, - submission_id: Optional[str] = None, ) -> str: """Submit and execute a job asynchronously. @@ -110,24 +105,18 @@ class JobSubmissionClient(SubmissionClient): Args: entrypoint: The shell command to run for this job. - submission_id: A unique ID for this job. + job_id: A unique ID for this job. runtime_env: The runtime environment to install and run this job in. metadata: Arbitrary data to store along with this job. - job_id: DEPRECATED. This has been renamed to submission_id Returns: - The submission ID of the submitted job. If not specified, - this is a randomly generated unique ID. + The job ID of the submitted job. If not specified, this is a randomly + generated unique ID. Raises: RuntimeError: If the request to the job server fails, or if the specified - submission_id has already been used by a job on this cluster. + job_id has already been used by a job on this cluster. """ - if job_id: - logger.warning( - "job_id kwarg is deprecated. Please use submission_id instead." - ) - runtime_env = runtime_env or {} metadata = metadata or {} metadata.update(self._default_metadata) @@ -138,20 +127,18 @@ class JobSubmissionClient(SubmissionClient): # Run the RuntimeEnv constructor to parse local pip/conda requirements files. runtime_env = RuntimeEnv(**runtime_env).to_dict() - submission_id = submission_id or job_id - req = JobSubmitRequest( entrypoint=entrypoint, - submission_id=submission_id, + job_id=job_id, runtime_env=runtime_env, metadata=metadata, ) - logger.debug(f"Submitting job with submission_id={submission_id}.") + logger.debug(f"Submitting job with job_id={job_id}.") r = self._do_request("POST", "/api/jobs/", json_data=dataclasses.asdict(req)) if r.status_code == 200: - return JobSubmitResponse(**r.json()).submission_id + return JobSubmitResponse(**r.json()).job_id else: self._raise_error(r) @@ -165,12 +152,12 @@ class JobSubmissionClient(SubmissionClient): Example: >>> from ray.job_submission import JobSubmissionClient >>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP - >>> sub_id = client.submit_job(entrypoint="sleep 10") # doctest: +SKIP - >>> client.stop_job(sub_id) # doctest: +SKIP + >>> job_id = client.submit_job(entrypoint="sleep 10") # doctest: +SKIP + >>> client.stop_job(job_id) # doctest: +SKIP True Args: - job_id: The job ID or submission ID for the job to be stopped. + job_id: The job ID for the job to be stopped. Returns: True if the job was running, otherwise False. @@ -191,21 +178,20 @@ class JobSubmissionClient(SubmissionClient): def get_job_info( self, job_id: str, - ) -> JobDetails: + ) -> JobInfo: """Get the latest status and other information associated with a job. Example: >>> from ray.job_submission import JobSubmissionClient >>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP - >>> submission_id = client.submit_job(entrypoint="sleep 1") # doctest: +SKIP - >>> job_submission_client.get_job_info(submission_id) # doctest: +SKIP + >>> job_id = client.submit_job(entrypoint="sleep 1") # doctest: +SKIP + >>> job_submission_client.get_job_info(job_id) # doctest: +SKIP JobInfo(status='SUCCEEDED', message='Job finished successfully.', error_type=None, start_time=1647388711, end_time=1647388712, metadata={}, runtime_env={}) Args: - job_id: The job ID or submission ID of the job whose information - is being requested. + job_id: The ID of the job whose information is being requested. Returns: The JobInfo for the job. @@ -217,12 +203,12 @@ class JobSubmissionClient(SubmissionClient): r = self._do_request("GET", f"/api/jobs/{job_id}") if r.status_code == 200: - return JobDetails(**r.json()) + return JobInfo(**r.json()) else: self._raise_error(r) @PublicAPI(stability="beta") - def list_jobs(self) -> List[JobDetails]: + def list_jobs(self) -> Dict[str, JobInfo]: """List all jobs along with their status and other information. Lists all jobs that have ever run on the cluster, including jobs that are @@ -234,16 +220,12 @@ class JobSubmissionClient(SubmissionClient): >>> client.submit_job(entrypoint="echo hello") # doctest: +SKIP >>> client.submit_job(entrypoint="sleep 2") # doctest: +SKIP >>> client.list_jobs() # doctest: +SKIP - [JobDetails(status='SUCCEEDED', - job_id='03000000', type='submission', - submission_id='raysubmit_4LamXRuQpYdSMg7J', + {'raysubmit_4LamXRuQpYdSMg7J': JobInfo(status='SUCCEEDED', message='Job finished successfully.', error_type=None, start_time=1647388711, end_time=1647388712, metadata={}, runtime_env={}), - JobDetails(status='RUNNING', - job_id='04000000', type='submission', - submission_id='raysubmit_1dxCeNvG1fCMVNHG', + 'raysubmit_1dxCeNvG1fCMVNHG': JobInfo(status='RUNNING', message='Job is currently running.', error_type=None, - start_time=1647454832, end_time=None, metadata={}, runtime_env={})] + start_time=1647454832, end_time=None, metadata={}, runtime_env={})} Returns: A dictionary mapping job_ids to their information. @@ -255,9 +237,10 @@ class JobSubmissionClient(SubmissionClient): if r.status_code == 200: jobs_info_json = r.json() - jobs_info = [ - JobDetails(**job_info_json) for job_info_json in jobs_info_json - ] + jobs_info = { + job_id: JobInfo(**job_info_json) + for job_id, job_info_json in jobs_info_json.items() + } return jobs_info else: self._raise_error(r) @@ -274,8 +257,7 @@ class JobSubmissionClient(SubmissionClient): 'SUCCEEDED' Args: - job_id: The job ID or submission ID of the job whose status is being - requested. + job_id: The ID of the job whose status is being requested. Returns: The JobStatus of the job. @@ -293,13 +275,12 @@ class JobSubmissionClient(SubmissionClient): Example: >>> from ray.job_submission import JobSubmissionClient >>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP - >>> sub_id = client.submit_job(entrypoint="echo hello") # doctest: +SKIP - >>> client.get_job_logs(sub_id) # doctest: +SKIP + >>> job_id = client.submit_job(entrypoint="echo hello") # doctest: +SKIP + >>> client.get_job_logs(job_id) # doctest: +SKIP 'hello\\n' Args: - job_id: The job ID or submission ID of the job whose logs are being - requested. + job_id: The ID of the job whose logs are being requested. Returns: A string containing the full logs of the job. @@ -322,7 +303,7 @@ class JobSubmissionClient(SubmissionClient): Example: >>> from ray.job_submission import JobSubmissionClient >>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP - >>> submission_id = client.submit_job( # doctest: +SKIP + >>> job_id = client.submit_job( # doctest: +SKIP ... entrypoint="echo hi && sleep 5 && echo hi2") >>> async for lines in client.tail_job_logs( # doctest: +SKIP ... 'raysubmit_Xe7cvjyGJCyuCvm2'): @@ -331,8 +312,7 @@ class JobSubmissionClient(SubmissionClient): hi2 Args: - job_id: The job ID or submission ID of the job whose logs are being - requested. + job_id: The ID of the job whose logs are being requested. Returns: The iterator. diff --git a/dashboard/modules/job/tests/test_backwards_compatibility.py b/dashboard/modules/job/tests/test_backwards_compatibility.py index 702fe6f94..e9a27060b 100644 --- a/dashboard/modules/job/tests/test_backwards_compatibility.py +++ b/dashboard/modules/job/tests/test_backwards_compatibility.py @@ -32,8 +32,6 @@ def _compatibility_script_path(file_name: str) -> str: class TestBackwardsCompatibility: - # TODO(aguo): Unskip this test once 2.0.0 is released. - @pytest.mark.skip("#25902 breaks backwards compatibility of the REST api.") def test_cli(self): """ 1) Create a new conda environment with ray version X installed diff --git a/dashboard/modules/job/tests/test_cli.py b/dashboard/modules/job/tests/test_cli.py index 1f96e46ba..dd599f1bc 100644 --- a/dashboard/modules/job/tests/test_cli.py +++ b/dashboard/modules/job/tests/test_cli.py @@ -220,14 +220,13 @@ class TestSubmit: with set_env_var("RAY_ADDRESS", "env_addr"): result = runner.invoke(job_cli_group, ["submit", "--", "echo hello"]) assert result.exit_code == 0 - assert mock_client_instance.called_with(submission_id=None) + assert mock_client_instance.called_with(job_id=None) result = runner.invoke( - job_cli_group, - ["submit", "--", "--submission-id=my_job_id", "echo hello"], + job_cli_group, ["submit", "--", "--job-id=my_job_id", "echo hello"] ) assert result.exit_code == 0 - assert mock_client_instance.called_with(submission_id="my_job_id") + assert mock_client_instance.called_with(job_id="my_job_id") if __name__ == "__main__": diff --git a/dashboard/modules/job/tests/test_cli_integration.py b/dashboard/modules/job/tests/test_cli_integration.py index 18307be83..f70d86a4b 100644 --- a/dashboard/modules/job/tests/test_cli_integration.py +++ b/dashboard/modules/job/tests/test_cli_integration.py @@ -19,13 +19,12 @@ def set_env_var(key: str, val: Optional[str] = None): elif key in os.environ: del os.environ[key] - try: - yield - finally: - if key in os.environ: - del os.environ[key] - if old_val is not None: - os.environ[key] = old_val + yield + + if key in os.environ: + del os.environ[key] + if old_val is not None: + os.environ[key] = old_val @pytest.fixture @@ -157,7 +156,7 @@ class TestJobStop: class TestJobList: def test_empty(self, ray_start_stop): stdout, _ = _run_cmd("ray job list") - assert "[]" in stdout + assert "{}" in stdout def test_list(self, ray_start_stop): _run_cmd("ray job submit --job-id='hello_id' -- echo hello") @@ -168,6 +167,7 @@ class TestJobList: f"--runtime-env-json='{json.dumps(runtime_env)}' -- echo hi" ) stdout, _ = _run_cmd("ray job list") + assert "JobInfo" in stdout assert "123" in stdout assert "hello_id" in stdout assert "hi_id" in stdout diff --git a/dashboard/modules/job/tests/test_common.py b/dashboard/modules/job/tests/test_common.py index cb28bbe1c..2984c9f0f 100644 --- a/dashboard/modules/job/tests/test_common.py +++ b/dashboard/modules/job/tests/test_common.py @@ -19,21 +19,19 @@ class TestJobSubmitRequestValidation: with pytest.raises(TypeError, match="must be a string"): validate_request_type({"entrypoint": 123}, JobSubmitRequest) - def test_validate_submission_id(self): + def test_validate_job_id(self): r = validate_request_type({"entrypoint": "abc"}, JobSubmitRequest) assert r.entrypoint == "abc" - assert r.submission_id is None + assert r.job_id is None r = validate_request_type( - {"entrypoint": "abc", "submission_id": "123"}, JobSubmitRequest + {"entrypoint": "abc", "job_id": "123"}, JobSubmitRequest ) assert r.entrypoint == "abc" - assert r.submission_id == "123" + assert r.job_id == "123" with pytest.raises(TypeError, match="must be a string"): - validate_request_type( - {"entrypoint": 123, "submission_id": 1}, JobSubmitRequest - ) + validate_request_type({"entrypoint": 123, "job_id": 1}, JobSubmitRequest) def test_validate_runtime_env(self): r = validate_request_type({"entrypoint": "abc"}, JobSubmitRequest) diff --git a/dashboard/modules/job/tests/test_http_job_server.py b/dashboard/modules/job/tests/test_http_job_server.py index 041dc6c91..e54a30815 100644 --- a/dashboard/modules/job/tests/test_http_job_server.py +++ b/dashboard/modules/job/tests/test_http_job_server.py @@ -5,7 +5,6 @@ import shutil import sys import tempfile from pathlib import Path -import subprocess from typing import Optional from unittest.mock import patch @@ -21,15 +20,11 @@ from ray._private.test_utils import ( wait_until_server_available, ) from ray.dashboard.modules.dashboard_sdk import ClusterInfo, parse_cluster_info -from ray.dashboard.modules.job.pydantic_models import JobDetails +from ray.dashboard.modules.job.common import JobInfo from ray.dashboard.modules.version import CURRENT_VERSION from ray.dashboard.tests.conftest import * # noqa from ray.job_submission import JobStatus, JobSubmissionClient from ray.tests.conftest import _ray_start -from ray.dashboard.modules.job.tests.test_cli_integration import set_env_var - -# This test requires you have AWS credentials set up (any AWS credentials will -# do, this test only accesses a public bucket). logger = logging.getLogger(__name__) @@ -49,28 +44,22 @@ def job_sdk_client(headers): yield JobSubmissionClient(format_web_url(address), headers=headers) +# NOTE(architkulkarni): This test must be run first in order for the job +# submission history of the shared Ray runtime to be empty. @pytest.mark.parametrize("use_sdk", [True, False]) -def test_list_jobs_empty(headers, use_sdk: bool): - # Create a cluster using `ray start` instead of `ray.init` to avoid creating a job - subprocess.check_output(["ray", "start", "--head"]) - address = "http://127.0.0.1:8265" - try: - with set_env_var("RAY_ADDRESS", address): - client = JobSubmissionClient(format_web_url(address), headers=headers) +def test_list_jobs_empty(job_sdk_client: JobSubmissionClient, use_sdk: bool): + client = job_sdk_client - if use_sdk: - assert client.list_jobs() == [] - else: - r = client._do_request( - "GET", - "/api/jobs/", - ) + if use_sdk: + assert client.list_jobs() == dict() + else: + r = client._do_request( + "GET", + "/api/jobs/", + ) - assert r.status_code == 200 - assert json.loads(r.text) == [] - - finally: - subprocess.check_output(["ray", "stop", "--force"]) + assert r.status_code == 200 + assert json.loads(r.text) == dict() @pytest.mark.parametrize("use_sdk", [True, False]) @@ -80,17 +69,13 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool): runtime_env = {"env_vars": {"TEST": "123"}} metadata = {"foo": "bar"} entrypoint = "echo hello" - submission_id = client.submit_job( + job_id = client.submit_job( entrypoint=entrypoint, runtime_env=runtime_env, metadata=metadata ) - wait_for_condition(_check_job_succeeded, client=client, job_id=submission_id) + wait_for_condition(_check_job_succeeded, client=client, job_id=job_id) if use_sdk: - info: JobDetails = next( - job_info - for job_info in client.list_jobs() - if job_info.submission_id == submission_id - ) + info: JobInfo = client.list_jobs()[job_id] else: r = client._do_request( "GET", @@ -99,12 +84,8 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool): assert r.status_code == 200 jobs_info_json = json.loads(r.text) - info_json = next( - job_info - for job_info in jobs_info_json - if job_info["submission_id"] == submission_id - ) - info = JobDetails(**info_json) + info_json = jobs_info_json[job_id] + info = JobInfo(**info_json) assert info.entrypoint == entrypoint assert info.status == JobStatus.SUCCEEDED @@ -113,10 +94,6 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool): assert info.runtime_env == runtime_env assert info.metadata == metadata - # Test get job status by job / driver id - status = client.get_job_status(info.submission_id) - assert status == JobStatus.SUCCEEDED - def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool: status = client.get_job_status(job_id) @@ -491,9 +468,7 @@ def test_submit_optional_args(job_sdk_client): json_data={"entrypoint": "ls"}, ) - wait_for_condition( - _check_job_succeeded, client=client, job_id=r.json()["submission_id"] - ) + wait_for_condition(_check_job_succeeded, client=client, job_id=r.json()["job_id"]) def test_missing_resources(job_sdk_client): diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index b1cb05268..ed17b058a 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -164,15 +164,12 @@ def test_list_jobs_empty(job_manager: JobManager): @pytest.mark.asyncio async def test_list_jobs(job_manager: JobManager): - job_manager.submit_job(entrypoint="echo hi", submission_id="1") + job_manager.submit_job(entrypoint="echo hi", job_id="1") runtime_env = {"env_vars": {"TEST": "123"}} metadata = {"foo": "bar"} job_manager.submit_job( - entrypoint="echo hello", - submission_id="2", - runtime_env=runtime_env, - metadata=metadata, + entrypoint="echo hello", job_id="2", runtime_env=runtime_env, metadata=metadata ) await async_wait_for_condition( check_job_succeeded, job_manager=job_manager, job_id="1" @@ -194,20 +191,18 @@ async def test_list_jobs(job_manager: JobManager): @pytest.mark.asyncio async def test_pass_job_id(job_manager): - submission_id = "my_custom_id" + job_id = "my_custom_id" - returned_id = job_manager.submit_job( - entrypoint="echo hello", submission_id=submission_id - ) - assert returned_id == submission_id + returned_id = job_manager.submit_job(entrypoint="echo hello", job_id=job_id) + assert returned_id == job_id await async_wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=submission_id + check_job_succeeded, job_manager=job_manager, job_id=job_id ) # Check that the same job_id is rejected. with pytest.raises(RuntimeError): - job_manager.submit_job(entrypoint="echo hello", submission_id=submission_id) + job_manager.submit_job(entrypoint="echo hello", job_id=job_id) @pytest.mark.asyncio diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 2123a32ab..741117375 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -43,6 +43,7 @@ from ray.autoscaler._private.commands import ( ) from ray.autoscaler._private.constants import RAY_PROCESSES from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID +from ray.dashboard.modules.job.cli import job_cli_group from ray.experimental.state.api import get_log, list_logs from ray.experimental.state.common import DEFAULT_RPC_TIMEOUT, DEFAULT_LOG_LIMIT from ray.util.annotations import PublicAPI @@ -2554,19 +2555,12 @@ cli.add_command(install_nightly) cli.add_command(cpp) cli.add_command(disable_usage_stats) cli.add_command(enable_usage_stats) +add_command_alias(job_cli_group, name="job", hidden=True) add_command_alias(ray_logs, name="logs", hidden=False) cli.add_command(state_cli_list) cli.add_command(state_cli_get) add_command_alias(summary_state_cli_group, name="summary", hidden=False) -try: - from ray.dashboard.modules.job.cli import job_cli_group - - add_command_alias(job_cli_group, name="job", hidden=True) -except Exception as e: - logger.debug(f"Integrating ray jobs command line tool failed with {e}") - - try: from ray.serve.scripts import serve_cli