mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
[serve] Add optional prev_version
check to .deploy()
for users to avoid race conditions (#15821)
This commit is contained in:
parent
03c7c530a9
commit
d042aa6d73
4 changed files with 122 additions and 13 deletions
|
@ -27,7 +27,7 @@ py_test(
|
|||
|
||||
py_test(
|
||||
name = "test_deploy",
|
||||
size = "medium",
|
||||
size = "large",
|
||||
srcs = serve_tests_srcs,
|
||||
tags = ["exclusive"],
|
||||
deps = [":serve_lib"],
|
||||
|
|
|
@ -360,6 +360,7 @@ class Client:
|
|||
ray_actor_options: Optional[Dict] = None,
|
||||
config: Optional[Union[BackendConfig, Dict[str, Any]]] = None,
|
||||
version: Optional[str] = None,
|
||||
prev_version: Optional[str] = None,
|
||||
route_prefix: Optional[str] = None,
|
||||
_blocking: Optional[bool] = True) -> Optional[GoalId]:
|
||||
if config is None:
|
||||
|
@ -399,9 +400,9 @@ class Client:
|
|||
python_methods.append(method_name)
|
||||
|
||||
goal_id, updating = ray.get(
|
||||
self._controller.deploy.remote(name, backend_config,
|
||||
replica_config, python_methods,
|
||||
version, route_prefix))
|
||||
self._controller.deploy.remote(
|
||||
name, backend_config, replica_config, python_methods, version,
|
||||
prev_version, route_prefix))
|
||||
|
||||
if updating:
|
||||
msg = f"Updating deployment '{name}'"
|
||||
|
@ -1060,6 +1061,7 @@ class Deployment:
|
|||
name: str,
|
||||
config: BackendConfig,
|
||||
version: Optional[str] = None,
|
||||
prev_version: Optional[str] = None,
|
||||
init_args: Optional[Tuple[Any]] = None,
|
||||
route_prefix: Optional[str] = None,
|
||||
ray_actor_options: Optional[Dict] = None,
|
||||
|
@ -1082,6 +1084,8 @@ class Deployment:
|
|||
raise TypeError("name must be a string.")
|
||||
if not (version is None or isinstance(version, str)):
|
||||
raise TypeError("version must be a string.")
|
||||
if not (prev_version is None or isinstance(prev_version, str)):
|
||||
raise TypeError("prev_version must be a string.")
|
||||
if not (init_args is None or isinstance(init_args, tuple)):
|
||||
raise TypeError("init_args must be a tuple.")
|
||||
if route_prefix is not None:
|
||||
|
@ -1104,6 +1108,7 @@ class Deployment:
|
|||
self._func_or_class = func_or_class
|
||||
self._name = name
|
||||
self._version = version
|
||||
self._prev_version = prev_version
|
||||
self._config = config
|
||||
self._init_args = init_args
|
||||
self._route_prefix = route_prefix
|
||||
|
@ -1120,9 +1125,17 @@ class Deployment:
|
|||
|
||||
If None, will be redeployed every time `.deploy()` is called.
|
||||
"""
|
||||
|
||||
return self._version
|
||||
|
||||
@property
|
||||
def prev_version(self) -> Optional[str]:
|
||||
"""Existing version of deployment to target.
|
||||
|
||||
If prev_version does not match with existing deployment
|
||||
version, the deployment will fail to be deployed.
|
||||
"""
|
||||
return self._prev_version
|
||||
|
||||
@property
|
||||
def func_or_class(self) -> Callable:
|
||||
"""Underlying class or function that this deployment wraps."""
|
||||
|
@ -1179,6 +1192,7 @@ class Deployment:
|
|||
ray_actor_options=self._ray_actor_options,
|
||||
config=self._config,
|
||||
version=self._version,
|
||||
prev_version=self._prev_version,
|
||||
route_prefix=self._route_prefix,
|
||||
_blocking=_blocking,
|
||||
_internal=True)
|
||||
|
@ -1213,6 +1227,7 @@ class Deployment:
|
|||
func_or_class: Optional[Callable] = None,
|
||||
name: Optional[str] = None,
|
||||
version: Optional[str] = None,
|
||||
prev_version: Optional[str] = None,
|
||||
init_args: Optional[Tuple[Any]] = None,
|
||||
route_prefix: Optional[str] = None,
|
||||
num_replicas: Optional[int] = None,
|
||||
|
@ -1259,6 +1274,7 @@ class Deployment:
|
|||
name,
|
||||
new_config,
|
||||
version=version,
|
||||
prev_version=prev_version,
|
||||
init_args=init_args,
|
||||
route_prefix=route_prefix,
|
||||
ray_actor_options=ray_actor_options,
|
||||
|
@ -1296,6 +1312,7 @@ def deployment(func_or_class: Callable) -> Deployment:
|
|||
@overload
|
||||
def deployment(name: Optional[str] = None,
|
||||
version: Optional[str] = None,
|
||||
prev_version: Optional[str] = None,
|
||||
num_replicas: Optional[int] = None,
|
||||
init_args: Optional[Tuple[Any]] = None,
|
||||
ray_actor_options: Optional[Dict] = None,
|
||||
|
@ -1309,6 +1326,7 @@ def deployment(
|
|||
_func_or_class: Optional[Callable] = None,
|
||||
name: Optional[str] = None,
|
||||
version: Optional[str] = None,
|
||||
prev_version: Optional[str] = None,
|
||||
num_replicas: Optional[int] = None,
|
||||
init_args: Optional[Tuple[Any]] = None,
|
||||
route_prefix: Optional[str] = None,
|
||||
|
@ -1326,6 +1344,11 @@ def deployment(
|
|||
with a version change, a rolling update of the replicas will be
|
||||
performed. If not provided, every deployment will be treated as a
|
||||
new version.
|
||||
prev_version (Optional[str]): Version of the existing deployment which
|
||||
is used as a precondition for the next deployment. If prev_version
|
||||
does not match with the existing deployment's version, the
|
||||
deployment will fail. If not provided, deployment procedure will
|
||||
not check the existing deployment's version.
|
||||
num_replicas (Optional[int]): The number of processes to start up that
|
||||
will handle requests to this backend. Defaults to 1.
|
||||
init_args (Optional[Tuple]): Arguments to be passed to the class
|
||||
|
@ -1377,6 +1400,7 @@ def deployment(
|
|||
name if name is not None else _func_or_class.__name__,
|
||||
config,
|
||||
version=version,
|
||||
prev_version=prev_version,
|
||||
init_args=init_args,
|
||||
route_prefix=route_prefix,
|
||||
ray_actor_options=ray_actor_options,
|
||||
|
|
|
@ -230,10 +230,10 @@ class ServeController:
|
|||
for endpoint, info in self.endpoint_state.get_endpoints().items():
|
||||
if (backend_tag in info["traffic"]
|
||||
or backend_tag in info["shadows"]):
|
||||
raise ValueError("Backend '{}' is used by endpoint '{}' "
|
||||
"and cannot be deleted. Please remove "
|
||||
"the backend from all endpoints and try "
|
||||
"again.".format(backend_tag, endpoint))
|
||||
raise ValueError(f"Backend '{backend_tag}' is used by "
|
||||
f"endpoint '{endpoint}' and cannot be "
|
||||
"deleted. Please remove the backend "
|
||||
"from all endpoints and try again.")
|
||||
return self.backend_state.delete_backend(backend_tag, force_kill)
|
||||
|
||||
async def update_backend_config(self, backend_tag: BackendTag,
|
||||
|
@ -271,14 +271,28 @@ class ServeController:
|
|||
self.endpoint_state.shutdown()
|
||||
self.http_state.shutdown()
|
||||
|
||||
async def deploy(self, name: str, backend_config: BackendConfig,
|
||||
replica_config: ReplicaConfig, python_methods: List[str],
|
||||
version: Optional[str], route_prefix: Optional[str]
|
||||
) -> Tuple[Optional[GoalId], bool]:
|
||||
async def deploy(
|
||||
self, name: str, backend_config: BackendConfig,
|
||||
replica_config: ReplicaConfig, python_methods: List[str],
|
||||
version: Optional[str], prev_version: Optional[str],
|
||||
route_prefix: Optional[str]) -> Tuple[Optional[GoalId], bool]:
|
||||
if route_prefix is not None:
|
||||
assert route_prefix.startswith("/")
|
||||
|
||||
async with self.write_lock:
|
||||
if prev_version is not None:
|
||||
existing_backend_info = self.backend_state.get_backend(name)
|
||||
if (existing_backend_info is None
|
||||
or not existing_backend_info.version):
|
||||
raise ValueError(
|
||||
f"prev_version '{prev_version}' is specified but "
|
||||
"there is no existing deployment.")
|
||||
if existing_backend_info.version != prev_version:
|
||||
raise ValueError(
|
||||
f"prev_version '{prev_version}' "
|
||||
"does not match with the existing "
|
||||
f"version '{existing_backend_info.version}'.")
|
||||
|
||||
backend_info = BackendInfo(
|
||||
actor_def=ray.remote(
|
||||
create_backend_replica(
|
||||
|
|
|
@ -127,6 +127,77 @@ def test_deploy_no_version(serve_instance, use_handle):
|
|||
assert pid5 == pid4
|
||||
|
||||
|
||||
@pytest.mark.parametrize("use_handle", [True, False])
|
||||
def test_deploy_prev_version(serve_instance, use_handle):
|
||||
name = "test"
|
||||
|
||||
@serve.deployment(name=name)
|
||||
def v1(*args):
|
||||
return f"1|{os.getpid()}"
|
||||
|
||||
def call():
|
||||
if use_handle:
|
||||
ret = ray.get(v1.get_handle().remote())
|
||||
else:
|
||||
ret = requests.get(f"http://localhost:8000/{name}").text
|
||||
|
||||
return ret.split("|")[0], ret.split("|")[1]
|
||||
|
||||
# Deploy with prev_version specified, where there is no existing deployment
|
||||
with pytest.raises(ValueError):
|
||||
v1.options(version="1", prev_version="0").deploy()
|
||||
|
||||
v1.deploy()
|
||||
val1, pid1 = call()
|
||||
assert val1 == "1"
|
||||
|
||||
@serve.deployment(name=name)
|
||||
def v2(*args):
|
||||
return f"2|{os.getpid()}"
|
||||
|
||||
# Deploying without specifying prev_version should still be possible.
|
||||
v2.deploy()
|
||||
val2, pid2 = call()
|
||||
assert val2 == "2"
|
||||
assert pid2 != pid1
|
||||
|
||||
v2.options(version="1").deploy()
|
||||
val3, pid3 = call()
|
||||
assert val3 == "2"
|
||||
assert pid3 != pid2
|
||||
|
||||
@serve.deployment(name=name)
|
||||
def v3(*args):
|
||||
return f"3|{os.getpid()}"
|
||||
|
||||
# If prev_version does not match with the existing version, it should fail.
|
||||
with pytest.raises(ValueError):
|
||||
v3.options(version="2", prev_version="0").deploy()
|
||||
|
||||
# If prev_version matches with the existing version, it should succeed.
|
||||
v3.options(version="2", prev_version="1").deploy()
|
||||
val4, pid4 = call()
|
||||
assert val4 == "3"
|
||||
assert pid4 != pid3
|
||||
|
||||
# Specifying the version should stop updates from happening.
|
||||
v3.options(version="2").deploy()
|
||||
val5, pid5 = call()
|
||||
assert val5 == "3"
|
||||
assert pid5 == pid4
|
||||
|
||||
v2.options(version="3", prev_version="2").deploy()
|
||||
val6, pid6 = call()
|
||||
assert val6 == "2"
|
||||
assert pid6 != pid5
|
||||
|
||||
# Deploying without specifying prev_version should still be possible.
|
||||
v1.deploy()
|
||||
val7, pid7 = call()
|
||||
assert val7 == "1"
|
||||
assert pid7 != pid6
|
||||
|
||||
|
||||
@pytest.mark.parametrize("use_handle", [True, False])
|
||||
def test_config_change(serve_instance, use_handle):
|
||||
@serve.deployment(version="1")
|
||||
|
|
Loading…
Add table
Reference in a new issue