From 5ce79d0a461ab715d20f2d65efe147ddf6f233e7 Mon Sep 17 00:00:00 2001 From: Jiao Date: Wed, 24 Nov 2021 15:38:26 -0800 Subject: [PATCH] [jobs] Fix job server's ray init(to use redis address rather than auto (#20705) * [job submission] Use specific redis_address and redis_password instead of "auto" (#20687) Co-authored-by: Edward Oakes Co-authored-by: Jiao Dong --- dashboard/modules/job/cli.py | 2 + dashboard/modules/job/job_head.py | 21 +++- dashboard/modules/job/tests/test_cli.py | 13 ++- .../modules/job/tests/test_cli_integration.py | 107 ++++++++++++++++++ 4 files changed, 138 insertions(+), 5 deletions(-) create mode 100644 dashboard/modules/job/tests/test_cli_integration.py diff --git a/dashboard/modules/job/cli.py b/dashboard/modules/job/cli.py index 0ffd1a6d5..e1fa2b670 100644 --- a/dashboard/modules/job/cli.py +++ b/dashboard/modules/job/cli.py @@ -14,6 +14,7 @@ logger = logging.getLogger(__name__) def _get_sdk_client(address: Optional[str], create_cluster_if_needed: bool = False ) -> JobSubmissionClient: + if address is None: if "RAY_ADDRESS" not in os.environ: raise ValueError( @@ -21,6 +22,7 @@ def _get_sdk_client(address: Optional[str], "or RAY_ADDRESS environment variable.") address = os.environ["RAY_ADDRESS"] + logger.info(f"Creating JobSubmissionClient at address: {address}") return JobSubmissionClient(address, create_cluster_if_needed) diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index ad4e5baef..0e035ca70 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -27,6 +27,8 @@ from ray.dashboard.modules.job.common import ( from ray.dashboard.modules.job.job_manager import JobManager logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + routes = dashboard_utils.ClassMethodRouteTable RAY_INTERNAL_JOBS_NAMESPACE = "_ray_internal_jobs" @@ -35,9 +37,21 @@ RAY_INTERNAL_JOBS_NAMESPACE = "_ray_internal_jobs" def _init_ray_and_catch_exceptions(f: Callable) -> Callable: @wraps(f) async def check(self, *args, **kwargs): - if not ray.is_initialized(): - ray.init(address="auto", namespace=RAY_INTERNAL_JOBS_NAMESPACE) try: + if not ray.is_initialized(): + try: + address = self._redis_address + redis_pw = self._redis_password + logger.info(f"Connecting to ray with address={address}, " + f"redis_pw={redis_pw}") + ray.init( + address=address, + namespace=RAY_INTERNAL_JOBS_NAMESPACE, + _redis_password=redis_pw) + except Exception as e: + ray.shutdown() + raise e from None + return await f(self, *args, **kwargs) except Exception as e: logger.exception(f"Unexpected error in handler: {e}") @@ -52,6 +66,9 @@ class JobHead(dashboard_utils.DashboardHeadModule): def __init__(self, dashboard_head): super().__init__(dashboard_head) + ip, port = dashboard_head.redis_address + self._redis_address = f"{ip}:{port}" + self._redis_password = dashboard_head.redis_password self._job_manager = None async def _parse_and_validate_request(self, req: Request, diff --git a/dashboard/modules/job/tests/test_cli.py b/dashboard/modules/job/tests/test_cli.py index 30175c934..4aa95d4ec 100644 --- a/dashboard/modules/job/tests/test_cli.py +++ b/dashboard/modules/job/tests/test_cli.py @@ -6,6 +6,7 @@ import tempfile import os from unittest import mock import yaml +from typing import Optional from click.testing import CliRunner @@ -47,11 +48,17 @@ def runtime_env_formats(): @contextmanager -def set_env_var(key: str, val: str): +def set_env_var(key: str, val: Optional[str] = None): old_val = os.environ.get(key, None) - os.environ[key] = val + if val is not None: + os.environ[key] = val + elif key in os.environ: + del os.environ[key] + yield - del os.environ[key] + + if key in os.environ: + del os.environ[key] if old_val is not None: os.environ[key] = old_val diff --git a/dashboard/modules/job/tests/test_cli_integration.py b/dashboard/modules/job/tests/test_cli_integration.py new file mode 100644 index 000000000..86bcee67b --- /dev/null +++ b/dashboard/modules/job/tests/test_cli_integration.py @@ -0,0 +1,107 @@ +from contextlib import contextmanager +import os +import logging +import sys +import subprocess +from typing import Optional + +import pytest + +logger = logging.getLogger(__name__) + + +@contextmanager +def set_env_var(key: str, val: Optional[str] = None): + old_val = os.environ.get(key, None) + if val is not None: + os.environ[key] = val + elif key in os.environ: + del os.environ[key] + + yield + + if key in os.environ: + del os.environ[key] + if old_val is not None: + os.environ[key] = old_val + + +@pytest.fixture +def ray_start_stop(): + subprocess.check_output(["ray", "start", "--head"]) + yield + subprocess.check_output(["ray", "stop", "--force"]) + + +@contextmanager +def ray_cluster_manager(): + """ + Used not as fixture in case we want to set RAY_ADDRESS first. + """ + subprocess.check_output(["ray", "start", "--head"]) + yield + subprocess.check_output(["ray", "stop", "--force"]) + + +class TestSubmitIntegration: + """ + Integration version of job CLI test that ensures interaction with the + following components are working as expected: + + 1) Ray client: use of RAY_ADDRESS and ray.init() in job_head.py + 2) Ray dashboard: `ray start --head` + """ + + def test_empty_ray_address(self, ray_start_stop): + with set_env_var("RAY_ADDRESS", None): + completed_process = subprocess.run( + ["ray", "job", "submit", "--", "echo hello"], + stderr=subprocess.PIPE) + stderr = completed_process.stderr.decode("utf-8") + # Current dashboard module that raises no exception from requests.. + assert ("Address must be specified using either the " + "--address flag or RAY_ADDRESS environment") in stderr + + def test_ray_client_adress(self, ray_start_stop): + with set_env_var("RAY_ADDRESS", "127.0.0.1:8265"): + completed_process = subprocess.run( + ["ray", "job", "submit", "--", "echo hello"], + stderr=subprocess.PIPE) + stderr = completed_process.stderr.decode("utf-8") + # Current dashboard module that raises no exception from requests.. + assert "Query the status of the job" in stderr + + def test_valid_http_ray_address(self, ray_start_stop): + with set_env_var("RAY_ADDRESS", "http://127.0.0.1:8265"): + completed_process = subprocess.run( + ["ray", "job", "submit", "--", "echo hello"], + stderr=subprocess.PIPE) + stderr = completed_process.stderr.decode("utf-8") + # Current dashboard module that raises no exception from requests.. + assert "Query the status of the job" in stderr + + def test_set_ray_http_address_first(self): + with set_env_var("RAY_ADDRESS", "http://127.0.0.1:8265"): + with ray_cluster_manager(): + completed_process = subprocess.run( + ["ray", "job", "submit", "--", "echo hello"], + stderr=subprocess.PIPE) + stderr = completed_process.stderr.decode("utf-8") + # Current dashboard module that raises no exception from + # requests.. + assert "Query the status of the job" in stderr + + def test_set_ray_client_address_first(self): + with set_env_var("RAY_ADDRESS", "127.0.0.1:8265"): + with ray_cluster_manager(): + completed_process = subprocess.run( + ["ray", "job", "submit", "--", "echo hello"], + stderr=subprocess.PIPE) + stderr = completed_process.stderr.decode("utf-8") + # Current dashboard module that raises no exception from + # requests.. + assert "Query the status of the job" in stderr + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__]))