mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
[job submission] Fix address defaulting behavior (#24970)
Per the discussion in https://github.com/ray-project/ray/issues/24858: - If an address without a port is provided, don't append a port. - Default to `http://localhost:8265` if nothing is provided.
This commit is contained in:
parent
85fe5346c8
commit
cb7bcbd651
7 changed files with 41 additions and 44 deletions
|
@ -24,7 +24,6 @@ from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
|
|||
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
|
||||
from ray.dashboard.modules.job.common import uri_to_http_components
|
||||
|
||||
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
|
||||
from ray.util.annotations import PublicAPI
|
||||
from ray.client_builder import _split_address
|
||||
from ray.autoscaler._private.cli_logger import cli_logger
|
||||
|
@ -32,6 +31,9 @@ from ray.autoscaler._private.cli_logger import cli_logger
|
|||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# By default, connect to local cluster.
|
||||
DEFAULT_DASHBOARD_ADDRESS = "http://localhost:8265"
|
||||
|
||||
|
||||
def parse_runtime_env_args(
|
||||
runtime_env: Optional[str] = None,
|
||||
|
@ -108,18 +110,8 @@ def get_job_submission_client_cluster_info(
|
|||
"""
|
||||
|
||||
scheme = "https" if _use_tls else "http"
|
||||
|
||||
split = address.split(":")
|
||||
host = split[0]
|
||||
if len(split) == 1:
|
||||
port = DEFAULT_DASHBOARD_PORT
|
||||
elif len(split) == 2:
|
||||
port = int(split[1])
|
||||
else:
|
||||
raise ValueError(f"Invalid address: {address}.")
|
||||
|
||||
return ClusterInfo(
|
||||
address=f"{scheme}://{host}:{port}",
|
||||
address=f"{scheme}://{address}",
|
||||
cookies=cookies,
|
||||
metadata=metadata,
|
||||
headers=headers,
|
||||
|
@ -127,12 +119,16 @@ def get_job_submission_client_cluster_info(
|
|||
|
||||
|
||||
def parse_cluster_info(
|
||||
address: str,
|
||||
address: Optional[str] = None,
|
||||
create_cluster_if_needed: bool = False,
|
||||
cookies: Optional[Dict[str, Any]] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, Any]] = None,
|
||||
) -> ClusterInfo:
|
||||
if address is None:
|
||||
logger.info(f"No address provided, defaulting to {DEFAULT_DASHBOARD_ADDRESS}.")
|
||||
address = DEFAULT_DASHBOARD_ADDRESS
|
||||
|
||||
module_string, inner_address = _split_address(address)
|
||||
|
||||
# If user passes in ray://, raise error. Dashboard submission should
|
||||
|
@ -183,8 +179,8 @@ def parse_cluster_info(
|
|||
class SubmissionClient:
|
||||
def __init__(
|
||||
self,
|
||||
address: str,
|
||||
create_cluster_if_needed=False,
|
||||
address: Optional[str] = None,
|
||||
create_cluster_if_needed: bool = False,
|
||||
cookies: Optional[Dict[str, Any]] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, Any]] = None,
|
||||
|
|
|
@ -17,12 +17,7 @@ def _get_sdk_client(
|
|||
address: Optional[str], create_cluster_if_needed: bool = False
|
||||
) -> JobSubmissionClient:
|
||||
|
||||
if address is None:
|
||||
if "RAY_ADDRESS" not in os.environ:
|
||||
raise ValueError(
|
||||
"Address must be specified using either the --address flag "
|
||||
"or RAY_ADDRESS environment variable."
|
||||
)
|
||||
if address is None and "RAY_ADDRESS" in os.environ:
|
||||
address = os.environ["RAY_ADDRESS"]
|
||||
|
||||
cli_logger.labeled_value("Job submission server address", address)
|
||||
|
|
|
@ -35,7 +35,7 @@ class JobSubmissionClient(SubmissionClient):
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
address: str,
|
||||
address: Optional[str] = None,
|
||||
create_cluster_if_needed: bool = False,
|
||||
cookies: Optional[Dict[str, Any]] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
|
@ -44,7 +44,8 @@ class JobSubmissionClient(SubmissionClient):
|
|||
"""Initialize a JobSubmissionClient and check the connection to the cluster.
|
||||
|
||||
Args:
|
||||
address: The IP address and port of the head node.
|
||||
address: The IP address and port of the head node. Defaults to
|
||||
http://localhost:8265.
|
||||
create_cluster_if_needed: Indicates whether the cluster at the specified
|
||||
address needs to already be running. Ray doesn't start a cluster
|
||||
before interacting with jobs, but external job managers may do so.
|
||||
|
|
|
@ -91,8 +91,8 @@ def _job_cli_group_test_address(mock_sdk_client, cmd, *args):
|
|||
assert mock_sdk_client.called_with("env_addr")
|
||||
# Test passing no address.
|
||||
result = runner.invoke(job_cli_group, [cmd, *args])
|
||||
assert result.exit_code == 1
|
||||
assert "Address must be specified" in str(result.exception)
|
||||
assert result.exit_code == 0
|
||||
assert mock_sdk_client.called_with(None)
|
||||
|
||||
|
||||
class TestList:
|
||||
|
|
|
@ -89,11 +89,9 @@ class TestRayAddress:
|
|||
|
||||
def test_empty_ray_address(self, ray_start_stop):
|
||||
with set_env_var("RAY_ADDRESS", None):
|
||||
_, stderr = _run_cmd("ray job submit -- echo hello", should_fail=True)
|
||||
assert (
|
||||
"Address must be specified using either the "
|
||||
"--address flag or RAY_ADDRESS environment"
|
||||
) in stderr
|
||||
stdout, _ = _run_cmd("ray job submit -- echo hello")
|
||||
assert "hello" in stdout
|
||||
assert "succeeded" in stdout
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_client_address", ["127.0.0.1:8265", "ray://127.0.0.1:8265"]
|
||||
|
|
|
@ -19,7 +19,6 @@ from ray.dashboard.modules.dashboard_sdk import (
|
|||
parse_cluster_info,
|
||||
)
|
||||
from ray.dashboard.tests.conftest import * # noqa
|
||||
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
|
||||
from ray.tests.conftest import _ray_start
|
||||
from ray._private.test_utils import (
|
||||
chdir,
|
||||
|
@ -481,10 +480,9 @@ def test_parse_cluster_info(scheme: str, host: str, port: Optional[int]):
|
|||
if port is not None:
|
||||
address += f":{port}"
|
||||
|
||||
final_port = port if port is not None else DEFAULT_DASHBOARD_PORT
|
||||
if scheme in {"http", "https"}:
|
||||
assert parse_cluster_info(address, False) == ClusterInfo(
|
||||
address=f"{scheme}://{host}:{final_port}",
|
||||
address=address,
|
||||
cookies=None,
|
||||
metadata=None,
|
||||
headers=None,
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
import pytest
|
||||
import sys
|
||||
from typing import Dict, Optional, Tuple
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from ray.dashboard.modules.dashboard_sdk import parse_cluster_info
|
||||
from ray.dashboard.modules.dashboard_sdk import (
|
||||
ClusterInfo,
|
||||
DEFAULT_DASHBOARD_ADDRESS,
|
||||
parse_cluster_info,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -42,7 +47,7 @@ def test_parse_cluster_info(
|
|||
get_job_submission_client_cluster_info=mock_get_job_submission_client_cluster,
|
||||
), patch.multiple("importlib", import_module=mock_import_module):
|
||||
if module_string == "ray":
|
||||
assert (
|
||||
with pytest.raises(ValueError, match="ray://"):
|
||||
parse_cluster_info(
|
||||
address,
|
||||
create_cluster_if_needed=create_cluster_if_needed,
|
||||
|
@ -50,15 +55,6 @@ def test_parse_cluster_info(
|
|||
metadata=metadata,
|
||||
headers=headers,
|
||||
)
|
||||
== "Ray ClusterInfo"
|
||||
)
|
||||
mock_get_job_submission_client_cluster.assert_called_once_with(
|
||||
inner_address,
|
||||
create_cluster_if_needed=create_cluster_if_needed,
|
||||
cookies=cookies,
|
||||
metadata=metadata,
|
||||
headers=headers,
|
||||
)
|
||||
elif module_string == "other_module":
|
||||
assert (
|
||||
parse_cluster_info(
|
||||
|
@ -78,3 +74,16 @@ def test_parse_cluster_info(
|
|||
metadata=metadata,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
|
||||
def test_parse_cluster_info_default_address():
|
||||
assert (
|
||||
parse_cluster_info(
|
||||
address=None,
|
||||
)
|
||||
== ClusterInfo(address=DEFAULT_DASHBOARD_ADDRESS)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
|
Loading…
Add table
Reference in a new issue