[Serve] Clean up deployment suffixes between pipeline build() calls (#23984)

This commit is contained in:
Jiao 2022-04-19 15:59:42 -07:00 committed by GitHub
parent 3af7fb6490
commit 5ba29f040f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 39 deletions

View file

@ -4,6 +4,7 @@ from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
process_ingress_deployment_in_serve_dag,
DeploymentNameGenerator,
)
from ray.serve.deployment import Deployment
@ -60,14 +61,19 @@ def build(ray_dag_root_node: DAGNode) -> List[Deployment]:
>>> deployments = build_app(ray_dag) # it can be method node
>>> deployments = build_app(m1) # or just a regular node.
"""
serve_root_dag = ray_dag_root_node.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag_root_node.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
deployments_with_http = process_ingress_deployment_in_serve_dag(deployments)
return deployments_with_http
def get_and_validate_ingress_deployment(deployments: List[Deployment]) -> Deployment:
def get_and_validate_ingress_deployment(
deployments: List[Deployment],
) -> Deployment:
"""Validation for http route prefixes for a list of deployments in pipeline.
Ensures:

View file

@ -1,5 +1,4 @@
from typing import Any, Dict, List, Union
import threading
from collections import OrderedDict
from ray.experimental.dag import (
@ -23,32 +22,35 @@ class DeploymentNameGenerator(object):
monotonic increasing id to it.
"""
__lock = threading.Lock()
__shared_state = dict()
def __init__(self):
self.name_to_suffix: Dict[str, int] = dict()
@classmethod
def get_deployment_name(cls, dag_node: Union[ClassNode, FunctionNode]):
def get_deployment_name(self, dag_node: Union[ClassNode, FunctionNode]):
assert isinstance(dag_node, (ClassNode, FunctionNode)), (
"get_deployment_name() should only be called on ClassNode or "
"FunctionNode instances."
)
with cls.__lock:
deployment_name = (
dag_node.get_options().get("name", None) or dag_node._body.__name__
)
if deployment_name not in cls.__shared_state:
cls.__shared_state[deployment_name] = 0
return deployment_name
else:
cls.__shared_state[deployment_name] += 1
suffix_num = cls.__shared_state[deployment_name]
return f"{deployment_name}_{suffix_num}"
deployment_name = (
dag_node.get_options().get("name", None) or dag_node._body.__name__
)
if deployment_name not in self.name_to_suffix:
self.name_to_suffix[deployment_name] = 0
return deployment_name
else:
self.name_to_suffix[deployment_name] += 1
suffix_num = self.name_to_suffix[deployment_name]
@classmethod
def reset(cls):
with cls.__lock:
cls.__shared_state = dict()
return f"{deployment_name}_{suffix_num}"
def reset(self):
self.name_to_suffix = dict()
def __enter__(self):
return self
def __exit__(self, *args):
self.reset()
def _remove_non_default_ray_actor_options(ray_actor_options: Dict[str, Any]):
@ -70,13 +72,15 @@ def _remove_non_default_ray_actor_options(ray_actor_options: Dict[str, Any]):
return ray_actor_options
def transform_ray_dag_to_serve_dag(dag_node):
def transform_ray_dag_to_serve_dag(
dag_node: DAGNode, deployment_name_generator: DeploymentNameGenerator
):
"""
Transform a Ray DAG to a Serve DAG. Map ClassNode to DeploymentNode with
ray decorated body passed in, ans ClassMethodNode to DeploymentMethodNode.
ray decorated body passed in, and ClassMethodNode to DeploymentMethodNode.
"""
if isinstance(dag_node, ClassNode):
deployment_name = DeploymentNameGenerator.get_deployment_name(dag_node)
deployment_name = deployment_name_generator.get_deployment_name(dag_node)
ray_actor_options = _remove_non_default_ray_actor_options(
dag_node.get_options()
)
@ -109,7 +113,7 @@ def transform_ray_dag_to_serve_dag(dag_node):
# TODO (jiaodong): We do not convert ray function to deployment function
# yet, revisit this later
) and dag_node.get_other_args_to_resolve().get("is_from_serve_deployment"):
deployment_name = DeploymentNameGenerator.get_deployment_name(dag_node)
deployment_name = deployment_name_generator.get_deployment_name(dag_node)
return DeploymentFunctionNode(
dag_node._body,
deployment_name,

View file

@ -8,7 +8,9 @@ from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
get_pipeline_input_node,
DeploymentNameGenerator,
)
from ray.serve.pipeline.api import build
from ray.serve.pipeline.tests.resources.test_modules import (
Model,
NESTED_HANDLE_KEY,
@ -44,6 +46,11 @@ def _validate_consistent_python_output(
)
def test_build_simple_func_dag(serve_instance):
ray_dag, _ = get_simple_func_dag()
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
@ -58,7 +65,10 @@ def test_build_simple_func_dag(serve_instance):
def test_simple_single_class(serve_instance):
ray_dag, _ = get_simple_class_with_class_method_dag()
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
deployments[0].deploy()
@ -72,7 +82,10 @@ def test_single_class_with_valid_ray_options(serve_instance):
model = Model.options(num_cpus=1, memory=1000).bind(2, ratio=0.3)
ray_dag = model.forward.bind(dag_input)
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
deployments[0].deploy()
@ -91,7 +104,10 @@ def test_single_class_with_invalid_deployment_options(serve_instance):
model = Model.options(name="my_deployment").bind(2, ratio=0.3)
ray_dag = model.forward.bind(dag_input)
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
with pytest.raises(
@ -103,7 +119,10 @@ def test_single_class_with_invalid_deployment_options(serve_instance):
def test_func_class_with_class_method_dag(serve_instance):
ray_dag, _ = get_func_class_with_class_method_dag()
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 2
for deployment in deployments:
@ -121,7 +140,10 @@ def test_multi_instantiation_class_deployment_in_init_args(serve_instance):
"""
ray_dag, _ = get_multi_instantiation_class_deployment_in_init_args_dag()
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
print(f"Serve DAG: \n{serve_root_dag}")
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 3
@ -140,7 +162,10 @@ def test_shared_deployment_handle(serve_instance):
"""
ray_dag, _ = get_shared_deployment_handle_dag()
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
print(f"Serve DAG: \n{serve_root_dag}")
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 2
@ -160,7 +185,10 @@ def test_multi_instantiation_class_nested_deployment_arg(serve_instance):
"""
ray_dag, _ = get_multi_instantiation_class_nested_deployment_arg_dag()
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
print(f"Serve DAG: \n{serve_root_dag}")
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 3
@ -185,7 +213,10 @@ def test_multi_instantiation_class_nested_deployment_arg(serve_instance):
def test_get_pipeline_input_node():
# 1) No InputNode found
ray_dag = combine.bind(1, 2)
serve_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
with pytest.raises(
AssertionError, match="There should be one and only one InputNode"
):
@ -200,10 +231,26 @@ def test_get_pipeline_input_node():
with pytest.raises(
AssertionError, match="Each DAG should only have one unique InputNode"
):
serve_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(
node, deployment_name_generator
)
)
get_pipeline_input_node(serve_dag)
def test_unique_name_reset_upon_build():
ray_dag, _ = get_multi_instantiation_class_deployment_in_init_args_dag()
deployments = build(ray_dag)
assert deployments[0].name == "Model"
assert deployments[1].name == "Model_1"
deployments = build(ray_dag)
# Assert we don't keep increasing suffix id between build() calls
assert deployments[0].name == "Model"
assert deployments[1].name == "Model_1"
if __name__ == "__main__":
import sys

View file

@ -28,6 +28,7 @@ from ray.serve.pipeline.tests.resources.test_modules import (
from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
DeploymentNameGenerator,
)
RayHandleLike = TypeVar("RayHandleLike")
@ -233,7 +234,10 @@ def _test_deployment_json_serde_helper(
3) Deserialized serve dag can extract correct number and definition of
serve deployments.
"""
serve_root_dag = ray_dag.apply_recursive(transform_ray_dag_to_serve_dag)
with DeploymentNameGenerator() as deployment_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, deployment_name_generator)
)
json_serialized = json.dumps(serve_root_dag, cls=DAGNodeEncoder)
deserialized_serve_root_dag_node = json.loads(
json_serialized, object_hook=dagnode_from_json

View file

@ -7,7 +7,6 @@ import random
import ray
from ray import serve
from ray.serve.pipeline.generate import DeploymentNameGenerator
# https://tools.ietf.org/html/rfc6335#section-6
MIN_DYNAMIC_PORT = 49152
@ -79,5 +78,3 @@ def serve_instance(_shared_serve_instance):
_shared_serve_instance.delete_deployments(serve.list_deployments().keys())
# Clear the ServeHandle cache between tests to avoid them piling up.
_shared_serve_instance.handle_cache.clear()
# Clear deployment generation shared state between tests
DeploymentNameGenerator.reset()