[Job Submission] Add stop API to http & sdk, with better status code + stacktrace (#20094)

This commit is contained in:
Jiao 2021-11-06 10:37:54 -07:00 committed by GitHub
parent 7a18d90a25
commit 9ef75b27ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 195 additions and 21 deletions

View file

@ -40,6 +40,14 @@ class JobSubmitResponse:
job_id: str job_id: str
# ==== Job Stop ====
@dataclass
class JobStopResponse:
stopped: bool
# ==== Job Status ==== # ==== Job Status ====

View file

@ -1,9 +1,11 @@
import aiohttp.web import aiohttp.web
import dataclasses
from functools import wraps from functools import wraps
import logging import logging
from typing import Callable from typing import Any, Callable
import json import json
import dataclasses import traceback
from dataclasses import dataclass
import ray import ray
import ray.dashboard.utils as dashboard_utils import ray.dashboard.utils as dashboard_utils
@ -12,7 +14,7 @@ from ray._private.runtime_env.packaging import (package_exists,
upload_package_to_gcs) upload_package_to_gcs)
from ray.dashboard.modules.job.data_types import ( from ray.dashboard.modules.job.data_types import (
GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse, GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse,
JobStatusResponse, JobLogsResponse) JobStopResponse, JobStatusResponse, JobLogsResponse)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable routes = dashboard_utils.ClassMethodRouteTable
@ -22,6 +24,7 @@ RAY_INTERNAL_JOBS_NAMESPACE = "_ray_internal_jobs_"
JOBS_API_PREFIX = "/api/jobs/" JOBS_API_PREFIX = "/api/jobs/"
JOBS_API_ROUTE_LOGS = JOBS_API_PREFIX + "logs" JOBS_API_ROUTE_LOGS = JOBS_API_PREFIX + "logs"
JOBS_API_ROUTE_SUBMIT = JOBS_API_PREFIX + "submit" JOBS_API_ROUTE_SUBMIT = JOBS_API_PREFIX + "submit"
JOBS_API_ROUTE_STOP = JOBS_API_PREFIX + "stop"
JOBS_API_ROUTE_STATUS = JOBS_API_PREFIX + "status" JOBS_API_ROUTE_STATUS = JOBS_API_PREFIX + "status"
JOBS_API_ROUTE_PACKAGE = JOBS_API_PREFIX + "package" JOBS_API_ROUTE_PACKAGE = JOBS_API_PREFIX + "package"
@ -42,12 +45,34 @@ class JobHead(dashboard_utils.DashboardHeadModule):
self._job_manager = None self._job_manager = None
async def _parse_and_validate_request(self, req: aiohttp.web.Request,
request_type: dataclass) -> Any:
"""Parse request and cast to request type. If parsing failed, return a
Response object with status 400 and stacktrace instead.
"""
try:
# TODO: (jiaodong) Validate if job request is valid without using
# pydantic.
result = request_type(**(await req.json()))
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPBadRequest.status_code)
return result
@routes.get(JOBS_API_ROUTE_PACKAGE) @routes.get(JOBS_API_ROUTE_PACKAGE)
@_ensure_ray_initialized @_ensure_ray_initialized
async def get_package(self, async def get_package(self,
req: aiohttp.web.Request) -> aiohttp.web.Response: req: aiohttp.web.Request) -> aiohttp.web.Response:
package_uri = req.query["package_uri"] package_uri = req.query["package_uri"]
resp = GetPackageResponse(package_exists=package_exists(package_uri)) try:
exists = package_exists(package_uri)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)
resp = GetPackageResponse(package_exists=exists)
return aiohttp.web.Response( return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)), text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json") content_type="application/json")
@ -57,22 +82,55 @@ class JobHead(dashboard_utils.DashboardHeadModule):
async def upload_package(self, req: aiohttp.web.Request): async def upload_package(self, req: aiohttp.web.Request):
package_uri = req.query["package_uri"] package_uri = req.query["package_uri"]
logger.info(f"Uploading package {package_uri} to the GCS.") logger.info(f"Uploading package {package_uri} to the GCS.")
upload_package_to_gcs(package_uri, await req.read()) try:
upload_package_to_gcs(package_uri, await req.read())
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)
return aiohttp.web.Response() return aiohttp.web.Response(status=aiohttp.web.HTTPOk.status_code, )
@routes.post(JOBS_API_ROUTE_SUBMIT) @routes.post(JOBS_API_ROUTE_SUBMIT)
@_ensure_ray_initialized @_ensure_ray_initialized
async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response: async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
# TODO: (jiaodong) Validate if job request is valid without using result = await self._parse_and_validate_request(req, JobSubmitRequest)
# pydantic. # Request parsing failed, returned with Response object.
submit_request = JobSubmitRequest(**(await req.json())) if isinstance(result, aiohttp.web.Response):
job_id = self._job_manager.submit_job( return result
entrypoint=submit_request.entrypoint, else:
runtime_env=submit_request.runtime_env, submit_request = result
metadata=submit_request.metadata)
try:
job_id = self._job_manager.submit_job(
entrypoint=submit_request.entrypoint,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata)
resp = JobSubmitResponse(job_id=job_id)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)
return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json",
status=aiohttp.web.HTTPOk.status_code,
)
@routes.post(JOBS_API_ROUTE_STOP)
@_ensure_ray_initialized
async def stop(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"]
try:
stopped = self._job_manager.stop_job(job_id)
resp = JobStopResponse(stopped=stopped)
except Exception:
return aiohttp.web.Response(
reason=traceback.format_exc().encode("utf-8"),
status=aiohttp.web.HTTPInternalServerError.status_code)
resp = JobSubmitResponse(job_id=job_id)
return aiohttp.web.Response( return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)), text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json") content_type="application/json")
@ -82,7 +140,6 @@ class JobHead(dashboard_utils.DashboardHeadModule):
async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response: async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"] job_id = req.query["job_id"]
status: JobStatus = self._job_manager.get_job_status(job_id) status: JobStatus = self._job_manager.get_job_status(job_id)
resp = JobStatusResponse(job_status=status) resp = JobStatusResponse(job_status=status)
return aiohttp.web.Response( return aiohttp.web.Response(
text=json.dumps(dataclasses.asdict(resp)), text=json.dumps(dataclasses.asdict(resp)),
@ -92,6 +149,7 @@ class JobHead(dashboard_utils.DashboardHeadModule):
@_ensure_ray_initialized @_ensure_ray_initialized
async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
job_id = req.query["job_id"] job_id = req.query["job_id"]
stdout: bytes = self._job_manager.get_job_stdout(job_id) stdout: bytes = self._job_manager.get_job_stdout(job_id)
stderr: bytes = self._job_manager.get_job_stderr(job_id) stderr: bytes = self._job_manager.get_job_stderr(job_id)

View file

@ -10,12 +10,12 @@ from ray._private.runtime_env.packaging import (
create_package, get_uri_for_directory, parse_uri) create_package, get_uri_for_directory, parse_uri)
from ray._private.job_manager import JobStatus from ray._private.job_manager import JobStatus
from ray.dashboard.modules.job.data_types import ( from ray.dashboard.modules.job.data_types import (
GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStatusResponse, GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStopResponse,
JobLogsResponse) JobStatusResponse, JobLogsResponse)
from ray.dashboard.modules.job.job_head import ( from ray.dashboard.modules.job.job_head import (
JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STATUS, JOBS_API_ROUTE_LOGS, JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STOP,
JOBS_API_ROUTE_PACKAGE) JOBS_API_ROUTE_STATUS, JOBS_API_ROUTE_PACKAGE)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -46,7 +46,6 @@ class JobSubmissionClient:
f"json: {json_data}, params: {params}.") f"json: {json_data}, params: {params}.")
r = requests.request( r = requests.request(
method, url, data=data, json=json_data, params=params) method, url, data=data, json=json_data, params=params)
r.raise_for_status() r.raise_for_status()
if response_type is None: if response_type is None:
return None return None
@ -130,6 +129,14 @@ class JobSubmissionClient:
response_type=JobSubmitResponse) response_type=JobSubmitResponse)
return resp.job_id return resp.job_id
def stop_job(self, job_id: str) -> bool:
resp = self._do_request(
"POST",
JOBS_API_ROUTE_STOP,
params={"job_id": job_id},
response_type=JobStopResponse)
return resp.stopped
def get_job_status(self, job_id: str) -> JobStatus: def get_job_status(self, job_id: str) -> JobStatus:
resp = self._do_request( resp = self._do_request(
"GET", "GET",

View file

@ -2,6 +2,7 @@ import logging
from pathlib import Path from pathlib import Path
import sys import sys
import tempfile import tempfile
import requests
import pytest import pytest
@ -10,9 +11,9 @@ from ray._private.test_utils import (format_web_url, wait_for_condition,
wait_until_server_available) wait_until_server_available)
from ray._private.job_manager import JobStatus from ray._private.job_manager import JobStatus
from ray.dashboard.modules.job.sdk import JobSubmissionClient from ray.dashboard.modules.job.sdk import JobSubmissionClient
from ray.dashboard.modules.job.job_head import JOBS_API_ROUTE_SUBMIT
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture @pytest.fixture
@ -31,6 +32,16 @@ def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
return status == JobStatus.SUCCEEDED return status == JobStatus.SUCCEEDED
def _check_job_failed(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
return status == JobStatus.FAILED
def _check_job_stopped(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
return status == JobStatus.STOPPED
@pytest.fixture( @pytest.fixture(
scope="function", scope="function",
params=["no_working_dir", "local_working_dir", "s3_working_dir"]) params=["no_working_dir", "local_working_dir", "s3_working_dir"])
@ -98,6 +109,96 @@ def test_submit_job(job_sdk_client, working_dir_option):
assert stderr == working_dir_option["expected_stderr"] assert stderr == working_dir_option["expected_stderr"]
def test_http_bad_request(job_sdk_client):
"""
Send bad requests to job http server and ensure right return code and
error message is returned via http.
"""
client = job_sdk_client
# 400 - HTTPBadRequest
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client._do_request(
"POST",
JOBS_API_ROUTE_SUBMIT,
json_data={"key": "baaaad request"},
)
ex_message = str(e.value)
assert "400 Client Error" in ex_message
assert "TypeError: __init__() got an unexpected keyword argument" in ex_message # noqa: E501
# 405 - HTTPMethodNotAllowed
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client._do_request(
"GET",
JOBS_API_ROUTE_SUBMIT,
json_data={"key": "baaaad request"},
)
ex_message = str(e.value)
assert "405 Client Error: Method Not Allowed" in ex_message
# 500 - HTTPInternalServerError
with pytest.raises(requests.exceptions.HTTPError) as e:
_ = client.submit_job(
entrypoint="echo hello",
runtime_env={"working_dir": "s3://does_not_exist"})
ex_message = str(e.value)
assert "500 Server Error" in ex_message
assert "Only .zip files supported for S3 URIs" in ex_message
def test_submit_job_with_exception_in_driver(job_sdk_client):
"""
Submit a job that's expected to throw exception while executing.
"""
client = job_sdk_client
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
driver_script = """
print('Hello !')
raise RuntimeError('Intentionally failed.')
"""
test_script_file = path / "test_script.py"
with open(test_script_file, "w+") as file:
file.write(driver_script)
job_id = client.submit_job(
entrypoint="python test_script.py",
runtime_env={"working_dir": tmp_dir})
wait_for_condition(_check_job_failed, client=client, job_id=job_id)
stdout, stderr = client.get_job_logs(job_id)
assert stdout == "Hello !"
assert "RuntimeError: Intentionally failed." in stderr
def test_stop_long_running_job(job_sdk_client):
"""
Submit a job that runs for a while and stop it in the middle.
"""
client = job_sdk_client
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
driver_script = """
print('Hello !')
import time
time.sleep(300) # This should never finish
raise RuntimeError('Intentionally failed.')
"""
test_script_file = path / "test_script.py"
with open(test_script_file, "w+") as file:
file.write(driver_script)
job_id = client.submit_job(
entrypoint="python test_script.py",
runtime_env={"working_dir": tmp_dir})
assert client.stop_job(job_id) is True
wait_for_condition(_check_job_stopped, client=client, job_id=job_id)
def test_job_metadata(job_sdk_client): def test_job_metadata(job_sdk_client):
client = job_sdk_client client = job_sdk_client