[serve] Support working_dir in serve run (#22760)

#22714 added `serve run` to the Serve CLI. This change allows the user to specify a local or remote `working_dir` in `serve run`.
This commit is contained in:
shrekris-anyscale 2022-03-08 11:18:41 -08:00 committed by GitHub
parent d1009c8489
commit ab2741d64b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 913 additions and 361 deletions

View file

@ -0,0 +1,315 @@
import dataclasses
import importlib
import logging
import json
import yaml
from pathlib import Path
import tempfile
from typing import Any, Dict, List, Optional
from pkg_resources import packaging
try:
import aiohttp
import requests
except ImportError:
aiohttp = None
requests = None
from ray._private.runtime_env.packaging import (
create_package,
get_uri_for_directory,
parse_uri,
)
from ray.dashboard.modules.job.common import uri_to_http_components
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
from ray.util.annotations import PublicAPI
from ray.client_builder import _split_address
from ray.autoscaler._private.cli_logger import cli_logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def parse_runtime_env_args(
runtime_env: Optional[str] = None,
runtime_env_json: Optional[str] = None,
working_dir: Optional[str] = None,
):
"""
Generates a runtime_env dictionary using `runtime_env`, `runtime_env_json`,
and `working_dir` CLI options. Only one of `runtime_env` or
`runtime_env_json` may be defined. `working_dir` overwrites the
`working_dir` from any other option.
"""
final_runtime_env = {}
if runtime_env is not None:
if runtime_env_json is not None:
raise ValueError(
"Only one of --runtime_env and --runtime-env-json can be provided."
)
with open(runtime_env, "r") as f:
final_runtime_env = yaml.safe_load(f)
elif runtime_env_json is not None:
final_runtime_env = json.loads(runtime_env_json)
if working_dir is not None:
if "working_dir" in final_runtime_env:
cli_logger.warning(
"Overriding runtime_env working_dir with --working-dir option"
)
final_runtime_env["working_dir"] = working_dir
return final_runtime_env
@dataclasses.dataclass
class ClusterInfo:
address: str
cookies: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
headers: Optional[Dict[str, Any]] = None
def get_submission_client_cluster_info(
address: str,
# For backwards compatibility
*,
# only used in importlib case in parse_cluster_info, but needed
# in function signature.
create_cluster_if_needed: Optional[bool] = False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
_use_tls: Optional[bool] = False,
) -> ClusterInfo:
"""Get address, cookies, and metadata used for SubmissionClient.
If no port is specified in `address`, the Ray dashboard default will be
inserted.
Args:
address (str): Address without the module prefix that is passed
to SubmissionClient.
create_cluster_if_needed (bool): Indicates whether the cluster
of the address returned needs to be running. Ray doesn't
start a cluster before interacting with jobs, but other
implementations may do so.
Returns:
ClusterInfo object consisting of address, cookies, and metadata
for SubmissionClient to use.
"""
scheme = "https" if _use_tls else "http"
split = address.split(":")
host = split[0]
if len(split) == 1:
port = DEFAULT_DASHBOARD_PORT
elif len(split) == 2:
port = int(split[1])
else:
raise ValueError(f"Invalid address: {address}.")
return ClusterInfo(
address=f"{scheme}://{host}:{port}",
cookies=cookies,
metadata=metadata,
headers=headers,
)
def parse_cluster_info(
address: str,
create_cluster_if_needed: bool = False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
) -> ClusterInfo:
module_string, inner_address = _split_address(address)
# If user passes http(s):// or ray://, go through normal parsing.
if module_string in {"http", "https", "ray"}:
return get_submission_client_cluster_info(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
_use_tls=module_string == "https",
)
# Try to dynamically import the function to get cluster info.
else:
try:
module = importlib.import_module(module_string)
except Exception:
raise RuntimeError(
f"Module: {module_string} does not exist.\n"
f"This module was parsed from Address: {address}"
) from None
assert "get_submission_client_cluster_info" in dir(module), (
f"Module: {module_string} does "
"not have `get_submission_client_cluster_info`."
)
return module.get_submission_client_cluster_info(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
)
class SubmissionClient:
def __init__(
self,
address: str,
create_cluster_if_needed=False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
):
cluster_info = parse_cluster_info(
address, create_cluster_if_needed, cookies, metadata, headers
)
self._address = cluster_info.address
self._cookies = cluster_info.cookies
self._default_metadata = cluster_info.metadata or {}
# Headers used for all requests sent to job server, optional and only
# needed for cases like authentication to remote cluster.
self._headers = cluster_info.headers
def _check_connection_and_version(
self, min_version: str = "1.9", version_error_message: str = None
):
if version_error_message is None:
version_error_message = (
f"Please ensure the cluster is running Ray {min_version} or higher."
)
try:
r = self._do_request("GET", "/api/version")
if r.status_code == 404:
raise RuntimeError(version_error_message)
r.raise_for_status()
running_ray_version = r.json()["ray_version"]
if packaging.version.parse(running_ray_version) < packaging.version.parse(
min_version
):
raise RuntimeError(version_error_message)
# TODO(edoakes): check the version if/when we break compatibility.
except requests.exceptions.ConnectionError:
raise ConnectionError(
f"Failed to connect to Ray at address: {self._address}."
)
def _raise_error(self, r: "requests.Response"):
raise RuntimeError(
f"Request failed with status code {r.status_code}: {r.text}."
)
def _do_request(
self,
method: str,
endpoint: str,
*,
data: Optional[bytes] = None,
json_data: Optional[dict] = None,
) -> "requests.Response":
url = self._address + endpoint
logger.debug(f"Sending request to {url} with json data: {json_data or {}}.")
return requests.request(
method,
url,
cookies=self._cookies,
data=data,
json=json_data,
headers=self._headers,
)
def _package_exists(
self,
package_uri: str,
) -> bool:
protocol, package_name = uri_to_http_components(package_uri)
r = self._do_request("GET", f"/api/packages/{protocol}/{package_name}")
if r.status_code == 200:
logger.debug(f"Package {package_uri} already exists.")
return True
elif r.status_code == 404:
logger.debug(f"Package {package_uri} does not exist.")
return False
else:
self._raise_error(r)
def _upload_package(
self,
package_uri: str,
package_path: str,
include_parent_dir: Optional[bool] = False,
excludes: Optional[List[str]] = None,
) -> bool:
logger.info(f"Uploading package {package_uri}.")
with tempfile.TemporaryDirectory() as tmp_dir:
protocol, package_name = uri_to_http_components(package_uri)
package_file = Path(tmp_dir) / package_name
create_package(
package_path,
package_file,
include_parent_dir=include_parent_dir,
excludes=excludes,
)
try:
r = self._do_request(
"PUT",
f"/api/packages/{protocol}/{package_name}",
data=package_file.read_bytes(),
)
if r.status_code != 200:
self._raise_error(r)
finally:
package_file.unlink()
def _upload_package_if_needed(
self, package_path: str, excludes: Optional[List[str]] = None
) -> str:
package_uri = get_uri_for_directory(package_path, excludes=excludes)
if not self._package_exists(package_uri):
self._upload_package(package_uri, package_path, excludes=excludes)
else:
logger.info(f"Package {package_uri} already exists, skipping upload.")
return package_uri
def _upload_working_dir_if_needed(self, runtime_env: Dict[str, Any]):
if "working_dir" in runtime_env:
working_dir = runtime_env["working_dir"]
try:
parse_uri(working_dir)
is_uri = True
logger.debug("working_dir is already a valid URI.")
except ValueError:
is_uri = False
if not is_uri:
logger.debug("working_dir is not a URI, attempting to upload.")
package_uri = self._upload_package_if_needed(
working_dir, excludes=runtime_env.get("excludes", None)
)
runtime_env["working_dir"] = package_uri
@PublicAPI(stability="beta")
def get_version(self) -> str:
r = self._do_request("GET", "/api/version")
if r.status_code == 200:
return r.json().get("version")
else:
self._raise_error(r)

View file

@ -1,17 +1,16 @@
import asyncio
import json
import os
import pprint
from subprocess import list2cmdline
import time
from typing import Optional, Tuple
import yaml
import click
from ray.autoscaler._private.cli_logger import add_click_logging_options, cli_logger, cf
from ray.job_submission import JobStatus, JobSubmissionClient
from ray.util.annotations import PublicAPI
from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args
def _get_sdk_client(
@ -144,25 +143,11 @@ def submit(
"""
client = _get_sdk_client(address, create_cluster_if_needed=True)
final_runtime_env = {}
if runtime_env is not None:
if runtime_env_json is not None:
raise ValueError(
"Only one of --runtime_env and " "--runtime-env-json can be provided."
)
with open(runtime_env, "r") as f:
final_runtime_env = yaml.safe_load(f)
elif runtime_env_json is not None:
final_runtime_env = json.loads(runtime_env_json)
if working_dir is not None:
if "working_dir" in final_runtime_env:
cli_logger.warning(
"Overriding runtime_env working_dir with --working-dir option"
)
final_runtime_env["working_dir"] = working_dir
final_runtime_env = parse_runtime_env_args(
runtime_env=runtime_env,
runtime_env_json=runtime_env_json,
working_dir=working_dir,
)
job_id = client.submit_job(
entrypoint=list2cmdline(entrypoint),

View file

@ -1,9 +1,6 @@
import dataclasses
import importlib
import logging
from pathlib import Path
import tempfile
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, Optional
try:
import aiohttp
@ -12,11 +9,6 @@ except ImportError:
aiohttp = None
requests = None
from ray._private.runtime_env.packaging import (
create_package,
get_uri_for_directory,
parse_uri,
)
from ray.dashboard.modules.job.common import (
JobStatus,
JobSubmitRequest,
@ -24,117 +16,16 @@ from ray.dashboard.modules.job.common import (
JobStopResponse,
JobInfo,
JobLogsResponse,
uri_to_http_components,
)
from ray.dashboard.modules.dashboard_sdk import SubmissionClient
from ray.ray_constants import DEFAULT_DASHBOARD_PORT
from ray.util.annotations import PublicAPI
from ray.client_builder import _split_address
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclasses.dataclass
class ClusterInfo:
address: str
cookies: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
headers: Optional[Dict[str, Any]] = None
def get_job_submission_client_cluster_info(
address: str,
# For backwards compatibility
*,
# only used in importlib case in parse_cluster_info, but needed
# in function signature.
create_cluster_if_needed: Optional[bool] = False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
_use_tls: Optional[bool] = False,
) -> ClusterInfo:
"""Get address, cookies, and metadata used for JobSubmissionClient.
If no port is specified in `address`, the Ray dashboard default will be
inserted.
Args:
address (str): Address without the module prefix that is passed
to JobSubmissionClient.
create_cluster_if_needed (bool): Indicates whether the cluster
of the address returned needs to be running. Ray doesn't
start a cluster before interacting with jobs, but other
implementations may do so.
Returns:
ClusterInfo object consisting of address, cookies, and metadata
for JobSubmissionClient to use.
"""
scheme = "https" if _use_tls else "http"
split = address.split(":")
host = split[0]
if len(split) == 1:
port = DEFAULT_DASHBOARD_PORT
elif len(split) == 2:
port = int(split[1])
else:
raise ValueError(f"Invalid address: {address}.")
return ClusterInfo(
address=f"{scheme}://{host}:{port}",
cookies=cookies,
metadata=metadata,
headers=headers,
)
def parse_cluster_info(
address: str,
create_cluster_if_needed: bool = False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
) -> ClusterInfo:
module_string, inner_address = _split_address(address)
# If user passes http(s):// or ray://, go through normal parsing.
if module_string in {"http", "https", "ray"}:
return get_job_submission_client_cluster_info(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
_use_tls=module_string == "https",
)
# Try to dynamically import the function to get cluster info.
else:
try:
module = importlib.import_module(module_string)
except Exception:
raise RuntimeError(
f"Module: {module_string} does not exist.\n"
f"This module was parsed from Address: {address}"
) from None
assert "get_job_submission_client_cluster_info" in dir(module), (
f"Module: {module_string} does "
"not have `get_job_submission_client_cluster_info`."
)
return module.get_job_submission_client_cluster_info(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
)
class JobSubmissionClient:
class JobSubmissionClient(SubmissionClient):
def __init__(
self,
address: str,
@ -148,140 +39,20 @@ class JobSubmissionClient:
"The Ray jobs CLI & SDK require the ray[default] "
"installation: `pip install 'ray[default']``"
)
cluster_info = parse_cluster_info(
address, create_cluster_if_needed, cookies, metadata, headers
super().__init__(
address=address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
)
self._address = cluster_info.address
self._cookies = cluster_info.cookies
self._default_metadata = cluster_info.metadata or {}
# Headers used for all requests sent to job server, optional and only
# needed for cases like authentication to remote cluster.
self._headers = cluster_info.headers
self._check_connection_and_version()
def _check_connection_and_version(self):
try:
r = self._do_request("GET", "/api/version")
if r.status_code == 404:
raise RuntimeError(
"Jobs API not supported on the Ray cluster. "
"Please ensure the cluster is running "
"Ray 1.9 or higher."
)
r.raise_for_status()
# TODO(edoakes): check the version if/when we break compatibility.
except requests.exceptions.ConnectionError:
raise ConnectionError(
f"Failed to connect to Ray at address: {self._address}."
)
def _raise_error(self, r: "requests.Response"):
raise RuntimeError(
f"Request failed with status code {r.status_code}: {r.text}."
self._check_connection_and_version(
min_version="1.9",
version_error_message="Jobs API is not supported on the Ray "
"cluster. Please ensure the cluster is "
"running Ray 1.9 or higher.",
)
def _do_request(
self,
method: str,
endpoint: str,
*,
data: Optional[bytes] = None,
json_data: Optional[dict] = None,
) -> "requests.Response":
url = self._address + endpoint
logger.debug(f"Sending request to {url} with json data: {json_data or {}}.")
return requests.request(
method,
url,
cookies=self._cookies,
data=data,
json=json_data,
headers=self._headers,
)
def _package_exists(
self,
package_uri: str,
) -> bool:
protocol, package_name = uri_to_http_components(package_uri)
r = self._do_request("GET", f"/api/packages/{protocol}/{package_name}")
if r.status_code == 200:
logger.debug(f"Package {package_uri} already exists.")
return True
elif r.status_code == 404:
logger.debug(f"Package {package_uri} does not exist.")
return False
else:
self._raise_error(r)
def _upload_package(
self,
package_uri: str,
package_path: str,
include_parent_dir: Optional[bool] = False,
excludes: Optional[List[str]] = None,
) -> bool:
logger.info(f"Uploading package {package_uri}.")
with tempfile.TemporaryDirectory() as tmp_dir:
protocol, package_name = uri_to_http_components(package_uri)
package_file = Path(tmp_dir) / package_name
create_package(
package_path,
package_file,
include_parent_dir=include_parent_dir,
excludes=excludes,
)
try:
r = self._do_request(
"PUT",
f"/api/packages/{protocol}/{package_name}",
data=package_file.read_bytes(),
)
if r.status_code != 200:
self._raise_error(r)
finally:
package_file.unlink()
def _upload_package_if_needed(
self, package_path: str, excludes: Optional[List[str]] = None
) -> str:
package_uri = get_uri_for_directory(package_path, excludes=excludes)
if not self._package_exists(package_uri):
self._upload_package(package_uri, package_path, excludes=excludes)
else:
logger.info(f"Package {package_uri} already exists, skipping upload.")
return package_uri
def _upload_working_dir_if_needed(self, runtime_env: Dict[str, Any]):
if "working_dir" in runtime_env:
working_dir = runtime_env["working_dir"]
try:
parse_uri(working_dir)
is_uri = True
logger.debug("working_dir is already a valid URI.")
except ValueError:
is_uri = False
if not is_uri:
logger.debug("working_dir is not a URI, attempting to upload.")
package_uri = self._upload_package_if_needed(
working_dir, excludes=runtime_env.get("excludes", None)
)
runtime_env["working_dir"] = package_uri
@PublicAPI(stability="beta")
def get_version(self) -> str:
r = self._do_request("GET", "/api/version")
if r.status_code == 200:
return r.json().get("version")
else:
self._raise_error(r)
@PublicAPI(stability="beta")
def submit_job(
self,

View file

@ -11,7 +11,7 @@ from unittest.mock import patch
import ray
from ray.job_submission import JobSubmissionClient, JobStatus
from ray.dashboard.modules.job.common import CURRENT_VERSION, JobInfo
from ray.dashboard.modules.job.sdk import (
from ray.dashboard.modules.dashboard_sdk import (
ClusterInfo,
parse_cluster_info,
)

View file

@ -2,7 +2,7 @@ import pytest
from typing import Dict, Optional, Tuple
from unittest.mock import Mock, patch
from ray.dashboard.modules.job.sdk import parse_cluster_info
from ray.dashboard.modules.dashboard_sdk import parse_cluster_info
@pytest.mark.parametrize(
@ -25,12 +25,12 @@ def test_parse_cluster_info(
headers: Optional[Dict[str, str]],
):
"""
Test ray.dashboard.modules.job.sdk.parse_cluster_info for different
Test ray.dashboard.modules.dashboard_sdk.parse_cluster_info for different
format of addresses.
"""
mock_get_job_submission_client_cluster = Mock(return_value="Ray ClusterInfo")
mock_get_submission_client_cluster = Mock(return_value="Ray ClusterInfo")
mock_module = Mock()
mock_module.get_job_submission_client_cluster_info = Mock(
mock_module.get_submission_client_cluster_info = Mock(
return_value="Other module ClusterInfo"
)
mock_import_module = Mock(return_value=mock_module)
@ -38,8 +38,8 @@ def test_parse_cluster_info(
address, module_string, inner_address = address_param
with patch.multiple(
"ray.dashboard.modules.job.sdk",
get_job_submission_client_cluster_info=mock_get_job_submission_client_cluster,
"ray.dashboard.modules.dashboard_sdk",
get_submission_client_cluster_info=mock_get_submission_client_cluster,
), patch.multiple("importlib", import_module=mock_import_module):
if module_string == "ray":
assert (
@ -52,7 +52,7 @@ def test_parse_cluster_info(
)
== "Ray ClusterInfo"
)
mock_get_job_submission_client_cluster.assert_called_once_with(
mock_get_submission_client_cluster.assert_called_once_with(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
@ -71,7 +71,7 @@ def test_parse_cluster_info(
== "Other module ClusterInfo"
)
mock_import_module.assert_called_once_with(module_string)
mock_module.get_job_submission_client_cluster_info.assert_called_once_with(
mock_module.get_submission_client_cluster_info.assert_called_once_with(
inner_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,

View file

@ -0,0 +1,70 @@
from typing import Any, Dict, Optional, Union
try:
import aiohttp
import requests
except ImportError:
aiohttp = None
requests = None
from ray.dashboard.modules.dashboard_sdk import SubmissionClient
DEPLOY_PATH = "/api/serve/deployments/"
INFO_PATH = "/api/serve/deployments/"
STATUS_PATH = "/api/serve/deployments/status"
DELETE_PATH = "/api/serve/deployments/"
class ServeSubmissionClient(SubmissionClient):
def __init__(
self,
address: str,
create_cluster_if_needed=False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
):
if requests is None:
raise RuntimeError(
"The Serve CLI requires the ray[default] "
"installation: `pip install 'ray[default']``"
)
super().__init__(
address=address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
)
self._check_connection_and_version(
min_version="1.12",
version_error_message="Serve CLI is not supported on the Ray "
"cluster. Please ensure the cluster is "
"running Ray 1.12 or higher.",
)
def deploy_application(self, app_config: Dict) -> None:
response = self._do_request("PUT", DEPLOY_PATH, json_data=app_config)
if response.status_code != 200:
self._raise_error(response)
def get_info(self) -> Union[Dict, None]:
response = self._do_request("GET", INFO_PATH)
if response.status_code == 200:
return response.json()
else:
self._raise_error(response)
def get_status(self) -> Union[Dict, None]:
response = self._do_request("GET", STATUS_PATH)
if response.status_code == 200:
return response.json()
else:
self._raise_error(response)
def delete_application(self) -> None:
response = self._do_request("DELETE", DELETE_PATH)
if response.status_code != 200:
self._raise_error(response)

View file

@ -0,0 +1,5 @@
py_modules:
- "pm1"
- "pm2"
working_dir: "wd"

View file

@ -0,0 +1,64 @@
import pytest
import sys
import os
from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args
class TestParseRuntimeEnvArgs:
@pytest.mark.skipif(
sys.platform == "win32", reason="File path incorrect on Windows."
)
def test_runtime_env_valid(self):
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml"
)
assert parse_runtime_env_args(runtime_env=config_file_name) == {
"py_modules": ["pm1", "pm2"],
"working_dir": "wd",
}
def test_runtime_env_json_valid(self):
runtime_env = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd"}'
assert parse_runtime_env_args(runtime_env_json=runtime_env) == {
"py_modules": ["pm1", "pm2"],
"working_dir": "wd",
}
@pytest.mark.skipif(
sys.platform == "win32", reason="File path incorrect on Windows."
)
def test_runtime_env_and_json(self):
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml"
)
runtime_env_json = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd"}'
with pytest.raises(ValueError):
parse_runtime_env_args(
runtime_env=config_file_name, runtime_env_json=runtime_env_json
)
def test_working_dir_valid(self):
assert parse_runtime_env_args(working_dir="wd") == {"working_dir": "wd"}
@pytest.mark.skipif(
sys.platform == "win32", reason="File path incorrect on Windows."
)
def test_working_dir_override(self):
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "basic_runtime_env.yaml"
)
assert parse_runtime_env_args(
runtime_env=config_file_name, working_dir="wd2"
) == {"py_modules": ["pm1", "pm2"], "working_dir": "wd2"}
runtime_env = '{"py_modules": ["pm1", "pm2"], "working_dir": "wd2"}'
assert parse_runtime_env_args(
runtime_env_json=runtime_env, working_dir="wd2"
) == {"py_modules": ["pm1", "pm2"], "working_dir": "wd2"}
def test_all_none(self):
assert parse_runtime_env_args() == {}
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -269,7 +269,7 @@ py_test(
py_test(
name = "test_cli",
size = "medium",
size = "large",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],

View file

@ -4,7 +4,6 @@ import yaml
import os
import sys
import pathlib
import requests
import click
import time
from typing import Tuple, List, Dict
@ -25,20 +24,11 @@ from ray.dashboard.modules.serve.schema import (
schema_to_serve_application,
serve_application_status_to_schema,
)
from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args
from ray.dashboard.modules.serve.sdk import ServeSubmissionClient
from ray.autoscaler._private.cli_logger import cli_logger
def log_failed_request(response: requests.models.Response, address: str):
error_message = (
f"\nRequest to address {address} failed. Got response status code "
f"{response.status_code} with the following message:"
f"\n\n{response.text}"
)
cli_logger.newline()
cli_logger.error(error_message)
cli_logger.newline()
def process_args_and_kwargs(
args_and_kwargs: Tuple[str],
) -> Tuple[List[str], Dict[str, str]]:
@ -99,14 +89,33 @@ def process_args_and_kwargs(
return args, args_and_kwargs
def configure_runtime_env(deployment: Deployment, updates: Dict):
"""
Overwrites deployment's runtime_env with fields in updates. Any fields in
deployment's runtime_env that aren't in updates stay the same.
"""
if deployment.ray_actor_options is None:
deployment._ray_actor_options = {"runtime_env": updates}
elif "runtime_env" in deployment.ray_actor_options:
deployment.ray_actor_options["runtime_env"].update(updates)
else:
deployment.ray_actor_options["runtime_env"] = updates
@click.group(help="[EXPERIMENTAL] CLI for managing Serve instances on a Ray cluster.")
def cli():
pass
@cli.command(help="Start a detached Serve instance on the Ray cluster.")
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "auto"),
required=False,
type=str,
help="Address of the running Ray cluster to connect to. " 'Defaults to "auto".',
help='Address of the running Ray cluster to connect to. Defaults to "auto".',
)
@click.option(
"--namespace",
@ -123,15 +132,6 @@ def process_args_and_kwargs(
type=str,
help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."),
)
def cli(address, namespace, runtime_env_json):
ray.init(
address=address,
namespace=namespace,
runtime_env=json.loads(runtime_env_json),
)
@cli.command(help="Start a detached Serve instance on the Ray cluster.")
@click.option(
"--http-host",
default=DEFAULT_HTTP_HOST,
@ -160,7 +160,20 @@ def cli(address, namespace, runtime_env_json):
type=str,
hidden=True,
)
def start(http_host, http_port, http_location, checkpoint_path):
def start(
address,
namespace,
runtime_env_json,
http_host,
http_port,
http_location,
checkpoint_path,
):
ray.init(
address=address,
namespace=namespace,
runtime_env=json.loads(runtime_env_json),
)
serve.start(
detached=True,
http_options=dict(
@ -173,7 +186,27 @@ def start(http_host, http_port, http_location, checkpoint_path):
@cli.command(help="Shutdown the running Serve instance on the Ray cluster.")
def shutdown():
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "auto"),
required=False,
type=str,
help='Address of the running Ray cluster to connect to. Defaults to "auto".',
)
@click.option(
"--namespace",
"-n",
default="serve",
required=False,
type=str,
help='Ray namespace to connect to. Defaults to "serve".',
)
def shutdown(address: str, namespace: str):
ray.init(
address=address,
namespace=namespace,
)
serve.api._connect()
serve.shutdown()
@ -188,6 +221,29 @@ class may or may not be decorated with ``@serve.deployment``.
hidden=True,
)
@click.argument("deployment")
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "auto"),
required=False,
type=str,
help='Address of the running Ray cluster to connect to. Defaults to "auto".',
)
@click.option(
"--namespace",
"-n",
default="serve",
required=False,
type=str,
help='Ray namespace to connect to. Defaults to "serve".',
)
@click.option(
"--runtime-env-json",
default=r"{}",
required=False,
type=str,
help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."),
)
@click.option(
"--options-json",
default=r"{}",
@ -195,7 +251,18 @@ class may or may not be decorated with ``@serve.deployment``.
type=str,
help="JSON string for the deployments options",
)
def create_deployment(deployment: str, options_json: str):
def create_deployment(
address: str,
namespace: str,
runtime_env_json: str,
deployment: str,
options_json: str,
):
ray.init(
address=address,
namespace=namespace,
runtime_env=json.loads(runtime_env_json),
)
deployment_cls = import_attr(deployment)
if not isinstance(deployment_cls, Deployment):
deployment_cls = serve.deployment(deployment_cls)
@ -204,11 +271,17 @@ def create_deployment(deployment: str, options_json: str):
@cli.command(
help="""
[Experimental] Deploy a YAML configuration file via REST API to
your Serve cluster.
""",
hidden=True,
short_help="[Experimental] Deploy deployments from a YAML config file.",
help=(
"Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a "
"Serve YAML configuration file path or an import path to "
"a class or function to deploy.\n\n"
"Import paths must be of the form "
'"module.submodule_1...submodule_n.MyClassOrFunction".\n\n'
"Sends a nonblocking request. A successful response only indicates that the "
"request was received successfully. It does not mean the deployments are "
"live. Use `serve info` and `serve status` to check on them. "
),
)
@click.argument("config_file_name")
@click.option(
@ -220,51 +293,111 @@ def create_deployment(deployment: str, options_json: str):
help='Address of the Ray dashboard to query. For example, "http://localhost:8265".',
)
def deploy(config_file_name: str, address: str):
full_address_path = f"{address}/api/serve/deployments/"
with open(config_file_name, "r") as config_file:
config = yaml.safe_load(config_file)
# Generate a schema using the config to ensure its format is valid
# Schematize config to validate format
ServeApplicationSchema.parse_obj(config)
response = requests.put(full_address_path, json=config)
ServeSubmissionClient(address).deploy_application(config)
if response.status_code == 200:
cli_logger.newline()
cli_logger.success(
"\nSent deploy request successfully!\n "
"* Use `serve status` to check your deployments' statuses.\n "
"* Use `serve info` to see your running Serve "
"application's configuration.\n"
)
cli_logger.newline()
else:
log_failed_request(response, address)
cli_logger.newline()
cli_logger.success(
"\nSent deploy request successfully!\n "
"* Use `serve status` to check your deployments' statuses.\n "
"* Use `serve info` to see your running Serve "
"application's configuration.\n"
)
cli_logger.newline()
@cli.command(
help="[Experimental] Run deployments via Serve's Python API.",
hidden=True,
short_help="[Experimental] Run deployments via Serve's Python API.",
help=(
"Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a "
"Serve YAML configuration file path or an import path to "
"a class or function to deploy.\n\n"
"The full command must be of the form:\n"
'"serve run [import path] [optional parameters] -- [arg-1] ... [arg-n] '
'[kwarg-1]=[kwval-1] ... [kwarg-n]=[kwval-n]"\n\n'
"Deployments via import path may also take in init_args and "
"init_kwargs from any ARGS_AND_KWARGS passed in. Import paths must be "
"of the form:\n"
'"module.submodule_1...submodule_n.MyClassOrFunction".\n\n'
"Blocks after deploying, and logs status periodically. After being killed, "
"this command tears down all deployments it deployed. If there are no "
"deployments left, it also tears down the Serve application."
),
)
@click.argument("config_or_import_path")
@click.argument("args_and_kwargs", required=False, nargs=-1)
@click.option(
"--runtime-env",
type=str,
default=None,
required=False,
help="Path to a local YAML file containing a runtime_env definition. "
"Overrides all runtime_envs specified in a config file.",
)
@click.option(
"--runtime-env-json",
type=str,
default=None,
required=False,
help="JSON-serialized runtime_env dictionary. Overrides all runtime_envs "
"specified in a config file.",
)
@click.option(
"--working-dir",
type=str,
default=None,
required=False,
help=(
"Directory containing files that your job will run in. Can be a "
"local directory or a remote URI to a .zip file (S3, GS, HTTP). "
"This overrides the working_dir in --runtime-env if both are "
"specified. Overrides all working_dirs specified in a config file."
),
)
@click.option(
"--cluster-address",
"-c",
default="auto",
required=False,
type=str,
help=('Address of the Ray cluster to query. Defaults to "auto".'),
)
@click.option(
"--dashboard-address",
"-d",
default="http://localhost:8265",
required=False,
type=str,
help=(
'Address of the Ray dashboard to query. Defaults to "http://localhost:8265".'
),
)
def run(
config_or_import_path: str,
args_and_kwargs: Tuple[str],
runtime_env: str,
runtime_env_json: str,
working_dir: str,
cluster_address: str,
dashboard_address: str,
):
"""
Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a
Serve YAML configuration file path or an import path to
a class or function to deploy. Import paths must be of the form
"module.submodule_1...submodule_n.MyClassOrFunction".
"""
try:
# Check if path provided is for config or import
deployments = []
is_config = pathlib.Path(config_or_import_path).is_file()
args, kwargs = process_args_and_kwargs(args_and_kwargs)
runtime_env_updates = parse_runtime_env_args(
runtime_env=runtime_env,
runtime_env_json=runtime_env_json,
working_dir=working_dir,
)
if is_config:
config_path = config_or_import_path
@ -285,7 +418,14 @@ def run(
schematized_config = ServeApplicationSchema.parse_obj(config)
deployments = schema_to_serve_application(schematized_config)
ray.init(address=cluster_address, namespace="serve")
serve.start(detached=True)
ServeSubmissionClient(dashboard_address)._upload_working_dir_if_needed(
runtime_env_updates
)
for deployment in deployments:
configure_runtime_env(deployment, runtime_env_updates)
deploy_group(deployments)
cli_logger.newline()
@ -310,7 +450,13 @@ def run(
deployment = serve.deployment(name=deployment_name)(import_path)
deployments = [deployment]
ray.init(address=cluster_address, namespace="serve")
serve.start(detached=True)
ServeSubmissionClient(dashboard_address)._upload_working_dir_if_needed(
runtime_env_updates
)
configure_runtime_env(deployment, runtime_env_updates)
deployment.options(
init_args=args,
init_kwargs=kwargs,
@ -340,8 +486,11 @@ def run(
@cli.command(
help="[Experimental] Get info about your Serve application's config.",
hidden=True,
short_help="[Experimental] Get info about your Serve application's config.",
help=(
"Prints the configurations of all running deployments in the Serve "
"application."
),
)
@click.option(
"--address",
@ -351,18 +500,32 @@ def run(
type=str,
help='Address of the Ray dashboard to query. For example, "http://localhost:8265".',
)
def info(address: str):
full_address_path = f"{address}/api/serve/deployments/"
response = requests.get(full_address_path)
if response.status_code == 200:
print(json.dumps(response.json(), indent=4))
else:
log_failed_request(response, address)
@click.option(
"--json_format",
"-j",
is_flag=True,
help="Print info as json. If omitted, info is printed as YAML.",
)
def info(address: str, json_format=bool):
app_info = ServeSubmissionClient(address).get_info()
if app_info is not None:
if json_format:
print(json.dumps(app_info, indent=4))
else:
print(yaml.dump(app_info))
@cli.command(
help="[Experimental] Get your Serve application's status.",
hidden=True,
short_help="[Experimental] Get your Serve application's status.",
help=(
"Prints status information about all deployments in the Serve application.\n\n"
"Deployments may be:\n\n"
"- HEALTHY: all replicas are acting normally and passing their health checks.\n"
"- UNHEALTHY: at least one replica is not acting normally and may not be "
"passing its health check.\n"
"- UPDATING: the deployment is updating."
),
)
@click.option(
"--address",
@ -373,17 +536,17 @@ def info(address: str):
help='Address of the Ray dashboard to query. For example, "http://localhost:8265".',
)
def status(address: str):
full_address_path = f"{address}/api/serve/deployments/status"
response = requests.get(full_address_path)
if response.status_code == 200:
print(json.dumps(response.json(), indent=4))
else:
log_failed_request(response, address)
app_status = ServeSubmissionClient(address).get_status()
if app_status is not None:
print(json.dumps(app_status, indent=4))
@cli.command(
help="[Experimental] Get info about your Serve application's config.",
hidden=True,
short_help=(
"[EXPERIMENTAL] Deletes all running deployments in the Serve application."
),
help="Deletes all running deployments in the Serve application.",
)
@click.option(
"--address",
@ -395,6 +558,7 @@ def status(address: str):
)
@click.option("--yes", "-y", is_flag=True, help="Bypass confirmation prompt.")
def delete(address: str, yes: bool):
if not yes:
click.confirm(
f"\nThis will shutdown the Serve application at address "
@ -403,11 +567,8 @@ def delete(address: str, yes: bool):
abort=True,
)
full_address_path = f"{address}/api/serve/deployments/"
response = requests.delete(full_address_path)
if response.status_code == 200:
cli_logger.newline()
cli_logger.success("\nSent delete request successfully!\n")
cli_logger.newline()
else:
log_failed_request(response, address)
ServeSubmissionClient(address).delete_application()
cli_logger.newline()
cli_logger.success("\nSent delete request successfully!\n")
cli_logger.newline()

View file

@ -12,7 +12,7 @@ from ray import serve
from ray.tests.conftest import tmp_working_dir # noqa: F401, E501
from ray._private.test_utils import wait_for_condition
from ray.dashboard.optional_utils import RAY_INTERNAL_DASHBOARD_NAMESPACE
from ray.serve.scripts import process_args_and_kwargs
from ray.serve.scripts import process_args_and_kwargs, configure_runtime_env
def ping_endpoint(endpoint: str, params: str = ""):
@ -149,6 +149,72 @@ class TestProcessArgsAndKwargs:
assert kwargs == {}
class TestConfigureRuntimeEnv:
@serve.deployment
def f():
pass
@pytest.mark.parametrize("ray_actor_options", [None, {}])
def test_empty_ray_actor_options(self, ray_actor_options):
runtime_env = {
"working_dir": "http://test.com",
"pip": ["requests", "pendulum==2.1.2"],
}
deployment = TestConfigureRuntimeEnv.f.options(
ray_actor_options=ray_actor_options
)
configure_runtime_env(deployment, runtime_env)
assert deployment.ray_actor_options["runtime_env"] == runtime_env
def test_overwrite_all_options(self):
old_runtime_env = {
"working_dir": "http://test.com",
"pip": ["requests", "pendulum==2.1.2"],
}
new_runtime_env = {
"working_dir": "http://new.com",
"pip": [],
"env_vars": {"test_var": "test"},
}
deployment = TestConfigureRuntimeEnv.f.options(
ray_actor_options={"runtime_env": old_runtime_env}
)
configure_runtime_env(deployment, new_runtime_env)
assert deployment.ray_actor_options["runtime_env"] == new_runtime_env
def test_overwrite_some_options(self):
old_runtime_env = {
"working_dir": "http://new.com",
"pip": [],
"env_vars": {"test_var": "test"},
}
new_runtime_env = {
"working_dir": "http://test.com",
"pip": ["requests", "pendulum==2.1.2"],
}
merged_env = {
"working_dir": "http://test.com",
"pip": ["requests", "pendulum==2.1.2"],
"env_vars": {"test_var": "test"},
}
deployment = TestConfigureRuntimeEnv.f.options(
ray_actor_options={"runtime_env": old_runtime_env}
)
configure_runtime_env(deployment, new_runtime_env)
assert deployment.ray_actor_options["runtime_env"] == merged_env
def test_overwrite_no_options(self):
runtime_env = {
"working_dir": "http://test.com",
"pip": ["requests", "pendulum==2.1.2"],
}
deployment = TestConfigureRuntimeEnv.f.options(
ray_actor_options={"runtime_env": runtime_env}
)
configure_runtime_env(deployment, {})
assert deployment.ray_actor_options["runtime_env"] == runtime_env
def test_start_shutdown(ray_start_stop):
with pytest.raises(subprocess.CalledProcessError):
subprocess.check_output(["serve", "shutdown"])
@ -159,10 +225,10 @@ def test_start_shutdown(ray_start_stop):
def test_start_shutdown_in_namespace(ray_start_stop):
with pytest.raises(subprocess.CalledProcessError):
subprocess.check_output(["serve", "-n", "test", "shutdown"])
subprocess.check_output(["serve", "shutdown", "-n", "test"])
subprocess.check_output(["serve", "-n", "test", "start"])
subprocess.check_output(["serve", "-n", "test", "shutdown"])
subprocess.check_output(["serve", "start", "-n", "test"])
subprocess.check_output(["serve", "shutdown", "-n", "test"])
class A:
@ -195,14 +261,14 @@ def test_create_deployment(ray_start_stop, tmp_working_dir, class_name): # noqa
subprocess.check_output(
[
"serve",
"create-deployment",
f"ray.serve.tests.test_cli.{class_name}",
"--runtime-env-json",
json.dumps(
{
"working_dir": tmp_working_dir,
}
),
"create-deployment",
f"ray.serve.tests.test_cli.{class_name}",
"--options-json",
json.dumps(
{
@ -334,7 +400,7 @@ def test_info(ray_start_stop):
deploy_response = subprocess.check_output(["serve", "deploy", config_file_name])
assert success_message_fragment in deploy_response
info_response = subprocess.check_output(["serve", "info"]).decode("utf-8")
info_response = subprocess.check_output(["serve", "info", "-j"]).decode("utf-8")
info = json.loads(info_response)
assert "deployments" in info
@ -404,7 +470,7 @@ def test_delete(ray_start_stop):
# Deploys a config file and deletes it
def get_num_deployments():
info_response = subprocess.check_output(["serve", "info"])
info_response = subprocess.check_output(["serve", "info", "-j"])
info = json.loads(info_response)
return len(info["deployments"])
@ -561,5 +627,81 @@ def test_run_simultaneous(ray_start_stop):
assert ping_endpoint("Macaw") == "connection error"
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_run_runtime_env(ray_start_stop):
# Tests serve run with runtime_envs specified
# Use local working_dir with import path
p = subprocess.Popen(
[
"serve",
"run",
"test_cli.Macaw",
"--working-dir",
os.path.dirname(__file__),
"--",
"green",
"--name=Molly",
]
)
wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=10)
p.send_signal(signal.SIGINT)
p.wait()
# Use local working_dir with config file
p = subprocess.Popen(
[
"serve",
"run",
os.path.join(
os.path.dirname(__file__), "test_config_files", "scarlet.yaml"
),
"--working-dir",
os.path.dirname(__file__),
]
)
wait_for_condition(
lambda: ping_endpoint("Scarlet") == "Scarlet is red!", timeout=10
)
p.send_signal(signal.SIGINT)
p.wait()
# Use remote working_dir
p = subprocess.Popen(
[
"serve",
"run",
"test_module.test.one",
"--working-dir",
"https://github.com/shrekris-anyscale/test_module/archive/HEAD.zip",
]
)
wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10)
p.send_signal(signal.SIGINT)
p.wait()
# Use runtime env
p = subprocess.Popen(
[
"serve",
"run",
os.path.join(
os.path.dirname(__file__), "test_config_files", "fake_runtime_env.yaml"
),
"--runtime-env-json",
(
'{"py_modules": ["https://github.com/shrekris-anyscale/'
'test_deploy_group/archive/HEAD.zip"],'
'"working_dir": "http://nonexistentlink-q490123950ni34t"}'
),
"--working-dir",
"https://github.com/shrekris-anyscale/test_module/archive/HEAD.zip",
]
)
wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10)
p.send_signal(signal.SIGINT)
p.wait()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -0,0 +1,21 @@
deployments:
- name: one
init_args: null
init_kwargs: null
import_path: "test_module.test.one"
num_replicas: 2
route_prefix: "/one"
max_concurrent_queries: null
user_config: null
autoscaling_config: null
graceful_shutdown_wait_loop_s: null
graceful_shutdown_timeout_s: null
health_check_period_s: null
health_check_timeout_s: null
ray_actor_options:
runtime_env:
py_modules:
- "https://fakemodule1.com"
- "https://fakemodule2.com"
working_dir: "https://fakewd1.com"

View file

@ -0,0 +1,18 @@
deployments:
- name: Macaw
init_args:
- "red"
init_kwargs:
name: "Scarlet"
import_path: "test_cli.Macaw"
num_replicas: 1
route_prefix: "/Scarlet"
max_concurrent_queries: null
user_config: null
autoscaling_config: null
graceful_shutdown_wait_loop_s: null
graceful_shutdown_timeout_s: null
health_check_period_s: null
health_check_timeout_s: null
ray_actor_options: null