diff --git a/python/ray/experimental/dag/py_obj_scanner.py b/python/ray/experimental/dag/py_obj_scanner.py index 7ea21eb46..3ce3bf92a 100644 --- a/python/ray/experimental/dag/py_obj_scanner.py +++ b/python/ray/experimental/dag/py_obj_scanner.py @@ -49,6 +49,13 @@ class _PyObjScanner(ray.cloudpickle.CloudPickler): from ray.serve.pipeline.deployment_node import DeploymentNode from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode + from ray.serve.deployment_executor_node import DeploymentExecutorNode + from ray.serve.deployment_method_executor_node import ( + DeploymentMethodExecutorNode, + ) + from ray.serve.deployment_function_executor_node import ( + DeploymentFunctionExecutorNode, + ) self.dispatch_table[FunctionNode] = self._reduce_dag_node self.dispatch_table[ClassNode] = self._reduce_dag_node @@ -58,6 +65,11 @@ class _PyObjScanner(ray.cloudpickle.CloudPickler): self.dispatch_table[DeploymentNode] = self._reduce_dag_node self.dispatch_table[DeploymentMethodNode] = self._reduce_dag_node self.dispatch_table[DeploymentFunctionNode] = self._reduce_dag_node + + self.dispatch_table[DeploymentExecutorNode] = self._reduce_dag_node + self.dispatch_table[DeploymentMethodExecutorNode] = self._reduce_dag_node + self.dispatch_table[DeploymentFunctionExecutorNode] = self._reduce_dag_node + super().__init__(self._buf) def find_nodes(self, obj: Any) -> List["DAGNode"]: diff --git a/python/ray/serve/deployment_executor_node.py b/python/ray/serve/deployment_executor_node.py new file mode 100644 index 000000000..61794a13a --- /dev/null +++ b/python/ray/serve/deployment_executor_node.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, List + +from ray.experimental.dag import DAGNode +from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode +from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY +from ray.experimental.dag.format_utils import get_dag_node_str +from ray.serve.handle import RayServeHandle + + +class DeploymentExecutorNode(DAGNode): + """The lightweight executor DAGNode of DeploymentNode that optimizes for + efficiency. + + - We need Ray DAGNode's traversal and replacement mechanism to deal + with deeply nested nodes as args in the DAG + - Meanwhile, __init__, _copy_impl and _execute_impl are on the critical + pass of execution for every request. + + Therefore for serve we introduce a minimal weight node as the final product + of DAG transformation, and will be used in actual execution as well as + deployment. + """ + + def __init__( + self, + deployment_handle, + dag_args, # Not deployment init args + dag_kwargs, # Not deployment init kwargs + ): + self._deployment_handle = deployment_handle + super().__init__(dag_args, dag_kwargs, {}, {}) + + def _copy_impl( + self, + new_args: List[Any], + new_kwargs: Dict[str, Any], + new_options: Dict[str, Any], + new_other_args_to_resolve: Dict[str, Any], + ) -> "DeploymentExecutorNode": + return DeploymentExecutorNode( + self._deployment_handle, + new_args, + new_kwargs, + ) + + def _execute_impl(self, *args, **kwargs) -> RayServeHandle: + """Does not call into anything or produce a new value, as the time + this function gets called, all child nodes are already resolved to + ObjectRefs. + """ + return self._deployment_handle + + def __getattr__(self, method_name: str): + return DeploymentMethodExecutorNode( + method_name, + (), + {}, + other_args_to_resolve={ + PARENT_CLASS_NODE_KEY: self, + }, + ) + + def __str__(self) -> str: + return get_dag_node_str(self, str(self._deployment_handle)) + + def to_json(self) -> Dict[str, Any]: + return { + DAGNODE_TYPE_KEY: DeploymentExecutorNode.__name__, + "deployment_handle": self._deployment_handle, + "args": self.get_args(), + "kwargs": self.get_kwargs(), + } + + @classmethod + def from_json(cls, input_json): + assert input_json[DAGNODE_TYPE_KEY] == DeploymentExecutorNode.__name__ + return cls( + input_json["deployment_handle"], input_json["args"], input_json["kwargs"] + ) diff --git a/python/ray/serve/deployment_function_executor_node.py b/python/ray/serve/deployment_function_executor_node.py new file mode 100644 index 000000000..381f58bf7 --- /dev/null +++ b/python/ray/serve/deployment_function_executor_node.py @@ -0,0 +1,78 @@ +from typing import Any, Dict, List, Union + +from ray import ObjectRef +from ray.experimental.dag import DAGNode +from ray.serve.handle import RayServeSyncHandle, RayServeHandle +from ray.experimental.dag.constants import DAGNODE_TYPE_KEY +from ray.experimental.dag.format_utils import get_dag_node_str + + +class DeploymentFunctionExecutorNode(DAGNode): + """The lightweight executor DAGNode of DeploymentFunctionNode that optimizes + for efficiency. + + - We need Ray DAGNode's traversal and replacement mechanism to deal + with deeply nested nodes as args in the DAG + - Meanwhile, __init__, _copy_impl and _execute_impl are on the critical + pass of execution for every request. + + Therefore for serve we introduce a minimal weight node as the final product + of DAG transformation, and will be used in actual execution as well as + deployment. + """ + + def __init__( + self, + deployment_function_handle: Union[RayServeSyncHandle, RayServeHandle], + func_args, + func_kwargs, + ): + super().__init__( + func_args, + func_kwargs, + {}, + {}, + ) + self._deployment_function_handle = deployment_function_handle + + def _copy_impl( + self, + new_args: List[Any], + new_kwargs: Dict[str, Any], + new_options: Dict[str, Any], + new_other_args_to_resolve: Dict[str, Any], + ) -> "DeploymentFunctionExecutorNode": + return DeploymentFunctionExecutorNode( + self._deployment_function_handle, new_args, new_kwargs + ) + + def _execute_impl(self, *args, **kwargs) -> ObjectRef: + """Executor of DeploymentNode getting called each time on dag.execute. + + The execute implementation is recursive, that is, the method nodes will + receive whatever this method returns. We return a handle here so method + node can directly call upon. + """ + return self._deployment_function_handle.remote( + *self._bound_args, **self._bound_kwargs + ) + + def __str__(self) -> str: + return get_dag_node_str(self, str(self._deployment_function_handle)) + + def to_json(self) -> Dict[str, Any]: + return { + DAGNODE_TYPE_KEY: DeploymentFunctionExecutorNode.__name__, + "deployment_function_handle": self._deployment_function_handle, + "args": self.get_args(), + "kwargs": self.get_kwargs(), + } + + @classmethod + def from_json(cls, input_json): + assert input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionExecutorNode.__name__ + return cls( + input_json["deployment_function_handle"], + input_json["args"], + input_json["kwargs"], + ) diff --git a/python/ray/serve/deployment_method_executor_node.py b/python/ray/serve/deployment_method_executor_node.py new file mode 100644 index 000000000..99d24600a --- /dev/null +++ b/python/ray/serve/deployment_method_executor_node.py @@ -0,0 +1,84 @@ +from typing import Any, Dict, List + +from ray import ObjectRef +from ray.experimental.dag import DAGNode +from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY +from ray.experimental.dag.format_utils import get_dag_node_str + + +class DeploymentMethodExecutorNode(DAGNode): + """The lightweight executor DAGNode of DeploymentMethodNode that optimizes + for efficiency. + + - We need Ray DAGNode's traversal and replacement mechanism to deal + with deeply nested nodes as args in the DAG + - Meanwhile, __init__, _copy_impl and _execute_impl are on the critical + pass of execution for every request. + + Therefore for serve we introduce a minimal weight node as the final product + of DAG transformation, and will be used in actual execution as well as + deployment. + """ + + def __init__( + self, + deployment_method_name: str, + dag_args, + dag_kwargs, + other_args_to_resolve=None, + ): + super().__init__( + dag_args, dag_kwargs, {}, other_args_to_resolve=other_args_to_resolve + ) + self._deployment_node_replaced_by_handle = other_args_to_resolve[ + PARENT_CLASS_NODE_KEY + ] + self._deployment_method_name = deployment_method_name + + def _copy_impl( + self, + new_args: List[Any], + new_kwargs: Dict[str, Any], + new_options: Dict[str, Any], + new_other_args_to_resolve: Dict[str, Any], + ) -> "DeploymentMethodExecutorNode": + return DeploymentMethodExecutorNode( + self._deployment_method_name, + new_args, + new_kwargs, + other_args_to_resolve=new_other_args_to_resolve, + ) + + def _execute_impl(self, *args, **kwargs) -> ObjectRef: + """Executor of DeploymentNode getting called each time on dag.execute. + + The execute implementation is recursive, that is, the method nodes will + receive whatever this method returns. We return a handle here so method + node can directly call upon. + """ + method_body = getattr( + self._deployment_node_replaced_by_handle, self._deployment_method_name + ) + return method_body.remote(*self._bound_args, **self._bound_kwargs) + + def __str__(self) -> str: + return get_dag_node_str(self, str(self._deployment_method_name)) + + def to_json(self) -> Dict[str, Any]: + return { + DAGNODE_TYPE_KEY: DeploymentMethodExecutorNode.__name__, + "deployment_method_name": self._deployment_method_name, + "args": self.get_args(), + "kwargs": self.get_kwargs(), + "other_args_to_resolve": self.get_other_args_to_resolve(), + } + + @classmethod + def from_json(cls, input_json): + assert input_json[DAGNODE_TYPE_KEY] == DeploymentMethodExecutorNode.__name__ + return cls( + input_json["deployment_method_name"], + input_json["args"], + input_json["kwargs"], + other_args_to_resolve=input_json["other_args_to_resolve"], + ) diff --git a/python/ray/serve/pipeline/api.py b/python/ray/serve/pipeline/api.py index afa96854d..00bb1e6ba 100644 --- a/python/ray/serve/pipeline/api.py +++ b/python/ray/serve/pipeline/api.py @@ -4,7 +4,9 @@ from ray.experimental.dag.dag_node import DAGNode from ray.serve.pipeline.generate import ( transform_ray_dag_to_serve_dag, extract_deployments_from_serve_dag, + transform_serve_dag_to_serve_executor_dag, process_ingress_deployment_in_serve_dag, + generate_executor_dag_driver_deployment, ) from ray.serve.deployment import Deployment from ray.experimental.dag.utils import DAGNodeNameGenerator @@ -67,6 +69,19 @@ def build(ray_dag_root_node: DAGNode) -> List[Deployment]: lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator) ) deployments = extract_deployments_from_serve_dag(serve_root_dag) + + # After Ray DAG is transformed to Serve DAG with deployments and their init + # args filled, generate a minimal weight executor serve dag for perf + serve_executor_root_dag = serve_root_dag.apply_recursive( + transform_serve_dag_to_serve_executor_dag + ) + root_driver_deployment = deployments[-1] + new_driver_deployment = generate_executor_dag_driver_deployment( + serve_executor_root_dag, root_driver_deployment + ) + # Replace DAGDriver deployment with executor DAGDriver deployment + deployments[-1] = new_driver_deployment + # Validate and only expose HTTP for the endpoint deployments_with_http = process_ingress_deployment_in_serve_dag(deployments) return deployments_with_http diff --git a/python/ray/serve/pipeline/deployment_function_node.py b/python/ray/serve/pipeline/deployment_function_node.py index 9690fbec1..0b1df10f0 100644 --- a/python/ray/serve/pipeline/deployment_function_node.py +++ b/python/ray/serve/pipeline/deployment_function_node.py @@ -7,8 +7,8 @@ from ray.experimental.dag.format_utils import get_dag_node_str from ray.experimental.dag.constants import DAGNODE_TYPE_KEY from ray.serve.deployment import Deployment, schema_to_deployment from ray.serve.config import DeploymentConfig -from ray.serve.schema import DeploymentSchema from ray.serve.handle import RayServeLazySyncHandle +from ray.serve.schema import DeploymentSchema from ray.serve.utils import get_deployment_import_path @@ -74,7 +74,7 @@ class DeploymentFunctionNode(DAGNode): _internal=True, ) # TODO (jiaodong): Polish with async handle support later - self._deployment_handle = RayServeLazySyncHandle(deployment_name) + self._deployment_handle = RayServeLazySyncHandle(self._deployment.name) def _copy_impl( self, @@ -130,7 +130,7 @@ class DeploymentFunctionNode(DAGNode): @classmethod def from_json(cls, input_json): assert input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionNode.__name__ - node = cls( + return cls( input_json["import_path"], input_json["deployment_name"], input_json["args"], @@ -138,5 +138,3 @@ class DeploymentFunctionNode(DAGNode): input_json["options"], other_args_to_resolve=input_json["other_args_to_resolve"], ) - node._stable_uuid = input_json["uuid"] - return node diff --git a/python/ray/serve/pipeline/deployment_method_node.py b/python/ray/serve/pipeline/deployment_method_node.py index 74072a26f..9b303a3e7 100644 --- a/python/ray/serve/pipeline/deployment_method_node.py +++ b/python/ray/serve/pipeline/deployment_method_node.py @@ -21,7 +21,7 @@ class DeploymentMethodNode(DAGNode): ): self._deployment = deployment self._deployment_method_name: str = deployment_method_name - self._deployment_node = other_args_to_resolve[PARENT_CLASS_NODE_KEY] + self._deployment_handle = other_args_to_resolve[PARENT_CLASS_NODE_KEY] super().__init__( method_args, method_kwargs, @@ -48,7 +48,7 @@ class DeploymentMethodNode(DAGNode): def _execute_impl(self, *args, **kwargs): """Executor of DeploymentMethodNode by ray.remote()""" # Execute with bound args. - method_body = getattr(self._deployment_node, self._deployment_method_name) + method_body = getattr(self._deployment_handle, self._deployment_method_name) return method_body.remote( *self._bound_args, **self._bound_kwargs, diff --git a/python/ray/serve/pipeline/deployment_node.py b/python/ray/serve/pipeline/deployment_node.py index dc6a254b2..64630fcbf 100644 --- a/python/ray/serve/pipeline/deployment_node.py +++ b/python/ray/serve/pipeline/deployment_node.py @@ -3,11 +3,24 @@ import json from typing import Any, Callable, Dict, Optional, List, Tuple, Union from ray.experimental.dag import DAGNode, InputNode -from ray.serve.handle import RayServeLazySyncHandle, RayServeSyncHandle, RayServeHandle +from ray.serve.deployment_executor_node import DeploymentExecutorNode +from ray.serve.deployment_function_executor_node import ( + DeploymentFunctionExecutorNode, +) +from ray.serve.deployment_method_executor_node import ( + DeploymentMethodExecutorNode, +) +from ray.serve.handle import ( + RayServeLazySyncHandle, + RayServeSyncHandle, + RayServeHandle, +) from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode -from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY -from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY +from ray.experimental.dag.constants import ( + DAGNODE_TYPE_KEY, + PARENT_CLASS_NODE_KEY, +) from ray.experimental.dag.format_utils import get_dag_node_str from ray.serve.deployment import Deployment, schema_to_deployment from ray.serve.deployment_graph import RayServeDAGHandle @@ -54,12 +67,24 @@ class DeploymentNode(DAGNode): # Thus we need convert all DeploymentNode used in init args into # deployment handles (executable and picklable) in ray serve DAG to make # serve DAG end to end executable. + # TODO(jiaodong): This part does some magic for DAGDriver and will throw + # error with weird pickle replace table error. Move this out. def replace_with_handle(node): if isinstance(node, DeploymentNode): return node._get_serve_deployment_handle( node._deployment, node._bound_other_args_to_resolve ) - elif isinstance(node, (DeploymentMethodNode, DeploymentFunctionNode)): + elif isinstance(node, DeploymentExecutorNode): + return node._deployment_handle + elif isinstance( + node, + ( + DeploymentMethodNode, + DeploymentMethodExecutorNode, + DeploymentFunctionNode, + DeploymentFunctionExecutorNode, + ), + ): from ray.serve.pipeline.json_serde import DAGNodeEncoder serve_dag_root_json = json.dumps(node, cls=DAGNodeEncoder) @@ -71,7 +96,15 @@ class DeploymentNode(DAGNode): ) = self.apply_functional( [deployment_init_args, deployment_init_kwargs], predictate_fn=lambda node: isinstance( - node, (DeploymentNode, DeploymentMethodNode, DeploymentFunctionNode) + node, + ( + DeploymentNode, + DeploymentMethodNode, + DeploymentFunctionNode, + DeploymentExecutorNode, + DeploymentFunctionExecutorNode, + DeploymentMethodExecutorNode, + ), ), apply_fn=replace_with_handle, ) @@ -117,9 +150,9 @@ class DeploymentNode(DAGNode): ray_actor_options=ray_actor_options, _internal=True, ) - self._deployment_handle: Union[ - RayServeLazySyncHandle, RayServeHandle, RayServeSyncHandle - ] = self._get_serve_deployment_handle(self._deployment, other_args_to_resolve) + self._deployment_handle: RayServeLazySyncHandle = ( + self._get_serve_deployment_handle(self._deployment, other_args_to_resolve) + ) def _copy_impl( self, @@ -165,20 +198,8 @@ class DeploymentNode(DAGNode): return async handle only if user explicitly set USE_SYNC_HANDLE_KEY with value of False. """ - # TODO (jiaodong): Support configurable async handle - if USE_SYNC_HANDLE_KEY not in bound_other_args_to_resolve: - # Return sync RayServeLazySyncHandle - return RayServeLazySyncHandle(deployment.name) - elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is True: - # Return sync RayServeSyncHandle - return deployment.get_handle(sync=True) - elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is False: - # Return async RayServeHandle - return deployment.get_handle(sync=False) - else: - raise ValueError( - f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value." - ) + # TODO: (jiaodong) Support async handle + return RayServeLazySyncHandle(deployment.name) def _contains_input_node(self) -> bool: """Check if InputNode is used in children DAGNodes with current node @@ -234,7 +255,7 @@ class DeploymentNode(DAGNode): } @classmethod - def from_json(cls, input_json, object_hook=None): + def from_json(cls, input_json): assert input_json[DAGNODE_TYPE_KEY] == DeploymentNode.__name__ return cls( input_json["import_path"], diff --git a/python/ray/serve/pipeline/generate.py b/python/ray/serve/pipeline/generate.py index ec0a99c9d..e67115601 100644 --- a/python/ray/serve/pipeline/generate.py +++ b/python/ray/serve/pipeline/generate.py @@ -1,3 +1,4 @@ +import json from typing import List from collections import OrderedDict @@ -11,9 +12,14 @@ from ray.experimental.dag.function_node import FunctionNode from ray.experimental.dag.input_node import InputNode from ray.experimental.dag.utils import DAGNodeNameGenerator from ray.serve.deployment import Deployment +from ray.serve.deployment_graph import RayServeDAGHandle from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode from ray.serve.pipeline.deployment_node import DeploymentNode from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode +from ray.serve.deployment_executor_node import DeploymentExecutorNode +from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode +from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode +from ray.serve.pipeline.json_serde import DAGNodeEncoder def transform_ray_dag_to_serve_dag( @@ -95,6 +101,90 @@ def extract_deployments_from_serve_dag( return list(deployments.values()) +def transform_serve_dag_to_serve_executor_dag(serve_dag_root_node: DAGNode): + """Given a runnable serve dag with deployment init args and options + processed, transform into an equivalent, but minimal dag optimized for + execution. + """ + if isinstance(serve_dag_root_node, DeploymentNode): + return DeploymentExecutorNode( + serve_dag_root_node._deployment_handle, + serve_dag_root_node.get_args(), + serve_dag_root_node.get_kwargs(), + ) + elif isinstance(serve_dag_root_node, DeploymentMethodNode): + return DeploymentMethodExecutorNode( + # Deployment method handle + serve_dag_root_node._deployment_method_name, + serve_dag_root_node.get_args(), + serve_dag_root_node.get_kwargs(), + other_args_to_resolve=serve_dag_root_node.get_other_args_to_resolve(), + ) + elif isinstance(serve_dag_root_node, DeploymentFunctionNode): + return DeploymentFunctionExecutorNode( + serve_dag_root_node._deployment_handle, + serve_dag_root_node.get_args(), + serve_dag_root_node.get_kwargs(), + ) + else: + return serve_dag_root_node + + +def generate_executor_dag_driver_deployment( + serve_executor_dag_root_node: DAGNode, original_driver_deployment: Deployment +): + """Given a transformed minimal execution serve dag, and original DAGDriver + deployment, generate new DAGDriver deployment that uses new serve executor + dag as init_args. + + Args: + serve_executor_dag_root_node (DeploymentExecutorNode): Transformed + executor serve dag with only barebone deployment handles. + original_driver_deployment (Deployment): User's original DAGDriver + deployment that wrapped Ray DAG as init args. + Returns: + executor_dag_driver_deployment (Deployment): New DAGDriver deployment + with executor serve dag as init args. + """ + + def replace_with_handle(node): + if isinstance(node, DeploymentExecutorNode): + return node._deployment_handle + elif isinstance( + node, + ( + DeploymentMethodExecutorNode, + DeploymentFunctionExecutorNode, + ), + ): + serve_dag_root_json = json.dumps(node, cls=DAGNodeEncoder) + return RayServeDAGHandle(serve_dag_root_json) + + ( + replaced_deployment_init_args, + replaced_deployment_init_kwargs, + ) = serve_executor_dag_root_node.apply_functional( + [ + serve_executor_dag_root_node.get_args(), + serve_executor_dag_root_node.get_kwargs(), + ], + predictate_fn=lambda node: isinstance( + node, + ( + DeploymentExecutorNode, + DeploymentFunctionExecutorNode, + DeploymentMethodExecutorNode, + ), + ), + apply_fn=replace_with_handle, + ) + + return original_driver_deployment.options( + init_args=replaced_deployment_init_args, + init_kwargs=replaced_deployment_init_kwargs, + ) + + def get_pipeline_input_node(serve_dag_root_node: DAGNode): """Return the InputNode singleton node from serve dag, and throw exceptions if we didn't find any, or found more than one. diff --git a/python/ray/serve/pipeline/json_serde.py b/python/ray/serve/pipeline/json_serde.py index 392e4cdfb..5a4085ab8 100644 --- a/python/ray/serve/pipeline/json_serde.py +++ b/python/ray/serve/pipeline/json_serde.py @@ -15,6 +15,10 @@ from ray.experimental.dag import ( from ray.serve.pipeline.deployment_node import DeploymentNode from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode +from ray.serve.deployment_executor_node import DeploymentExecutorNode +from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode +from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode + from ray.serve.schema import ( DeploymentSchema, ) @@ -125,6 +129,21 @@ def dagnode_from_json(input_json: Any) -> Union[DAGNode, RayServeHandle, Any]: that we perserve the same parent node. - .options() does not contain any DAGNode type """ + node_type_to_cls = { + # Ray DAG Inputs + InputNode.__name__: InputNode, + InputAttributeNode.__name__: InputAttributeNode, + # Ray DAG Nodes + ClassMethodNode.__name__: ClassMethodNode, + # Deployment transformation nodes + DeploymentNode.__name__: DeploymentNode, + DeploymentMethodNode.__name__: DeploymentMethodNode, + DeploymentFunctionNode.__name__: DeploymentFunctionNode, + # Deployment graph execution nodes + DeploymentExecutorNode.__name__: DeploymentExecutorNode, + DeploymentMethodExecutorNode.__name__: DeploymentMethodExecutorNode, + DeploymentFunctionExecutorNode.__name__: DeploymentFunctionExecutorNode, + } # Deserialize RayServeHandle type if SERVE_HANDLE_JSON_KEY in input_json: return serve_handle_from_json_dict(input_json) @@ -141,18 +160,8 @@ def dagnode_from_json(input_json: Any) -> Union[DAGNode, RayServeHandle, Any]: HandleOptions(input_json["handle_options_method_name"]), ) # Deserialize DAGNode type - elif input_json[DAGNODE_TYPE_KEY] == InputNode.__name__: - return InputNode.from_json(input_json) - elif input_json[DAGNODE_TYPE_KEY] == InputAttributeNode.__name__: - return InputAttributeNode.from_json(input_json) - elif input_json[DAGNODE_TYPE_KEY] == ClassMethodNode.__name__: - return ClassMethodNode.from_json(input_json) - elif input_json[DAGNODE_TYPE_KEY] == DeploymentNode.__name__: - return DeploymentNode.from_json(input_json) - elif input_json[DAGNODE_TYPE_KEY] == DeploymentMethodNode.__name__: - return DeploymentMethodNode.from_json(input_json) - elif input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionNode.__name__: - return DeploymentFunctionNode.from_json(input_json) + elif input_json[DAGNODE_TYPE_KEY] in node_type_to_cls: + return node_type_to_cls[input_json[DAGNODE_TYPE_KEY]].from_json(input_json) else: # Class and Function nodes require original module as body. module_name, attr_name = parse_import_path(input_json["import_path"]) diff --git a/python/ray/serve/pipeline/tests/test_deployment_node.py b/python/ray/serve/pipeline/tests/test_deployment_node.py index 84e029f72..e80397e13 100644 --- a/python/ray/serve/pipeline/tests/test_deployment_node.py +++ b/python/ray/serve/pipeline/tests/test_deployment_node.py @@ -43,7 +43,7 @@ class Actor: return self.i -@pytest.mark.asyncio +@pytest.mark.skip(reason="async handle not enabled yet") async def test_simple_deployment_async(serve_instance): """Internal testing only for simple creation and execution. @@ -131,21 +131,6 @@ def test_no_input_node_as_init_args(): ) -def test_invalid_use_sync_handle(): - with pytest.raises( - ValueError, - match=f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value", - ): - _ = DeploymentNode( - Actor, - "test", - [], - {}, - {}, - other_args_to_resolve={USE_SYNC_HANDLE_KEY: {"options_a": "hii"}}, - ) - - def test_mix_sync_async_handle(serve_instance): # TODO: (jiaodong) Add complex multi-deployment tests from ray DAG. pass diff --git a/python/ray/serve/pipeline/tests/test_generate.py b/python/ray/serve/pipeline/tests/test_generate.py index f91629c22..60368026b 100644 --- a/python/ray/serve/pipeline/tests/test_generate.py +++ b/python/ray/serve/pipeline/tests/test_generate.py @@ -2,14 +2,13 @@ import pytest import ray from ray import serve -from ray.serve.handle import RayServeLazySyncHandle from ray.experimental.dag import InputNode +from ray.serve.handle import RayServeLazySyncHandle from ray.serve.pipeline.generate import ( transform_ray_dag_to_serve_dag, extract_deployments_from_serve_dag, get_pipeline_input_node, ) -from ray.serve.pipeline.api import build from ray.serve.pipeline.tests.resources.test_modules import ( Model, NESTED_HANDLE_KEY, @@ -238,12 +237,21 @@ def test_get_pipeline_input_node(): get_pipeline_input_node(serve_dag) -def test_unique_name_reset_upon_build(): +def test_unique_name_reset_upon_build(serve_instance): ray_dag, _ = get_multi_instantiation_class_deployment_in_init_args_dag() - deployments = build(ray_dag) + with DAGNodeNameGenerator() as node_name_generator: + serve_root_dag = ray_dag.apply_recursive( + lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator) + ) + deployments = extract_deployments_from_serve_dag(serve_root_dag) assert deployments[0].name == "Model" assert deployments[1].name == "Model_1" - deployments = build(ray_dag) + + with DAGNodeNameGenerator() as node_name_generator: + serve_root_dag = ray_dag.apply_recursive( + lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator) + ) + deployments = extract_deployments_from_serve_dag(serve_root_dag) # Assert we don't keep increasing suffix id between build() calls assert deployments[0].name == "Model" assert deployments[1].name == "Model_1" diff --git a/python/ray/serve/tests/test_pipeline_dag.py b/python/ray/serve/tests/test_pipeline_dag.py index 36d00ced6..4e8edff8e 100644 --- a/python/ray/serve/tests/test_pipeline_dag.py +++ b/python/ray/serve/tests/test_pipeline_dag.py @@ -156,16 +156,23 @@ def test_chained_function(serve_instance, use_build): def func_2(input): return input * 2 + @serve.deployment + def func_3(input): + return input * 3 + with InputNode() as dag_input: output_1 = func_1.bind(dag_input) output_2 = func_2.bind(dag_input) - serve_dag = combine.bind(output_1, output_2) + output_3 = func_3.bind(output_2) + ray_dag = combine.bind(output_1, output_2, kwargs_output=output_3) with pytest.raises(ValueError, match="Please provide a driver class"): - _ = serve.run(serve_dag) + _ = serve.run(ray_dag) - handle = serve.run(DAGDriver.bind(serve_dag, http_adapter=json_resolver)) - assert ray.get(handle.predict.remote(2)) == 6 # 2 + 2*2 - assert requests.post("http://127.0.0.1:8000/", json=2).json() == 6 + serve_dag = DAGDriver.bind(ray_dag, http_adapter=json_resolver) + + handle = serve.run(serve_dag) + assert ray.get(handle.predict.remote(2)) == 18 # 2 + 2*2 + (2*2) * 3 + assert requests.post("http://127.0.0.1:8000/", json=2).json() == 18 @pytest.mark.parametrize("use_build", [False, True])