diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 7fc56aa40..add9674c8 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -2,11 +2,14 @@ import json import yaml import os +import sys +import pathlib import requests import click +import time import ray -from ray.serve.api import Deployment +from ray.serve.api import Deployment, deploy_group, get_deployment_statuses from ray.serve.config import DeploymentMode from ray._private.utils import import_attr from ray import serve @@ -15,7 +18,11 @@ from ray.serve.constants import ( DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, ) -from ray.dashboard.modules.serve.schema import ServeApplicationSchema +from ray.dashboard.modules.serve.schema import ( + ServeApplicationSchema, + schema_to_serve_application, + serve_application_status_to_schema, +) from ray.autoscaler._private.cli_logger import cli_logger @@ -52,9 +59,7 @@ def log_failed_request(response: requests.models.Response, address: str): default=r"{}", required=False, type=str, - help=( - "Runtime environment dictionary to pass into ray.init. " "Defaults to empty." - ), + help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."), ) def cli(address, namespace, runtime_env_json): ray.init( @@ -176,6 +181,91 @@ def deploy(config_file_name: str, address: str): log_failed_request(response, address) +@cli.command( + help="[Experimental] Run deployments via Serve's Python API.", + hidden=True, +) +@click.argument("config_or_import_path") +@click.option( + "--config_or_import_path", + default=None, + required=False, + type=str, + help="Either a Serve YAML configuration file path or an import path to " + "a class or function to deploy. Import paths must be of the form " + '"module.submodule_1...submodule_n.MyClassOrFunction".', +) +@click.option( + "--address", + "-a", + default=None, + required=False, + type=str, + help="Address of the running Ray cluster to connect to. " 'Defaults to "auto".', +) +def run(config_or_import_path: str, address: str): + """ + Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a + Serve YAML configuration file path or an import path to + a class or function to deploy. Import paths must be of the form + "module.submodule_1...submodule_n.MyClassOrFunction". + """ + + try: + # Check if path provided is for config or import + is_config = pathlib.Path(config_or_import_path).is_file() + + if address is not None: + ray.init(address=address, namespace="serve") + serve.start() + + if is_config: + cli_logger.print( + "Deploying application in config file at " f"{config_or_import_path}." + ) + with open(config_or_import_path, "r") as config_file: + config = yaml.safe_load(config_file) + + schematized_config = ServeApplicationSchema.parse_obj(config) + deployments = schema_to_serve_application(schematized_config) + deploy_group(deployments) + + cli_logger.newline() + cli_logger.success( + f'\nDeployments from config file at "{config_or_import_path}" ' + "deployed successfully!\n" + ) + cli_logger.newline() + + if not is_config: + cli_logger.print( + "Deploying function or class imported from " f"{config_or_import_path}." + ) + func_or_class = import_attr(config_or_import_path) + if not isinstance(func_or_class, Deployment): + func_or_class = serve.deployment(func_or_class) + func_or_class.deploy() + + cli_logger.newline() + cli_logger.print( + f"\nDeployed import at {config_or_import_path} successfully!\n" + ) + cli_logger.newline() + + while True: + statuses = serve_application_status_to_schema( + get_deployment_statuses() + ).json(indent=4) + cli_logger.newline() + cli_logger.print(f"\n{statuses}", no_format=True) + cli_logger.newline() + time.sleep(10) + + except KeyboardInterrupt: + cli_logger.print("Got SIGINT (KeyboardInterrupt). Shutting down Serve.") + sys.exit() + + @cli.command( help="[Experimental] Get info about your Serve application's config.", hidden=True, @@ -195,3 +285,56 @@ def info(address: str): print(json.dumps(response.json(), indent=4)) else: log_failed_request(response, address) + + +@cli.command( + help="[Experimental] Get your Serve application's status.", + hidden=True, +) +@click.option( + "--address", + "-a", + default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"), + required=False, + type=str, + help='Address of the Ray dashboard to query. For example, "http://localhost:8265".', +) +def status(address: str): + full_address_path = f"{address}/api/serve/deployments/status" + response = requests.get(full_address_path) + if response.status_code == 200: + print(json.dumps(response.json(), indent=4)) + else: + log_failed_request(response, address) + + +@cli.command( + help="[Experimental] Get info about your Serve application's config.", + hidden=True, +) +@click.option( + "--address", + "-a", + default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"), + required=False, + type=str, + help='Address of the Ray dashboard to query. For example, "http://localhost:8265".', +) +@click.option("--yes", "-y", is_flag=True, help="Bypass confirmation prompt.") +def delete(address: str, yes: bool): + if not yes: + click.confirm( + f"\nThis will shutdown the Serve application at address " + f'"{address}" and delete all deployments there. Do you ' + "want to continue?", + abort=True, + ) + + full_address_path = f"{address}/api/serve/deployments/" + response = requests.delete(full_address_path) + if response.status_code == 200: + cli_logger.newline() + cli_logger.success("\nSent delete request successfully!\n") + cli_logger.newline() + else: + log_failed_request(response, address) diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index 3891e1454..95d7418fe 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -3,13 +3,14 @@ import os from pathlib import Path import subprocess import sys - +import signal import pytest import requests import ray from ray import serve from ray.tests.conftest import tmp_working_dir # noqa: F401, E501 +from ray._private.test_utils import wait_for_condition from ray.dashboard.optional_utils import RAY_INTERNAL_DASHBOARD_NAMESPACE @@ -164,6 +165,8 @@ def test_deploy(ray_start_stop): == deployment_config["response"] ) + ray.shutdown() + @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") def test_info(ray_start_stop): @@ -222,5 +225,88 @@ def test_info(ray_start_stop): ) +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_status(ray_start_stop): + # Deploys a config file and checks its status + + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "three_deployments.yaml" + ) + + subprocess.check_output(["serve", "deploy", config_file_name]) + status_response = subprocess.check_output(["serve", "status"]) + statuses = json.loads(status_response)["statuses"] + + expected_deployments = {"shallow", "deep", "one"} + for status in statuses: + expected_deployments.remove(status["name"]) + assert status["status"] in {"HEALTHY", "UPDATING"} + assert "message" in status + assert len(expected_deployments) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_delete(ray_start_stop): + # Deploys a config file and deletes it + + def get_num_deployments(): + info_response = subprocess.check_output(["serve", "info"]) + info = json.loads(info_response) + return len(info["deployments"]) + + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "two_deployments.yaml" + ) + + # Check idempotence + for _ in range(2): + subprocess.check_output(["serve", "deploy", config_file_name]) + wait_for_condition(lambda: get_num_deployments() == 2, timeout=35) + + subprocess.check_output(["serve", "delete", "-y"]) + wait_for_condition(lambda: get_num_deployments() == 0, timeout=35) + + +def parrot(request): + return request.query_params["sound"] + + +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_run(ray_start_stop): + # Deploys valid config file and import path via serve run + + def ping_endpoint(endpoint: str, params: str = ""): + try: + return requests.get(f"http://localhost:8000/{endpoint}{params}").text + except requests.exceptions.ConnectionError: + return "connection error" + + # Deploy via config file + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "two_deployments.yaml" + ) + + p = subprocess.Popen(["serve", "run", config_file_name]) + wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10) + wait_for_condition( + lambda: ping_endpoint("shallow") == "Hello shallow world!", timeout=10 + ) + + p.send_signal(signal.SIGINT) # Equivalent to ctrl-C + p.wait() + assert ping_endpoint("one") == "connection error" + assert ping_endpoint("shallow") == "connection error" + + # Deploy via import path + p = subprocess.Popen(["serve", "run", "ray.serve.tests.test_cli.parrot"]) + wait_for_condition( + lambda: ping_endpoint("parrot", params="?sound=squawk") == "squawk", timeout=10 + ) + + p.send_signal(signal.SIGINT) # Equivalent to ctrl-C + p.wait() + assert ping_endpoint("parrot", params="?sound=squawk") == "connection error" + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__]))