From d042aa6d739f6e6b2fc04452e63237737e123abf Mon Sep 17 00:00:00 2001 From: Jae Sim Date: Thu, 20 May 2021 10:43:22 -0400 Subject: [PATCH] [serve] Add optional `prev_version` check to `.deploy()` for users to avoid race conditions (#15821) --- python/ray/serve/BUILD | 2 +- python/ray/serve/api.py | 32 ++++++++++-- python/ray/serve/controller.py | 30 ++++++++--- python/ray/serve/tests/test_deploy.py | 71 +++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 13 deletions(-) diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index 1626ef20d..359f4d89f 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -27,7 +27,7 @@ py_test( py_test( name = "test_deploy", - size = "medium", + size = "large", srcs = serve_tests_srcs, tags = ["exclusive"], deps = [":serve_lib"], diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 72aaf8c84..dc464309a 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -360,6 +360,7 @@ class Client: ray_actor_options: Optional[Dict] = None, config: Optional[Union[BackendConfig, Dict[str, Any]]] = None, version: Optional[str] = None, + prev_version: Optional[str] = None, route_prefix: Optional[str] = None, _blocking: Optional[bool] = True) -> Optional[GoalId]: if config is None: @@ -399,9 +400,9 @@ class Client: python_methods.append(method_name) goal_id, updating = ray.get( - self._controller.deploy.remote(name, backend_config, - replica_config, python_methods, - version, route_prefix)) + self._controller.deploy.remote( + name, backend_config, replica_config, python_methods, version, + prev_version, route_prefix)) if updating: msg = f"Updating deployment '{name}'" @@ -1060,6 +1061,7 @@ class Deployment: name: str, config: BackendConfig, version: Optional[str] = None, + prev_version: Optional[str] = None, init_args: Optional[Tuple[Any]] = None, route_prefix: Optional[str] = None, ray_actor_options: Optional[Dict] = None, @@ -1082,6 +1084,8 @@ class Deployment: raise TypeError("name must be a string.") if not (version is None or isinstance(version, str)): raise TypeError("version must be a string.") + if not (prev_version is None or isinstance(prev_version, str)): + raise TypeError("prev_version must be a string.") if not (init_args is None or isinstance(init_args, tuple)): raise TypeError("init_args must be a tuple.") if route_prefix is not None: @@ -1104,6 +1108,7 @@ class Deployment: self._func_or_class = func_or_class self._name = name self._version = version + self._prev_version = prev_version self._config = config self._init_args = init_args self._route_prefix = route_prefix @@ -1120,9 +1125,17 @@ class Deployment: If None, will be redeployed every time `.deploy()` is called. """ - return self._version + @property + def prev_version(self) -> Optional[str]: + """Existing version of deployment to target. + + If prev_version does not match with existing deployment + version, the deployment will fail to be deployed. + """ + return self._prev_version + @property def func_or_class(self) -> Callable: """Underlying class or function that this deployment wraps.""" @@ -1179,6 +1192,7 @@ class Deployment: ray_actor_options=self._ray_actor_options, config=self._config, version=self._version, + prev_version=self._prev_version, route_prefix=self._route_prefix, _blocking=_blocking, _internal=True) @@ -1213,6 +1227,7 @@ class Deployment: func_or_class: Optional[Callable] = None, name: Optional[str] = None, version: Optional[str] = None, + prev_version: Optional[str] = None, init_args: Optional[Tuple[Any]] = None, route_prefix: Optional[str] = None, num_replicas: Optional[int] = None, @@ -1259,6 +1274,7 @@ class Deployment: name, new_config, version=version, + prev_version=prev_version, init_args=init_args, route_prefix=route_prefix, ray_actor_options=ray_actor_options, @@ -1296,6 +1312,7 @@ def deployment(func_or_class: Callable) -> Deployment: @overload def deployment(name: Optional[str] = None, version: Optional[str] = None, + prev_version: Optional[str] = None, num_replicas: Optional[int] = None, init_args: Optional[Tuple[Any]] = None, ray_actor_options: Optional[Dict] = None, @@ -1309,6 +1326,7 @@ def deployment( _func_or_class: Optional[Callable] = None, name: Optional[str] = None, version: Optional[str] = None, + prev_version: Optional[str] = None, num_replicas: Optional[int] = None, init_args: Optional[Tuple[Any]] = None, route_prefix: Optional[str] = None, @@ -1326,6 +1344,11 @@ def deployment( with a version change, a rolling update of the replicas will be performed. If not provided, every deployment will be treated as a new version. + prev_version (Optional[str]): Version of the existing deployment which + is used as a precondition for the next deployment. If prev_version + does not match with the existing deployment's version, the + deployment will fail. If not provided, deployment procedure will + not check the existing deployment's version. num_replicas (Optional[int]): The number of processes to start up that will handle requests to this backend. Defaults to 1. init_args (Optional[Tuple]): Arguments to be passed to the class @@ -1377,6 +1400,7 @@ def deployment( name if name is not None else _func_or_class.__name__, config, version=version, + prev_version=prev_version, init_args=init_args, route_prefix=route_prefix, ray_actor_options=ray_actor_options, diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 4fed74e01..d1f2de749 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -230,10 +230,10 @@ class ServeController: for endpoint, info in self.endpoint_state.get_endpoints().items(): if (backend_tag in info["traffic"] or backend_tag in info["shadows"]): - raise ValueError("Backend '{}' is used by endpoint '{}' " - "and cannot be deleted. Please remove " - "the backend from all endpoints and try " - "again.".format(backend_tag, endpoint)) + raise ValueError(f"Backend '{backend_tag}' is used by " + f"endpoint '{endpoint}' and cannot be " + "deleted. Please remove the backend " + "from all endpoints and try again.") return self.backend_state.delete_backend(backend_tag, force_kill) async def update_backend_config(self, backend_tag: BackendTag, @@ -271,14 +271,28 @@ class ServeController: self.endpoint_state.shutdown() self.http_state.shutdown() - async def deploy(self, name: str, backend_config: BackendConfig, - replica_config: ReplicaConfig, python_methods: List[str], - version: Optional[str], route_prefix: Optional[str] - ) -> Tuple[Optional[GoalId], bool]: + async def deploy( + self, name: str, backend_config: BackendConfig, + replica_config: ReplicaConfig, python_methods: List[str], + version: Optional[str], prev_version: Optional[str], + route_prefix: Optional[str]) -> Tuple[Optional[GoalId], bool]: if route_prefix is not None: assert route_prefix.startswith("/") async with self.write_lock: + if prev_version is not None: + existing_backend_info = self.backend_state.get_backend(name) + if (existing_backend_info is None + or not existing_backend_info.version): + raise ValueError( + f"prev_version '{prev_version}' is specified but " + "there is no existing deployment.") + if existing_backend_info.version != prev_version: + raise ValueError( + f"prev_version '{prev_version}' " + "does not match with the existing " + f"version '{existing_backend_info.version}'.") + backend_info = BackendInfo( actor_def=ray.remote( create_backend_replica( diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 932fe49b3..2eaa74542 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -127,6 +127,77 @@ def test_deploy_no_version(serve_instance, use_handle): assert pid5 == pid4 +@pytest.mark.parametrize("use_handle", [True, False]) +def test_deploy_prev_version(serve_instance, use_handle): + name = "test" + + @serve.deployment(name=name) + def v1(*args): + return f"1|{os.getpid()}" + + def call(): + if use_handle: + ret = ray.get(v1.get_handle().remote()) + else: + ret = requests.get(f"http://localhost:8000/{name}").text + + return ret.split("|")[0], ret.split("|")[1] + + # Deploy with prev_version specified, where there is no existing deployment + with pytest.raises(ValueError): + v1.options(version="1", prev_version="0").deploy() + + v1.deploy() + val1, pid1 = call() + assert val1 == "1" + + @serve.deployment(name=name) + def v2(*args): + return f"2|{os.getpid()}" + + # Deploying without specifying prev_version should still be possible. + v2.deploy() + val2, pid2 = call() + assert val2 == "2" + assert pid2 != pid1 + + v2.options(version="1").deploy() + val3, pid3 = call() + assert val3 == "2" + assert pid3 != pid2 + + @serve.deployment(name=name) + def v3(*args): + return f"3|{os.getpid()}" + + # If prev_version does not match with the existing version, it should fail. + with pytest.raises(ValueError): + v3.options(version="2", prev_version="0").deploy() + + # If prev_version matches with the existing version, it should succeed. + v3.options(version="2", prev_version="1").deploy() + val4, pid4 = call() + assert val4 == "3" + assert pid4 != pid3 + + # Specifying the version should stop updates from happening. + v3.options(version="2").deploy() + val5, pid5 = call() + assert val5 == "3" + assert pid5 == pid4 + + v2.options(version="3", prev_version="2").deploy() + val6, pid6 = call() + assert val6 == "2" + assert pid6 != pid5 + + # Deploying without specifying prev_version should still be possible. + v1.deploy() + val7, pid7 = call() + assert val7 == "1" + assert pid7 != pid6 + + @pytest.mark.parametrize("use_handle", [True, False]) def test_config_change(serve_instance, use_handle): @serve.deployment(version="1")