[job submission] Allow passing job_id, return DOES_NOT_EXIST when applicable (#20164)

This commit is contained in:
Edward Oakes 2021-11-08 23:10:27 -08:00 committed by GitHub
parent d46caa9856
commit 50f2cf8a74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 87 additions and 23 deletions

View file

@ -11,7 +11,7 @@ py_library(
)
py_test_run_all_subdirectory(
size = "small",
size = "medium",
include = ["**/test*.py"],
exclude = ["modules/test/**", "modules/node/tests/test_node.py", "tests/test_dashboard.py"],
extra_srcs = [],

View file

@ -1,5 +1,5 @@
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Optional
from dataclasses import dataclass
@ -7,6 +7,7 @@ class JobStatus(str, Enum):
def __str__(self):
return f"{self.value}"
DOES_NOT_EXIST = "DOES_NOT_EXIST"
PENDING = "PENDING"
RUNNING = "RUNNING"
STOPPED = "STOPPED"
@ -31,6 +32,10 @@ class JobSubmitRequest:
runtime_env: Dict[str, Any]
# Command to start execution, ex: "python script.py"
entrypoint: str
# Optional job_id to specify for the job. If the job_id is not specified,
# one will be generated. If a job with the same job_id already exists, it
# will be rejected.
job_id: Optional[str]
# Metadata to pass in to the JobConfig.
metadata: Dict[str, str]

View file

@ -104,6 +104,7 @@ class JobHead(dashboard_utils.DashboardHeadModule):
try:
job_id = self._job_manager.submit_job(
entrypoint=submit_request.entrypoint,
job_id=submit_request.job_id,
runtime_env=submit_request.runtime_env,
metadata=submit_request.metadata)

View file

@ -113,7 +113,9 @@ class JobSubmissionClient:
runtime_env["working_dir"] = package_uri
def submit_job(self,
*,
entrypoint: str,
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None) -> str:
runtime_env = runtime_env or {}
@ -121,7 +123,10 @@ class JobSubmissionClient:
self._upload_working_dir_if_needed(runtime_env)
req = JobSubmitRequest(
entrypoint=entrypoint, runtime_env=runtime_env, metadata=metadata)
entrypoint=entrypoint,
job_id=job_id,
runtime_env=runtime_env,
metadata=metadata)
resp = self._do_request(
"POST",
JOBS_API_ROUTE_SUBMIT,

View file

@ -42,6 +42,12 @@ def _check_job_stopped(client: JobSubmissionClient, job_id: str) -> bool:
return status == JobStatus.STOPPED
def _check_job_does_not_exist(client: JobSubmissionClient,
job_id: str) -> bool:
status = client.get_job_status(job_id)
return status == JobStatus.DOES_NOT_EXIST
@pytest.fixture(
scope="function",
params=["no_working_dir", "local_working_dir", "s3_working_dir"])
@ -227,5 +233,25 @@ def test_job_metadata(job_sdk_client):
})
def test_pass_job_id(job_sdk_client):
client = job_sdk_client
job_id = "my_custom_id"
returned_id = client.submit_job(entrypoint="echo hello", job_id=job_id)
assert returned_id == job_id
wait_for_condition(_check_job_succeeded, client=client, job_id=returned_id)
# Test that a duplicate job_id is rejected.
with pytest.raises(Exception, match=f"{job_id} already exists"):
returned_id = client.submit_job(entrypoint="echo hello", job_id=job_id)
def test_nonexistent_job(job_sdk_client):
client = job_sdk_client
_check_job_does_not_exist(client, "nonexistent_job")
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -93,8 +93,10 @@ class JobStatusStorageClient:
def get_status(self, job_id: str) -> JobStatus:
pickled_status = _internal_kv_get(
self.JOB_STATUS_KEY.format(job_id=job_id))
assert pickled_status is not None, f"Status not found for {job_id}"
return pickle.loads(pickled_status)
if pickled_status is None:
return JobStatus.DOES_NOT_EXIST
else:
return pickle.loads(pickled_status)
class JobSupervisor:
@ -307,7 +309,9 @@ class JobManager:
"Cannot found the node dictionary for current node.")
def submit_job(self,
*,
entrypoint: str,
job_id: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, str]] = None,
_start_signal_actor: Optional[ActorHandle] = None) -> str:
@ -341,10 +345,15 @@ class JobManager:
job_id: Generated uuid for further job management. Only valid
within the same ray cluster.
"""
job_id = str(uuid4())
self._status_client.put_status(job_id, JobStatus.PENDING)
supervisor = None
if job_id is None:
job_id = str(uuid4())
elif self._status_client.get_status(
job_id) != JobStatus.DOES_NOT_EXIST:
raise RuntimeError(f"Job {job_id} already exists.")
self._status_client.put_status(job_id, JobStatus.PENDING)
supervisor = None
try:
logger.debug(
f"Submitting job with generated internal job_id: {job_id}")

View file

@ -57,24 +57,39 @@ def check_subprocess_cleaned(pid):
return psutil.pid_exists(pid) is False
def test_pass_job_id(job_manager):
job_id = "my_custom_id"
returned_id = job_manager.submit_job(
entrypoint="echo hello", job_id=job_id)
assert returned_id == job_id
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
# Check that the same job_id is rejected.
with pytest.raises(RuntimeError):
job_manager.submit_job(entrypoint="echo hello", job_id=job_id)
class TestShellScriptExecution:
def test_submit_basic_echo(self, job_manager):
job_id = job_manager.submit_job("echo hello")
job_id = job_manager.submit_job(entrypoint="echo hello")
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
assert job_manager.get_job_stdout(job_id) == b"hello"
def test_submit_stderr(self, job_manager):
job_id = job_manager.submit_job("echo error 1>&2")
job_id = job_manager.submit_job(entrypoint="echo error 1>&2")
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
assert job_manager.get_job_stderr(job_id) == b"error"
def test_submit_ls_grep(self, job_manager):
job_id = job_manager.submit_job(
f"ls {os.path.dirname(__file__)} | grep test_job_manager.py")
grep_cmd = f"ls {os.path.dirname(__file__)} | grep test_job_manager.py"
job_id = job_manager.submit_job(entrypoint=grep_cmd)
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
@ -88,8 +103,8 @@ class TestShellScriptExecution:
3) Job no hanging job supervisor actor
4) Empty stdout
"""
job_id = job_manager.submit_job(
f"python {_driver_script_path('script_with_exception.py')}")
run_cmd = f"python {_driver_script_path('script_with_exception.py')}"
job_id = job_manager.submit_job(entrypoint=run_cmd)
wait_for_condition(
check_job_failed, job_manager=job_manager, job_id=job_id)
@ -101,7 +116,7 @@ class TestShellScriptExecution:
def test_submit_with_s3_runtime_env(self, job_manager):
job_id = job_manager.submit_job(
"python script.py",
entrypoint="python script.py",
runtime_env={"working_dir": "s3://runtime-env-test/script.zip"})
wait_for_condition(
@ -120,7 +135,7 @@ class TestRuntimeEnv:
driver script.
"""
job_id = job_manager.submit_job(
"echo $TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR",
entrypoint="echo $TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR",
runtime_env={
"env_vars": {
"TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "233"
@ -134,7 +149,7 @@ class TestRuntimeEnv:
def test_multiple_runtime_envs(self, job_manager):
# Test that you can run two jobs in different envs without conflict.
job_id_1 = job_manager.submit_job(
f"python {_driver_script_path('print_runtime_env.py')}",
entrypoint=f"python {_driver_script_path('print_runtime_env.py')}",
runtime_env={
"env_vars": {
"TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR"
@ -148,7 +163,7 @@ class TestRuntimeEnv:
) == b"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_1_VAR'}}" # noqa: E501
job_id_2 = job_manager.submit_job(
f"python {_driver_script_path('print_runtime_env.py')}",
entrypoint=f"python {_driver_script_path('print_runtime_env.py')}",
runtime_env={
"env_vars": {
"TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_2_VAR"
@ -166,7 +181,7 @@ class TestRuntimeEnv:
if user provided runtime_env in both driver script and submit()
"""
job_id = job_manager.submit_job(
f"python {_driver_script_path('override_env_var.py')}",
entrypoint=f"python {_driver_script_path('override_env_var.py')}",
runtime_env={
"env_vars": {
"TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR"
@ -186,8 +201,9 @@ class TestRuntimeEnv:
actor failed to setup runtime_env.
"""
with pytest.raises(RuntimeError):
run_cmd = f"python {_driver_script_path('override_env_var.py')}"
job_id = job_manager.submit_job(
f"python {_driver_script_path('override_env_var.py')}",
entrypoint=run_cmd,
runtime_env={"working_dir": "path_not_exist"})
assert job_manager.get_job_status(job_id) == JobStatus.FAILED
@ -205,7 +221,7 @@ class TestRuntimeEnv:
"\"")
# Check that we default to only the job ID.
job_id = job_manager.submit_job(print_metadata_cmd)
job_id = job_manager.submit_job(entrypoint=print_metadata_cmd)
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
@ -215,7 +231,8 @@ class TestRuntimeEnv:
# Check that we can pass custom metadata.
job_id = job_manager.submit_job(
print_metadata_cmd, metadata={
entrypoint=print_metadata_cmd,
metadata={
"key1": "val1",
"key2": "val2"
})
@ -243,7 +260,8 @@ class TestAsyncAPI:
"do echo 'Waiting...' && sleep 1; "
"done")
job_id = job_manager.submit_job(
wait_for_file_cmd, _start_signal_actor=_start_signal_actor)
entrypoint=wait_for_file_cmd,
_start_signal_actor=_start_signal_actor)
for _ in range(10):
time.sleep(0.1)