[serve] Add properties + docstring + test for Deployment class (#15917)

This commit is contained in:
Edward Oakes 2021-05-19 14:44:00 -05:00 committed by GitHub
parent 836c739fe5
commit a116875abc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 135 additions and 55 deletions

View file

@ -1056,7 +1056,7 @@ def ingress(app: Union["FastAPI", "APIRouter"]):
class Deployment:
def __init__(self,
backend_def: Callable,
func_or_class: Callable,
name: str,
config: BackendConfig,
version: Optional[str] = None,
@ -1075,7 +1075,7 @@ class Deployment:
raise RuntimeError(
"The Deployment constructor should not be called "
"directly. Use `@serve.deployment` instead.")
if not callable(backend_def):
if not callable(func_or_class):
raise TypeError(
"@serve.deployment must be called on a class or function.")
if not isinstance(name, str):
@ -1101,43 +1101,92 @@ class Deployment:
if init_args is None:
init_args = ()
self.backend_def = backend_def
self.name = name
self.version = version
self.config = config
self.init_args = init_args
self.route_prefix = route_prefix
self.ray_actor_options = ray_actor_options
self._func_or_class = func_or_class
self._name = name
self._version = version
self._config = config
self._init_args = init_args
self._route_prefix = route_prefix
self._ray_actor_options = ray_actor_options
@property
def name(self) -> str:
"""Unique name of this deployment."""
return self._name
@property
def version(self) -> Optional[str]:
"""Version of this deployment.
If None, will be redeployed every time `.deploy()` is called.
"""
return self._version
@property
def func_or_class(self) -> Callable:
"""Underlying class or function that this deployment wraps."""
return self._func_or_class
@property
def num_replicas(self) -> int:
"""Current target number of replicas."""
return self._config.num_replicas
@property
def user_config(self) -> Any:
"""Current dynamic user-provided config options."""
return self._config.user_config
@property
def max_concurrent_queries(self) -> int:
"""Current max outstanding queries from each handle."""
return self._config.max_concurrent_queries
@property
def route_prefix(self) -> Optional[str]:
"""HTTP route prefix that this deploymet is exposed under."""
return self._route_prefix
@property
def ray_actor_options(self) -> Optional[Dict]:
"""Actor options such as resources required for each replica."""
return self._ray_actor_options
@property
def init_args(self) -> Tuple[Any]:
"""Arguments passed to the underlying class' constructor."""
return self._init_args
def __call__(self):
raise RuntimeError("Deployments cannot be constructed directly. "
"Use `deployment.deploy() instead.`")
def deploy(self, *init_args, _blocking=True):
"""Deploy this deployment.
"""Deploy or update this deployment.
Args:
*init_args (optional): args to pass to the class __init__
method. Not valid if this deployment wraps a function.
"""
if len(init_args) == 0 and self.init_args is not None:
init_args = self.init_args
if len(init_args) == 0 and self._init_args is not None:
init_args = self._init_args
return _get_global_client().deploy(
self.name,
self.backend_def,
self._name,
self._func_or_class,
*init_args,
ray_actor_options=self.ray_actor_options,
config=self.config,
version=self.version,
route_prefix=self.route_prefix,
ray_actor_options=self._ray_actor_options,
config=self._config,
version=self._version,
route_prefix=self._route_prefix,
_blocking=_blocking,
_internal=True)
def delete(self):
"""Delete this deployment."""
return _get_global_client().delete_deployment(
self.name, _internal=True)
self._name, _internal=True)
def get_handle(self, sync: Optional[bool] = True
) -> Union[RayServeHandle, RayServeSyncHandle]:
@ -1153,7 +1202,7 @@ class Deployment:
ServeHandle
"""
return _get_global_client().get_handle(
self.name,
self._name,
missing_ok=True,
sync=sync,
_internal_use_serve_request=False,
@ -1161,7 +1210,7 @@ class Deployment:
def options(
self,
backend_def: Optional[Callable] = None,
func_or_class: Optional[Callable] = None,
name: Optional[str] = None,
version: Optional[str] = None,
init_args: Optional[Tuple[Any]] = None,
@ -1176,7 +1225,7 @@ class Deployment:
Only those options passed in will be updated, all others will remain
unchanged from the existing deployment.
"""
new_config = self.config.copy()
new_config = self._config.copy()
if num_replicas is not None:
new_config.num_replicas = num_replicas
if user_config is not None:
@ -1184,29 +1233,29 @@ class Deployment:
if max_concurrent_queries is not None:
new_config.max_concurrent_queries = max_concurrent_queries
if backend_def is None:
backend_def = self.backend_def
if func_or_class is None:
func_or_class = self._func_or_class
if name is None:
name = self.name
name = self._name
if version is None:
version = self.version
version = self._version
if init_args is None:
init_args = self.init_args
init_args = self._init_args
if route_prefix is None:
if self.route_prefix == f"/{self.name}":
if self._route_prefix == f"/{self._name}":
route_prefix = None
else:
route_prefix = self.route_prefix
route_prefix = self._route_prefix
if ray_actor_options is None:
ray_actor_options = self.ray_actor_options
ray_actor_options = self._ray_actor_options
return Deployment(
backend_def,
func_or_class,
name,
new_config,
version=version,
@ -1218,21 +1267,21 @@ class Deployment:
def __eq__(self, other):
return all([
self.name == other.name,
self.version == other.version,
self.config == other.config,
self.init_args == other.init_args,
self.route_prefix == other.route_prefix,
self.ray_actor_options == self.ray_actor_options,
self._name == other._name,
self._version == other._version,
self._config == other._config,
self._init_args == other._init_args,
self._route_prefix == other._route_prefix,
self._ray_actor_options == self._ray_actor_options,
])
def __str__(self):
if self.route_prefix is None:
route_prefix = f"/{self.name}"
if self._route_prefix is None:
route_prefix = f"/{self._name}"
else:
route_prefix = self.route_prefix
return (f"Deployment(name={self.name},"
f"version={self.version},"
route_prefix = self._route_prefix
return (f"Deployment(name={self._name},"
f"version={self._version},"
f"route_prefix={route_prefix})")
def __repr__(self):
@ -1240,7 +1289,7 @@ class Deployment:
@overload
def deployment(backend_def: Callable) -> Deployment:
def deployment(func_or_class: Callable) -> Deployment:
pass
@ -1257,7 +1306,7 @@ def deployment(name: Optional[str] = None,
def deployment(
_backend_def: Optional[Callable] = None,
_func_or_class: Optional[Callable] = None,
name: Optional[str] = None,
version: Optional[str] = None,
num_replicas: Optional[int] = None,
@ -1322,10 +1371,10 @@ def deployment(
if max_concurrent_queries is not None:
config.max_concurrent_queries = max_concurrent_queries
def decorator(_backend_def):
def decorator(_func_or_class):
return Deployment(
_backend_def,
name if name is not None else _backend_def.__name__,
_func_or_class,
name if name is not None else _func_or_class.__name__,
config,
version=version,
init_args=init_args,
@ -1336,7 +1385,7 @@ def deployment(
# This handles both parametrized and non-parametrized usage of the
# decorator. See the @serve.batch code for more details.
return decorator(_backend_def) if callable(_backend_def) else decorator
return decorator(_func_or_class) if callable(_func_or_class) else decorator
def get_deployment(name: str) -> Deployment:

View file

@ -14,7 +14,7 @@ def test_serve_forceful_shutdown(serve_instance):
while True:
time.sleep(1000)
sleeper.config.experimental_graceful_shutdown_timeout_s = 0.1
sleeper._config.experimental_graceful_shutdown_timeout_s = 0.1
sleeper.deploy()
handle = sleeper.get_handle()
@ -34,8 +34,8 @@ def test_serve_graceful_shutdown(serve_instance):
await signal_actor.wait.remote()
return ""
Wait.config.experimental_graceful_shutdown_wait_loop_s = 0.5
Wait.config.experimental_graceful_shutdown_timeout_s = 1000
Wait._config.experimental_graceful_shutdown_wait_loop_s = 0.5
Wait._config.experimental_graceful_shutdown_timeout_s = 1000
Wait.deploy()
handle = Wait.get_handle()
refs = [handle.remote(signal) for _ in range(10)]

View file

@ -234,7 +234,7 @@ def test_redeploy_single_replica(serve_instance, use_handle):
# Redeploy new version. This should not go through until the old version
# replica completely stops.
V2 = V1.options(backend_def=V2, version="2")
V2 = V1.options(func_or_class=V2, version="2")
goal_ref = V2.deploy(_blocking=False)
assert not client._wait_for_goal(goal_ref, timeout=0.1)
@ -353,7 +353,7 @@ def test_redeploy_multiple_replicas(serve_instance, use_handle):
# Redeploy new version. Since there is one replica blocking, only one new
# replica should be started up.
V2 = V1.options(backend_def=V2, version="2")
V2 = V1.options(func_or_class=V2, version="2")
goal_ref = V2.deploy(_blocking=False)
assert not client._wait_for_goal(goal_ref, timeout=0.1)
responses3, blocking3 = make_nonblocking_calls(
@ -652,6 +652,37 @@ def test_input_validation():
Base.options(max_concurrent_queries=-1)
def test_deployment_properties():
class DClass():
pass
D = serve.deployment(
name="name",
init_args=("hello", 123),
version="version",
num_replicas=2,
user_config="hi",
max_concurrent_queries=100,
route_prefix="/hello",
ray_actor_options={"num_cpus": 2})(DClass)
assert D.name == "name"
assert D.init_args == ("hello", 123)
assert D.version == "version"
assert D.num_replicas == 2
assert D.user_config == "hi"
assert D.max_concurrent_queries == 100
assert D.route_prefix == "/hello"
assert D.ray_actor_options == {"num_cpus": 2}
D = serve.deployment(
version=None,
route_prefix=None,
)(DClass)
assert D.version is None
assert D.route_prefix is None
class TestGetDeployment:
def get_deployment(self, name, use_list_api):
if use_list_api:

View file

@ -47,7 +47,7 @@ def test_controller_failure(serve_instance):
ray.kill(serve.api._global_client._controller, no_restart=False)
function.options(backend_def=function2).deploy()
function.options(func_or_class=function2).deploy()
def check_controller_failure():
response = request_with_retries("/controller_failure/", timeout=30)
@ -96,7 +96,7 @@ def test_http_proxy_failure(serve_instance):
def function2(_):
return "hello2"
function.options(backend_def=function2).deploy()
function.options(func_or_class=function2).deploy()
def check_new():
for _ in range(10):