Revert "[job submission] Use ray.init format addresses for JobSubmissionClient (#20245)" (#20314)

This reverts commit adc15a0fb0.
This commit is contained in:
Eric Liang 2021-11-12 21:11:24 -08:00 committed by GitHub
parent 7fe42341ed
commit 567e955810
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 7 additions and 93 deletions

View file

@ -9,9 +9,7 @@ from ray.dashboard.modules.job.sdk import JobSubmissionClient
logger = logging.getLogger(__name__)
def _get_sdk_client(address: Optional[str],
create_cluster_if_needed: bool = False
) -> JobSubmissionClient:
def _get_sdk_client(address: Optional[str]) -> JobSubmissionClient:
if address is None:
if "RAY_ADDRESS" not in os.environ:
raise ValueError(
@ -19,7 +17,7 @@ def _get_sdk_client(address: Optional[str],
"or RAY_ADDRESS environment variable.")
address = os.environ["RAY_ADDRESS"]
return JobSubmissionClient(address, create_cluster_if_needed)
return JobSubmissionClient(address)
@click.group("job")
@ -55,7 +53,7 @@ def job_submit(address: Optional[str], job_id: Optional[str],
Example:
>>> ray job submit -- python my_script.py --arg=val
"""
client = _get_sdk_client(address, create_cluster_if_needed=True)
client = _get_sdk_client(address)
runtime_env = {}
if working_dir is not None:

View file

@ -1,5 +1,4 @@
import dataclasses
import importlib
import logging
from pathlib import Path
import tempfile
@ -15,47 +14,13 @@ from ray.dashboard.modules.job.common import (
JOBS_API_ROUTE_SUBMIT, JOBS_API_ROUTE_STOP, JOBS_API_ROUTE_STATUS,
JOBS_API_ROUTE_PACKAGE)
from ray.client_builder import _split_address
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclasses.dataclass
class ClusterInfo:
address: str
cookies: Optional[Dict[str, Any]]
metadata: Optional[Dict[str, Any]]
def parse_cluster_info(address: str,
create_cluster_if_needed: bool) -> ClusterInfo:
module_string, inner_address = _split_address(address.rstrip("/"))
if module_string == "http" or module_string == "https":
return ClusterInfo(address=address, cookies=None, metadata=None)
else:
try:
module = importlib.import_module(module_string)
except Exception:
raise RuntimeError(
f"Module: {module_string} does not exist.\n"
f"This module was parsed from Address: {address}") from None
assert "get_job_submission_client_cluster_info" in dir(module), (
f"Module: {module_string} does "
"not have `get_job_submission_client_cluster_info`.")
return module.get_job_submission_client_cluster_info(
inner_address, create_cluster_if_needed)
class JobSubmissionClient:
def __init__(self, address: str, create_cluster_if_needed=False):
cluster_info = parse_cluster_info(address, create_cluster_if_needed)
self._address = cluster_info.address
self._cookies = cluster_info.cookies
self._default_metadata = cluster_info.metadata or {}
def __init__(self, address: str):
self._address: str = address.rstrip("/")
self._test_connection()
def _test_connection(self):
@ -77,12 +42,7 @@ class JobSubmissionClient:
logger.debug(f"Sending request to {url} with "
f"json: {json_data}, params: {params}.")
r = requests.request(
method,
url,
cookies=self._cookies,
data=data,
json=json_data,
params=params)
method, url, data=data, json=json_data, params=params)
r.raise_for_status()
if response_type is None:
return None
@ -157,7 +117,6 @@ class JobSubmissionClient:
metadata: Optional[Dict[str, str]] = None) -> str:
runtime_env = runtime_env or {}
metadata = metadata or {}
metadata.update(self._default_metadata)
self._upload_working_dir_if_needed(runtime_env)
req = JobSubmitRequest(

View file

@ -10,8 +10,7 @@ from ray.dashboard.tests.conftest import * # noqa
from ray._private.test_utils import (format_web_url, wait_for_condition,
wait_until_server_available)
from ray.dashboard.modules.job.common import JobStatus, JOBS_API_ROUTE_SUBMIT
from ray.dashboard.modules.job.sdk import (ClusterInfo, JobSubmissionClient,
parse_cluster_info)
from ray.dashboard.modules.job.sdk import JobSubmissionClient
logger = logging.getLogger(__name__)
@ -248,23 +247,5 @@ def test_nonexistent_job(job_sdk_client):
_check_job_does_not_exist(client, "nonexistent_job")
@pytest.mark.parametrize("address", [
"http://127.0.0.1", "https://127.0.0.1", "ray://127.0.0.1",
"fake_module://127.0.0.1"
])
def test_parse_cluster_info(address: str):
if address.startswith("ray"):
assert parse_cluster_info(address, False) == ClusterInfo(
address="http" + address[address.index("://"):],
cookies=None,
metadata=None)
elif address.startswith("http") or address.startswith("https"):
assert parse_cluster_info(address, False) == ClusterInfo(
address=address, cookies=None, metadata=None)
else:
with pytest.raises(RuntimeError):
parse_cluster_info(address, False)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -118,8 +118,6 @@ from ray import _private # noqa: E402,F401
from ray import workflow # noqa: E402,F401
# We import ClientBuilder so that modules can inherit from `ray.ClientBuilder`.
from ray.client_builder import client, ClientBuilder # noqa: E402
from ray.cluster_utils import ( # noqa: E402,F401
get_job_submission_client_cluster_info)
__all__ = [
"__version__",

View file

@ -10,7 +10,6 @@ import time
import ray
import ray._private.services
from ray._private.client_mode_hook import disable_client_hook
from ray.dashboard.modules.job.sdk import ClusterInfo
from ray import ray_constants
logger = logging.getLogger(__name__)
@ -317,23 +316,3 @@ class Cluster:
self.remove_node(self.head_node)
# need to reset internal kv since gcs is down
ray.experimental.internal_kv._internal_kv_reset()
def get_job_submission_client_cluster_info(
address: str, create_cluster_if_needed: bool) -> ClusterInfo:
"""Get address, cookies, and metadata used for JobSubmissionClient.
Args:
address (str): Address without the module prefix that is passed
to JobSubmissionClient.
create_cluster_if_needed (bool): Indicates whether the cluster
of the address returned needs to be running. Ray doesn't
start a cluster before interacting with jobs, but other
implementations may do so.
Returns:
ClusterInfo object consisting of address, cookies, and metadata
for JobSubmissionClient to use.
"""
return ClusterInfo(
address="http://" + address, cookies=None, metadata=None)

View file

@ -20,7 +20,6 @@ def test_api_functions():
"cancel",
"get_actor",
"get_gpu_ids",
"get_job_submission_client_cluster_info",
"shutdown",
"method",
"nodes",