[Deployment Graph] Move Deployment creation outside to build function (#26129)

This commit is contained in:
Jiao 2022-07-05 16:38:02 -07:00 committed by GitHub
parent 34f1b32861
commit 89b0b82c13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 120 additions and 277 deletions

View file

@ -348,7 +348,7 @@ py_test(
) )
py_test( py_test(
name = "test_pipeline_driver", name = "test_deployment_graph_driver",
size = "medium", size = "medium",
srcs = serve_tests_srcs, srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"], tags = ["exclusive", "team:serve"],
@ -356,7 +356,7 @@ py_test(
) )
py_test( py_test(
name = "test_pipeline_dag", name = "test_deployment_graph",
size = "medium", size = "medium",
srcs = serve_tests_srcs, srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"], tags = ["exclusive", "team:serve"],
@ -380,7 +380,7 @@ py_test(
) )
py_test( py_test(
name = "test_pipeline_ingress_deployment", name = "test_deployment_graph_ingress_deployment",
size = "small", size = "small",
srcs = serve_tests_srcs, srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"], tags = ["exclusive", "team:serve"],
@ -388,7 +388,7 @@ py_test(
) )
py_test( py_test(
name = "test_pipeline_deployment_config", name = "test_deployment_graph_config",
size = "small", size = "small",
srcs = serve_tests_srcs, srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"], tags = ["exclusive", "team:serve"],
@ -396,7 +396,7 @@ py_test(
) )
py_test( py_test(
name = "test_pipeline_handle_serde", name = "test_deployment_graph_handle_serde",
size = "small", size = "small",
srcs = serve_tests_srcs, srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"], tags = ["exclusive", "team:serve"],

View file

@ -1,8 +1,9 @@
import inspect
import json import json
from typing import List from typing import List
from collections import OrderedDict from collections import OrderedDict
from ray.serve.deployment import Deployment from ray.serve.deployment import Deployment, schema_to_deployment
from ray.serve.deployment_graph import RayServeDAGHandle from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.deployment_method_node import DeploymentMethodNode from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_node import DeploymentNode from ray.serve.deployment_node import DeploymentNode
@ -11,6 +12,8 @@ from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
from ray.serve.json_serde import DAGNodeEncoder from ray.serve.json_serde import DAGNodeEncoder
from ray.serve.handle import RayServeLazySyncHandle
from ray.serve.schema import DeploymentSchema
from ray.dag import ( from ray.dag import (
@ -133,13 +136,82 @@ def transform_ray_dag_to_serve_dag(
""" """
if isinstance(dag_node, ClassNode): if isinstance(dag_node, ClassNode):
deployment_name = node_name_generator.get_node_name(dag_node) deployment_name = node_name_generator.get_node_name(dag_node)
# Deployment can be passed into other DAGNodes as init args. This is
# supported pattern in ray DAG that user can instantiate and pass class
# instances as init args to others.
# However in ray serve we send init args via .remote() that requires
# pickling, and all DAGNode types are not picklable by design.
# Thus we need convert all DeploymentNode used in init args into
# deployment handles (executable and picklable) in ray serve DAG to make
# serve DAG end to end executable.
def replace_with_handle(node):
if isinstance(node, DeploymentNode):
return RayServeLazySyncHandle(node._deployment.name)
elif isinstance(node, DeploymentExecutorNode):
return node._deployment_handle
(
replaced_deployment_init_args,
replaced_deployment_init_kwargs,
) = dag_node.apply_functional(
[dag_node.get_args(), dag_node.get_kwargs()],
predictate_fn=lambda node: isinstance(
node,
# We need to match and replace all DAGNodes even though they
# could be None, because no DAGNode replacement should run into
# re-resolved child DAGNodes, otherwise with KeyError
(
DeploymentNode,
DeploymentMethodNode,
DeploymentFunctionNode,
DeploymentExecutorNode,
DeploymentFunctionExecutorNode,
DeploymentMethodExecutorNode,
),
),
apply_fn=replace_with_handle,
)
# ClassNode is created via bind on serve.deployment decorated class
# with no serve specific configs.
deployment_schema: DeploymentSchema = dag_node._bound_other_args_to_resolve[
"deployment_schema"
]
deployment_shell: Deployment = schema_to_deployment(deployment_schema)
# Prefer user specified name to override the generated one.
if (
inspect.isclass(dag_node._body)
and deployment_shell.name != dag_node._body.__name__
):
deployment_name = deployment_shell.name
# Set the route prefix, prefer the one user supplied,
# otherwise set it to /deployment_name
if (
deployment_shell.route_prefix is None
or deployment_shell.route_prefix != f"/{deployment_shell.name}"
):
route_prefix = deployment_shell.route_prefix
else:
route_prefix = f"/{deployment_name}"
deployment = deployment_shell.options(
func_or_class=dag_node._body,
name=deployment_name,
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
route_prefix=route_prefix,
)
return DeploymentNode( return DeploymentNode(
dag_node._body, deployment,
deployment_name,
dag_node.get_args(), dag_node.get_args(),
dag_node.get_kwargs(), dag_node.get_kwargs(),
dag_node.get_options(), dag_node.get_options(),
# TODO: (jiaodong) Support .options(metadata=xxx) for deployment
other_args_to_resolve=dag_node.get_other_args_to_resolve(), other_args_to_resolve=dag_node.get_other_args_to_resolve(),
) )

View file

@ -1,23 +1,12 @@
import inspect from typing import Any, Dict, Optional, List, Tuple
from typing import Any, Callable, Dict, Optional, List, Tuple, Union
from ray.dag import DAGNode from ray.dag import DAGNode
from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_function_executor_node import (
DeploymentFunctionExecutorNode,
)
from ray.serve.deployment_method_executor_node import (
DeploymentMethodExecutorNode,
)
from ray.serve.handle import RayServeLazySyncHandle from ray.serve.handle import RayServeLazySyncHandle
from ray.dag.constants import PARENT_CLASS_NODE_KEY from ray.dag.constants import PARENT_CLASS_NODE_KEY
from ray.dag.format_utils import get_dag_node_str from ray.dag.format_utils import get_dag_node_str
from ray.serve.deployment_method_node import DeploymentMethodNode from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_function_node import DeploymentFunctionNode from ray.serve.deployment import Deployment
from ray.serve.deployment import Deployment, schema_to_deployment
from ray.serve.config import DeploymentConfig
from ray.serve.schema import DeploymentSchema
class DeploymentNode(DAGNode): class DeploymentNode(DAGNode):
@ -27,8 +16,7 @@ class DeploymentNode(DAGNode):
self, self,
# For serve structured deployment, deployment body can be import path # For serve structured deployment, deployment body can be import path
# to the class or function instead. # to the class or function instead.
func_or_class: Union[Callable, str], deployment: Deployment,
deployment_name: str,
deployment_init_args: Tuple[Any], deployment_init_args: Tuple[Any],
deployment_init_kwargs: Dict[str, Any], deployment_init_kwargs: Dict[str, Any],
ray_actor_options: Dict[str, Any], ray_actor_options: Dict[str, Any],
@ -41,87 +29,7 @@ class DeploymentNode(DAGNode):
ray_actor_options, ray_actor_options,
other_args_to_resolve=other_args_to_resolve, other_args_to_resolve=other_args_to_resolve,
) )
# Deployment can be passed into other DAGNodes as init args. This is self._deployment = deployment
# supported pattern in ray DAG that user can instantiate and pass class
# instances as init args to others.
# However in ray serve we send init args via .remote() that requires
# pickling, and all DAGNode types are not picklable by design.
# Thus we need convert all DeploymentNode used in init args into
# deployment handles (executable and picklable) in ray serve DAG to make
# serve DAG end to end executable.
# TODO(jiaodong): This part does some magic for DAGDriver and will throw
# error with weird pickle replace table error. Move this out.
def replace_with_handle(node):
if isinstance(node, DeploymentNode):
return RayServeLazySyncHandle(node._deployment.name)
elif isinstance(node, DeploymentExecutorNode):
return node._deployment_handle
(
replaced_deployment_init_args,
replaced_deployment_init_kwargs,
) = self.apply_functional(
[deployment_init_args, deployment_init_kwargs],
predictate_fn=lambda node: isinstance(
node,
# We need to match and replace all DAGNodes even though they
# could be None, because no DAGNode replacement should run into
# re-resolved child DAGNodes, otherwise with KeyError
(
DeploymentNode,
DeploymentMethodNode,
DeploymentFunctionNode,
DeploymentExecutorNode,
DeploymentFunctionExecutorNode,
DeploymentMethodExecutorNode,
),
),
apply_fn=replace_with_handle,
)
if "deployment_schema" in self._bound_other_args_to_resolve:
deployment_schema: DeploymentSchema = self._bound_other_args_to_resolve[
"deployment_schema"
]
deployment_shell = schema_to_deployment(deployment_schema)
# Prefer user specified name to override the generated one.
if (
inspect.isclass(func_or_class)
and deployment_shell.name != func_or_class.__name__
):
deployment_name = deployment_shell.name
# Set the route prefix, prefer the one user supplied,
# otherwise set it to /deployment_name
if (
deployment_shell.route_prefix is None
or deployment_shell.route_prefix != f"/{deployment_shell.name}"
):
route_prefix = deployment_shell.route_prefix
else:
route_prefix = f"/{deployment_name}"
self._deployment = deployment_shell.options(
func_or_class=func_or_class,
name=deployment_name,
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
route_prefix=route_prefix,
)
else:
self._deployment: Deployment = Deployment(
func_or_class,
deployment_name,
# TODO: (jiaodong) Support deployment config from user input
DeploymentConfig(),
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
ray_actor_options=ray_actor_options,
_internal=True,
)
self._deployment_handle = RayServeLazySyncHandle(self._deployment.name) self._deployment_handle = RayServeLazySyncHandle(self._deployment.name)
def _copy_impl( def _copy_impl(
@ -132,8 +40,7 @@ class DeploymentNode(DAGNode):
new_other_args_to_resolve: Dict[str, Any], new_other_args_to_resolve: Dict[str, Any],
): ):
return DeploymentNode( return DeploymentNode(
self._deployment.func_or_class, self._deployment,
self._deployment.name,
new_args, new_args,
new_kwargs, new_kwargs,
new_options, new_options,

View file

@ -6,12 +6,13 @@ and structured deployment.
from typing import TypeVar from typing import TypeVar
import ray import ray
from ray import serve
RayHandleLike = TypeVar("RayHandleLike") RayHandleLike = TypeVar("RayHandleLike")
NESTED_HANDLE_KEY = "nested_handle" NESTED_HANDLE_KEY = "nested_handle"
@ray.remote @serve.deployment
class ClassHello: class ClassHello:
def __init__(self): def __init__(self):
pass pass
@ -20,7 +21,7 @@ class ClassHello:
return "hello" return "hello"
@ray.remote @serve.deployment
class Model: class Model:
def __init__(self, weight: int, ratio: float = None): def __init__(self, weight: int, ratio: float = None):
self.weight = weight self.weight = weight
@ -34,7 +35,7 @@ class Model:
return self.ratio * self.weight * input_data return self.ratio * self.weight * input_data
@ray.remote @serve.deployment
class Combine: class Combine:
def __init__( def __init__(
self, self,
@ -51,7 +52,7 @@ class Combine:
return sum(ray.get([r1_ref, r2_ref])) return sum(ray.get([r1_ref, r2_ref]))
@ray.remote @serve.deployment
class Counter: class Counter:
def __init__(self, val): def __init__(self, val):
self.val = val self.val = val
@ -63,17 +64,17 @@ class Counter:
self.val += inc self.val += inc
@ray.remote @serve.deployment
def fn_hello(): def fn_hello():
return "hello" return "hello"
@ray.remote @serve.deployment
def fn(val, incr=0): def fn(val, incr=0):
return val + incr return val + incr
@ray.remote @serve.deployment
def combine(m1_output, m2_output, kwargs_output=0): def combine(m1_output, m2_output, kwargs_output=0):
return m1_output + m2_output + kwargs_output return m1_output + m2_output + kwargs_output

View file

@ -77,43 +77,11 @@ def test_simple_single_class(serve_instance):
) )
def test_single_class_with_valid_ray_options(serve_instance):
with InputNode() as dag_input:
model = Model.options(num_cpus=1, memory=1000).bind(2, ratio=0.3)
ray_dag = model.forward.bind(dag_input)
with _DAGNodeNameGenerator() as node_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
deployments[0].deploy()
_validate_consistent_python_output(
deployments[0], ray_dag, deployments[0].name, input=1, output=0.6
)
deployment = serve.get_deployment(deployments[0].name)
assert deployment.ray_actor_options.get("num_cpus") == 1
assert deployment.ray_actor_options.get("memory") == 1000
assert deployment.ray_actor_options.get("runtime_env") == {}
def test_single_class_with_invalid_deployment_options(serve_instance): def test_single_class_with_invalid_deployment_options(serve_instance):
with InputNode() as dag_input: with pytest.raises(TypeError, match="name must be a string"):
model = Model.options(name="my_deployment").bind(2, ratio=0.3) with InputNode() as dag_input:
ray_dag = model.forward.bind(dag_input) model = Model.options(name=123).bind(2, ratio=0.3)
_ = model.forward.bind(dag_input)
with _DAGNodeNameGenerator() as node_name_generator:
serve_root_dag = ray_dag.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
with pytest.raises(
ValueError, match="Specifying 'name' in ray_actor_options is not allowed"
):
deployments[0].deploy()
def test_func_class_with_class_method_dag(serve_instance): def test_func_class_with_class_method_dag(serve_instance):
@ -127,7 +95,7 @@ def test_func_class_with_class_method_dag(serve_instance):
serve_executor_root_dag = serve_root_dag.apply_recursive( serve_executor_root_dag = serve_root_dag.apply_recursive(
transform_serve_dag_to_serve_executor_dag transform_serve_dag_to_serve_executor_dag
) )
assert len(deployments) == 2 assert len(deployments) == 3
for deployment in deployments: for deployment in deployments:
deployment.deploy() deployment.deploy()

View file

@ -32,7 +32,7 @@ def test_loading_check():
load_http_adapter(func) load_http_adapter(func)
loaded_my_resolver = load_http_adapter( loaded_my_resolver = load_http_adapter(
"ray.serve.tests.test_pipeline_driver.my_resolver" "ray.serve.tests.test_deployment_graph_driver.my_resolver"
) )
assert (loaded_my_resolver == my_resolver) or ( assert (loaded_my_resolver == my_resolver) or (
loaded_my_resolver.__code__.co_code == my_resolver.__code__.co_code loaded_my_resolver.__code__.co_code == my_resolver.__code__.co_code

View file

@ -15,7 +15,6 @@ from ray.serve.handle import (
from ray.serve.json_serde import ( from ray.serve.json_serde import (
DAGNodeEncoder, DAGNodeEncoder,
dagnode_from_json, dagnode_from_json,
DAGNODE_TYPE_KEY,
) )
from ray.serve.tests.resources.test_modules import ( from ray.serve.tests.resources.test_modules import (
Model, Model,
@ -35,59 +34,6 @@ from ray.serve.deployment_graph_build import (
RayHandleLike = TypeVar("RayHandleLike") RayHandleLike = TypeVar("RayHandleLike")
def _test_execution_class_node_ClassHello(original_dag_node, deserialized_dag_node):
# Creates actor of ClassHello
original_actor = original_dag_node.execute()
deserialized_actor = deserialized_dag_node.execute()
assert ray.get(original_actor.hello.remote()) == ray.get(
deserialized_actor.hello.remote()
)
def _test_execution_class_node_Model(original_dag_node, deserialized_dag_node):
# Creates actor of Model
original_actor = original_dag_node.execute()
deserialized_actor = deserialized_dag_node.execute()
assert ray.get(original_actor.forward.remote(2)) == ray.get(
deserialized_actor.forward.remote(2)
)
def _test_execution_function_node(original_dag_node, deserialized_dag_node):
assert ray.get(deserialized_dag_node.execute()) == ray.get(
original_dag_node.execute()
)
def _test_json_serde_helper(
original_dag_node, executor_fn=None, expected_json_dict=None
):
"""Helpful function to test full round the the following behavior:
1) Ray DAG node can go through full JSON serde cycle
2) Ray DAG node and deserialized DAG node produces same output
3) Ray DAG node can go through multiple rounds of JSON serde and still
provides the same value as if it's only JSON serialized once
"""
json_serialized = json.dumps(original_dag_node, cls=DAGNodeEncoder)
assert json_serialized == json.dumps(expected_json_dict)
deserialized_dag_node = json.loads(json_serialized, object_hook=dagnode_from_json)
executor_fn(original_dag_node, deserialized_dag_node)
# Ensure same node can produce exactly the same results on FunctionNode
# from one or multiple rounds of serialization
multiple_json_serialized = json.dumps(
json.loads(
json.dumps(original_dag_node, cls=DAGNodeEncoder),
object_hook=dagnode_from_json,
),
cls=DAGNodeEncoder,
)
assert multiple_json_serialized == json_serialized
def test_non_json_serializable_args(): def test_non_json_serializable_args():
"""Use non-JSON serializable object in Ray DAG and ensure we throw exception """Use non-JSON serializable object in Ray DAG and ensure we throw exception
with reasonable error messages. with reasonable error messages.
@ -119,48 +65,21 @@ def test_simple_function_node_json_serde(serve_instance):
- Simple function with args + kwargs, all primitive types - Simple function with args + kwargs, all primitive types
""" """
original_dag_node = combine.bind(1, 2) original_dag_node = combine.bind(1, 2)
_test_json_serde_helper( _test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_function_node, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.tests.resources.test_modules.combine",
"args": [1, 2],
"kwargs": {},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
original_dag_node = combine.bind(1, 2, kwargs_output=3) original_dag_node = combine.bind(1, 2, kwargs_output=3)
_test_json_serde_helper( _test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_function_node, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.tests.resources.test_modules.combine",
"args": [1, 2],
"kwargs": {"kwargs_output": 3},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
original_dag_node = fn_hello.bind() original_dag_node = fn_hello.bind()
_test_json_serde_helper( _test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_function_node, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.tests.resources.test_modules.fn_hello",
"args": [],
"kwargs": {},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
@ -178,49 +97,25 @@ def test_simple_class_node_json_serde(serve_instance):
- Simple class with args + kwargs, all primitive types - Simple class with args + kwargs, all primitive types
- Simple chain of class method calls, all primitive types - Simple chain of class method calls, all primitive types
""" """
original_dag_node = ClassHello.bind() hello_actor = ClassHello.bind()
_test_json_serde_helper( original_dag_node = hello_actor.hello.bind()
_test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_class_node_ClassHello, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.tests.resources.test_modules.ClassHello",
"args": [],
"kwargs": {},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
original_dag_node = Model.bind(1) model_actor = Model.bind(1)
_test_json_serde_helper( original_dag_node = model_actor.forward.bind(1)
_test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_class_node_Model, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.tests.resources.test_modules.Model",
"args": [1],
"kwargs": {},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
original_dag_node = Model.bind(1, ratio=0.5) model_actor = Model.bind(1, ratio=0.5)
_test_json_serde_helper( original_dag_node = model_actor.forward.bind(1)
_test_deployment_json_serde_helper(
original_dag_node, original_dag_node,
executor_fn=_test_execution_class_node_Model, expected_num_deployments=1,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.tests.resources.test_modules.Model",
"args": [1],
"kwargs": {"ratio": 0.5},
"options": {},
"other_args_to_resolve": {},
"uuid": original_dag_node.get_stable_uuid(),
},
) )
@ -312,7 +207,7 @@ def test_nested_deployment_node_json_serde(serve_instance):
( (
serve_root_dag, serve_root_dag,
deserialized_serve_root_dag_node, deserialized_serve_root_dag_node,
) = _test_deployment_json_serde_helper(ray_dag, input=1, expected_num_deployments=2) ) = _test_deployment_json_serde_helper(ray_dag, input=1, expected_num_deployments=3)
assert ray.get(serve_root_dag.execute(1)) == ray.get( assert ray.get(serve_root_dag.execute(1)) == ray.get(
deserialized_serve_root_dag_node.execute(1) deserialized_serve_root_dag_node.execute(1)
) )

View file

@ -12,7 +12,7 @@ from ray.serve.deployment_graph import InputNode
from ray.serve.drivers import DAGDriver from ray.serve.drivers import DAGDriver
import starlette.requests import starlette.requests
from ray.serve.generate import transform_ray_dag_to_serve_dag from ray.serve.deployment_graph_build import transform_ray_dag_to_serve_dag
NESTED_HANDLE_KEY = "nested_handle" NESTED_HANDLE_KEY = "nested_handle"