mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Serve][Deployment Graph][Perf] Add minimal executor DAGNode (#24754)
closes #24475 Current deployment graph has big perf issues compare with using plain deployment handle, mostly because overhead of DAGNode traversal mechanism. We need this mechanism to empower DAG API, specially deeply nested objects in args where we rely on pickling; But meanwhile the nature of each execution becomes re-creating and replacing every `DAGNode` instances involved upon each execution, that incurs overhead. Some overhead is inevitable due to pickling and executing DAGNode python code, but they could be quite minimal. As I profiled earlier, pickling itself is quite fast for our benchmarks at magnitude of microseconds. Meanwhile the elephant in the room is DeploymentNode and its relatives are doing too much work in constructor that's beyond necessary, thus slowing everything down. So the fix is as simple as 1) Introduce a new set of executor dag node types that contains absolute minimal information that only preserves the DAG structure with traversal mechanism, and ability to call relevant deployment handles. 2) Add a simple new pass in our build() that generates and replaces nodes with executor dag to produce a final executor dag to run the graph. Current ray dag -> serve dag mixed a lot of stuff related to deployment generation and init args, in longer term we should remove them but our correctness depends on it so i rather leave it as separate PR. ### Current 10 node chain with deployment graph `.bind()` ``` chain_length: 10, num_clients: 1 latency_mean_ms: 41.05, latency_std_ms: 15.18 throughput_mean_tps: 27.5, throughput_std_tps: 3.2 ``` ### Using raw deployment handle without dag overhead ``` chain_length: 10, num_clients: 1 latency_mean_ms: 20.39, latency_std_ms: 4.57 throughput_mean_tps: 51.9, throughput_std_tps: 1.04 ``` ### After this PR: ``` chain_length: 10, num_clients: 1 latency_mean_ms: 20.35, latency_std_ms: 0.87 throughput_mean_tps: 48.4, throughput_std_tps: 1.43 ```
This commit is contained in:
parent
8b3451318c
commit
f27e85cd7d
13 changed files with 454 additions and 68 deletions
|
@ -49,6 +49,13 @@ class _PyObjScanner(ray.cloudpickle.CloudPickler):
|
|||
from ray.serve.pipeline.deployment_node import DeploymentNode
|
||||
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
|
||||
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
|
||||
from ray.serve.deployment_executor_node import DeploymentExecutorNode
|
||||
from ray.serve.deployment_method_executor_node import (
|
||||
DeploymentMethodExecutorNode,
|
||||
)
|
||||
from ray.serve.deployment_function_executor_node import (
|
||||
DeploymentFunctionExecutorNode,
|
||||
)
|
||||
|
||||
self.dispatch_table[FunctionNode] = self._reduce_dag_node
|
||||
self.dispatch_table[ClassNode] = self._reduce_dag_node
|
||||
|
@ -58,6 +65,11 @@ class _PyObjScanner(ray.cloudpickle.CloudPickler):
|
|||
self.dispatch_table[DeploymentNode] = self._reduce_dag_node
|
||||
self.dispatch_table[DeploymentMethodNode] = self._reduce_dag_node
|
||||
self.dispatch_table[DeploymentFunctionNode] = self._reduce_dag_node
|
||||
|
||||
self.dispatch_table[DeploymentExecutorNode] = self._reduce_dag_node
|
||||
self.dispatch_table[DeploymentMethodExecutorNode] = self._reduce_dag_node
|
||||
self.dispatch_table[DeploymentFunctionExecutorNode] = self._reduce_dag_node
|
||||
|
||||
super().__init__(self._buf)
|
||||
|
||||
def find_nodes(self, obj: Any) -> List["DAGNode"]:
|
||||
|
|
79
python/ray/serve/deployment_executor_node.py
Normal file
79
python/ray/serve/deployment_executor_node.py
Normal file
|
@ -0,0 +1,79 @@
|
|||
from typing import Any, Dict, List
|
||||
|
||||
from ray.experimental.dag import DAGNode
|
||||
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
|
||||
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY
|
||||
from ray.experimental.dag.format_utils import get_dag_node_str
|
||||
from ray.serve.handle import RayServeHandle
|
||||
|
||||
|
||||
class DeploymentExecutorNode(DAGNode):
|
||||
"""The lightweight executor DAGNode of DeploymentNode that optimizes for
|
||||
efficiency.
|
||||
|
||||
- We need Ray DAGNode's traversal and replacement mechanism to deal
|
||||
with deeply nested nodes as args in the DAG
|
||||
- Meanwhile, __init__, _copy_impl and _execute_impl are on the critical
|
||||
pass of execution for every request.
|
||||
|
||||
Therefore for serve we introduce a minimal weight node as the final product
|
||||
of DAG transformation, and will be used in actual execution as well as
|
||||
deployment.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
deployment_handle,
|
||||
dag_args, # Not deployment init args
|
||||
dag_kwargs, # Not deployment init kwargs
|
||||
):
|
||||
self._deployment_handle = deployment_handle
|
||||
super().__init__(dag_args, dag_kwargs, {}, {})
|
||||
|
||||
def _copy_impl(
|
||||
self,
|
||||
new_args: List[Any],
|
||||
new_kwargs: Dict[str, Any],
|
||||
new_options: Dict[str, Any],
|
||||
new_other_args_to_resolve: Dict[str, Any],
|
||||
) -> "DeploymentExecutorNode":
|
||||
return DeploymentExecutorNode(
|
||||
self._deployment_handle,
|
||||
new_args,
|
||||
new_kwargs,
|
||||
)
|
||||
|
||||
def _execute_impl(self, *args, **kwargs) -> RayServeHandle:
|
||||
"""Does not call into anything or produce a new value, as the time
|
||||
this function gets called, all child nodes are already resolved to
|
||||
ObjectRefs.
|
||||
"""
|
||||
return self._deployment_handle
|
||||
|
||||
def __getattr__(self, method_name: str):
|
||||
return DeploymentMethodExecutorNode(
|
||||
method_name,
|
||||
(),
|
||||
{},
|
||||
other_args_to_resolve={
|
||||
PARENT_CLASS_NODE_KEY: self,
|
||||
},
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return get_dag_node_str(self, str(self._deployment_handle))
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
return {
|
||||
DAGNODE_TYPE_KEY: DeploymentExecutorNode.__name__,
|
||||
"deployment_handle": self._deployment_handle,
|
||||
"args": self.get_args(),
|
||||
"kwargs": self.get_kwargs(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, input_json):
|
||||
assert input_json[DAGNODE_TYPE_KEY] == DeploymentExecutorNode.__name__
|
||||
return cls(
|
||||
input_json["deployment_handle"], input_json["args"], input_json["kwargs"]
|
||||
)
|
78
python/ray/serve/deployment_function_executor_node.py
Normal file
78
python/ray/serve/deployment_function_executor_node.py
Normal file
|
@ -0,0 +1,78 @@
|
|||
from typing import Any, Dict, List, Union
|
||||
|
||||
from ray import ObjectRef
|
||||
from ray.experimental.dag import DAGNode
|
||||
from ray.serve.handle import RayServeSyncHandle, RayServeHandle
|
||||
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY
|
||||
from ray.experimental.dag.format_utils import get_dag_node_str
|
||||
|
||||
|
||||
class DeploymentFunctionExecutorNode(DAGNode):
|
||||
"""The lightweight executor DAGNode of DeploymentFunctionNode that optimizes
|
||||
for efficiency.
|
||||
|
||||
- We need Ray DAGNode's traversal and replacement mechanism to deal
|
||||
with deeply nested nodes as args in the DAG
|
||||
- Meanwhile, __init__, _copy_impl and _execute_impl are on the critical
|
||||
pass of execution for every request.
|
||||
|
||||
Therefore for serve we introduce a minimal weight node as the final product
|
||||
of DAG transformation, and will be used in actual execution as well as
|
||||
deployment.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
deployment_function_handle: Union[RayServeSyncHandle, RayServeHandle],
|
||||
func_args,
|
||||
func_kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
func_args,
|
||||
func_kwargs,
|
||||
{},
|
||||
{},
|
||||
)
|
||||
self._deployment_function_handle = deployment_function_handle
|
||||
|
||||
def _copy_impl(
|
||||
self,
|
||||
new_args: List[Any],
|
||||
new_kwargs: Dict[str, Any],
|
||||
new_options: Dict[str, Any],
|
||||
new_other_args_to_resolve: Dict[str, Any],
|
||||
) -> "DeploymentFunctionExecutorNode":
|
||||
return DeploymentFunctionExecutorNode(
|
||||
self._deployment_function_handle, new_args, new_kwargs
|
||||
)
|
||||
|
||||
def _execute_impl(self, *args, **kwargs) -> ObjectRef:
|
||||
"""Executor of DeploymentNode getting called each time on dag.execute.
|
||||
|
||||
The execute implementation is recursive, that is, the method nodes will
|
||||
receive whatever this method returns. We return a handle here so method
|
||||
node can directly call upon.
|
||||
"""
|
||||
return self._deployment_function_handle.remote(
|
||||
*self._bound_args, **self._bound_kwargs
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return get_dag_node_str(self, str(self._deployment_function_handle))
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
return {
|
||||
DAGNODE_TYPE_KEY: DeploymentFunctionExecutorNode.__name__,
|
||||
"deployment_function_handle": self._deployment_function_handle,
|
||||
"args": self.get_args(),
|
||||
"kwargs": self.get_kwargs(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, input_json):
|
||||
assert input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionExecutorNode.__name__
|
||||
return cls(
|
||||
input_json["deployment_function_handle"],
|
||||
input_json["args"],
|
||||
input_json["kwargs"],
|
||||
)
|
84
python/ray/serve/deployment_method_executor_node.py
Normal file
84
python/ray/serve/deployment_method_executor_node.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
from typing import Any, Dict, List
|
||||
|
||||
from ray import ObjectRef
|
||||
from ray.experimental.dag import DAGNode
|
||||
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY
|
||||
from ray.experimental.dag.format_utils import get_dag_node_str
|
||||
|
||||
|
||||
class DeploymentMethodExecutorNode(DAGNode):
|
||||
"""The lightweight executor DAGNode of DeploymentMethodNode that optimizes
|
||||
for efficiency.
|
||||
|
||||
- We need Ray DAGNode's traversal and replacement mechanism to deal
|
||||
with deeply nested nodes as args in the DAG
|
||||
- Meanwhile, __init__, _copy_impl and _execute_impl are on the critical
|
||||
pass of execution for every request.
|
||||
|
||||
Therefore for serve we introduce a minimal weight node as the final product
|
||||
of DAG transformation, and will be used in actual execution as well as
|
||||
deployment.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
deployment_method_name: str,
|
||||
dag_args,
|
||||
dag_kwargs,
|
||||
other_args_to_resolve=None,
|
||||
):
|
||||
super().__init__(
|
||||
dag_args, dag_kwargs, {}, other_args_to_resolve=other_args_to_resolve
|
||||
)
|
||||
self._deployment_node_replaced_by_handle = other_args_to_resolve[
|
||||
PARENT_CLASS_NODE_KEY
|
||||
]
|
||||
self._deployment_method_name = deployment_method_name
|
||||
|
||||
def _copy_impl(
|
||||
self,
|
||||
new_args: List[Any],
|
||||
new_kwargs: Dict[str, Any],
|
||||
new_options: Dict[str, Any],
|
||||
new_other_args_to_resolve: Dict[str, Any],
|
||||
) -> "DeploymentMethodExecutorNode":
|
||||
return DeploymentMethodExecutorNode(
|
||||
self._deployment_method_name,
|
||||
new_args,
|
||||
new_kwargs,
|
||||
other_args_to_resolve=new_other_args_to_resolve,
|
||||
)
|
||||
|
||||
def _execute_impl(self, *args, **kwargs) -> ObjectRef:
|
||||
"""Executor of DeploymentNode getting called each time on dag.execute.
|
||||
|
||||
The execute implementation is recursive, that is, the method nodes will
|
||||
receive whatever this method returns. We return a handle here so method
|
||||
node can directly call upon.
|
||||
"""
|
||||
method_body = getattr(
|
||||
self._deployment_node_replaced_by_handle, self._deployment_method_name
|
||||
)
|
||||
return method_body.remote(*self._bound_args, **self._bound_kwargs)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return get_dag_node_str(self, str(self._deployment_method_name))
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
return {
|
||||
DAGNODE_TYPE_KEY: DeploymentMethodExecutorNode.__name__,
|
||||
"deployment_method_name": self._deployment_method_name,
|
||||
"args": self.get_args(),
|
||||
"kwargs": self.get_kwargs(),
|
||||
"other_args_to_resolve": self.get_other_args_to_resolve(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, input_json):
|
||||
assert input_json[DAGNODE_TYPE_KEY] == DeploymentMethodExecutorNode.__name__
|
||||
return cls(
|
||||
input_json["deployment_method_name"],
|
||||
input_json["args"],
|
||||
input_json["kwargs"],
|
||||
other_args_to_resolve=input_json["other_args_to_resolve"],
|
||||
)
|
|
@ -4,7 +4,9 @@ from ray.experimental.dag.dag_node import DAGNode
|
|||
from ray.serve.pipeline.generate import (
|
||||
transform_ray_dag_to_serve_dag,
|
||||
extract_deployments_from_serve_dag,
|
||||
transform_serve_dag_to_serve_executor_dag,
|
||||
process_ingress_deployment_in_serve_dag,
|
||||
generate_executor_dag_driver_deployment,
|
||||
)
|
||||
from ray.serve.deployment import Deployment
|
||||
from ray.experimental.dag.utils import DAGNodeNameGenerator
|
||||
|
@ -67,6 +69,19 @@ def build(ray_dag_root_node: DAGNode) -> List[Deployment]:
|
|||
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
|
||||
)
|
||||
deployments = extract_deployments_from_serve_dag(serve_root_dag)
|
||||
|
||||
# After Ray DAG is transformed to Serve DAG with deployments and their init
|
||||
# args filled, generate a minimal weight executor serve dag for perf
|
||||
serve_executor_root_dag = serve_root_dag.apply_recursive(
|
||||
transform_serve_dag_to_serve_executor_dag
|
||||
)
|
||||
root_driver_deployment = deployments[-1]
|
||||
new_driver_deployment = generate_executor_dag_driver_deployment(
|
||||
serve_executor_root_dag, root_driver_deployment
|
||||
)
|
||||
# Replace DAGDriver deployment with executor DAGDriver deployment
|
||||
deployments[-1] = new_driver_deployment
|
||||
# Validate and only expose HTTP for the endpoint
|
||||
deployments_with_http = process_ingress_deployment_in_serve_dag(deployments)
|
||||
|
||||
return deployments_with_http
|
||||
|
|
|
@ -7,8 +7,8 @@ from ray.experimental.dag.format_utils import get_dag_node_str
|
|||
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY
|
||||
from ray.serve.deployment import Deployment, schema_to_deployment
|
||||
from ray.serve.config import DeploymentConfig
|
||||
from ray.serve.schema import DeploymentSchema
|
||||
from ray.serve.handle import RayServeLazySyncHandle
|
||||
from ray.serve.schema import DeploymentSchema
|
||||
from ray.serve.utils import get_deployment_import_path
|
||||
|
||||
|
||||
|
@ -74,7 +74,7 @@ class DeploymentFunctionNode(DAGNode):
|
|||
_internal=True,
|
||||
)
|
||||
# TODO (jiaodong): Polish with async handle support later
|
||||
self._deployment_handle = RayServeLazySyncHandle(deployment_name)
|
||||
self._deployment_handle = RayServeLazySyncHandle(self._deployment.name)
|
||||
|
||||
def _copy_impl(
|
||||
self,
|
||||
|
@ -130,7 +130,7 @@ class DeploymentFunctionNode(DAGNode):
|
|||
@classmethod
|
||||
def from_json(cls, input_json):
|
||||
assert input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionNode.__name__
|
||||
node = cls(
|
||||
return cls(
|
||||
input_json["import_path"],
|
||||
input_json["deployment_name"],
|
||||
input_json["args"],
|
||||
|
@ -138,5 +138,3 @@ class DeploymentFunctionNode(DAGNode):
|
|||
input_json["options"],
|
||||
other_args_to_resolve=input_json["other_args_to_resolve"],
|
||||
)
|
||||
node._stable_uuid = input_json["uuid"]
|
||||
return node
|
||||
|
|
|
@ -21,7 +21,7 @@ class DeploymentMethodNode(DAGNode):
|
|||
):
|
||||
self._deployment = deployment
|
||||
self._deployment_method_name: str = deployment_method_name
|
||||
self._deployment_node = other_args_to_resolve[PARENT_CLASS_NODE_KEY]
|
||||
self._deployment_handle = other_args_to_resolve[PARENT_CLASS_NODE_KEY]
|
||||
super().__init__(
|
||||
method_args,
|
||||
method_kwargs,
|
||||
|
@ -48,7 +48,7 @@ class DeploymentMethodNode(DAGNode):
|
|||
def _execute_impl(self, *args, **kwargs):
|
||||
"""Executor of DeploymentMethodNode by ray.remote()"""
|
||||
# Execute with bound args.
|
||||
method_body = getattr(self._deployment_node, self._deployment_method_name)
|
||||
method_body = getattr(self._deployment_handle, self._deployment_method_name)
|
||||
return method_body.remote(
|
||||
*self._bound_args,
|
||||
**self._bound_kwargs,
|
||||
|
|
|
@ -3,11 +3,24 @@ import json
|
|||
from typing import Any, Callable, Dict, Optional, List, Tuple, Union
|
||||
|
||||
from ray.experimental.dag import DAGNode, InputNode
|
||||
from ray.serve.handle import RayServeLazySyncHandle, RayServeSyncHandle, RayServeHandle
|
||||
from ray.serve.deployment_executor_node import DeploymentExecutorNode
|
||||
from ray.serve.deployment_function_executor_node import (
|
||||
DeploymentFunctionExecutorNode,
|
||||
)
|
||||
from ray.serve.deployment_method_executor_node import (
|
||||
DeploymentMethodExecutorNode,
|
||||
)
|
||||
from ray.serve.handle import (
|
||||
RayServeLazySyncHandle,
|
||||
RayServeSyncHandle,
|
||||
RayServeHandle,
|
||||
)
|
||||
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
|
||||
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
|
||||
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
|
||||
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY, PARENT_CLASS_NODE_KEY
|
||||
from ray.experimental.dag.constants import (
|
||||
DAGNODE_TYPE_KEY,
|
||||
PARENT_CLASS_NODE_KEY,
|
||||
)
|
||||
from ray.experimental.dag.format_utils import get_dag_node_str
|
||||
from ray.serve.deployment import Deployment, schema_to_deployment
|
||||
from ray.serve.deployment_graph import RayServeDAGHandle
|
||||
|
@ -54,12 +67,24 @@ class DeploymentNode(DAGNode):
|
|||
# Thus we need convert all DeploymentNode used in init args into
|
||||
# deployment handles (executable and picklable) in ray serve DAG to make
|
||||
# serve DAG end to end executable.
|
||||
# TODO(jiaodong): This part does some magic for DAGDriver and will throw
|
||||
# error with weird pickle replace table error. Move this out.
|
||||
def replace_with_handle(node):
|
||||
if isinstance(node, DeploymentNode):
|
||||
return node._get_serve_deployment_handle(
|
||||
node._deployment, node._bound_other_args_to_resolve
|
||||
)
|
||||
elif isinstance(node, (DeploymentMethodNode, DeploymentFunctionNode)):
|
||||
elif isinstance(node, DeploymentExecutorNode):
|
||||
return node._deployment_handle
|
||||
elif isinstance(
|
||||
node,
|
||||
(
|
||||
DeploymentMethodNode,
|
||||
DeploymentMethodExecutorNode,
|
||||
DeploymentFunctionNode,
|
||||
DeploymentFunctionExecutorNode,
|
||||
),
|
||||
):
|
||||
from ray.serve.pipeline.json_serde import DAGNodeEncoder
|
||||
|
||||
serve_dag_root_json = json.dumps(node, cls=DAGNodeEncoder)
|
||||
|
@ -71,7 +96,15 @@ class DeploymentNode(DAGNode):
|
|||
) = self.apply_functional(
|
||||
[deployment_init_args, deployment_init_kwargs],
|
||||
predictate_fn=lambda node: isinstance(
|
||||
node, (DeploymentNode, DeploymentMethodNode, DeploymentFunctionNode)
|
||||
node,
|
||||
(
|
||||
DeploymentNode,
|
||||
DeploymentMethodNode,
|
||||
DeploymentFunctionNode,
|
||||
DeploymentExecutorNode,
|
||||
DeploymentFunctionExecutorNode,
|
||||
DeploymentMethodExecutorNode,
|
||||
),
|
||||
),
|
||||
apply_fn=replace_with_handle,
|
||||
)
|
||||
|
@ -117,9 +150,9 @@ class DeploymentNode(DAGNode):
|
|||
ray_actor_options=ray_actor_options,
|
||||
_internal=True,
|
||||
)
|
||||
self._deployment_handle: Union[
|
||||
RayServeLazySyncHandle, RayServeHandle, RayServeSyncHandle
|
||||
] = self._get_serve_deployment_handle(self._deployment, other_args_to_resolve)
|
||||
self._deployment_handle: RayServeLazySyncHandle = (
|
||||
self._get_serve_deployment_handle(self._deployment, other_args_to_resolve)
|
||||
)
|
||||
|
||||
def _copy_impl(
|
||||
self,
|
||||
|
@ -165,20 +198,8 @@ class DeploymentNode(DAGNode):
|
|||
return async handle only if user explicitly set
|
||||
USE_SYNC_HANDLE_KEY with value of False.
|
||||
"""
|
||||
# TODO (jiaodong): Support configurable async handle
|
||||
if USE_SYNC_HANDLE_KEY not in bound_other_args_to_resolve:
|
||||
# Return sync RayServeLazySyncHandle
|
||||
return RayServeLazySyncHandle(deployment.name)
|
||||
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is True:
|
||||
# Return sync RayServeSyncHandle
|
||||
return deployment.get_handle(sync=True)
|
||||
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is False:
|
||||
# Return async RayServeHandle
|
||||
return deployment.get_handle(sync=False)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value."
|
||||
)
|
||||
# TODO: (jiaodong) Support async handle
|
||||
return RayServeLazySyncHandle(deployment.name)
|
||||
|
||||
def _contains_input_node(self) -> bool:
|
||||
"""Check if InputNode is used in children DAGNodes with current node
|
||||
|
@ -234,7 +255,7 @@ class DeploymentNode(DAGNode):
|
|||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, input_json, object_hook=None):
|
||||
def from_json(cls, input_json):
|
||||
assert input_json[DAGNODE_TYPE_KEY] == DeploymentNode.__name__
|
||||
return cls(
|
||||
input_json["import_path"],
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import json
|
||||
from typing import List
|
||||
from collections import OrderedDict
|
||||
|
||||
|
@ -11,9 +12,14 @@ from ray.experimental.dag.function_node import FunctionNode
|
|||
from ray.experimental.dag.input_node import InputNode
|
||||
from ray.experimental.dag.utils import DAGNodeNameGenerator
|
||||
from ray.serve.deployment import Deployment
|
||||
from ray.serve.deployment_graph import RayServeDAGHandle
|
||||
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
|
||||
from ray.serve.pipeline.deployment_node import DeploymentNode
|
||||
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
|
||||
from ray.serve.deployment_executor_node import DeploymentExecutorNode
|
||||
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
|
||||
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
|
||||
from ray.serve.pipeline.json_serde import DAGNodeEncoder
|
||||
|
||||
|
||||
def transform_ray_dag_to_serve_dag(
|
||||
|
@ -95,6 +101,90 @@ def extract_deployments_from_serve_dag(
|
|||
return list(deployments.values())
|
||||
|
||||
|
||||
def transform_serve_dag_to_serve_executor_dag(serve_dag_root_node: DAGNode):
|
||||
"""Given a runnable serve dag with deployment init args and options
|
||||
processed, transform into an equivalent, but minimal dag optimized for
|
||||
execution.
|
||||
"""
|
||||
if isinstance(serve_dag_root_node, DeploymentNode):
|
||||
return DeploymentExecutorNode(
|
||||
serve_dag_root_node._deployment_handle,
|
||||
serve_dag_root_node.get_args(),
|
||||
serve_dag_root_node.get_kwargs(),
|
||||
)
|
||||
elif isinstance(serve_dag_root_node, DeploymentMethodNode):
|
||||
return DeploymentMethodExecutorNode(
|
||||
# Deployment method handle
|
||||
serve_dag_root_node._deployment_method_name,
|
||||
serve_dag_root_node.get_args(),
|
||||
serve_dag_root_node.get_kwargs(),
|
||||
other_args_to_resolve=serve_dag_root_node.get_other_args_to_resolve(),
|
||||
)
|
||||
elif isinstance(serve_dag_root_node, DeploymentFunctionNode):
|
||||
return DeploymentFunctionExecutorNode(
|
||||
serve_dag_root_node._deployment_handle,
|
||||
serve_dag_root_node.get_args(),
|
||||
serve_dag_root_node.get_kwargs(),
|
||||
)
|
||||
else:
|
||||
return serve_dag_root_node
|
||||
|
||||
|
||||
def generate_executor_dag_driver_deployment(
|
||||
serve_executor_dag_root_node: DAGNode, original_driver_deployment: Deployment
|
||||
):
|
||||
"""Given a transformed minimal execution serve dag, and original DAGDriver
|
||||
deployment, generate new DAGDriver deployment that uses new serve executor
|
||||
dag as init_args.
|
||||
|
||||
Args:
|
||||
serve_executor_dag_root_node (DeploymentExecutorNode): Transformed
|
||||
executor serve dag with only barebone deployment handles.
|
||||
original_driver_deployment (Deployment): User's original DAGDriver
|
||||
deployment that wrapped Ray DAG as init args.
|
||||
Returns:
|
||||
executor_dag_driver_deployment (Deployment): New DAGDriver deployment
|
||||
with executor serve dag as init args.
|
||||
"""
|
||||
|
||||
def replace_with_handle(node):
|
||||
if isinstance(node, DeploymentExecutorNode):
|
||||
return node._deployment_handle
|
||||
elif isinstance(
|
||||
node,
|
||||
(
|
||||
DeploymentMethodExecutorNode,
|
||||
DeploymentFunctionExecutorNode,
|
||||
),
|
||||
):
|
||||
serve_dag_root_json = json.dumps(node, cls=DAGNodeEncoder)
|
||||
return RayServeDAGHandle(serve_dag_root_json)
|
||||
|
||||
(
|
||||
replaced_deployment_init_args,
|
||||
replaced_deployment_init_kwargs,
|
||||
) = serve_executor_dag_root_node.apply_functional(
|
||||
[
|
||||
serve_executor_dag_root_node.get_args(),
|
||||
serve_executor_dag_root_node.get_kwargs(),
|
||||
],
|
||||
predictate_fn=lambda node: isinstance(
|
||||
node,
|
||||
(
|
||||
DeploymentExecutorNode,
|
||||
DeploymentFunctionExecutorNode,
|
||||
DeploymentMethodExecutorNode,
|
||||
),
|
||||
),
|
||||
apply_fn=replace_with_handle,
|
||||
)
|
||||
|
||||
return original_driver_deployment.options(
|
||||
init_args=replaced_deployment_init_args,
|
||||
init_kwargs=replaced_deployment_init_kwargs,
|
||||
)
|
||||
|
||||
|
||||
def get_pipeline_input_node(serve_dag_root_node: DAGNode):
|
||||
"""Return the InputNode singleton node from serve dag, and throw
|
||||
exceptions if we didn't find any, or found more than one.
|
||||
|
|
|
@ -15,6 +15,10 @@ from ray.experimental.dag import (
|
|||
from ray.serve.pipeline.deployment_node import DeploymentNode
|
||||
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
|
||||
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
|
||||
from ray.serve.deployment_executor_node import DeploymentExecutorNode
|
||||
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
|
||||
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
|
||||
|
||||
from ray.serve.schema import (
|
||||
DeploymentSchema,
|
||||
)
|
||||
|
@ -125,6 +129,21 @@ def dagnode_from_json(input_json: Any) -> Union[DAGNode, RayServeHandle, Any]:
|
|||
that we perserve the same parent node.
|
||||
- .options() does not contain any DAGNode type
|
||||
"""
|
||||
node_type_to_cls = {
|
||||
# Ray DAG Inputs
|
||||
InputNode.__name__: InputNode,
|
||||
InputAttributeNode.__name__: InputAttributeNode,
|
||||
# Ray DAG Nodes
|
||||
ClassMethodNode.__name__: ClassMethodNode,
|
||||
# Deployment transformation nodes
|
||||
DeploymentNode.__name__: DeploymentNode,
|
||||
DeploymentMethodNode.__name__: DeploymentMethodNode,
|
||||
DeploymentFunctionNode.__name__: DeploymentFunctionNode,
|
||||
# Deployment graph execution nodes
|
||||
DeploymentExecutorNode.__name__: DeploymentExecutorNode,
|
||||
DeploymentMethodExecutorNode.__name__: DeploymentMethodExecutorNode,
|
||||
DeploymentFunctionExecutorNode.__name__: DeploymentFunctionExecutorNode,
|
||||
}
|
||||
# Deserialize RayServeHandle type
|
||||
if SERVE_HANDLE_JSON_KEY in input_json:
|
||||
return serve_handle_from_json_dict(input_json)
|
||||
|
@ -141,18 +160,8 @@ def dagnode_from_json(input_json: Any) -> Union[DAGNode, RayServeHandle, Any]:
|
|||
HandleOptions(input_json["handle_options_method_name"]),
|
||||
)
|
||||
# Deserialize DAGNode type
|
||||
elif input_json[DAGNODE_TYPE_KEY] == InputNode.__name__:
|
||||
return InputNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] == InputAttributeNode.__name__:
|
||||
return InputAttributeNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] == ClassMethodNode.__name__:
|
||||
return ClassMethodNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] == DeploymentNode.__name__:
|
||||
return DeploymentNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] == DeploymentMethodNode.__name__:
|
||||
return DeploymentMethodNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] == DeploymentFunctionNode.__name__:
|
||||
return DeploymentFunctionNode.from_json(input_json)
|
||||
elif input_json[DAGNODE_TYPE_KEY] in node_type_to_cls:
|
||||
return node_type_to_cls[input_json[DAGNODE_TYPE_KEY]].from_json(input_json)
|
||||
else:
|
||||
# Class and Function nodes require original module as body.
|
||||
module_name, attr_name = parse_import_path(input_json["import_path"])
|
||||
|
|
|
@ -43,7 +43,7 @@ class Actor:
|
|||
return self.i
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip(reason="async handle not enabled yet")
|
||||
async def test_simple_deployment_async(serve_instance):
|
||||
"""Internal testing only for simple creation and execution.
|
||||
|
||||
|
@ -131,21 +131,6 @@ def test_no_input_node_as_init_args():
|
|||
)
|
||||
|
||||
|
||||
def test_invalid_use_sync_handle():
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match=f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value",
|
||||
):
|
||||
_ = DeploymentNode(
|
||||
Actor,
|
||||
"test",
|
||||
[],
|
||||
{},
|
||||
{},
|
||||
other_args_to_resolve={USE_SYNC_HANDLE_KEY: {"options_a": "hii"}},
|
||||
)
|
||||
|
||||
|
||||
def test_mix_sync_async_handle(serve_instance):
|
||||
# TODO: (jiaodong) Add complex multi-deployment tests from ray DAG.
|
||||
pass
|
||||
|
|
|
@ -2,14 +2,13 @@ import pytest
|
|||
|
||||
import ray
|
||||
from ray import serve
|
||||
from ray.serve.handle import RayServeLazySyncHandle
|
||||
from ray.experimental.dag import InputNode
|
||||
from ray.serve.handle import RayServeLazySyncHandle
|
||||
from ray.serve.pipeline.generate import (
|
||||
transform_ray_dag_to_serve_dag,
|
||||
extract_deployments_from_serve_dag,
|
||||
get_pipeline_input_node,
|
||||
)
|
||||
from ray.serve.pipeline.api import build
|
||||
from ray.serve.pipeline.tests.resources.test_modules import (
|
||||
Model,
|
||||
NESTED_HANDLE_KEY,
|
||||
|
@ -238,12 +237,21 @@ def test_get_pipeline_input_node():
|
|||
get_pipeline_input_node(serve_dag)
|
||||
|
||||
|
||||
def test_unique_name_reset_upon_build():
|
||||
def test_unique_name_reset_upon_build(serve_instance):
|
||||
ray_dag, _ = get_multi_instantiation_class_deployment_in_init_args_dag()
|
||||
deployments = build(ray_dag)
|
||||
with DAGNodeNameGenerator() as node_name_generator:
|
||||
serve_root_dag = ray_dag.apply_recursive(
|
||||
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
|
||||
)
|
||||
deployments = extract_deployments_from_serve_dag(serve_root_dag)
|
||||
assert deployments[0].name == "Model"
|
||||
assert deployments[1].name == "Model_1"
|
||||
deployments = build(ray_dag)
|
||||
|
||||
with DAGNodeNameGenerator() as node_name_generator:
|
||||
serve_root_dag = ray_dag.apply_recursive(
|
||||
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
|
||||
)
|
||||
deployments = extract_deployments_from_serve_dag(serve_root_dag)
|
||||
# Assert we don't keep increasing suffix id between build() calls
|
||||
assert deployments[0].name == "Model"
|
||||
assert deployments[1].name == "Model_1"
|
||||
|
|
|
@ -156,16 +156,23 @@ def test_chained_function(serve_instance, use_build):
|
|||
def func_2(input):
|
||||
return input * 2
|
||||
|
||||
@serve.deployment
|
||||
def func_3(input):
|
||||
return input * 3
|
||||
|
||||
with InputNode() as dag_input:
|
||||
output_1 = func_1.bind(dag_input)
|
||||
output_2 = func_2.bind(dag_input)
|
||||
serve_dag = combine.bind(output_1, output_2)
|
||||
output_3 = func_3.bind(output_2)
|
||||
ray_dag = combine.bind(output_1, output_2, kwargs_output=output_3)
|
||||
with pytest.raises(ValueError, match="Please provide a driver class"):
|
||||
_ = serve.run(serve_dag)
|
||||
_ = serve.run(ray_dag)
|
||||
|
||||
handle = serve.run(DAGDriver.bind(serve_dag, http_adapter=json_resolver))
|
||||
assert ray.get(handle.predict.remote(2)) == 6 # 2 + 2*2
|
||||
assert requests.post("http://127.0.0.1:8000/", json=2).json() == 6
|
||||
serve_dag = DAGDriver.bind(ray_dag, http_adapter=json_resolver)
|
||||
|
||||
handle = serve.run(serve_dag)
|
||||
assert ray.get(handle.predict.remote(2)) == 18 # 2 + 2*2 + (2*2) * 3
|
||||
assert requests.post("http://127.0.0.1:8000/", json=2).json() == 18
|
||||
|
||||
|
||||
@pytest.mark.parametrize("use_build", [False, True])
|
||||
|
|
Loading…
Add table
Reference in a new issue