mirror of
https://github.com/vale981/ray
synced 2025-03-10 05:16:49 -04:00

## Why are these changes needed? - Fixes the jobs tab in the new dashboard. Previously it didn't load. - Combines the old job concept, "driver jobs" and the new job submission conception into a single concept called "jobs". Jobs tab shows information about both jobs. - Updates all job APIs: They now returns both submission jobs and driver jobs. They also contains additional data in the response including "id", "job_id", "submission_id", and "driver". They also accept either job_id or submission_id as input. - Job ID is the same as the "ray core job id" concept. It is in the form of "0100000" and is the primary id to represent jobs. - Submission ID is an ID that is generated for each ray job submission. It is in the form of "raysubmit_12345...". It is a secondary id that can be used if a client needs to provide a self-generated id. or if the job id doesn't exist (ex: if the submission job doesn't create a ray driver) This PR has 2 deprecations - The `submit_job` sdk now accepts a new kwarg `submission_id`. `job_id is deprecated. - The `ray job submit` CLI now accepts `--submission-id`. `--job-id` is deprecated. **This PR has 4 backwards incompatible changes:** - list_jobs sdk now returns a list instead of a dictionary - the `ray job list` CLI now prints a list instead of a dictionary - The `/api/jobs` endpoint returns a list instead of a dictionary - The `POST api/jobs` endpoint (submit job) now returns a json with `submission_id` field instead of `job_id`.
395 lines
14 KiB
Python
395 lines
14 KiB
Python
import asyncio
|
|
import concurrent
|
|
import dataclasses
|
|
import json
|
|
import logging
|
|
import traceback
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional, Tuple
|
|
|
|
import aiohttp.web
|
|
from aiohttp.web import Request, Response
|
|
|
|
import ray
|
|
from ray._private import ray_constants
|
|
import ray.dashboard.optional_utils as optional_utils
|
|
import ray.dashboard.utils as dashboard_utils
|
|
from ray._private.runtime_env.packaging import (
|
|
package_exists,
|
|
pin_runtime_env_uri,
|
|
upload_package_to_gcs,
|
|
)
|
|
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
|
|
from ray.dashboard.modules.job.common import (
|
|
http_uri_components_to_uri,
|
|
JobStatus,
|
|
JobSubmitRequest,
|
|
JobSubmitResponse,
|
|
JobStopResponse,
|
|
JobLogsResponse,
|
|
validate_request_type,
|
|
JOB_ID_METADATA_KEY,
|
|
)
|
|
from ray.dashboard.modules.job.pydantic_models import (
|
|
DriverInfo,
|
|
JobDetails,
|
|
JobType,
|
|
)
|
|
from ray.dashboard.modules.version import (
|
|
CURRENT_VERSION,
|
|
VersionResponse,
|
|
)
|
|
from ray.dashboard.modules.job.job_manager import JobManager
|
|
from ray.runtime_env import RuntimeEnv
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
routes = optional_utils.ClassMethodRouteTable
|
|
|
|
|
|
class JobHead(dashboard_utils.DashboardHeadModule):
|
|
def __init__(self, dashboard_head):
|
|
super().__init__(dashboard_head)
|
|
self._dashboard_head = dashboard_head
|
|
self._job_manager = None
|
|
self._gcs_job_info_stub = None
|
|
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
|
|
async def _parse_and_validate_request(
|
|
self, req: Request, request_type: dataclass
|
|
) -> Any:
|
|
"""Parse request and cast to request type. If parsing failed, return a
|
|
Response object with status 400 and stacktrace instead.
|
|
"""
|
|
try:
|
|
return validate_request_type(await req.json(), request_type)
|
|
except Exception as e:
|
|
logger.info(f"Got invalid request type: {e}")
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPBadRequest.status_code,
|
|
)
|
|
|
|
async def find_job_by_ids(self, job_or_submission_id: str) -> Optional[JobDetails]:
|
|
"""
|
|
Attempts to find the job with a given submission_id or job id.
|
|
"""
|
|
# First try to find by job_id
|
|
driver_jobs, submission_job_drivers = await self._get_driver_jobs()
|
|
job = driver_jobs.get(job_or_submission_id)
|
|
if job:
|
|
return job
|
|
# Try to find a driver with the given id
|
|
submission_id = next(
|
|
(
|
|
id
|
|
for id, driver in submission_job_drivers.items()
|
|
if driver.id == job_or_submission_id
|
|
),
|
|
None,
|
|
)
|
|
|
|
if not submission_id:
|
|
# If we didn't find a driver with the given id,
|
|
# then lets try to search for a submission with given id
|
|
submission_id = job_or_submission_id
|
|
|
|
job_info = await asyncio.get_event_loop().run_in_executor(
|
|
self._executor, lambda: self._job_manager.get_job_info(submission_id)
|
|
)
|
|
if job_info:
|
|
driver = submission_job_drivers.get(submission_id)
|
|
job = JobDetails(
|
|
**dataclasses.asdict(job_info),
|
|
submission_id=submission_id,
|
|
job_id=driver.id if driver else None,
|
|
driver_info=driver,
|
|
type=JobType.SUBMISSION,
|
|
)
|
|
return job
|
|
|
|
return None
|
|
|
|
@routes.get("/api/version")
|
|
async def get_version(self, req: Request) -> Response:
|
|
# NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
|
|
# client when we have backwards-incompatible changes.
|
|
resp = VersionResponse(
|
|
version=CURRENT_VERSION,
|
|
ray_version=ray.__version__,
|
|
ray_commit=ray.__commit__,
|
|
)
|
|
return Response(
|
|
text=json.dumps(dataclasses.asdict(resp)),
|
|
content_type="application/json",
|
|
status=aiohttp.web.HTTPOk.status_code,
|
|
)
|
|
|
|
@routes.get("/api/packages/{protocol}/{package_name}")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def get_package(self, req: Request) -> Response:
|
|
package_uri = http_uri_components_to_uri(
|
|
protocol=req.match_info["protocol"],
|
|
package_name=req.match_info["package_name"],
|
|
)
|
|
|
|
logger.debug(f"Adding temporary reference to package {package_uri}.")
|
|
try:
|
|
pin_runtime_env_uri(package_uri)
|
|
except Exception:
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPInternalServerError.status_code,
|
|
)
|
|
|
|
if not package_exists(package_uri):
|
|
return Response(
|
|
text=f"Package {package_uri} does not exist",
|
|
status=aiohttp.web.HTTPNotFound.status_code,
|
|
)
|
|
|
|
return Response()
|
|
|
|
@routes.put("/api/packages/{protocol}/{package_name}")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def upload_package(self, req: Request):
|
|
package_uri = http_uri_components_to_uri(
|
|
protocol=req.match_info["protocol"],
|
|
package_name=req.match_info["package_name"],
|
|
)
|
|
logger.info(f"Uploading package {package_uri} to the GCS.")
|
|
try:
|
|
upload_package_to_gcs(package_uri, await req.read())
|
|
except Exception:
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPInternalServerError.status_code,
|
|
)
|
|
|
|
return Response(status=aiohttp.web.HTTPOk.status_code)
|
|
|
|
@routes.post("/api/jobs/")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def submit_job(self, req: Request) -> Response:
|
|
result = await self._parse_and_validate_request(req, JobSubmitRequest)
|
|
# Request parsing failed, returned with Response object.
|
|
if isinstance(result, Response):
|
|
return result
|
|
else:
|
|
submit_request = result
|
|
|
|
try:
|
|
submission_id = self._job_manager.submit_job(
|
|
entrypoint=submit_request.entrypoint,
|
|
submission_id=submit_request.submission_id,
|
|
runtime_env=submit_request.runtime_env,
|
|
metadata=submit_request.metadata,
|
|
)
|
|
|
|
resp = JobSubmitResponse(job_id=submission_id, submission_id=submission_id)
|
|
except (TypeError, ValueError):
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPBadRequest.status_code,
|
|
)
|
|
except Exception:
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPInternalServerError.status_code,
|
|
)
|
|
|
|
return Response(
|
|
text=json.dumps(dataclasses.asdict(resp)),
|
|
content_type="application/json",
|
|
status=aiohttp.web.HTTPOk.status_code,
|
|
)
|
|
|
|
@routes.post("/api/jobs/{job_or_submission_id}/stop")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def stop_job(self, req: Request) -> Response:
|
|
job_or_submission_id = req.match_info["job_or_submission_id"]
|
|
job = await self.find_job_by_ids(job_or_submission_id)
|
|
if not job:
|
|
return Response(
|
|
text=f"Job {job_or_submission_id} does not exist",
|
|
status=aiohttp.web.HTTPNotFound.status_code,
|
|
)
|
|
if job.type is not JobType.SUBMISSION:
|
|
return Response(
|
|
text="Can only stop submission type jobs",
|
|
status=aiohttp.web.HTTPBadRequest.status_code,
|
|
)
|
|
|
|
try:
|
|
stopped = self._job_manager.stop_job(job.submission_id)
|
|
resp = JobStopResponse(stopped=stopped)
|
|
except Exception:
|
|
return Response(
|
|
text=traceback.format_exc(),
|
|
status=aiohttp.web.HTTPInternalServerError.status_code,
|
|
)
|
|
|
|
return Response(
|
|
text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
|
|
)
|
|
|
|
@routes.get("/api/jobs/{job_or_submission_id}")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def get_job_info(self, req: Request) -> Response:
|
|
job_or_submission_id = req.match_info["job_or_submission_id"]
|
|
job = await self.find_job_by_ids(job_or_submission_id)
|
|
if not job:
|
|
return Response(
|
|
text=f"Job {job_or_submission_id} does not exist",
|
|
status=aiohttp.web.HTTPNotFound.status_code,
|
|
)
|
|
|
|
return Response(
|
|
text=json.dumps(job.dict()),
|
|
content_type="application/json",
|
|
)
|
|
|
|
@routes.get("/api/jobs/")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def list_jobs(self, req: Request) -> Response:
|
|
driver_jobs, submission_job_drivers = await self._get_driver_jobs()
|
|
|
|
# TODO(aguo): convert _job_manager.list_jobs to an async function.
|
|
submission_jobs = await asyncio.get_event_loop().run_in_executor(
|
|
self._executor, self._job_manager.list_jobs
|
|
)
|
|
submission_jobs = [
|
|
JobDetails(
|
|
**dataclasses.asdict(job),
|
|
submission_id=submission_id,
|
|
job_id=submission_job_drivers.get(submission_id).id
|
|
if submission_id in submission_job_drivers
|
|
else None,
|
|
driver_info=submission_job_drivers.get(submission_id),
|
|
type=JobType.SUBMISSION,
|
|
)
|
|
for submission_id, job in submission_jobs.items()
|
|
]
|
|
return Response(
|
|
text=json.dumps(
|
|
[
|
|
*[submission_job.dict() for submission_job in submission_jobs],
|
|
*[job_info.dict() for job_info in driver_jobs.values()],
|
|
]
|
|
),
|
|
content_type="application/json",
|
|
)
|
|
|
|
async def _get_driver_jobs(
|
|
self,
|
|
) -> Tuple[Dict[str, JobDetails], Dict[str, DriverInfo]]:
|
|
"""Returns a tuple of dictionaries related to drivers.
|
|
|
|
The first dictionary contains all driver jobs and is keyed by the job's id.
|
|
The second dictionary contains drivers that belong to submission jobs.
|
|
It's keyed by the submission job's submission id.
|
|
Only the last driver of a submission job is returned.
|
|
"""
|
|
request = gcs_service_pb2.GetAllJobInfoRequest()
|
|
reply = await self._gcs_job_info_stub.GetAllJobInfo(request, timeout=5)
|
|
|
|
jobs = {}
|
|
submission_job_drivers = {}
|
|
for job_table_entry in reply.job_info_list:
|
|
if job_table_entry.config.ray_namespace.startswith(
|
|
ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX
|
|
):
|
|
# Skip jobs in any _ray_internal_ namespace
|
|
continue
|
|
job_id = job_table_entry.job_id.hex()
|
|
metadata = dict(job_table_entry.config.metadata)
|
|
job_submission_id = metadata.get(JOB_ID_METADATA_KEY)
|
|
if not job_submission_id:
|
|
driver = DriverInfo(
|
|
id=job_id,
|
|
node_ip_address=job_table_entry.driver_ip_address,
|
|
pid=job_table_entry.driver_pid,
|
|
)
|
|
job = JobDetails(
|
|
job_id=job_id,
|
|
type=JobType.DRIVER,
|
|
status=JobStatus.SUCCEEDED
|
|
if job_table_entry.is_dead
|
|
else JobStatus.RUNNING,
|
|
entrypoint="",
|
|
start_time=job_table_entry.start_time,
|
|
end_time=job_table_entry.end_time,
|
|
metadata=metadata,
|
|
runtime_env=RuntimeEnv.deserialize(
|
|
job_table_entry.config.runtime_env_info.serialized_runtime_env
|
|
).to_dict(),
|
|
driver_info=driver,
|
|
)
|
|
jobs[job_id] = job
|
|
else:
|
|
driver = DriverInfo(
|
|
id=job_id,
|
|
node_ip_address=job_table_entry.driver_ip_address,
|
|
pid=job_table_entry.driver_pid,
|
|
)
|
|
submission_job_drivers[job_submission_id] = driver
|
|
|
|
return jobs, submission_job_drivers
|
|
|
|
@routes.get("/api/jobs/{job_or_submission_id}/logs")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def get_job_logs(self, req: Request) -> Response:
|
|
job_or_submission_id = req.match_info["job_or_submission_id"]
|
|
job = await self.find_job_by_ids(job_or_submission_id)
|
|
if not job:
|
|
return Response(
|
|
text=f"Job {job_or_submission_id} does not exist",
|
|
status=aiohttp.web.HTTPNotFound.status_code,
|
|
)
|
|
|
|
if job.type is not JobType.SUBMISSION:
|
|
return Response(
|
|
text="Can only get logs of submission type jobs",
|
|
status=aiohttp.web.HTTPBadRequest.status_code,
|
|
)
|
|
|
|
resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job.submission_id))
|
|
return Response(
|
|
text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
|
|
)
|
|
|
|
@routes.get("/api/jobs/{job_or_submission_id}/logs/tail")
|
|
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=False)
|
|
async def tail_job_logs(self, req: Request) -> Response:
|
|
job_or_submission_id = req.match_info["job_or_submission_id"]
|
|
job = await self.find_job_by_ids(job_or_submission_id)
|
|
if not job:
|
|
return Response(
|
|
text=f"Job {job_or_submission_id} does not exist",
|
|
status=aiohttp.web.HTTPNotFound.status_code,
|
|
)
|
|
|
|
if job.type is not JobType.SUBMISSION:
|
|
return Response(
|
|
text="Can only get logs of submission type jobs",
|
|
status=aiohttp.web.HTTPBadRequest.status_code,
|
|
)
|
|
|
|
ws = aiohttp.web.WebSocketResponse()
|
|
await ws.prepare(req)
|
|
|
|
async for lines in self._job_manager.tail_job_logs(job.submission_id):
|
|
await ws.send_str(lines)
|
|
|
|
async def run(self, server):
|
|
if not self._job_manager:
|
|
self._job_manager = JobManager()
|
|
|
|
self._gcs_job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub(
|
|
self._dashboard_head.aiogrpc_gcs_channel
|
|
)
|
|
|
|
@staticmethod
|
|
def is_minimal_module():
|
|
return False
|