mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Serve] Move deployment clean up under serve.run() api (#24306)
On the ServeHead level, it is talking to serve api and controller to do deployment and clean up now. With this pr, it hides the deployment clean up logic into server.run() for code cleanness and easy to refactor in the future.
This commit is contained in:
parent
2aee537f92
commit
59debac670
4 changed files with 42 additions and 14 deletions
|
@ -56,21 +56,11 @@ class ServeHead(dashboard_utils.DashboardHeadModule):
|
|||
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=True)
|
||||
async def put_all_deployments(self, req: Request) -> Response:
|
||||
from ray import serve
|
||||
from ray.serve.context import get_global_client
|
||||
from ray.serve.application import Application
|
||||
|
||||
app = Application.from_dict(await req.json())
|
||||
serve.run(app, _blocking=False)
|
||||
|
||||
new_names = set()
|
||||
for deployment in app.deployments.values():
|
||||
new_names.add(deployment.name)
|
||||
|
||||
all_deployments = serve.list_deployments()
|
||||
all_names = set(all_deployments.keys())
|
||||
names_to_delete = all_names.difference(new_names)
|
||||
get_global_client().delete_deployments(names_to_delete)
|
||||
|
||||
return Response()
|
||||
|
||||
async def run(self, server):
|
||||
|
|
|
@ -641,10 +641,10 @@ def run(
|
|||
"route_prefix": deployment.route_prefix,
|
||||
"url": deployment.url,
|
||||
}
|
||||
|
||||
parameter_group.append(deployment_parameters)
|
||||
|
||||
client.deploy_group(parameter_group, _blocking=_blocking)
|
||||
client.deploy_group(
|
||||
parameter_group, _blocking=_blocking, remove_past_deployments=True
|
||||
)
|
||||
|
||||
if ingress is not None:
|
||||
return ingress.get_handle()
|
||||
|
|
|
@ -259,7 +259,12 @@ class ServeControllerClient:
|
|||
self.log_deployment_ready(name, version, url, tag)
|
||||
|
||||
@_ensure_connected
|
||||
def deploy_group(self, deployments: List[Dict], _blocking: bool = True):
|
||||
def deploy_group(
|
||||
self,
|
||||
deployments: List[Dict],
|
||||
_blocking: bool = True,
|
||||
remove_past_deployments: bool = True,
|
||||
):
|
||||
deployment_args_list = []
|
||||
for deployment in deployments:
|
||||
deployment_args_list.append(
|
||||
|
@ -295,6 +300,18 @@ class ServeControllerClient:
|
|||
self._wait_for_deployment_healthy(name)
|
||||
self.log_deployment_ready(name, version, url, tags[i])
|
||||
|
||||
if remove_past_deployments:
|
||||
# clean up the old deployments
|
||||
new_deployments_names = set()
|
||||
for deployment in deployments:
|
||||
new_deployments_names.add(deployment["name"])
|
||||
|
||||
all_deployments_names = set(self.list_deployments().keys())
|
||||
deployment_names_to_delete = all_deployments_names.difference(
|
||||
new_deployments_names
|
||||
)
|
||||
self.delete_deployments(deployment_names_to_delete)
|
||||
|
||||
@_ensure_connected
|
||||
def delete_deployments(self, names: Iterable[str], blocking: bool = True) -> None:
|
||||
ray.get(self._controller.delete_deployments.remote(names))
|
||||
|
|
|
@ -409,6 +409,27 @@ def test_run_get_ingress_node(serve_instance):
|
|||
assert ray.get(ingress_handle.remote()) == "got f"
|
||||
|
||||
|
||||
def test_run_delete_old_deployments(serve_instance):
|
||||
"""Check that serve.run() can remove all old deployments"""
|
||||
|
||||
@serve.deployment(name="f", route_prefix="/test1")
|
||||
def f():
|
||||
return "got f"
|
||||
|
||||
@serve.deployment(name="g", route_prefix="/test2")
|
||||
def g():
|
||||
return "got g"
|
||||
|
||||
ingress_handle = serve.run(f.bind())
|
||||
assert ray.get(ingress_handle.remote()) == "got f"
|
||||
|
||||
ingress_handle = serve.run(g.bind())
|
||||
assert ray.get(ingress_handle.remote()) == "got g"
|
||||
|
||||
assert "g" in serve.list_deployments()
|
||||
assert "f" not in serve.list_deployments()
|
||||
|
||||
|
||||
class TestSetOptions:
|
||||
def test_set_options_basic(self):
|
||||
@serve.deployment(
|
||||
|
|
Loading…
Add table
Reference in a new issue