[job submission] Prefix job ID with raysubmit_ and pass job_name metadata (#20490)

This commit is contained in:
Edward Oakes 2021-11-17 21:48:22 -06:00 committed by GitHub
parent 9796ae56d5
commit eae523159f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 16 deletions

View file

@ -11,9 +11,10 @@ from ray.experimental.internal_kv import (
)
from ray._private.runtime_env.packaging import parse_uri
# NOTE(edoakes): constant should be considered a public API because it's
# exposed in the snapshot API.
# NOTE(edoakes): these constants should be considered a public API because
# they're exposed in the snapshot API.
JOB_ID_METADATA_KEY = "job_submission_id"
JOB_NAME_METADATA_KEY = "job_name"
class JobStatus(str, Enum):

View file

@ -4,16 +4,18 @@ import os
import json
import logging
import traceback
import random
import subprocess
import string
from typing import Any, Dict, Tuple, Optional
from uuid import uuid4
import ray
import ray.ray_constants as ray_constants
from ray.actor import ActorHandle
from ray.dashboard.modules.job.common import (
JobStatus, JobStatusStorageClient, JOB_ID_METADATA_KEY)
JobStatus, JobStatusStorageClient, JOB_ID_METADATA_KEY,
JOB_NAME_METADATA_KEY)
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
logger = logging.getLogger(__name__)
@ -25,6 +27,20 @@ except AttributeError:
create_task = asyncio.ensure_future
def generate_job_id() -> str:
"""Returns a job_id of the form 'raysubmit_XYZ'.
Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID).
"""
rand = random.SystemRandom()
possible_characters = list(
set(string.ascii_letters + string.digits) -
{"I", "l", "o", "O", "0"} # No confusing characters
)
id_part = "".join(rand.choices(possible_characters, k=16))
return f"raysubmit_{id_part}"
class JobLogStorageClient:
"""
Disk storage for stdout / stderr of driver script logs.
@ -60,14 +76,18 @@ class JobSupervisor:
SUBPROCESS_POLL_PERIOD_S = 0.1
def __init__(self, job_id: str, metadata: Dict[str, str]):
def __init__(self, job_id: str, user_metadata: Dict[str, str]):
self._job_id = job_id
self._status_client = JobStatusStorageClient()
self._log_client = JobLogStorageClient()
self._runtime_env = ray.get_runtime_context().runtime_env
self._metadata = metadata
self._metadata[JOB_ID_METADATA_KEY] = job_id
# Default metadata if not passed by the user.
self._metadata = {
JOB_ID_METADATA_KEY: job_id,
JOB_NAME_METADATA_KEY: job_id
}
self._metadata.update(user_metadata)
# fire and forget call from outer job manager to this actor
self._stop_event = asyncio.Event()
@ -263,7 +283,7 @@ class JobManager:
1) Generate a new unique id for this job submission, each call of this
method assumes they're independent submission with its own new
uuid, job supervisor actor and child process.
ID, job supervisor actor, and child process.
2) Create new detached actor with same runtime_env as job spec
Actual setting up runtime_env, subprocess group, driver command
@ -289,7 +309,7 @@ class JobManager:
within the same ray cluster.
"""
if job_id is None:
job_id = str(uuid4())
job_id = generate_job_id()
elif self._status_client.get_status(job_id) is not None:
raise RuntimeError(f"Job {job_id} already exists.")
@ -330,8 +350,7 @@ class JobManager:
"""Request job to exit, fire and forget.
Args:
job_id: Generated uuid from submit_job. Only valid in same ray
cluster.
job_id: ID of the job.
Returns:
stopped:
True if there's running job
@ -354,8 +373,7 @@ class JobManager:
All job status is stored and read only from GCS.
Args:
job_id: Generated uuid from submit_job. Only valid in same ray
cluster.
job_id: ID of the job.
Returns:
job_status: Latest known job status
"""

View file

@ -202,6 +202,7 @@ def test_job_metadata(job_sdk_client):
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)
assert str({
"job_name": job_id,
"job_submission_id": job_id,
"key1": "val1",
"key2": "val2"

View file

@ -7,8 +7,9 @@ import psutil
import pytest
import ray
from ray.dashboard.modules.job.common import JobStatus, JOB_ID_METADATA_KEY
from ray.dashboard.modules.job.job_manager import JobManager
from ray.dashboard.modules.job.common import (JobStatus, JOB_ID_METADATA_KEY,
JOB_NAME_METADATA_KEY)
from ray.dashboard.modules.job.job_manager import generate_job_id, JobManager
from ray._private.test_utils import SignalActor, wait_for_condition
TEST_NAMESPACE = "jobs_test_namespace"
@ -56,6 +57,19 @@ def check_subprocess_cleaned(pid):
return psutil.pid_exists(pid) is False
def test_generate_job_id():
ids = set()
for _ in range(10000):
new_id = generate_job_id()
assert new_id.startswith("raysubmit_")
assert new_id.count("_") == 1
assert "-" not in new_id
assert "/" not in new_id
ids.add(new_id)
assert len(ids) == 10000
def test_pass_job_id(job_manager):
job_id = "my_custom_id"
@ -218,12 +232,13 @@ class TestRuntimeEnv:
"print(dict(sorted(job_config.metadata.items())))"
"\"")
# Check that we default to only the job ID.
# Check that we default to only the job ID and job name.
job_id = job_manager.submit_job(entrypoint=print_metadata_cmd)
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
assert dict_to_str({
JOB_NAME_METADATA_KEY: job_id,
JOB_ID_METADATA_KEY: job_id
}) in job_manager.get_job_logs(job_id)
@ -238,11 +253,24 @@ class TestRuntimeEnv:
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
assert dict_to_str({
JOB_NAME_METADATA_KEY: job_id,
JOB_ID_METADATA_KEY: job_id,
"key1": "val1",
"key2": "val2"
}) in job_manager.get_job_logs(job_id)
# Check that we can override job name.
job_id = job_manager.submit_job(
entrypoint=print_metadata_cmd,
metadata={JOB_NAME_METADATA_KEY: "custom_name"})
wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id)
assert dict_to_str({
JOB_NAME_METADATA_KEY: "custom_name",
JOB_ID_METADATA_KEY: job_id
}) in job_manager.get_job_logs(job_id)
class TestAsyncAPI:
def _run_hanging_command(self,