2022-03-09 21:31:23 -08:00
|
|
|
import dataclasses
|
|
|
|
import importlib
|
|
|
|
import logging
|
|
|
|
import json
|
|
|
|
import yaml
|
|
|
|
from pathlib import Path
|
|
|
|
import tempfile
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from pkg_resources import packaging
|
2022-07-14 06:44:07 -07:00
|
|
|
import ray
|
2022-03-09 21:31:23 -08:00
|
|
|
|
|
|
|
try:
|
|
|
|
import requests
|
|
|
|
except ImportError:
|
|
|
|
requests = None
|
|
|
|
|
2022-06-13 17:11:19 -07:00
|
|
|
|
2022-03-09 21:31:23 -08:00
|
|
|
from ray._private.runtime_env.packaging import (
|
|
|
|
create_package,
|
|
|
|
get_uri_for_directory,
|
2022-03-16 14:37:10 -07:00
|
|
|
get_uri_for_package,
|
2022-03-09 21:31:23 -08:00
|
|
|
)
|
2022-03-10 09:42:25 -08:00
|
|
|
from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
|
|
|
|
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
|
2022-03-09 21:31:23 -08:00
|
|
|
from ray.dashboard.modules.job.common import uri_to_http_components
|
|
|
|
|
|
|
|
from ray.util.annotations import PublicAPI
|
|
|
|
from ray.client_builder import _split_address
|
|
|
|
from ray.autoscaler._private.cli_logger import cli_logger
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
2022-05-20 14:10:36 -05:00
|
|
|
# By default, connect to local cluster.
|
|
|
|
DEFAULT_DASHBOARD_ADDRESS = "http://localhost:8265"
|
|
|
|
|
2022-03-09 21:31:23 -08:00
|
|
|
|
|
|
|
def parse_runtime_env_args(
|
|
|
|
runtime_env: Optional[str] = None,
|
|
|
|
runtime_env_json: Optional[str] = None,
|
|
|
|
working_dir: Optional[str] = None,
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Generates a runtime_env dictionary using `runtime_env`, `runtime_env_json`,
|
|
|
|
and `working_dir` CLI options. Only one of `runtime_env` or
|
|
|
|
`runtime_env_json` may be defined. `working_dir` overwrites the
|
|
|
|
`working_dir` from any other option.
|
|
|
|
"""
|
|
|
|
|
|
|
|
final_runtime_env = {}
|
|
|
|
if runtime_env is not None:
|
|
|
|
if runtime_env_json is not None:
|
|
|
|
raise ValueError(
|
|
|
|
"Only one of --runtime_env and --runtime-env-json can be provided."
|
|
|
|
)
|
|
|
|
with open(runtime_env, "r") as f:
|
|
|
|
final_runtime_env = yaml.safe_load(f)
|
|
|
|
|
|
|
|
elif runtime_env_json is not None:
|
|
|
|
final_runtime_env = json.loads(runtime_env_json)
|
|
|
|
|
|
|
|
if working_dir is not None:
|
|
|
|
if "working_dir" in final_runtime_env:
|
|
|
|
cli_logger.warning(
|
|
|
|
"Overriding runtime_env working_dir with --working-dir option"
|
|
|
|
)
|
|
|
|
|
|
|
|
final_runtime_env["working_dir"] = working_dir
|
|
|
|
|
|
|
|
return final_runtime_env
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
class ClusterInfo:
|
|
|
|
address: str
|
|
|
|
cookies: Optional[Dict[str, Any]] = None
|
|
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
headers: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
|
|
|
|
|
|
# TODO (shrekris-anyscale): renaming breaks compatibility, do NOT rename
|
|
|
|
def get_job_submission_client_cluster_info(
|
|
|
|
address: str,
|
|
|
|
# For backwards compatibility
|
|
|
|
*,
|
|
|
|
# only used in importlib case in parse_cluster_info, but needed
|
|
|
|
# in function signature.
|
|
|
|
create_cluster_if_needed: Optional[bool] = False,
|
|
|
|
cookies: Optional[Dict[str, Any]] = None,
|
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
|
headers: Optional[Dict[str, Any]] = None,
|
|
|
|
_use_tls: Optional[bool] = False,
|
|
|
|
) -> ClusterInfo:
|
|
|
|
"""Get address, cookies, and metadata used for SubmissionClient.
|
|
|
|
|
|
|
|
If no port is specified in `address`, the Ray dashboard default will be
|
|
|
|
inserted.
|
|
|
|
|
|
|
|
Args:
|
2022-06-01 11:27:54 -07:00
|
|
|
address: Address without the module prefix that is passed
|
2022-03-09 21:31:23 -08:00
|
|
|
to SubmissionClient.
|
2022-06-01 11:27:54 -07:00
|
|
|
create_cluster_if_needed: Indicates whether the cluster
|
2022-03-09 21:31:23 -08:00
|
|
|
of the address returned needs to be running. Ray doesn't
|
|
|
|
start a cluster before interacting with jobs, but other
|
|
|
|
implementations may do so.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
ClusterInfo object consisting of address, cookies, and metadata
|
|
|
|
for SubmissionClient to use.
|
|
|
|
"""
|
|
|
|
|
|
|
|
scheme = "https" if _use_tls else "http"
|
|
|
|
return ClusterInfo(
|
2022-05-20 14:10:36 -05:00
|
|
|
address=f"{scheme}://{address}",
|
2022-03-09 21:31:23 -08:00
|
|
|
cookies=cookies,
|
|
|
|
metadata=metadata,
|
|
|
|
headers=headers,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def parse_cluster_info(
|
2022-05-20 14:10:36 -05:00
|
|
|
address: Optional[str] = None,
|
2022-03-09 21:31:23 -08:00
|
|
|
create_cluster_if_needed: bool = False,
|
|
|
|
cookies: Optional[Dict[str, Any]] = None,
|
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
|
headers: Optional[Dict[str, Any]] = None,
|
|
|
|
) -> ClusterInfo:
|
2022-05-20 14:10:36 -05:00
|
|
|
if address is None:
|
[core] ray.init defaults to an existing Ray instance if there is one (#26678)
ray.init() will currently start a new Ray instance even if one is already existing, which is very confusing if you are a new user trying to go from local development to a cluster. This PR changes it so that, when no address is specified, we first try to find an existing Ray cluster that was created through `ray start`. If none is found, we will start a new one.
This makes two changes to the ray.init() resolution order:
1. When `ray start` is called, the started cluster address was already written to a file called `/tmp/ray/ray_current_cluster`. For ray.init() and ray.init(address="auto"), we will first check this local file for an existing cluster address. The file is deleted on `ray stop`. If the file is empty, autodetect any running cluster (legacy behavior) if address="auto", or we will start a new local Ray instance if address=None.
2. When ray.init(address="local") is called, we will create a new local Ray instance, even if one is already existing. This behavior seems to be necessary mainly for `ray.client` use cases.
This also surfaces the logs about which Ray instance we are connecting to. Previously these were hidden because we didn't set up the log until after connecting to Ray. So now Ray will log one of the following messages during ray.init:
```
(Connecting to existing Ray cluster at address: <IP>...)
...connection...
(Started a local Ray cluster.| Connected to Ray Cluster.)( View the dashboard at <URL>)
```
Note that this changes the dashboard URL to be printed with `ray.init()` instead of when the dashboard is first started.
Co-authored-by: Eric Liang <ekhliang@gmail.com>
2022-07-23 14:27:22 -04:00
|
|
|
if (
|
|
|
|
ray.is_initialized()
|
|
|
|
and ray._private.worker.global_worker.node.address_info["webui_url"]
|
|
|
|
is not None
|
|
|
|
):
|
2022-07-14 06:44:07 -07:00
|
|
|
address = (
|
|
|
|
"http://"
|
|
|
|
f"{ray._private.worker.global_worker.node.address_info['webui_url']}"
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
logger.info(
|
|
|
|
f"No address provided, defaulting to {DEFAULT_DASHBOARD_ADDRESS}."
|
|
|
|
)
|
|
|
|
address = DEFAULT_DASHBOARD_ADDRESS
|
2022-05-20 14:10:36 -05:00
|
|
|
|
2022-03-09 21:31:23 -08:00
|
|
|
module_string, inner_address = _split_address(address)
|
|
|
|
|
2022-03-21 09:17:51 -07:00
|
|
|
# If user passes in ray://, raise error. Dashboard submission should
|
|
|
|
# not use a Ray client address.
|
|
|
|
if module_string == "ray":
|
|
|
|
raise ValueError(
|
|
|
|
f'Got an unexpected Ray client address "{address}" while trying '
|
|
|
|
"to connect to the Ray dashboard. The dashboard SDK requires the "
|
|
|
|
"Ray dashboard server's HTTP(S) address (which should start with "
|
|
|
|
'"http://" or "https://", not "ray://"). If this address '
|
|
|
|
"wasn't passed explicitly, it may be set in the RAY_ADDRESS "
|
|
|
|
"environment variable."
|
|
|
|
)
|
|
|
|
|
|
|
|
# If user passes http(s)://, go through normal parsing.
|
|
|
|
if module_string in {"http", "https"}:
|
2022-03-09 21:31:23 -08:00
|
|
|
return get_job_submission_client_cluster_info(
|
|
|
|
inner_address,
|
|
|
|
create_cluster_if_needed=create_cluster_if_needed,
|
|
|
|
cookies=cookies,
|
|
|
|
metadata=metadata,
|
|
|
|
headers=headers,
|
|
|
|
_use_tls=module_string == "https",
|
|
|
|
)
|
|
|
|
# Try to dynamically import the function to get cluster info.
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
module = importlib.import_module(module_string)
|
|
|
|
except Exception:
|
|
|
|
raise RuntimeError(
|
|
|
|
f"Module: {module_string} does not exist.\n"
|
|
|
|
f"This module was parsed from Address: {address}"
|
|
|
|
) from None
|
|
|
|
assert "get_job_submission_client_cluster_info" in dir(module), (
|
|
|
|
f"Module: {module_string} does "
|
|
|
|
"not have `get_job_submission_client_cluster_info`."
|
|
|
|
)
|
|
|
|
|
|
|
|
return module.get_job_submission_client_cluster_info(
|
|
|
|
inner_address,
|
|
|
|
create_cluster_if_needed=create_cluster_if_needed,
|
|
|
|
cookies=cookies,
|
|
|
|
metadata=metadata,
|
|
|
|
headers=headers,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class SubmissionClient:
|
|
|
|
def __init__(
|
|
|
|
self,
|
2022-05-20 14:10:36 -05:00
|
|
|
address: Optional[str] = None,
|
|
|
|
create_cluster_if_needed: bool = False,
|
2022-03-09 21:31:23 -08:00
|
|
|
cookies: Optional[Dict[str, Any]] = None,
|
|
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
|
|
headers: Optional[Dict[str, Any]] = None,
|
|
|
|
):
|
|
|
|
|
2022-06-30 16:04:53 -07:00
|
|
|
# Remove any trailing slashes
|
|
|
|
if address is not None and address.endswith("/"):
|
|
|
|
address = address.rstrip("/")
|
|
|
|
logger.debug(
|
|
|
|
"The submission address cannot contain trailing slashes. Removing "
|
|
|
|
f'them from the requested submission address of "{address}".'
|
|
|
|
)
|
|
|
|
|
2022-03-09 21:31:23 -08:00
|
|
|
cluster_info = parse_cluster_info(
|
|
|
|
address, create_cluster_if_needed, cookies, metadata, headers
|
|
|
|
)
|
|
|
|
self._address = cluster_info.address
|
|
|
|
self._cookies = cluster_info.cookies
|
|
|
|
self._default_metadata = cluster_info.metadata or {}
|
|
|
|
# Headers used for all requests sent to job server, optional and only
|
|
|
|
# needed for cases like authentication to remote cluster.
|
|
|
|
self._headers = cluster_info.headers
|
|
|
|
|
|
|
|
def _check_connection_and_version(
|
|
|
|
self, min_version: str = "1.9", version_error_message: str = None
|
2022-07-06 19:37:30 -07:00
|
|
|
):
|
|
|
|
self._check_connection_and_version_with_url(min_version, version_error_message)
|
|
|
|
|
|
|
|
def _check_connection_and_version_with_url(
|
|
|
|
self,
|
|
|
|
min_version: str = "1.9",
|
|
|
|
version_error_message: str = None,
|
|
|
|
url: str = "/api/version",
|
2022-03-09 21:31:23 -08:00
|
|
|
):
|
|
|
|
if version_error_message is None:
|
|
|
|
version_error_message = (
|
|
|
|
f"Please ensure the cluster is running Ray {min_version} or higher."
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
2022-07-06 19:37:30 -07:00
|
|
|
r = self._do_request("GET", url)
|
2022-03-09 21:31:23 -08:00
|
|
|
if r.status_code == 404:
|
|
|
|
raise RuntimeError(version_error_message)
|
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
running_ray_version = r.json()["ray_version"]
|
|
|
|
if packaging.version.parse(running_ray_version) < packaging.version.parse(
|
|
|
|
min_version
|
|
|
|
):
|
|
|
|
raise RuntimeError(version_error_message)
|
|
|
|
# TODO(edoakes): check the version if/when we break compatibility.
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
|
raise ConnectionError(
|
|
|
|
f"Failed to connect to Ray at address: {self._address}."
|
|
|
|
)
|
|
|
|
|
|
|
|
def _raise_error(self, r: "requests.Response"):
|
|
|
|
raise RuntimeError(
|
|
|
|
f"Request failed with status code {r.status_code}: {r.text}."
|
|
|
|
)
|
|
|
|
|
|
|
|
def _do_request(
|
|
|
|
self,
|
|
|
|
method: str,
|
|
|
|
endpoint: str,
|
|
|
|
*,
|
|
|
|
data: Optional[bytes] = None,
|
|
|
|
json_data: Optional[dict] = None,
|
2022-06-13 17:11:19 -07:00
|
|
|
**kwargs,
|
2022-03-09 21:31:23 -08:00
|
|
|
) -> "requests.Response":
|
2022-06-13 17:11:19 -07:00
|
|
|
"""Perform the actual HTTP request
|
|
|
|
|
|
|
|
Keyword arguments other than "cookies", "headers" are forwarded to the
|
|
|
|
`requests.request()`.
|
|
|
|
"""
|
2022-03-09 21:31:23 -08:00
|
|
|
url = self._address + endpoint
|
|
|
|
logger.debug(f"Sending request to {url} with json data: {json_data or {}}.")
|
|
|
|
return requests.request(
|
|
|
|
method,
|
|
|
|
url,
|
|
|
|
cookies=self._cookies,
|
|
|
|
data=data,
|
|
|
|
json=json_data,
|
|
|
|
headers=self._headers,
|
2022-06-13 17:11:19 -07:00
|
|
|
**kwargs,
|
2022-03-09 21:31:23 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
def _package_exists(
|
|
|
|
self,
|
|
|
|
package_uri: str,
|
|
|
|
) -> bool:
|
|
|
|
protocol, package_name = uri_to_http_components(package_uri)
|
|
|
|
r = self._do_request("GET", f"/api/packages/{protocol}/{package_name}")
|
|
|
|
|
|
|
|
if r.status_code == 200:
|
|
|
|
logger.debug(f"Package {package_uri} already exists.")
|
|
|
|
return True
|
|
|
|
elif r.status_code == 404:
|
|
|
|
logger.debug(f"Package {package_uri} does not exist.")
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
self._raise_error(r)
|
|
|
|
|
|
|
|
def _upload_package(
|
|
|
|
self,
|
|
|
|
package_uri: str,
|
|
|
|
package_path: str,
|
|
|
|
include_parent_dir: Optional[bool] = False,
|
|
|
|
excludes: Optional[List[str]] = None,
|
2022-03-16 14:37:10 -07:00
|
|
|
is_file: bool = False,
|
2022-03-09 21:31:23 -08:00
|
|
|
) -> bool:
|
|
|
|
logger.info(f"Uploading package {package_uri}.")
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
|
|
protocol, package_name = uri_to_http_components(package_uri)
|
2022-03-16 14:37:10 -07:00
|
|
|
if is_file:
|
|
|
|
package_file = Path(package_path)
|
|
|
|
else:
|
|
|
|
package_file = Path(tmp_dir) / package_name
|
|
|
|
create_package(
|
|
|
|
package_path,
|
|
|
|
package_file,
|
|
|
|
include_parent_dir=include_parent_dir,
|
|
|
|
excludes=excludes,
|
|
|
|
)
|
2022-03-09 21:31:23 -08:00
|
|
|
try:
|
|
|
|
r = self._do_request(
|
|
|
|
"PUT",
|
|
|
|
f"/api/packages/{protocol}/{package_name}",
|
|
|
|
data=package_file.read_bytes(),
|
|
|
|
)
|
|
|
|
if r.status_code != 200:
|
|
|
|
self._raise_error(r)
|
|
|
|
finally:
|
2022-03-16 14:37:10 -07:00
|
|
|
# If the package is a user's existing file, don't delete it.
|
|
|
|
if not is_file:
|
|
|
|
package_file.unlink()
|
2022-03-09 21:31:23 -08:00
|
|
|
|
|
|
|
def _upload_package_if_needed(
|
2022-03-10 09:42:25 -08:00
|
|
|
self,
|
|
|
|
package_path: str,
|
2022-03-16 14:37:10 -07:00
|
|
|
include_parent_dir: bool = False,
|
2022-03-10 09:42:25 -08:00
|
|
|
excludes: Optional[List[str]] = None,
|
2022-03-16 14:37:10 -07:00
|
|
|
is_file: bool = False,
|
2022-03-09 21:31:23 -08:00
|
|
|
) -> str:
|
2022-03-16 14:37:10 -07:00
|
|
|
if is_file:
|
|
|
|
package_uri = get_uri_for_package(Path(package_path))
|
|
|
|
else:
|
|
|
|
package_uri = get_uri_for_directory(package_path, excludes=excludes)
|
2022-03-10 09:42:25 -08:00
|
|
|
|
2022-03-09 21:31:23 -08:00
|
|
|
if not self._package_exists(package_uri):
|
2022-03-10 09:42:25 -08:00
|
|
|
self._upload_package(
|
|
|
|
package_uri,
|
|
|
|
package_path,
|
|
|
|
include_parent_dir=include_parent_dir,
|
|
|
|
excludes=excludes,
|
2022-03-16 14:37:10 -07:00
|
|
|
is_file=is_file,
|
2022-03-10 09:42:25 -08:00
|
|
|
)
|
2022-03-09 21:31:23 -08:00
|
|
|
else:
|
|
|
|
logger.info(f"Package {package_uri} already exists, skipping upload.")
|
|
|
|
|
|
|
|
return package_uri
|
|
|
|
|
|
|
|
def _upload_working_dir_if_needed(self, runtime_env: Dict[str, Any]):
|
2022-03-16 14:37:10 -07:00
|
|
|
def _upload_fn(working_dir, excludes, is_file=False):
|
2022-03-10 09:42:25 -08:00
|
|
|
self._upload_package_if_needed(
|
2022-03-16 14:37:10 -07:00
|
|
|
working_dir,
|
|
|
|
include_parent_dir=False,
|
|
|
|
excludes=excludes,
|
|
|
|
is_file=is_file,
|
2022-03-10 09:42:25 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
upload_working_dir_if_needed(runtime_env, upload_fn=_upload_fn)
|
|
|
|
|
|
|
|
def _upload_py_modules_if_needed(self, runtime_env: Dict[str, Any]):
|
2022-03-16 14:37:10 -07:00
|
|
|
def _upload_fn(module_path, excludes, is_file=False):
|
2022-03-10 09:42:25 -08:00
|
|
|
self._upload_package_if_needed(
|
2022-03-16 14:37:10 -07:00
|
|
|
module_path, include_parent_dir=True, excludes=excludes, is_file=is_file
|
2022-03-10 09:42:25 -08:00
|
|
|
)
|
|
|
|
|
2022-03-16 14:37:10 -07:00
|
|
|
upload_py_modules_if_needed(runtime_env, upload_fn=_upload_fn)
|
2022-03-09 21:31:23 -08:00
|
|
|
|
|
|
|
@PublicAPI(stability="beta")
|
|
|
|
def get_version(self) -> str:
|
|
|
|
r = self._do_request("GET", "/api/version")
|
|
|
|
if r.status_code == 200:
|
|
|
|
return r.json().get("version")
|
|
|
|
else:
|
|
|
|
self._raise_error(r)
|