Revert "Fix the jobs tab in the beta dashboard and fill it with data from both "submission" jobs and "driver" jobs (#25902)"

This reverts commit 5d6bc5360d.
This commit is contained in:
Jun Gong 2022-07-27 16:48:58 -07:00
parent 99e9cbcf33
commit 617c586b9b
17 changed files with 185 additions and 543 deletions

View file

@ -1,23 +1,24 @@
import { useCallback, useContext, useEffect, useRef, useState } from "react";
import { GlobalContext } from "../../../App";
import { useCallback, useEffect, useRef, useState } from "react";
import { getJobList } from "../../../service/job";
import { UnifiedJob } from "../../../type/job";
import { Job } from "../../../type/job";
export const useJobList = () => {
const [jobList, setList] = useState<UnifiedJob[]>([]);
const [jobList, setList] = useState<Job[]>([]);
const [page, setPage] = useState({ pageSize: 10, pageNo: 1 });
const [msg, setMsg] = useState("Loading the job list...");
const [isRefreshing, setRefresh] = useState(true);
const { ipLogMap } = useContext(GlobalContext);
const [filter, setFilter] = useState<
{
key: "job_id" | "status";
key: "jobId" | "name" | "language" | "state" | "namespaceId";
val: string;
}[]
>([]);
const refreshRef = useRef(isRefreshing);
const tot = useRef<NodeJS.Timeout>();
const changeFilter = (key: "job_id" | "status", val: string) => {
const changeFilter = (
key: "jobId" | "name" | "language" | "state" | "namespaceId",
val: string,
) => {
const f = filter.find((e) => e.key === key);
if (f) {
f.val = val;
@ -36,11 +37,9 @@ export const useJobList = () => {
}
const rsp = await getJobList();
if (rsp) {
setList(
rsp.data.sort((a, b) => (b.start_time ?? 0) - (a.start_time ?? 0)),
);
setMsg("Fetched jobs");
if (rsp?.data?.data?.summary) {
setList(rsp.data.data.summary.sort((a, b) => b.timestamp - a.timestamp));
setMsg(rsp.data.msg || "");
}
tot.current = setTimeout(getJob, 4000);
@ -56,7 +55,7 @@ export const useJobList = () => {
}, [getJob]);
return {
jobList: jobList.filter((node) =>
filter.every((f) => node[f.key] && (node[f.key] ?? "").includes(f.val)),
filter.every((f) => node[f.key] && node[f.key].includes(f.val)),
),
msg,
isRefreshing,
@ -65,6 +64,5 @@ export const useJobList = () => {
page,
originalJobs: jobList,
setPage: (key: string, val: number) => setPage({ ...page, [key]: val }),
ipLogMap,
};
};

View file

@ -13,7 +13,7 @@ import dayjs from "dayjs";
import React from "react";
import { Link } from "react-router-dom";
import Loading from "../../components/Loading";
import { SearchInput } from "../../components/SearchComponent";
import { SearchInput, SearchSelect } from "../../components/SearchComponent";
import TitleCard from "../../components/TitleCard";
import { useJobList } from "./hook/useJobList";
@ -25,13 +25,12 @@ const useStyles = makeStyles((theme) => ({
}));
const columns = [
"Job ID",
"Submission ID",
"Status",
"Logs",
"ID",
"DriverIpAddress",
"DriverPid",
"IsDead",
"StartTime",
"EndTime",
"Driver Pid",
];
const JobList = () => {
@ -44,7 +43,6 @@ const JobList = () => {
changeFilter,
page,
setPage,
ipLogMap,
} = useJobList();
return (
@ -64,8 +62,13 @@ const JobList = () => {
<TitleCard title="Job List">
<TableContainer>
<SearchInput
label="Job ID"
onChange={(value) => changeFilter("job_id", value)}
label="ID"
onChange={(value) => changeFilter("jobId", value)}
/>
<SearchSelect
label="Language"
onChange={(value) => changeFilter("language", value)}
options={["JAVA", "PYTHON"]}
/>
<SearchInput
label="Page Size"
@ -97,60 +100,31 @@ const JobList = () => {
page.pageNo * page.pageSize,
)
.map(
(
{
job_id,
submission_id,
driver_info,
type,
status,
start_time,
end_time,
},
index,
) => (
<TableRow key={job_id ?? submission_id ?? index}>
<TableCell align="center">{job_id ?? "-"}</TableCell>
({
jobId = "",
driverIpAddress,
isDead,
driverPid,
startTime,
endTime,
}) => (
<TableRow key={jobId}>
<TableCell align="center">
{submission_id ?? "-"}
<Link to={`/job/${jobId}`}>{jobId}</Link>
</TableCell>
<TableCell align="center">{status}</TableCell>
<TableCell align="center">{driverIpAddress}</TableCell>
<TableCell align="center">{driverPid}</TableCell>
<TableCell align="center">
{/* TODO(aguo): Also show logs for the job id instead
of just the submission's logs */}
{driver_info &&
ipLogMap[driver_info.node_ip_address] ? (
<Link
to={`/log/${encodeURIComponent(
ipLogMap[driver_info.node_ip_address],
)}?fileName=${
type === "DRIVER"
? job_id
: `driver-${submission_id}`
}`}
target="_blank"
>
Log
</Link>
) : (
"-"
)}
{isDead ? "true" : "false"}
</TableCell>
<TableCell align="center">
{dayjs(Number(start_time)).format(
"YYYY/MM/DD HH:mm:ss",
)}
{dayjs(Number(startTime)).format("YYYY/MM/DD HH:mm:ss")}
</TableCell>
<TableCell align="center">
{end_time && end_time > 0
? dayjs(Number(end_time)).format(
"YYYY/MM/DD HH:mm:ss",
)
{endTime > 0
? dayjs(Number(endTime)).format("YYYY/MM/DD HH:mm:ss")
: "-"}
</TableCell>
<TableCell align="center">
{driver_info?.pid ?? "-"}
</TableCell>
</TableRow>
),
)}

View file

@ -2,7 +2,7 @@ import { JobDetailRsp, JobListRsp } from "../type/job";
import { get } from "./requestHandlers";
export const getJobList = () => {
return get<JobListRsp>("api/jobs/");
return get<JobListRsp>("jobs?view=summary");
};
export const getJobDetail = (id: string) => {

View file

@ -63,26 +63,10 @@ export type JobDetailRsp = {
result: boolean;
};
export type JobListRsp = UnifiedJob[];
export type UnifiedJob = {
job_id: string | null;
submission_id: string | null;
type: string;
status: string;
entrypoint: string;
message: string | null;
error_type: string | null;
start_time: number | null;
end_time: number | null;
metadata: { [key: string]: string } | null;
runtime_env: { [key: string]: string } | null;
driver_info: DriverInfo | null;
};
export type DriverInfo = {
id: string;
node_ip_address: string;
node_id: string;
pid: string;
export type JobListRsp = {
data: {
summary: Job[];
};
msg: string;
result: boolean;
};

View file

@ -87,17 +87,7 @@ def job_cli_group():
type=str,
default=None,
required=False,
help=("DEPRECATED: Use -- submission-id instead."),
)
@click.option(
"--submission-id",
type=str,
default=None,
required=False,
help=(
"Submission ID to specify for the job. "
"If not provided, one will be generated."
),
help=("Job ID to specify for the job. " "If not provided, one will be generated."),
)
@click.option(
"--runtime-env",
@ -137,7 +127,6 @@ def job_cli_group():
def submit(
address: Optional[str],
job_id: Optional[str],
submission_id: Optional[str],
runtime_env: Optional[str],
runtime_env_json: Optional[str],
working_dir: Optional[str],
@ -150,19 +139,11 @@ def submit(
ray job submit -- python my_script.py --arg=val
"""
if job_id:
cli_logger.warning(
"--job-id option is deprecated. " "Please use --submission-id instead."
)
submission_id = submission_id or job_id
if ray_constants.RAY_JOB_SUBMIT_HOOK in os.environ:
# Submit all args as **kwargs per the JOB_SUBMIT_HOOK contract.
_load_class(os.environ[ray_constants.RAY_JOB_SUBMIT_HOOK])(
address=address,
job_id=submission_id,
submission_id=submission_id,
job_id=job_id,
runtime_env=runtime_env,
runtime_env_json=runtime_env_json,
working_dir=working_dir,
@ -180,7 +161,7 @@ def submit(
job_id = client.submit_job(
entrypoint=list2cmdline(entrypoint),
submission_id=submission_id,
job_id=job_id,
runtime_env=final_runtime_env,
)

View file

@ -49,7 +49,6 @@ class JobStatus(str, Enum):
return self.value in {"STOPPED", "SUCCEEDED", "FAILED"}
# TODO(aguo): Convert to pydantic model
@dataclass
class JobInfo:
"""A class for recording information associated with a job and its execution."""
@ -181,10 +180,10 @@ def validate_request_type(json_data: Dict[str, Any], request_type: dataclass) ->
class JobSubmitRequest:
# Command to start execution, ex: "python script.py"
entrypoint: str
# Optional submission_id to specify for the job. If the submission_id
# is not specified, one will be generated. If a job with the same
# submission_id already exists, it will be rejected.
submission_id: Optional[str] = None
# Optional job_id to specify for the job. If the job_id is not specified,
# one will be generated. If a job with the same job_id already exists, it
# will be rejected.
job_id: Optional[str] = None
# Dict to setup execution environment.
runtime_env: Optional[Dict[str, Any]] = None
# Metadata to pass in to the JobConfig.
@ -194,10 +193,9 @@ class JobSubmitRequest:
if not isinstance(self.entrypoint, str):
raise TypeError(f"entrypoint must be a string, got {type(self.entrypoint)}")
if self.submission_id is not None and not isinstance(self.submission_id, str):
if self.job_id is not None and not isinstance(self.job_id, str):
raise TypeError(
"submission_id must be a string if provided, "
f"got {type(self.submission_id)}"
f"job_id must be a string if provided, got {type(self.job_id)}"
)
if self.runtime_env is not None:
@ -228,9 +226,7 @@ class JobSubmitRequest:
@dataclass
class JobSubmitResponse:
# DEPRECATED: Use submission_id instead.
job_id: str
submission_id: str
@dataclass

View file

@ -1,46 +1,34 @@
import asyncio
import concurrent
import dataclasses
import json
import logging
import traceback
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
import aiohttp.web
from aiohttp.web import Request, Response
import dataclasses
import logging
from typing import Any
import json
import traceback
from dataclasses import dataclass
import ray
from ray._private import ray_constants
import ray.dashboard.optional_utils as optional_utils
import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as optional_utils
from ray._private.runtime_env.packaging import (
package_exists,
pin_runtime_env_uri,
upload_package_to_gcs,
pin_runtime_env_uri,
)
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray.dashboard.modules.job.common import (
http_uri_components_to_uri,
JobStatus,
JobInfo,
JobSubmitRequest,
JobSubmitResponse,
JobStopResponse,
JobLogsResponse,
validate_request_type,
JOB_ID_METADATA_KEY,
)
from ray.dashboard.modules.job.pydantic_models import (
DriverInfo,
JobDetails,
JobType,
)
from ray.dashboard.modules.version import (
CURRENT_VERSION,
VersionResponse,
)
from ray.dashboard.modules.job.job_manager import JobManager
from ray.runtime_env import RuntimeEnv
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -51,10 +39,7 @@ routes = optional_utils.ClassMethodRouteTable
class JobHead(dashboard_utils.DashboardHeadModule):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._dashboard_head = dashboard_head
self._job_manager = None
self._gcs_job_info_stub = None
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
async def _parse_and_validate_request(
self, req: Request, request_type: dataclass
@ -71,45 +56,9 @@ class JobHead(dashboard_utils.DashboardHeadModule):
status=aiohttp.web.HTTPBadRequest.status_code,
)
async def find_job_by_ids(self, job_or_submission_id: str) -> Optional[JobDetails]:
"""
Attempts to find the job with a given submission_id or job id.
"""
# First try to find by job_id
driver_jobs, submission_job_drivers = await self._get_driver_jobs()
job = driver_jobs.get(job_or_submission_id)
if job:
return job
# Try to find a driver with the given id
submission_id = next(
(
id
for id, driver in submission_job_drivers.items()
if driver.id == job_or_submission_id
),
None,
)
if not submission_id:
# If we didn't find a driver with the given id,
# then lets try to search for a submission with given id
submission_id = job_or_submission_id
job_info = await asyncio.get_event_loop().run_in_executor(
self._executor, lambda: self._job_manager.get_job_info(submission_id)
)
if job_info:
driver = submission_job_drivers.get(submission_id)
job = JobDetails(
**dataclasses.asdict(job_info),
submission_id=submission_id,
job_id=driver.id if driver else None,
driver_info=driver,
type=JobType.SUBMISSION,
)
return job
return None
def job_exists(self, job_id: str) -> bool:
status = self._job_manager.get_job_status(job_id)
return status is not None
@routes.get("/api/version")
async def get_version(self, req: Request) -> Response:
@ -180,14 +129,14 @@ class JobHead(dashboard_utils.DashboardHeadModule):
submit_request = result
try:
submission_id = self._job_manager.submit_job(
job_id = self._job_manager.submit_job(
entrypoint=submit_request.entrypoint,
submission_id=submit_request.submission_id,
job_id=submit_request.job_id,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata,
)
resp = JobSubmitResponse(job_id=submission_id, submission_id=submission_id)
resp = JobSubmitResponse(job_id=job_id)
except (TypeError, ValueError):
return Response(
text=traceback.format_exc(),
@ -205,24 +154,18 @@ class JobHead(dashboard_utils.DashboardHeadModule):
status=aiohttp.web.HTTPOk.status_code,
)
@routes.post("/api/jobs/{job_or_submission_id}/stop")
@routes.post("/api/jobs/{job_id}/stop")
@optional_utils.init_ray_and_catch_exceptions()
async def stop_job(self, req: Request) -> Response:
job_or_submission_id = req.match_info["job_or_submission_id"]
job = await self.find_job_by_ids(job_or_submission_id)
if not job:
job_id = req.match_info["job_id"]
if not self.job_exists(job_id):
return Response(
text=f"Job {job_or_submission_id} does not exist",
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code,
)
if job.type is not JobType.SUBMISSION:
return Response(
text="Can only stop submission type jobs",
status=aiohttp.web.HTTPBadRequest.status_code,
)
try:
stopped = self._job_manager.stop_job(job.submission_id)
stopped = self._job_manager.stop_job(job_id)
resp = JobStopResponse(stopped=stopped)
except Exception:
return Response(
@ -234,162 +177,70 @@ class JobHead(dashboard_utils.DashboardHeadModule):
text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
)
@routes.get("/api/jobs/{job_or_submission_id}")
@routes.get("/api/jobs/{job_id}")
@optional_utils.init_ray_and_catch_exceptions()
async def get_job_info(self, req: Request) -> Response:
job_or_submission_id = req.match_info["job_or_submission_id"]
job = await self.find_job_by_ids(job_or_submission_id)
if not job:
job_id = req.match_info["job_id"]
if not self.job_exists(job_id):
return Response(
text=f"Job {job_or_submission_id} does not exist",
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code,
)
data: JobInfo = self._job_manager.get_job_info(job_id)
return Response(
text=json.dumps(job.dict()),
content_type="application/json",
text=json.dumps(dataclasses.asdict(data)), content_type="application/json"
)
@routes.get("/api/jobs/")
@optional_utils.init_ray_and_catch_exceptions()
async def list_jobs(self, req: Request) -> Response:
driver_jobs, submission_job_drivers = await self._get_driver_jobs()
# TODO(aguo): convert _job_manager.list_jobs to an async function.
submission_jobs = await asyncio.get_event_loop().run_in_executor(
self._executor, self._job_manager.list_jobs
)
submission_jobs = [
JobDetails(
**dataclasses.asdict(job),
submission_id=submission_id,
job_id=submission_job_drivers.get(submission_id).id
if submission_id in submission_job_drivers
else None,
driver_info=submission_job_drivers.get(submission_id),
type=JobType.SUBMISSION,
)
for submission_id, job in submission_jobs.items()
]
data: dict[str, JobInfo] = self._job_manager.list_jobs()
return Response(
text=json.dumps(
[
*[submission_job.dict() for submission_job in submission_jobs],
*[job_info.dict() for job_info in driver_jobs.values()],
]
{
job_id: dataclasses.asdict(job_info)
for job_id, job_info in data.items()
}
),
content_type="application/json",
)
async def _get_driver_jobs(
self,
) -> Tuple[Dict[str, JobDetails], Dict[str, DriverInfo]]:
"""Returns a tuple of dictionaries related to drivers.
The first dictionary contains all driver jobs and is keyed by the job's id.
The second dictionary contains drivers that belong to submission jobs.
It's keyed by the submission job's submission id.
Only the last driver of a submission job is returned.
"""
request = gcs_service_pb2.GetAllJobInfoRequest()
reply = await self._gcs_job_info_stub.GetAllJobInfo(request, timeout=5)
jobs = {}
submission_job_drivers = {}
for job_table_entry in reply.job_info_list:
if job_table_entry.config.ray_namespace.startswith(
ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX
):
# Skip jobs in any _ray_internal_ namespace
continue
job_id = job_table_entry.job_id.hex()
metadata = dict(job_table_entry.config.metadata)
job_submission_id = metadata.get(JOB_ID_METADATA_KEY)
if not job_submission_id:
driver = DriverInfo(
id=job_id,
node_ip_address=job_table_entry.driver_ip_address,
pid=job_table_entry.driver_pid,
)
job = JobDetails(
job_id=job_id,
type=JobType.DRIVER,
status=JobStatus.SUCCEEDED
if job_table_entry.is_dead
else JobStatus.RUNNING,
entrypoint="",
start_time=job_table_entry.start_time,
end_time=job_table_entry.end_time,
metadata=metadata,
runtime_env=RuntimeEnv.deserialize(
job_table_entry.config.runtime_env_info.serialized_runtime_env
).to_dict(),
driver_info=driver,
)
jobs[job_id] = job
else:
driver = DriverInfo(
id=job_id,
node_ip_address=job_table_entry.driver_ip_address,
pid=job_table_entry.driver_pid,
)
submission_job_drivers[job_submission_id] = driver
return jobs, submission_job_drivers
@routes.get("/api/jobs/{job_or_submission_id}/logs")
@routes.get("/api/jobs/{job_id}/logs")
@optional_utils.init_ray_and_catch_exceptions()
async def get_job_logs(self, req: Request) -> Response:
job_or_submission_id = req.match_info["job_or_submission_id"]
job = await self.find_job_by_ids(job_or_submission_id)
if not job:
job_id = req.match_info["job_id"]
if not self.job_exists(job_id):
return Response(
text=f"Job {job_or_submission_id} does not exist",
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code,
)
if job.type is not JobType.SUBMISSION:
return Response(
text="Can only get logs of submission type jobs",
status=aiohttp.web.HTTPBadRequest.status_code,
)
resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job.submission_id))
resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job_id))
return Response(
text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
)
@routes.get("/api/jobs/{job_or_submission_id}/logs/tail")
@routes.get("/api/jobs/{job_id}/logs/tail")
@optional_utils.init_ray_and_catch_exceptions()
async def tail_job_logs(self, req: Request) -> Response:
job_or_submission_id = req.match_info["job_or_submission_id"]
job = await self.find_job_by_ids(job_or_submission_id)
if not job:
job_id = req.match_info["job_id"]
if not self.job_exists(job_id):
return Response(
text=f"Job {job_or_submission_id} does not exist",
text=f"Job {job_id} does not exist",
status=aiohttp.web.HTTPNotFound.status_code,
)
if job.type is not JobType.SUBMISSION:
return Response(
text="Can only get logs of submission type jobs",
status=aiohttp.web.HTTPBadRequest.status_code,
)
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(req)
async for lines in self._job_manager.tail_job_logs(job.submission_id):
async for lines in self._job_manager.tail_job_logs(job_id):
await ws.send_str(lines)
async def run(self, server):
if not self._job_manager:
self._job_manager = JobManager()
self._gcs_job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub(
self._dashboard_head.aiogrpc_gcs_channel
)
@staticmethod
def is_minimal_module():
return False

View file

@ -438,7 +438,7 @@ class JobManager:
self,
*,
entrypoint: str,
submission_id: Optional[str] = None,
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
_start_signal_actor: Optional[ActorHandle] = None,
@ -471,12 +471,12 @@ class JobManager:
job_id: Generated uuid for further job management. Only valid
within the same ray cluster.
"""
if submission_id is None:
submission_id = generate_job_id()
elif self._job_info_client.get_status(submission_id) is not None:
raise RuntimeError(f"Job {submission_id} already exists.")
if job_id is None:
job_id = generate_job_id()
elif self._job_info_client.get_status(job_id) is not None:
raise RuntimeError(f"Job {job_id} already exists.")
logger.info(f"Starting job with submission_id: {submission_id}")
logger.info(f"Starting job with job_id: {job_id}")
job_info = JobInfo(
entrypoint=entrypoint,
status=JobStatus.PENDING,
@ -484,7 +484,7 @@ class JobManager:
metadata=metadata,
runtime_env=runtime_env,
)
self._job_info_client.put_info(submission_id, job_info)
self._job_info_client.put_info(job_id, job_info)
# Wait for the actor to start up asynchronously so this call always
# returns immediately and we can catch errors with the actor starting
@ -492,7 +492,7 @@ class JobManager:
try:
supervisor = self._supervisor_actor_cls.options(
lifetime="detached",
name=self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),
name=self.JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id),
num_cpus=0,
# Currently we assume JobManager is created by dashboard server
# running on headnode, same for job supervisor actors scheduled
@ -500,20 +500,20 @@ class JobManager:
self._get_current_node_resource_key(): 0.001,
},
runtime_env=self._get_supervisor_runtime_env(runtime_env),
).remote(submission_id, entrypoint, metadata or {})
).remote(job_id, entrypoint, metadata or {})
supervisor.run.remote(_start_signal_actor=_start_signal_actor)
# Monitor the job in the background so we can detect errors without
# requiring a client to poll.
create_task(self._monitor_job(submission_id, job_supervisor=supervisor))
create_task(self._monitor_job(job_id, job_supervisor=supervisor))
except Exception as e:
self._job_info_client.put_status(
submission_id,
job_id,
JobStatus.FAILED,
message=f"Failed to start job supervisor: {e}.",
)
return submission_id
return job_id
def stop_job(self, job_id) -> bool:
"""Request a job to exit, fire and forget.

View file

@ -1,81 +0,0 @@
from enum import Enum
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from ray.dashboard.modules.job.common import JobStatus
class DriverInfo(BaseModel):
"""A class for recording information about the driver related to the job."""
id: str = Field(..., description="The id of the driver")
node_ip_address: str = Field(
..., description="The ip address of the node the driver is running on"
)
pid: str = Field(
..., description="The pid of the worker process the driver is using."
)
# TODO(aguo): Add node_id as a field.
class JobType(str, Enum):
"""An enumeration for describing the different job types."""
#: A job that was initiated by the job submission apis
SUBMISSION = "SUBMISSION"
#: A job that was initiated by a driver script.
DRIVER = "DRIVER"
class JobDetails(BaseModel):
"""
Job data with extra details about its driver and its submission.
"""
type: JobType = Field(..., description="The type of job.")
entrypoint: Optional[str] = Field(
None, description="The entrypoint command for this job."
)
job_id: Optional[str] = Field(
None,
description="The job id. An id that is created for every job that is "
"launched in ray. This can be used to fetch data about jobs using ray "
"core apis.",
)
submission_id: Optional[str] = Field(
None,
description="A submission id is an id created for every submission job. It can "
"be used to fetch data about jobs using the job submission apis.",
)
driver_info: Optional[DriverInfo] = Field(
None,
description="The driver related to this job. For submission jobs, "
"it is the last driver launched by that job submission, "
"or None if there is no driver.",
)
# The following fields are copied from JobInfo.
# TODO(aguo): Inherit from JobInfo once it's migrated to pydantic.
status: JobStatus = Field(..., description="The status of the job.")
entrypoint: str = Field(..., description="The entrypoint command for this job.")
message: Optional[str] = Field(
None, description="A message describing the status in more detail."
)
error_type: Optional[str] = Field(
None, description="Internal error, user script error"
)
start_time: Optional[int] = Field(
None,
description="The time when the job was started. " "A Unix timestamp in ms.",
)
end_time: Optional[int] = Field(
None,
description="The time when the job moved into a terminal state. "
"A Unix timestamp in ms.",
)
metadata: Optional[Dict[str, str]] = Field(
None, description="Arbitrary user-provided metadata for the job."
)
runtime_env: Optional[Dict[str, Any]] = Field(
None, description="The runtime environment for the job."
)

View file

@ -1,26 +1,22 @@
import dataclasses
import logging
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, Optional
try:
import aiohttp
import requests
from ray.dashboard.modules.job.pydantic_models import (
JobDetails,
)
except ImportError:
aiohttp = None
requests = None
JobDetails = None
from ray.dashboard.modules.job.common import (
JobStatus,
JobSubmitRequest,
JobSubmitResponse,
JobStopResponse,
JobInfo,
JobLogsResponse,
)
from ray.dashboard.modules.dashboard_sdk import SubmissionClient
from ray.runtime_env import RuntimeEnv
@ -87,7 +83,6 @@ class JobSubmissionClient(SubmissionClient):
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
submission_id: Optional[str] = None,
) -> str:
"""Submit and execute a job asynchronously.
@ -110,24 +105,18 @@ class JobSubmissionClient(SubmissionClient):
Args:
entrypoint: The shell command to run for this job.
submission_id: A unique ID for this job.
job_id: A unique ID for this job.
runtime_env: The runtime environment to install and run this job in.
metadata: Arbitrary data to store along with this job.
job_id: DEPRECATED. This has been renamed to submission_id
Returns:
The submission ID of the submitted job. If not specified,
this is a randomly generated unique ID.
The job ID of the submitted job. If not specified, this is a randomly
generated unique ID.
Raises:
RuntimeError: If the request to the job server fails, or if the specified
submission_id has already been used by a job on this cluster.
job_id has already been used by a job on this cluster.
"""
if job_id:
logger.warning(
"job_id kwarg is deprecated. Please use submission_id instead."
)
runtime_env = runtime_env or {}
metadata = metadata or {}
metadata.update(self._default_metadata)
@ -138,20 +127,18 @@ class JobSubmissionClient(SubmissionClient):
# Run the RuntimeEnv constructor to parse local pip/conda requirements files.
runtime_env = RuntimeEnv(**runtime_env).to_dict()
submission_id = submission_id or job_id
req = JobSubmitRequest(
entrypoint=entrypoint,
submission_id=submission_id,
job_id=job_id,
runtime_env=runtime_env,
metadata=metadata,
)
logger.debug(f"Submitting job with submission_id={submission_id}.")
logger.debug(f"Submitting job with job_id={job_id}.")
r = self._do_request("POST", "/api/jobs/", json_data=dataclasses.asdict(req))
if r.status_code == 200:
return JobSubmitResponse(**r.json()).submission_id
return JobSubmitResponse(**r.json()).job_id
else:
self._raise_error(r)
@ -165,12 +152,12 @@ class JobSubmissionClient(SubmissionClient):
Example:
>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP
>>> sub_id = client.submit_job(entrypoint="sleep 10") # doctest: +SKIP
>>> client.stop_job(sub_id) # doctest: +SKIP
>>> job_id = client.submit_job(entrypoint="sleep 10") # doctest: +SKIP
>>> client.stop_job(job_id) # doctest: +SKIP
True
Args:
job_id: The job ID or submission ID for the job to be stopped.
job_id: The job ID for the job to be stopped.
Returns:
True if the job was running, otherwise False.
@ -191,21 +178,20 @@ class JobSubmissionClient(SubmissionClient):
def get_job_info(
self,
job_id: str,
) -> JobDetails:
) -> JobInfo:
"""Get the latest status and other information associated with a job.
Example:
>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP
>>> submission_id = client.submit_job(entrypoint="sleep 1") # doctest: +SKIP
>>> job_submission_client.get_job_info(submission_id) # doctest: +SKIP
>>> job_id = client.submit_job(entrypoint="sleep 1") # doctest: +SKIP
>>> job_submission_client.get_job_info(job_id) # doctest: +SKIP
JobInfo(status='SUCCEEDED', message='Job finished successfully.',
error_type=None, start_time=1647388711, end_time=1647388712,
metadata={}, runtime_env={})
Args:
job_id: The job ID or submission ID of the job whose information
is being requested.
job_id: The ID of the job whose information is being requested.
Returns:
The JobInfo for the job.
@ -217,12 +203,12 @@ class JobSubmissionClient(SubmissionClient):
r = self._do_request("GET", f"/api/jobs/{job_id}")
if r.status_code == 200:
return JobDetails(**r.json())
return JobInfo(**r.json())
else:
self._raise_error(r)
@PublicAPI(stability="beta")
def list_jobs(self) -> List[JobDetails]:
def list_jobs(self) -> Dict[str, JobInfo]:
"""List all jobs along with their status and other information.
Lists all jobs that have ever run on the cluster, including jobs that are
@ -234,16 +220,12 @@ class JobSubmissionClient(SubmissionClient):
>>> client.submit_job(entrypoint="echo hello") # doctest: +SKIP
>>> client.submit_job(entrypoint="sleep 2") # doctest: +SKIP
>>> client.list_jobs() # doctest: +SKIP
[JobDetails(status='SUCCEEDED',
job_id='03000000', type='submission',
submission_id='raysubmit_4LamXRuQpYdSMg7J',
{'raysubmit_4LamXRuQpYdSMg7J': JobInfo(status='SUCCEEDED',
message='Job finished successfully.', error_type=None,
start_time=1647388711, end_time=1647388712, metadata={}, runtime_env={}),
JobDetails(status='RUNNING',
job_id='04000000', type='submission',
submission_id='raysubmit_1dxCeNvG1fCMVNHG',
'raysubmit_1dxCeNvG1fCMVNHG': JobInfo(status='RUNNING',
message='Job is currently running.', error_type=None,
start_time=1647454832, end_time=None, metadata={}, runtime_env={})]
start_time=1647454832, end_time=None, metadata={}, runtime_env={})}
Returns:
A dictionary mapping job_ids to their information.
@ -255,9 +237,10 @@ class JobSubmissionClient(SubmissionClient):
if r.status_code == 200:
jobs_info_json = r.json()
jobs_info = [
JobDetails(**job_info_json) for job_info_json in jobs_info_json
]
jobs_info = {
job_id: JobInfo(**job_info_json)
for job_id, job_info_json in jobs_info_json.items()
}
return jobs_info
else:
self._raise_error(r)
@ -274,8 +257,7 @@ class JobSubmissionClient(SubmissionClient):
'SUCCEEDED'
Args:
job_id: The job ID or submission ID of the job whose status is being
requested.
job_id: The ID of the job whose status is being requested.
Returns:
The JobStatus of the job.
@ -293,13 +275,12 @@ class JobSubmissionClient(SubmissionClient):
Example:
>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP
>>> sub_id = client.submit_job(entrypoint="echo hello") # doctest: +SKIP
>>> client.get_job_logs(sub_id) # doctest: +SKIP
>>> job_id = client.submit_job(entrypoint="echo hello") # doctest: +SKIP
>>> client.get_job_logs(job_id) # doctest: +SKIP
'hello\\n'
Args:
job_id: The job ID or submission ID of the job whose logs are being
requested.
job_id: The ID of the job whose logs are being requested.
Returns:
A string containing the full logs of the job.
@ -322,7 +303,7 @@ class JobSubmissionClient(SubmissionClient):
Example:
>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") # doctest: +SKIP
>>> submission_id = client.submit_job( # doctest: +SKIP
>>> job_id = client.submit_job( # doctest: +SKIP
... entrypoint="echo hi && sleep 5 && echo hi2")
>>> async for lines in client.tail_job_logs( # doctest: +SKIP
... 'raysubmit_Xe7cvjyGJCyuCvm2'):
@ -331,8 +312,7 @@ class JobSubmissionClient(SubmissionClient):
hi2
Args:
job_id: The job ID or submission ID of the job whose logs are being
requested.
job_id: The ID of the job whose logs are being requested.
Returns:
The iterator.

View file

@ -32,8 +32,6 @@ def _compatibility_script_path(file_name: str) -> str:
class TestBackwardsCompatibility:
# TODO(aguo): Unskip this test once 2.0.0 is released.
@pytest.mark.skip("#25902 breaks backwards compatibility of the REST api.")
def test_cli(self):
"""
1) Create a new conda environment with ray version X installed

View file

@ -220,14 +220,13 @@ class TestSubmit:
with set_env_var("RAY_ADDRESS", "env_addr"):
result = runner.invoke(job_cli_group, ["submit", "--", "echo hello"])
assert result.exit_code == 0
assert mock_client_instance.called_with(submission_id=None)
assert mock_client_instance.called_with(job_id=None)
result = runner.invoke(
job_cli_group,
["submit", "--", "--submission-id=my_job_id", "echo hello"],
job_cli_group, ["submit", "--", "--job-id=my_job_id", "echo hello"]
)
assert result.exit_code == 0
assert mock_client_instance.called_with(submission_id="my_job_id")
assert mock_client_instance.called_with(job_id="my_job_id")
if __name__ == "__main__":

View file

@ -19,13 +19,12 @@ def set_env_var(key: str, val: Optional[str] = None):
elif key in os.environ:
del os.environ[key]
try:
yield
finally:
if key in os.environ:
del os.environ[key]
if old_val is not None:
os.environ[key] = old_val
yield
if key in os.environ:
del os.environ[key]
if old_val is not None:
os.environ[key] = old_val
@pytest.fixture
@ -157,7 +156,7 @@ class TestJobStop:
class TestJobList:
def test_empty(self, ray_start_stop):
stdout, _ = _run_cmd("ray job list")
assert "[]" in stdout
assert "{}" in stdout
def test_list(self, ray_start_stop):
_run_cmd("ray job submit --job-id='hello_id' -- echo hello")
@ -168,6 +167,7 @@ class TestJobList:
f"--runtime-env-json='{json.dumps(runtime_env)}' -- echo hi"
)
stdout, _ = _run_cmd("ray job list")
assert "JobInfo" in stdout
assert "123" in stdout
assert "hello_id" in stdout
assert "hi_id" in stdout

View file

@ -19,21 +19,19 @@ class TestJobSubmitRequestValidation:
with pytest.raises(TypeError, match="must be a string"):
validate_request_type({"entrypoint": 123}, JobSubmitRequest)
def test_validate_submission_id(self):
def test_validate_job_id(self):
r = validate_request_type({"entrypoint": "abc"}, JobSubmitRequest)
assert r.entrypoint == "abc"
assert r.submission_id is None
assert r.job_id is None
r = validate_request_type(
{"entrypoint": "abc", "submission_id": "123"}, JobSubmitRequest
{"entrypoint": "abc", "job_id": "123"}, JobSubmitRequest
)
assert r.entrypoint == "abc"
assert r.submission_id == "123"
assert r.job_id == "123"
with pytest.raises(TypeError, match="must be a string"):
validate_request_type(
{"entrypoint": 123, "submission_id": 1}, JobSubmitRequest
)
validate_request_type({"entrypoint": 123, "job_id": 1}, JobSubmitRequest)
def test_validate_runtime_env(self):
r = validate_request_type({"entrypoint": "abc"}, JobSubmitRequest)

View file

@ -5,7 +5,6 @@ import shutil
import sys
import tempfile
from pathlib import Path
import subprocess
from typing import Optional
from unittest.mock import patch
@ -21,15 +20,11 @@ from ray._private.test_utils import (
wait_until_server_available,
)
from ray.dashboard.modules.dashboard_sdk import ClusterInfo, parse_cluster_info
from ray.dashboard.modules.job.pydantic_models import JobDetails
from ray.dashboard.modules.job.common import JobInfo
from ray.dashboard.modules.version import CURRENT_VERSION
from ray.dashboard.tests.conftest import * # noqa
from ray.job_submission import JobStatus, JobSubmissionClient
from ray.tests.conftest import _ray_start
from ray.dashboard.modules.job.tests.test_cli_integration import set_env_var
# This test requires you have AWS credentials set up (any AWS credentials will
# do, this test only accesses a public bucket).
logger = logging.getLogger(__name__)
@ -49,28 +44,22 @@ def job_sdk_client(headers):
yield JobSubmissionClient(format_web_url(address), headers=headers)
# NOTE(architkulkarni): This test must be run first in order for the job
# submission history of the shared Ray runtime to be empty.
@pytest.mark.parametrize("use_sdk", [True, False])
def test_list_jobs_empty(headers, use_sdk: bool):
# Create a cluster using `ray start` instead of `ray.init` to avoid creating a job
subprocess.check_output(["ray", "start", "--head"])
address = "http://127.0.0.1:8265"
try:
with set_env_var("RAY_ADDRESS", address):
client = JobSubmissionClient(format_web_url(address), headers=headers)
def test_list_jobs_empty(job_sdk_client: JobSubmissionClient, use_sdk: bool):
client = job_sdk_client
if use_sdk:
assert client.list_jobs() == []
else:
r = client._do_request(
"GET",
"/api/jobs/",
)
if use_sdk:
assert client.list_jobs() == dict()
else:
r = client._do_request(
"GET",
"/api/jobs/",
)
assert r.status_code == 200
assert json.loads(r.text) == []
finally:
subprocess.check_output(["ray", "stop", "--force"])
assert r.status_code == 200
assert json.loads(r.text) == dict()
@pytest.mark.parametrize("use_sdk", [True, False])
@ -80,17 +69,13 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool):
runtime_env = {"env_vars": {"TEST": "123"}}
metadata = {"foo": "bar"}
entrypoint = "echo hello"
submission_id = client.submit_job(
job_id = client.submit_job(
entrypoint=entrypoint, runtime_env=runtime_env, metadata=metadata
)
wait_for_condition(_check_job_succeeded, client=client, job_id=submission_id)
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
if use_sdk:
info: JobDetails = next(
job_info
for job_info in client.list_jobs()
if job_info.submission_id == submission_id
)
info: JobInfo = client.list_jobs()[job_id]
else:
r = client._do_request(
"GET",
@ -99,12 +84,8 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool):
assert r.status_code == 200
jobs_info_json = json.loads(r.text)
info_json = next(
job_info
for job_info in jobs_info_json
if job_info["submission_id"] == submission_id
)
info = JobDetails(**info_json)
info_json = jobs_info_json[job_id]
info = JobInfo(**info_json)
assert info.entrypoint == entrypoint
assert info.status == JobStatus.SUCCEEDED
@ -113,10 +94,6 @@ def test_list_jobs(job_sdk_client: JobSubmissionClient, use_sdk: bool):
assert info.runtime_env == runtime_env
assert info.metadata == metadata
# Test get job status by job / driver id
status = client.get_job_status(info.submission_id)
assert status == JobStatus.SUCCEEDED
def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
@ -491,9 +468,7 @@ def test_submit_optional_args(job_sdk_client):
json_data={"entrypoint": "ls"},
)
wait_for_condition(
_check_job_succeeded, client=client, job_id=r.json()["submission_id"]
)
wait_for_condition(_check_job_succeeded, client=client, job_id=r.json()["job_id"])
def test_missing_resources(job_sdk_client):

View file

@ -164,15 +164,12 @@ def test_list_jobs_empty(job_manager: JobManager):
@pytest.mark.asyncio
async def test_list_jobs(job_manager: JobManager):
job_manager.submit_job(entrypoint="echo hi", submission_id="1")
job_manager.submit_job(entrypoint="echo hi", job_id="1")
runtime_env = {"env_vars": {"TEST": "123"}}
metadata = {"foo": "bar"}
job_manager.submit_job(
entrypoint="echo hello",
submission_id="2",
runtime_env=runtime_env,
metadata=metadata,
entrypoint="echo hello", job_id="2", runtime_env=runtime_env, metadata=metadata
)
await async_wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id="1"
@ -194,20 +191,18 @@ async def test_list_jobs(job_manager: JobManager):
@pytest.mark.asyncio
async def test_pass_job_id(job_manager):
submission_id = "my_custom_id"
job_id = "my_custom_id"
returned_id = job_manager.submit_job(
entrypoint="echo hello", submission_id=submission_id
)
assert returned_id == submission_id
returned_id = job_manager.submit_job(entrypoint="echo hello", job_id=job_id)
assert returned_id == job_id
await async_wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=submission_id
check_job_succeeded, job_manager=job_manager, job_id=job_id
)
# Check that the same job_id is rejected.
with pytest.raises(RuntimeError):
job_manager.submit_job(entrypoint="echo hello", submission_id=submission_id)
job_manager.submit_job(entrypoint="echo hello", job_id=job_id)
@pytest.mark.asyncio

View file

@ -43,6 +43,7 @@ from ray.autoscaler._private.commands import (
)
from ray.autoscaler._private.constants import RAY_PROCESSES
from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID
from ray.dashboard.modules.job.cli import job_cli_group
from ray.experimental.state.api import get_log, list_logs
from ray.experimental.state.common import DEFAULT_RPC_TIMEOUT, DEFAULT_LOG_LIMIT
from ray.util.annotations import PublicAPI
@ -2554,19 +2555,12 @@ cli.add_command(install_nightly)
cli.add_command(cpp)
cli.add_command(disable_usage_stats)
cli.add_command(enable_usage_stats)
add_command_alias(job_cli_group, name="job", hidden=True)
add_command_alias(ray_logs, name="logs", hidden=False)
cli.add_command(state_cli_list)
cli.add_command(state_cli_get)
add_command_alias(summary_state_cli_group, name="summary", hidden=False)
try:
from ray.dashboard.modules.job.cli import job_cli_group
add_command_alias(job_cli_group, name="job", hidden=True)
except Exception as e:
logger.debug(f"Integrating ray jobs command line tool failed with {e}")
try:
from ray.serve.scripts import serve_cli