diff --git a/dashboard/modules/job/common.py b/dashboard/modules/job/common.py index 01d01c0df..2d90b04ba 100644 --- a/dashboard/modules/job/common.py +++ b/dashboard/modules/job/common.py @@ -11,9 +11,10 @@ from ray.experimental.internal_kv import ( ) from ray._private.runtime_env.packaging import parse_uri -# NOTE(edoakes): constant should be considered a public API because it's -# exposed in the snapshot API. +# NOTE(edoakes): these constants should be considered a public API because +# they're exposed in the snapshot API. JOB_ID_METADATA_KEY = "job_submission_id" +JOB_NAME_METADATA_KEY = "job_name" class JobStatus(str, Enum): diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index ce2952932..8210e118c 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -4,16 +4,18 @@ import os import json import logging import traceback +import random import subprocess +import string from typing import Any, Dict, Tuple, Optional -from uuid import uuid4 import ray import ray.ray_constants as ray_constants from ray.actor import ActorHandle from ray.dashboard.modules.job.common import ( - JobStatus, JobStatusStorageClient, JOB_ID_METADATA_KEY) + JobStatus, JobStatusStorageClient, JOB_ID_METADATA_KEY, + JOB_NAME_METADATA_KEY) from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR logger = logging.getLogger(__name__) @@ -25,6 +27,20 @@ except AttributeError: create_task = asyncio.ensure_future +def generate_job_id() -> str: + """Returns a job_id of the form 'raysubmit_XYZ'. + + Prefixed with 'raysubmit' to avoid confusion with Ray JobID (driver ID). + """ + rand = random.SystemRandom() + possible_characters = list( + set(string.ascii_letters + string.digits) - + {"I", "l", "o", "O", "0"} # No confusing characters + ) + id_part = "".join(rand.choices(possible_characters, k=16)) + return f"raysubmit_{id_part}" + + class JobLogStorageClient: """ Disk storage for stdout / stderr of driver script logs. @@ -60,14 +76,18 @@ class JobSupervisor: SUBPROCESS_POLL_PERIOD_S = 0.1 - def __init__(self, job_id: str, metadata: Dict[str, str]): + def __init__(self, job_id: str, user_metadata: Dict[str, str]): self._job_id = job_id self._status_client = JobStatusStorageClient() self._log_client = JobLogStorageClient() self._runtime_env = ray.get_runtime_context().runtime_env - self._metadata = metadata - self._metadata[JOB_ID_METADATA_KEY] = job_id + # Default metadata if not passed by the user. + self._metadata = { + JOB_ID_METADATA_KEY: job_id, + JOB_NAME_METADATA_KEY: job_id + } + self._metadata.update(user_metadata) # fire and forget call from outer job manager to this actor self._stop_event = asyncio.Event() @@ -263,7 +283,7 @@ class JobManager: 1) Generate a new unique id for this job submission, each call of this method assumes they're independent submission with its own new - uuid, job supervisor actor and child process. + ID, job supervisor actor, and child process. 2) Create new detached actor with same runtime_env as job spec Actual setting up runtime_env, subprocess group, driver command @@ -289,7 +309,7 @@ class JobManager: within the same ray cluster. """ if job_id is None: - job_id = str(uuid4()) + job_id = generate_job_id() elif self._status_client.get_status(job_id) is not None: raise RuntimeError(f"Job {job_id} already exists.") @@ -330,8 +350,7 @@ class JobManager: """Request job to exit, fire and forget. Args: - job_id: Generated uuid from submit_job. Only valid in same ray - cluster. + job_id: ID of the job. Returns: stopped: True if there's running job @@ -354,8 +373,7 @@ class JobManager: All job status is stored and read only from GCS. Args: - job_id: Generated uuid from submit_job. Only valid in same ray - cluster. + job_id: ID of the job. Returns: job_status: Latest known job status """ diff --git a/dashboard/modules/job/tests/test_http_job_server.py b/dashboard/modules/job/tests/test_http_job_server.py index d96b566f9..5678bd546 100644 --- a/dashboard/modules/job/tests/test_http_job_server.py +++ b/dashboard/modules/job/tests/test_http_job_server.py @@ -202,6 +202,7 @@ def test_job_metadata(job_sdk_client): wait_for_condition(_check_job_succeeded, client=client, job_id=job_id) assert str({ + "job_name": job_id, "job_submission_id": job_id, "key1": "val1", "key2": "val2" diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index c7604fc16..11dac65e3 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -7,8 +7,9 @@ import psutil import pytest import ray -from ray.dashboard.modules.job.common import JobStatus, JOB_ID_METADATA_KEY -from ray.dashboard.modules.job.job_manager import JobManager +from ray.dashboard.modules.job.common import (JobStatus, JOB_ID_METADATA_KEY, + JOB_NAME_METADATA_KEY) +from ray.dashboard.modules.job.job_manager import generate_job_id, JobManager from ray._private.test_utils import SignalActor, wait_for_condition TEST_NAMESPACE = "jobs_test_namespace" @@ -56,6 +57,19 @@ def check_subprocess_cleaned(pid): return psutil.pid_exists(pid) is False +def test_generate_job_id(): + ids = set() + for _ in range(10000): + new_id = generate_job_id() + assert new_id.startswith("raysubmit_") + assert new_id.count("_") == 1 + assert "-" not in new_id + assert "/" not in new_id + ids.add(new_id) + + assert len(ids) == 10000 + + def test_pass_job_id(job_manager): job_id = "my_custom_id" @@ -218,12 +232,13 @@ class TestRuntimeEnv: "print(dict(sorted(job_config.metadata.items())))" "\"") - # Check that we default to only the job ID. + # Check that we default to only the job ID and job name. job_id = job_manager.submit_job(entrypoint=print_metadata_cmd) wait_for_condition( check_job_succeeded, job_manager=job_manager, job_id=job_id) assert dict_to_str({ + JOB_NAME_METADATA_KEY: job_id, JOB_ID_METADATA_KEY: job_id }) in job_manager.get_job_logs(job_id) @@ -238,11 +253,24 @@ class TestRuntimeEnv: wait_for_condition( check_job_succeeded, job_manager=job_manager, job_id=job_id) assert dict_to_str({ + JOB_NAME_METADATA_KEY: job_id, JOB_ID_METADATA_KEY: job_id, "key1": "val1", "key2": "val2" }) in job_manager.get_job_logs(job_id) + # Check that we can override job name. + job_id = job_manager.submit_job( + entrypoint=print_metadata_cmd, + metadata={JOB_NAME_METADATA_KEY: "custom_name"}) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert dict_to_str({ + JOB_NAME_METADATA_KEY: "custom_name", + JOB_ID_METADATA_KEY: job_id + }) in job_manager.get_job_logs(job_id) + class TestAsyncAPI: def _run_hanging_command(self,