[serve] Add run, delete, and status to Serve CLI (#22714)

This change adds `run`, `delete`, and `status` commands to the CLI introduced in #22648.
* `serve run`: Blocking command that allows users to deploy a YAML configuration or a class/function via import path. When terminated, the deployment(s) is torn down. Prints status info while running. Supports interactive development.
* `serve delete`: Shuts down a Serve application and deletes all its running deployments.
* `serve status`: Displays the status of a Serve application's deployments.
This commit is contained in:
shrekris-anyscale 2022-03-03 07:50:36 -08:00 committed by GitHub
parent 76dc4ccbfd
commit 71a493cf1f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 235 additions and 6 deletions

View file

@ -2,11 +2,14 @@
import json
import yaml
import os
import sys
import pathlib
import requests
import click
import time
import ray
from ray.serve.api import Deployment
from ray.serve.api import Deployment, deploy_group, get_deployment_statuses
from ray.serve.config import DeploymentMode
from ray._private.utils import import_attr
from ray import serve
@ -15,7 +18,11 @@ from ray.serve.constants import (
DEFAULT_HTTP_HOST,
DEFAULT_HTTP_PORT,
)
from ray.dashboard.modules.serve.schema import ServeApplicationSchema
from ray.dashboard.modules.serve.schema import (
ServeApplicationSchema,
schema_to_serve_application,
serve_application_status_to_schema,
)
from ray.autoscaler._private.cli_logger import cli_logger
@ -52,9 +59,7 @@ def log_failed_request(response: requests.models.Response, address: str):
default=r"{}",
required=False,
type=str,
help=(
"Runtime environment dictionary to pass into ray.init. " "Defaults to empty."
),
help=("Runtime environment dictionary to pass into ray.init. Defaults to empty."),
)
def cli(address, namespace, runtime_env_json):
ray.init(
@ -176,6 +181,91 @@ def deploy(config_file_name: str, address: str):
log_failed_request(response, address)
@cli.command(
help="[Experimental] Run deployments via Serve's Python API.",
hidden=True,
)
@click.argument("config_or_import_path")
@click.option(
"--config_or_import_path",
default=None,
required=False,
type=str,
help="Either a Serve YAML configuration file path or an import path to "
"a class or function to deploy. Import paths must be of the form "
'"module.submodule_1...submodule_n.MyClassOrFunction".',
)
@click.option(
"--address",
"-a",
default=None,
required=False,
type=str,
help="Address of the running Ray cluster to connect to. " 'Defaults to "auto".',
)
def run(config_or_import_path: str, address: str):
"""
Deploys deployment(s) from CONFIG_OR_IMPORT_PATH, which must be either a
Serve YAML configuration file path or an import path to
a class or function to deploy. Import paths must be of the form
"module.submodule_1...submodule_n.MyClassOrFunction".
"""
try:
# Check if path provided is for config or import
is_config = pathlib.Path(config_or_import_path).is_file()
if address is not None:
ray.init(address=address, namespace="serve")
serve.start()
if is_config:
cli_logger.print(
"Deploying application in config file at " f"{config_or_import_path}."
)
with open(config_or_import_path, "r") as config_file:
config = yaml.safe_load(config_file)
schematized_config = ServeApplicationSchema.parse_obj(config)
deployments = schema_to_serve_application(schematized_config)
deploy_group(deployments)
cli_logger.newline()
cli_logger.success(
f'\nDeployments from config file at "{config_or_import_path}" '
"deployed successfully!\n"
)
cli_logger.newline()
if not is_config:
cli_logger.print(
"Deploying function or class imported from " f"{config_or_import_path}."
)
func_or_class = import_attr(config_or_import_path)
if not isinstance(func_or_class, Deployment):
func_or_class = serve.deployment(func_or_class)
func_or_class.deploy()
cli_logger.newline()
cli_logger.print(
f"\nDeployed import at {config_or_import_path} successfully!\n"
)
cli_logger.newline()
while True:
statuses = serve_application_status_to_schema(
get_deployment_statuses()
).json(indent=4)
cli_logger.newline()
cli_logger.print(f"\n{statuses}", no_format=True)
cli_logger.newline()
time.sleep(10)
except KeyboardInterrupt:
cli_logger.print("Got SIGINT (KeyboardInterrupt). Shutting down Serve.")
sys.exit()
@cli.command(
help="[Experimental] Get info about your Serve application's config.",
hidden=True,
@ -195,3 +285,56 @@ def info(address: str):
print(json.dumps(response.json(), indent=4))
else:
log_failed_request(response, address)
@cli.command(
help="[Experimental] Get your Serve application's status.",
hidden=True,
)
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
required=False,
type=str,
help='Address of the Ray dashboard to query. For example, "http://localhost:8265".',
)
def status(address: str):
full_address_path = f"{address}/api/serve/deployments/status"
response = requests.get(full_address_path)
if response.status_code == 200:
print(json.dumps(response.json(), indent=4))
else:
log_failed_request(response, address)
@cli.command(
help="[Experimental] Get info about your Serve application's config.",
hidden=True,
)
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
required=False,
type=str,
help='Address of the Ray dashboard to query. For example, "http://localhost:8265".',
)
@click.option("--yes", "-y", is_flag=True, help="Bypass confirmation prompt.")
def delete(address: str, yes: bool):
if not yes:
click.confirm(
f"\nThis will shutdown the Serve application at address "
f'"{address}" and delete all deployments there. Do you '
"want to continue?",
abort=True,
)
full_address_path = f"{address}/api/serve/deployments/"
response = requests.delete(full_address_path)
if response.status_code == 200:
cli_logger.newline()
cli_logger.success("\nSent delete request successfully!\n")
cli_logger.newline()
else:
log_failed_request(response, address)

View file

@ -3,13 +3,14 @@ import os
from pathlib import Path
import subprocess
import sys
import signal
import pytest
import requests
import ray
from ray import serve
from ray.tests.conftest import tmp_working_dir # noqa: F401, E501
from ray._private.test_utils import wait_for_condition
from ray.dashboard.optional_utils import RAY_INTERNAL_DASHBOARD_NAMESPACE
@ -164,6 +165,8 @@ def test_deploy(ray_start_stop):
== deployment_config["response"]
)
ray.shutdown()
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_info(ray_start_stop):
@ -222,5 +225,88 @@ def test_info(ray_start_stop):
)
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status(ray_start_stop):
# Deploys a config file and checks its status
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "three_deployments.yaml"
)
subprocess.check_output(["serve", "deploy", config_file_name])
status_response = subprocess.check_output(["serve", "status"])
statuses = json.loads(status_response)["statuses"]
expected_deployments = {"shallow", "deep", "one"}
for status in statuses:
expected_deployments.remove(status["name"])
assert status["status"] in {"HEALTHY", "UPDATING"}
assert "message" in status
assert len(expected_deployments) == 0
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_delete(ray_start_stop):
# Deploys a config file and deletes it
def get_num_deployments():
info_response = subprocess.check_output(["serve", "info"])
info = json.loads(info_response)
return len(info["deployments"])
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "two_deployments.yaml"
)
# Check idempotence
for _ in range(2):
subprocess.check_output(["serve", "deploy", config_file_name])
wait_for_condition(lambda: get_num_deployments() == 2, timeout=35)
subprocess.check_output(["serve", "delete", "-y"])
wait_for_condition(lambda: get_num_deployments() == 0, timeout=35)
def parrot(request):
return request.query_params["sound"]
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_run(ray_start_stop):
# Deploys valid config file and import path via serve run
def ping_endpoint(endpoint: str, params: str = ""):
try:
return requests.get(f"http://localhost:8000/{endpoint}{params}").text
except requests.exceptions.ConnectionError:
return "connection error"
# Deploy via config file
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "two_deployments.yaml"
)
p = subprocess.Popen(["serve", "run", config_file_name])
wait_for_condition(lambda: ping_endpoint("one") == "2", timeout=10)
wait_for_condition(
lambda: ping_endpoint("shallow") == "Hello shallow world!", timeout=10
)
p.send_signal(signal.SIGINT) # Equivalent to ctrl-C
p.wait()
assert ping_endpoint("one") == "connection error"
assert ping_endpoint("shallow") == "connection error"
# Deploy via import path
p = subprocess.Popen(["serve", "run", "ray.serve.tests.test_cli.parrot"])
wait_for_condition(
lambda: ping_endpoint("parrot", params="?sound=squawk") == "squawk", timeout=10
)
p.send_signal(signal.SIGINT) # Equivalent to ctrl-C
p.wait()
assert ping_endpoint("parrot", params="?sound=squawk") == "connection error"
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))