[serve] Implement experimental deploy_group API (#22039)

If the declarative API issues a code change to a group of deployments at once, it needs to deploy the group of updated deployments atomically. This ensures any deployment using another deployment's handle inside its own __init__() function can access that handle regardless of the deployment order. This change adds deploy_group to the ServeController class, allowing it to deploy a list of deployments atomically. It also adds a new public API command, serve.deploy_group(), exposing the controller's functionality publicly, so atomic deployments can also be executed via Python API.

Closes #21873.
This commit is contained in:
shrekris-anyscale 2022-02-04 16:12:14 -08:00 committed by GitHub
parent a692e7d05e
commit a61d974dd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 353 additions and 65 deletions

View file

@ -8,7 +8,7 @@ import re
import time
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union, overload
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union, List, overload
from fastapi import APIRouter, FastAPI
from starlette.requests import Request
@ -116,7 +116,7 @@ class Client:
def __init__(
self, controller: ActorHandle, controller_name: str, detached: bool = False
):
self._controller = controller
self._controller: ServeController = controller
self._controller_name = controller_name
self._detached = detached
self._shutdown = False
@ -227,83 +227,77 @@ class Client:
url: Optional[str] = None,
_blocking: Optional[bool] = True,
) -> Optional[GoalId]:
if config is None:
config = {}
if ray_actor_options is None:
ray_actor_options = {}
curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in ray_actor_options:
ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
ray_actor_options["runtime_env"] = curr_job_env
replica_config = ReplicaConfig(
deployment_def,
controller_deploy_args = self.get_deploy_args(
name=name,
deployment_def=deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
config=config,
version=version,
prev_version=prev_version,
route_prefix=route_prefix,
)
if isinstance(config, dict):
deployment_config = DeploymentConfig.parse_obj(config)
elif isinstance(config, DeploymentConfig):
deployment_config = config
else:
raise TypeError("config must be a DeploymentConfig or a dictionary.")
if (
deployment_config.autoscaling_config is not None
and deployment_config.max_concurrent_queries
< deployment_config.autoscaling_config.target_num_ongoing_requests_per_replica # noqa: E501
):
logger.warning(
"Autoscaling will never happen, "
"because 'max_concurrent_queries' is less than "
"'target_num_ongoing_requests_per_replica' now."
)
goal_id, updating = ray.get(
self._controller.deploy.remote(
name,
deployment_config.to_proto_bytes(),
replica_config,
version,
prev_version,
route_prefix,
ray.get_runtime_context().job_id,
)
self._controller.deploy.remote(**controller_deploy_args)
)
tag = f"component=serve deployment={name}"
if updating:
msg = f"Updating deployment '{name}'"
if version is not None:
msg += f" to version '{version}'"
logger.info(f"{msg}. {tag}")
else:
logger.info(
f"Deployment '{name}' is already at version "
f"'{version}', not updating. {tag}"
)
tag = self.log_deployment_update_status(name, version, updating)
if _blocking:
self._wait_for_goal(goal_id)
if url is not None:
url_part = f" at `{url}`"
else:
url_part = ""
logger.info(
f"Deployment '{name}{':'+version if version else ''}' is ready"
f"{url_part}. {tag}"
)
self.log_deployment_ready(name, version, url, tag)
else:
return goal_id
@_ensure_connected
def deploy_group(
self, deployments: List[Dict], _blocking: bool = True
) -> List[GoalId]:
deployment_args_list = []
for deployment in deployments:
deployment_args_list.append(
self.get_deploy_args(
deployment["name"],
deployment["func_or_class"],
deployment["init_args"],
deployment["init_kwargs"],
ray_actor_options=deployment["ray_actor_options"],
config=deployment["config"],
version=deployment["version"],
prev_version=deployment["prev_version"],
route_prefix=deployment["route_prefix"],
)
)
update_goals = ray.get(
self._controller.deploy_group.remote(deployment_args_list)
)
tags = []
for i in range(len(deployments)):
deployment = deployments[i]
name, version = deployment["name"], deployment["version"]
updating = update_goals[i][1]
tags.append(self.log_deployment_update_status(name, version, updating))
nonblocking_goal_ids = []
for i in range(len(deployments)):
deployment = deployments[i]
url = deployment["url"]
goal_id = update_goals[i][0]
if _blocking:
self._wait_for_goal(goal_id)
self.log_deployment_ready(name, version, url, tags[i])
else:
nonblocking_goal_ids.append(goal_id)
return nonblocking_goal_ids
@_ensure_connected
def delete_deployment(self, name: str) -> None:
self._wait_for_goal(ray.get(self._controller.delete_deployment.remote(name)))
@ -406,6 +400,104 @@ class Client:
return handle
@_ensure_connected
def get_deploy_args(
self,
name: str,
deployment_def: Union[Callable, Type[Callable], str],
init_args: Tuple[Any],
init_kwargs: Dict[Any, Any],
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
route_prefix: Optional[str] = None,
) -> Dict:
"""
Takes a deployment's configuration, and returns the arguments needed
for the controller to deploy it.
"""
if config is None:
config = {}
if ray_actor_options is None:
ray_actor_options = {}
curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in ray_actor_options:
ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
ray_actor_options["runtime_env"] = curr_job_env
replica_config = ReplicaConfig(
deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
)
if isinstance(config, dict):
deployment_config = DeploymentConfig.parse_obj(config)
elif isinstance(config, DeploymentConfig):
deployment_config = config
else:
raise TypeError("config must be a DeploymentConfig or a dictionary.")
if (
deployment_config.autoscaling_config is not None
and deployment_config.max_concurrent_queries
< deployment_config.autoscaling_config.target_num_ongoing_requests_per_replica # noqa: E501
):
logger.warning(
"Autoscaling will never happen, "
"because 'max_concurrent_queries' is less than "
"'target_num_ongoing_requests_per_replica' now."
)
controller_deploy_args = {
"name": name,
"deployment_config_proto_bytes": deployment_config.to_proto_bytes(),
"replica_config": replica_config,
"version": version,
"prev_version": prev_version,
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().job_id,
}
return controller_deploy_args
@_ensure_connected
def log_deployment_update_status(
self, name: str, version: str, updating: bool
) -> str:
tag = f"component=serve deployment={name}"
if updating:
msg = f"Updating deployment '{name}'"
if version is not None:
msg += f" to version '{version}'"
logger.info(f"{msg}. {tag}")
else:
logger.info(
f"Deployment '{name}' is already at version "
f"'{version}', not updating. {tag}"
)
return tag
@_ensure_connected
def log_deployment_ready(self, name: str, version: str, url: str, tag: str) -> None:
if url is not None:
url_part = f" at `{url}`"
else:
url_part = ""
logger.info(
f"Deployment '{name}{':'+version if version else ''}' is ready"
f"{url_part}. {tag}"
)
def _check_http_and_checkpoint_options(
client: Client,
@ -865,7 +957,7 @@ class Deployment:
@property
def init_kwargs(self) -> Tuple[Any]:
"""Keyword args passed to the underlying class's constructor."""
return self._init_args
return self._init_kwargs
@property
def url(self) -> Optional[str]:
@ -1236,3 +1328,45 @@ def list_deployments() -> Dict[str, Deployment]:
)
return deployments
def deploy_group(deployments: List[Deployment], _blocking: bool = True) -> List[GoalId]:
"""
EXPERIMENTAL API
Takes in a list of deployment object, and deploys them atomically.
Args:
deployments(List[Deployment]): a list of deployments to deploy.
_blocking(bool): whether to wait for the deployments to finish
deploying or not.
"""
if len(deployments) == 0:
return []
parameter_group = []
for deployment in deployments:
if not isinstance(deployment, Deployment):
raise TypeError(
f"deploy_group only accepts Deployments, but got unexpected "
f"type {type(deployment)}."
)
deployment_parameters = {
"name": deployment._name,
"func_or_class": deployment._func_or_class,
"init_args": deployment.init_args,
"init_kwargs": deployment.init_kwargs,
"ray_actor_options": deployment._ray_actor_options,
"config": deployment._config,
"version": deployment._version,
"prev_version": deployment._prev_version,
"route_prefix": deployment.route_prefix,
"url": deployment.url,
}
parameter_group.append(deployment_parameters)
return _get_global_client().deploy_group(parameter_group, _blocking=_blocking)

View file

@ -358,6 +358,22 @@ class ServeController:
return goal_id, updating
def deploy_group(
self, deployment_args_list: List[Dict]
) -> List[Tuple[Optional[GoalId], bool]]:
"""
Takes in a list of dictionaries that contain keyword arguments for the
controller's deploy() function. Calls deploy on all the argument
dictionaries in the list. Effectively executes an atomic deploy on a
group of deployments.
"""
update_goals: List[Tuple[Optional[GoalId], bool]] = []
for deployment_args in deployment_args_list:
update_goals.append(self.deploy(**deployment_args))
return update_goals
def delete_deployment(self, name: str) -> Optional[GoalId]:
self.endpoint_state.delete_endpoint(name)
return self.deployment_state_manager.delete_deployment(name)

View file

@ -16,6 +16,8 @@ from ray import serve
from ray.serve.exceptions import RayServeException
from ray.serve.utils import get_random_letters
from ray.serve.api import deploy_group
@pytest.mark.parametrize("use_handle", [True, False])
def test_deploy(serve_instance, use_handle):
@ -1193,6 +1195,142 @@ def test_http_proxy_request_cancellation(serve_instance):
assert requests.get(url).text == "2"
class TestDeployGroup:
@serve.deployment
def f():
return "f reached"
@serve.deployment
def g():
return "g reached"
@serve.deployment
class C:
async def __call__(self):
return "C reached"
@serve.deployment
class D:
async def __call__(self):
return "D reached"
def deploy_and_check_responses(
self, deployments, responses, blocking=True, client=None
):
"""
Helper function that deploys the list of deployments, calls them with
their handles, and checks whether they return the objects in responses.
If blocking is False, this function uses a non-blocking deploy and uses
the client to wait until the deployments finish deploying.
"""
goal_ids = deploy_group(deployments, _blocking=blocking)
if blocking:
assert len(goal_ids) == 0
else:
assert len(goal_ids) == len(deployments)
if client:
for id in goal_ids:
client._wait_for_goal(id)
for deployment, response in zip(deployments, responses):
assert ray.get(deployment.get_handle().remote()) == response
def test_basic_deploy_group(self, serve_instance):
"""
Atomically deploys a group of deployments, including both functions and
classes. Checks whether they deploy correctly.
"""
deployments = [self.f, self.g, self.C, self.D]
responses = ["f reached", "g reached", "C reached", "D reached"]
self.deploy_and_check_responses(deployments, responses)
def test_non_blocking_deploy_group(self, serve_instance):
"""Checks deploy_group's behavior when _blocking=False."""
deployments = [self.f, self.g, self.C, self.D]
responses = ["f reached", "g reached", "C reached", "D reached"]
self.deploy_and_check_responses(
deployments, responses, blocking=False, client=serve_instance
)
def test_mutual_handles(self, serve_instance):
"""
Atomically deploys a group of deployments that get handles to other
deployments in the group inside their __init__ functions. The handle
references should fail in a non-atomic deployment. Checks whether the
deployments deploy correctly.
"""
@serve.deployment
class MutualHandles:
async def __init__(self, handle_name):
self.handle = serve.get_deployment(handle_name).get_handle()
async def __call__(self, echo: str):
return await self.handle.request_echo.remote(echo)
async def request_echo(self, echo: str):
return echo
names = []
for i in range(10):
names.append("a" * i)
deployments = []
for idx in range(len(names)):
# Each deployment will hold a ServeHandle with the next name in
# the list
deployment_name = names[idx]
handle_name = names[(idx + 1) % len(names)]
deployments.append(
MutualHandles.options(name=deployment_name, init_args=(handle_name,))
)
deploy_group(deployments)
for deployment in deployments:
assert (ray.get(deployment.get_handle().remote("hello"))) == "hello"
def test_decorated_deployments(self, serve_instance):
"""
Checks deploy_group's behavior when deployments have options set in
their @serve.deployment decorator.
"""
@serve.deployment(num_replicas=2, max_concurrent_queries=5)
class DecoratedClass1:
async def __call__(self):
return "DecoratedClass1 reached"
@serve.deployment(num_replicas=4, max_concurrent_queries=2)
class DecoratedClass2:
async def __call__(self):
return "DecoratedClass2 reached"
deployments = [DecoratedClass1, DecoratedClass2]
responses = ["DecoratedClass1 reached", "DecoratedClass2 reached"]
self.deploy_and_check_responses(deployments, responses)
def test_empty_list(self, serve_instance):
"""Checks deploy_group's behavior when deployment group is empty."""
self.deploy_and_check_responses([], [])
def test_invalid_input(self, serve_instance):
"""
Checks deploy_group's behavior when deployment group contains
non-Deployment objects.
"""
with pytest.raises(TypeError):
deploy_group([self.f, self.C, "not a Deployment object"])
if __name__ == "__main__":
import sys