Revert "Restore "[Serve] Deploy Serve deployment graphs via REST API" (#25073) (#25333)"

This reverts commit 16bdfe6a39.
This commit is contained in:
shrekris-anyscale 2022-06-03 12:03:39 -07:00 committed by GitHub
parent 2e05b62236
commit d6db873da1
14 changed files with 91 additions and 549 deletions

View file

@ -56,19 +56,10 @@ class ServeHead(dashboard_utils.DashboardHeadModule):
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=True)
async def put_all_deployments(self, req: Request) -> Response:
from ray import serve
from ray.serve.context import get_global_client
from ray.serve.schema import ServeApplicationSchema
from ray.serve.application import Application
config = ServeApplicationSchema.parse_obj(await req.json())
if config.import_path is not None:
client = get_global_client(_override_controller_namespace="serve")
client.deploy_app(config)
else:
# TODO (shrekris-anyscale): Remove this conditional path
app = Application.from_dict(await req.json())
serve.run(app, _blocking=False)
app = Application.from_dict(await req.json())
serve.run(app, _blocking=False)
return Response()

View file

@ -153,39 +153,6 @@ def test_put_get_success(ray_start_stop):
)
def test_put_new_rest_api(ray_start_stop):
config = {
"import_path": "ray.serve.tests.test_config_files.pizza.serve_dag",
"deployments": [
{
"name": "Multiplier",
"user_config": {
"factor": 1,
},
},
{
"name": "Adder",
"user_config": {
"increment": 1,
},
},
],
}
put_response = requests.put(GET_OR_PUT_URL, json=config, timeout=30)
assert put_response.status_code == 200
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "3 pizzas please!",
timeout=30,
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 2]).json()
== "2 pizzas please!",
timeout=30,
)
def test_delete_success(ray_start_stop):
ray_actor_options = {
"runtime_env": {

View file

@ -399,7 +399,7 @@ def test_ordinal_encoder_no_encode_list():
"unique_values(B)": {"cold": 0, "hot": 1, "warm": 2},
"unique_values(C)": {1: 0, 5: 1, 10: 2},
"unique_values(D)": {
tuple(): 0,
(): 0,
("cold", "cold"): 1,
("hot", "warm", "cold"): 2,
("warm",): 3,
@ -475,7 +475,7 @@ def test_one_hot_encoder():
"unique_values(B)": {"cold": 0, "hot": 1, "warm": 2},
"unique_values(C)": {1: 0, 5: 1, 10: 2},
"unique_values(D)": {
tuple(): 0,
(): 0,
("cold", "cold"): 1,
("hot", "warm", "cold"): 2,
("warm",): 3,

View file

@ -28,7 +28,6 @@ from ray.serve.config import (
HTTPOptions,
ReplicaConfig,
)
from ray.serve.schema import ServeApplicationSchema
from ray.serve.constants import (
MAX_CACHED_HANDLES,
CLIENT_POLLING_INTERVAL_S,
@ -326,16 +325,6 @@ class ServeControllerClient:
)
self.delete_deployments(deployment_names_to_delete, blocking=_blocking)
@_ensure_connected
def deploy_app(self, config: ServeApplicationSchema) -> None:
ray.get(
self._controller.deploy_app.remote(
config.import_path,
config.runtime_env,
config.dict(by_alias=True, exclude_unset=True).get("deployments", []),
)
)
@_ensure_connected
def delete_deployments(self, names: Iterable[str], blocking: bool = True) -> None:
ray.get(self._controller.delete_deployments.remote(names))

View file

@ -136,7 +136,7 @@ class StatusOverview:
)
@classmethod
def from_proto(cls, proto: StatusOverviewProto) -> "StatusOverview":
def from_proto(cls, proto: StatusOverviewProto):
# Recreate Serve Application info
app_status = ApplicationStatusInfo.from_proto(proto.app_status)

View file

@ -3,16 +3,12 @@ from collections import defaultdict
from copy import copy
import json
import logging
import traceback
import time
import os
from typing import Dict, Iterable, List, Optional, Tuple, Any
import ray
from ray.types import ObjectRef
from ray.actor import ActorHandle
from ray._private.utils import import_attr
from ray.exceptions import RayTaskError
from ray.serve.autoscaling_metrics import InMemoryMetricsStore
from ray.serve.autoscaling_policy import BasicAutoscalingPolicy
@ -123,12 +119,6 @@ class ServeController:
_override_controller_namespace=_override_controller_namespace,
)
# Reference to Ray task executing most recent deployment request
self.config_deployment_request_ref: ObjectRef = None
# Unix timestamp of latest config deployment request. Defaults to 0.
self.deployment_timestamp = 0
# TODO(simon): move autoscaling related stuff into a manager.
self.autoscaling_metrics_store = InMemoryMetricsStore()
self.handle_metrics_store = InMemoryMetricsStore()
@ -418,36 +408,6 @@ class ServeController:
return [self.deploy(**args) for args in deployment_args_list]
def deploy_app(
self,
import_path: str,
runtime_env: str,
deployment_override_options: List[Dict],
) -> None:
"""Kicks off a task that deploys a Serve application.
Cancels any previous in-progress task that is deploying a Serve
application.
Args:
import_path: Serve deployment graph's import path
runtime_env: runtime_env to run the deployment graph in
deployment_override_options: All dictionaries should
contain argument-value options that can be passed directly
into a set_options() call. Overrides deployment options set
in the graph itself.
"""
if self.config_deployment_request_ref is not None:
ray.cancel(self.config_deployment_request_ref)
logger.debug("Canceled existing config deployment request.")
self.config_deployment_request_ref = run_graph.options(
runtime_env=runtime_env
).remote(import_path, deployment_override_options)
self.deployment_timestamp = time.time()
def delete_deployment(self, name: str):
self.endpoint_state.delete_endpoint(name)
return self.deployment_state_manager.delete_deployment(name)
@ -534,25 +494,12 @@ class ServeController:
)
return deployment_route_list.SerializeToString()
async def get_serve_status(self) -> bytes:
def get_serve_status(self) -> bytes:
# TODO (shrekris-anyscale): Replace defaults with actual REST API status
serve_app_status = ApplicationStatus.RUNNING
serve_app_message = ""
deployment_timestamp = self.deployment_timestamp
if self.config_deployment_request_ref:
finished, pending = ray.wait(
[self.config_deployment_request_ref], timeout=0
)
if pending:
serve_app_status = ApplicationStatus.DEPLOYING
else:
try:
await finished[0]
except RayTaskError:
serve_app_status = ApplicationStatus.DEPLOY_FAILED
serve_app_message = f"Deployment failed:\n{traceback.format_exc()}"
deployment_timestamp = time.time()
app_status = ApplicationStatusInfo(
serve_app_status, serve_app_message, deployment_timestamp
@ -565,23 +512,3 @@ class ServeController:
)
return status_info.to_proto().SerializeToString()
@ray.remote(max_calls=1)
def run_graph(import_path: str, deployment_override_options: List[Dict]):
"""Deploys a Serve application to the controller's Ray cluster."""
from ray import serve
from ray.serve.api import build
# Import and build the graph
graph = import_attr(import_path)
app = build(graph)
# Override options for each deployment
for options_dict in deployment_override_options:
name = options_dict["name"]
app.deployments[name].set_options(**options_dict)
# Run the graph locally on the cluster
serve.start(_override_controller_namespace="serve")
serve.run(app)

View file

@ -1,17 +1,13 @@
import logging
from typing import Dict, Any, Optional
from ray import cloudpickle
from ray.serve.common import EndpointInfo, EndpointTag
from ray.serve.constants import SERVE_LOGGER_NAME
from ray.serve.long_poll import LongPollNamespace
from ray.serve.storage.kv_store import KVStoreBase
from ray.serve.long_poll import LongPollHost
CHECKPOINT_KEY = "serve-endpoint-state-checkpoint"
logger = logging.getLogger(SERVE_LOGGER_NAME)
class EndpointState:
"""Manages all state for endpoints in the system.
@ -58,19 +54,15 @@ class EndpointState:
updated to match the given parameters. Calling this twice with the same
arguments is a no-op.
"""
if self._endpoints.get(endpoint) == endpoint_info:
return
existing_route_endpoint = self._get_endpoint_for_route(endpoint_info.route)
if existing_route_endpoint is not None and existing_route_endpoint != endpoint:
logger.warn(
f'route_prefix "{endpoint_info.route}" is currently '
f'registered to deployment "{existing_route_endpoint}". '
f'Re-registering route_prefix "{endpoint_info.route}" to '
f'deployment "{endpoint}".'
raise ValueError(
f"route_prefix '{endpoint_info.route}' is already registered."
)
del self._endpoints[existing_route_endpoint]
if endpoint in self._endpoints:
if self._endpoints[endpoint] == endpoint_info:
return
self._endpoints[endpoint] = endpoint_info

View file

@ -78,9 +78,7 @@ class RayActorOptionsSchema(BaseModel, extra=Extra.forbid):
return v
class DeploymentSchema(
BaseModel, extra=Extra.forbid, allow_population_by_field_name=True
):
class DeploymentSchema(BaseModel, extra=Extra.forbid):
name: str = Field(
..., description=("Globally-unique name identifying this deployment.")
)
@ -155,7 +153,6 @@ class DeploymentSchema(
"replicas; the number of replicas will be fixed at "
"num_replicas."
),
alias="_autoscaling_config",
)
graceful_shutdown_wait_loop_s: float = Field(
default=None,
@ -165,7 +162,6 @@ class DeploymentSchema(
"default if null."
),
ge=0,
alias="_graceful_shutdown_wait_loop_s",
)
graceful_shutdown_timeout_s: float = Field(
default=None,
@ -175,7 +171,6 @@ class DeploymentSchema(
"default if null."
),
ge=0,
alias="_graceful_shutdown_timeout_s",
)
health_check_period_s: float = Field(
default=None,
@ -184,7 +179,6 @@ class DeploymentSchema(
"replicas. Uses a default if null."
),
gt=0,
alias="_health_check_period_s",
)
health_check_timeout_s: float = Field(
default=None,
@ -194,12 +188,66 @@ class DeploymentSchema(
"unhealthy. Uses a default if null."
),
gt=0,
alias="_health_check_timeout_s",
)
ray_actor_options: RayActorOptionsSchema = Field(
default=None, description="Options set for each replica actor."
)
@root_validator
def application_sufficiently_specified(cls, values):
"""
Some application information, such as the path to the function or class
must be specified. Additionally, some attributes only work in specific
languages (e.g. init_args and init_kwargs make sense in Python but not
Java). Specifying attributes that belong to different languages is
invalid.
"""
# Ensure that an application path is set
application_paths = {"import_path"}
specified_path = None
for path in application_paths:
if path in values and values[path] is not None:
specified_path = path
if specified_path is None:
raise ValueError(
"A path to the application's class or function must be specified."
)
# Ensure that only attributes belonging to the application path's
# language are specified.
# language_attributes contains all attributes in this schema related to
# the application's language
language_attributes = {"import_path", "init_args", "init_kwargs"}
# corresponding_attributes maps application_path attributes to all the
# attributes that may be set in that path's language
corresponding_attributes = {
# Python
"import_path": {"import_path", "init_args", "init_kwargs"}
}
possible_attributes = corresponding_attributes[specified_path]
for attribute in values:
if (
attribute not in possible_attributes
and attribute in language_attributes
):
raise ValueError(
f'Got "{values[specified_path]}" for '
f"{specified_path} and {values[attribute]} "
f"for {attribute}. {specified_path} and "
f"{attribute} do not belong to the same "
f"language and cannot be specified at the "
f"same time. Expected one of these to be "
f"null."
)
return values
@root_validator
def num_replicas_and_autoscaling_config_mutually_exclusive(cls, values):
if (
@ -297,10 +345,7 @@ class ServeApplicationSchema(BaseModel, extra=Extra.forbid):
"and py_modules may contain only remote URIs."
),
)
deployments: List[DeploymentSchema] = Field(
default=[],
description=("Deployment options that override options specified in the code."),
)
deployments: List[DeploymentSchema] = Field(...)
@validator("runtime_env")
def runtime_env_contains_remote_uris(cls, v):
@ -351,8 +396,6 @@ class ServeApplicationSchema(BaseModel, extra=Extra.forbid):
"import path may not start or end with a dot."
)
return v
class ServeStatusSchema(BaseModel, extra=Extra.forbid):
app_status: ApplicationStatusInfo = Field(

View file

@ -212,6 +212,16 @@ def test_user_config(serve_instance):
wait_for_condition(lambda: check("456", 3))
def test_reject_duplicate_route(serve_instance):
@serve.deployment(name="A", route_prefix="/api")
class A:
pass
A.deploy()
with pytest.raises(ValueError):
A.options(name="B").deploy()
def test_scaling_replicas(serve_instance):
@serve.deployment(name="counter", num_replicas=2)
class Counter:

View file

@ -1,87 +0,0 @@
from enum import Enum
from typing import List, Dict, TypeVar
import ray
from ray import serve
import starlette.requests
from ray.serve.drivers import DAGDriver
from ray.serve.deployment_graph import InputNode
RayHandleLike = TypeVar("RayHandleLike")
class Operation(str, Enum):
ADDITION = "ADD"
MULTIPLICATION = "MUL"
@serve.deployment(ray_actor_options={"num_cpus": 0.15})
class Router:
def __init__(self, multiplier: RayHandleLike, adder: RayHandleLike):
self.adder = adder
self.multiplier = multiplier
def route(self, op: Operation, input: int) -> int:
if op == Operation.ADDITION:
return ray.get(self.adder.add.remote(input))
elif op == Operation.MULTIPLICATION:
return ray.get(self.multiplier.multiply.remote(input))
@serve.deployment(
user_config={
"factor": 3,
},
ray_actor_options={"num_cpus": 0.15},
)
class Multiplier:
def __init__(self, factor: int):
self.factor = factor
def reconfigure(self, config: Dict):
self.factor = config.get("factor", -1)
def multiply(self, input_factor: int) -> int:
return input_factor * self.factor
@serve.deployment(
user_config={
"increment": 2,
},
ray_actor_options={"num_cpus": 0.15},
)
class Adder:
def __init__(self, increment: int):
self.increment = increment
def reconfigure(self, config: Dict):
self.increment = config.get("increment", -1)
def add(self, input: int) -> int:
return input + self.increment
@serve.deployment(ray_actor_options={"num_cpus": 0.15})
def create_order(amount: int) -> str:
return f"{amount} pizzas please!"
async def json_resolver(request: starlette.requests.Request) -> List:
return await request.json()
# Overwritten by user_config
ORIGINAL_INCREMENT = 1
ORIGINAL_FACTOR = 1
with InputNode() as inp:
operation, amount_input = inp[0], inp[1]
multiplier = Multiplier.bind(ORIGINAL_FACTOR)
adder = Adder.bind(ORIGINAL_INCREMENT)
router = Router.bind(multiplier, adder)
amount = router.route.bind(operation, amount_input)
order = create_order.bind(amount)
serve_dag = DAGDriver.bind(order, http_adapter=json_resolver)

View file

@ -1,20 +0,0 @@
from ray import serve
from ray.serve.deployment_graph import RayServeDAGHandle
@serve.deployment(ray_actor_options={"num_cpus": 0.1})
def f(*args):
return "wonderful world"
@serve.deployment(ray_actor_options={"num_cpus": 0.1})
class BasicDriver:
def __init__(self, dag: RayServeDAGHandle):
self.dag = dag
async def __call__(self):
return await self.dag.remote()
FNode = f.bind()
DagNode = BasicDriver.bind(FNode)

View file

@ -37,8 +37,9 @@ def test_path_validation(serve_instance):
D4.deploy()
# Allow duplicate route.
D4.options(name="test2").deploy()
# Reject duplicate route.
with pytest.raises(ValueError):
D4.options(name="test2").deploy()
def test_routes_endpoint(serve_instance):

View file

@ -272,8 +272,12 @@ class TestDeploymentSchema:
# Python requires an import path
deployment_schema = self.get_minimal_deployment_schema()
del deployment_schema["import_path"]
# DeploymentSchema should be generated with valid import_paths
with pytest.raises(ValueError, match="must be specified"):
DeploymentSchema.parse_obj(deployment_schema)
# DeploymentSchema should be generated once import_path is set
for path in get_valid_import_paths():
deployment_schema["import_path"] = path
DeploymentSchema.parse_obj(deployment_schema)
@ -500,67 +504,6 @@ class TestServeApplicationSchema:
with pytest.raises(ValidationError):
ServeApplicationSchema.parse_obj(serve_application_schema)
def test_serve_application_aliasing(self):
"""Check aliasing behavior for schemas."""
# Check that private options can optionally include underscore
app_dict = {
"import_path": "module.graph",
"runtime_env": {},
"deployments": [
{
"name": "d1",
"max_concurrent_queries": 3,
"autoscaling_config": {},
"_graceful_shutdown_wait_loop_s": 30,
"graceful_shutdown_timeout_s": 10,
"_health_check_period_s": 5,
"health_check_timeout_s": 7,
},
{
"name": "d2",
"max_concurrent_queries": 6,
"_autoscaling_config": {},
"graceful_shutdown_wait_loop_s": 50,
"_graceful_shutdown_timeout_s": 15,
"health_check_period_s": 53,
"_health_check_timeout_s": 73,
},
],
}
schema = ServeApplicationSchema.parse_obj(app_dict)
# Check that schema dictionary can include private options with an
# underscore (using the aliases)
private_options = {
"_autoscaling_config",
"_graceful_shutdown_wait_loop_s",
"_graceful_shutdown_timeout_s",
"_health_check_period_s",
"_health_check_timeout_s",
}
for deployment in schema.dict(by_alias=True)["deployments"]:
for option in private_options:
# Option with leading underscore
assert option in deployment
# Option without leading underscore
assert option[1:] not in deployment
# Check that schema dictionary can include private options without an
# underscore (using the field names)
for deployment in schema.dict()["deployments"]:
for option in private_options:
# Option without leading underscore
assert option[1:] in deployment
# Option with leading underscore
assert option not in deployment
class TestServeStatusSchema:
def get_valid_serve_status_schema(self):

View file

@ -1,10 +1,8 @@
from contextlib import contextmanager
import sys
import os
import subprocess
from tempfile import NamedTemporaryFile
import requests
from typing import Dict
import pytest
from ray.cluster_utils import AutoscalingCluster
@ -14,9 +12,6 @@ import ray
import ray.state
from ray import serve
from ray.serve.context import get_global_client
from ray.serve.schema import ServeApplicationSchema
from ray.serve.client import ServeControllerClient
from ray.serve.common import ApplicationStatus
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest import call_ray_stop_only # noqa: F401
@ -30,25 +25,13 @@ def shutdown_ray():
ray.shutdown()
@contextmanager
@pytest.fixture
def start_and_shutdown_ray_cli():
subprocess.check_output(["ray", "start", "--head"])
yield
subprocess.check_output(["ray", "stop", "--force"])
@pytest.fixture(scope="function")
def start_and_shutdown_ray_cli_function():
with start_and_shutdown_ray_cli():
yield
@pytest.fixture(scope="class")
def start_and_shutdown_ray_cli_class():
with start_and_shutdown_ray_cli():
yield
def test_standalone_actor_outside_serve():
# https://github.com/ray-project/ray/issues/20066
@ -232,204 +215,7 @@ def test_get_serve_status(shutdown_ray):
ray.shutdown()
@pytest.mark.usefixtures("start_and_shutdown_ray_cli_class")
class TestDeployApp:
@pytest.fixture()
def client(self):
ray.init(address="auto", namespace="serve")
client = serve.start(detached=True)
yield client
serve.shutdown()
ray.shutdown()
def get_test_config(self) -> Dict:
return {"import_path": "ray.serve.tests.test_config_files.pizza.serve_dag"}
def test_deploy_app_basic(self, client: ServeControllerClient):
config = ServeApplicationSchema.parse_obj(self.get_test_config())
client.deploy_app(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "4 pizzas please!"
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 3]).json()
== "9 pizzas please!"
)
def test_deploy_app_with_overriden_config(self, client: ServeControllerClient):
config = self.get_test_config()
config["deployments"] = [
{
"name": "Multiplier",
"user_config": {
"factor": 4,
},
},
{
"name": "Adder",
"user_config": {
"increment": 5,
},
},
]
client.deploy_app(ServeApplicationSchema.parse_obj(config))
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 0]).json()
== "5 pizzas please!"
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 2]).json()
== "8 pizzas please!"
)
def test_deploy_app_update_config(self, client: ServeControllerClient):
config = ServeApplicationSchema.parse_obj(self.get_test_config())
client.deploy_app(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "4 pizzas please!"
)
config = self.get_test_config()
config["deployments"] = [
{
"name": "Adder",
"user_config": {
"increment": -1,
},
},
]
client.deploy_app(ServeApplicationSchema.parse_obj(config))
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "1 pizzas please!"
)
def test_deploy_app_update_num_replicas(self, client: ServeControllerClient):
config = ServeApplicationSchema.parse_obj(self.get_test_config())
client.deploy_app(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "4 pizzas please!"
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 3]).json()
== "9 pizzas please!"
)
actors = ray.util.list_named_actors(all_namespaces=True)
config = self.get_test_config()
config["deployments"] = [
{
"name": "Adder",
"num_replicas": 2,
"user_config": {
"increment": 0,
},
"ray_actor_options": {"num_cpus": 0.1},
},
{
"name": "Multiplier",
"num_replicas": 3,
"user_config": {
"factor": 0,
},
"ray_actor_options": {"num_cpus": 0.1},
},
]
client.deploy_app(ServeApplicationSchema.parse_obj(config))
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "2 pizzas please!"
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 3]).json()
== "0 pizzas please!"
)
wait_for_condition(
lambda: client.get_serve_status().app_status.status
== ApplicationStatus.RUNNING,
timeout=15,
)
updated_actors = ray.util.list_named_actors(all_namespaces=True)
assert len(updated_actors) == len(actors) + 3
def test_deploy_app_update_timestamp(self, client: ServeControllerClient):
assert client.get_serve_status().app_status.deployment_timestamp == 0
config = ServeApplicationSchema.parse_obj(self.get_test_config())
client.deploy_app(config)
wait_for_condition(
lambda: client.get_serve_status().app_status.deployment_timestamp > 0
)
first_deploy_time = client.get_serve_status().app_status.deployment_timestamp
config = self.get_test_config()
config["deployments"] = [
{
"name": "Adder",
"num_replicas": 2,
},
]
client.deploy_app(ServeApplicationSchema.parse_obj(config))
wait_for_condition(
lambda: client.get_serve_status().app_status.deployment_timestamp
> first_deploy_time
)
assert client.get_serve_status().app_status.status in {
ApplicationStatus.DEPLOYING,
ApplicationStatus.RUNNING,
}
def test_deploy_app_overwrite_apps(self, client: ServeControllerClient):
"""Check that overwriting a live app with a new one works."""
# Launch first graph. Its driver's route_prefix should be "/".
test_config_1 = ServeApplicationSchema.parse_obj(
{
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
}
)
client.deploy_app(test_config_1)
wait_for_condition(
lambda: requests.get("http://localhost:8000/").text == "wonderful world"
)
# Launch second graph. Its driver's route_prefix should also be "/".
# "/" should lead to the new driver.
test_config_2 = ServeApplicationSchema.parse_obj(
{
"import_path": "ray.serve.tests.test_config_files.pizza.serve_dag",
}
)
client.deploy_app(test_config_2)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "4 pizzas please!"
)
def test_shutdown_remote(start_and_shutdown_ray_cli_function):
def test_shutdown_remote(start_and_shutdown_ray_cli):
"""Check that serve.shutdown() works on a remote Ray cluster."""
deploy_serve_script = (