[Deployment Graph] Move files out of pipeline folder (#25630)

This commit is contained in:
Jiao 2022-06-10 10:39:03 -07:00 committed by GitHub
parent 2546fbf99d
commit 6b9b1f135b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 142 additions and 169 deletions

View file

@ -46,9 +46,9 @@ class _PyObjScanner(ray.cloudpickle.CloudPickler):
from ray.experimental.dag.function_node import FunctionNode
from ray.experimental.dag.class_node import ClassNode, ClassMethodNode
from ray.experimental.dag.input_node import InputNode, InputAttributeNode
from ray.serve.pipeline.deployment_node import DeploymentNode
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment_node import DeploymentNode
from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_method_executor_node import (
DeploymentMethodExecutorNode,

View file

@ -489,20 +489,18 @@ py_test(
deps = [":serve_lib"]
)
pipeline_tests_srcs = glob(["pipeline/tests/**/*.py"])
py_test(
name = "test_deployment_node",
size = "medium",
srcs = pipeline_tests_srcs,
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
py_test(
name = "test_generate",
name = "test_deployment_graph_build",
size = "medium",
srcs = pipeline_tests_srcs,
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
@ -518,7 +516,7 @@ py_test(
py_test(
name = "test_json_serde",
size = "medium",
srcs = pipeline_tests_srcs,
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

View file

@ -50,7 +50,7 @@ from ray.serve.exceptions import RayServeException
from ray.serve.handle import RayServeHandle
from ray.serve.http_util import ASGIHTTPSender, make_fastapi_class_based_view
from ray.serve.logging_utils import LoggingContext
from ray.serve.pipeline.api import (
from ray.serve.deployment_graph_build import (
build as pipeline_build,
get_and_validate_ingress_deployment,
)

View file

@ -44,7 +44,7 @@ class RayServeDAGHandle:
# NOTE: There's nothing user can do about these warnings, we should hide it.
with _mute_sync_handle_warnings():
if self.dag_node is None:
from ray.serve.pipeline.json_serde import dagnode_from_json
from ray.serve.json_serde import dagnode_from_json
self.dag_node = json.loads(
self.dag_node_json, object_hook=dagnode_from_json

View file

@ -2,6 +2,16 @@ import json
from typing import List
from collections import OrderedDict
from ray.serve.deployment import Deployment
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_node import DeploymentNode
from ray.serve.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
from ray.serve.json_serde import DAGNodeEncoder
from ray.experimental.dag import (
DAGNode,
ClassNode,
@ -11,15 +21,106 @@ from ray.experimental.dag import (
from ray.experimental.dag.function_node import FunctionNode
from ray.experimental.dag.input_node import InputNode
from ray.experimental.dag.utils import DAGNodeNameGenerator
from ray.serve.deployment import Deployment
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.deployment_node import DeploymentNode
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
from ray.serve.pipeline.json_serde import DAGNodeEncoder
def build(ray_dag_root_node: DAGNode) -> List[Deployment]:
"""Do all the DAG transformation, extraction and generation needed to
produce a runnable and deployable serve pipeline application from a valid
DAG authored with Ray DAG API.
This should be the only user facing API that user interacts with.
Assumptions:
Following enforcements are only applied at generating and applying
pipeline artifact, but not blockers for local development and testing.
- ALL args and kwargs used in DAG building should be JSON serializable.
This means in order to ensure your pipeline application can run on
a remote cluster potentially with different runtime environment,
among all options listed:
1) binding in-memory objects
2) Rely on pickling
3) Enforce JSON serialibility on all args used
We believe both 1) & 2) rely on unstable in-memory objects or
cross version pickling / closure capture, where JSON serialization
provides the right contract needed for proper deployment.
- ALL classes and methods used should be visible on top of the file and
importable via a fully qualified name. Thus no inline class or
function definitions should be used.
Args:
ray_dag_root_node: DAGNode acting as root of a Ray authored DAG. It
should be executable via `ray_dag_root_node.execute(user_input)`
and should have `InputNode` in it.
Returns:
deployments: All deployments needed for an e2e runnable serve pipeline,
accessible via python .remote() call.
Examples:
>>> with InputNode() as dag_input:
... m1 = Model.bind(1)
... m2 = Model.bind(2)
... m1_output = m1.forward.bind(dag_input[0])
... m2_output = m2.forward.bind(dag_input[1])
... ray_dag = ensemble.bind(m1_output, m2_output)
Assuming we have non-JSON serializable or inline defined class or
function in local pipeline development.
>>> from ray.serve.api import build as build_app
>>> deployments = build_app(ray_dag) # it can be method node
>>> deployments = build_app(m1) # or just a regular node.
"""
with DAGNodeNameGenerator() as node_name_generator:
serve_root_dag = ray_dag_root_node.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
# After Ray DAG is transformed to Serve DAG with deployments and their init
# args filled, generate a minimal weight executor serve dag for perf
serve_executor_root_dag = serve_root_dag.apply_recursive(
transform_serve_dag_to_serve_executor_dag
)
root_driver_deployment = deployments[-1]
new_driver_deployment = generate_executor_dag_driver_deployment(
serve_executor_root_dag, root_driver_deployment
)
# Replace DAGDriver deployment with executor DAGDriver deployment
deployments[-1] = new_driver_deployment
# Validate and only expose HTTP for the endpoint
deployments_with_http = process_ingress_deployment_in_serve_dag(deployments)
return deployments_with_http
def get_and_validate_ingress_deployment(
deployments: List[Deployment],
) -> Deployment:
"""Validation for http route prefixes for a list of deployments in pipeline.
Ensures:
1) One and only one ingress deployment with given route prefix.
2) All other not ingress deployments should have prefix of None.
"""
ingress_deployments = []
for deployment in deployments:
if deployment.route_prefix is not None:
ingress_deployments.append(deployment)
if len(ingress_deployments) != 1:
raise ValueError(
"Only one deployment in an Serve Application or DAG can have "
f"non-None route prefix. {len(ingress_deployments)} ingress "
f"deployments found: {ingress_deployments}"
)
return ingress_deployments[0]
def transform_ray_dag_to_serve_dag(

View file

@ -11,8 +11,8 @@ from ray.serve.deployment_method_executor_node import (
)
from ray.serve.handle import RayServeLazySyncHandle
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_function_node import DeploymentFunctionNode
from ray.experimental.dag.constants import PARENT_CLASS_NODE_KEY
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.serve.deployment import Deployment, schema_to_deployment

View file

@ -1,112 +0,0 @@
from typing import List
from ray.experimental.dag.dag_node import DAGNode
from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
transform_serve_dag_to_serve_executor_dag,
process_ingress_deployment_in_serve_dag,
generate_executor_dag_driver_deployment,
)
from ray.serve.deployment import Deployment
from ray.experimental.dag.utils import DAGNodeNameGenerator
def build(ray_dag_root_node: DAGNode) -> List[Deployment]:
"""Do all the DAG transformation, extraction and generation needed to
produce a runnable and deployable serve pipeline application from a valid
DAG authored with Ray DAG API.
This should be the only user facing API that user interacts with.
Assumptions:
Following enforcements are only applied at generating and applying
pipeline artifact, but not blockers for local development and testing.
- ALL args and kwargs used in DAG building should be JSON serializable.
This means in order to ensure your pipeline application can run on
a remote cluster potentially with different runtime environment,
among all options listed:
1) binding in-memory objects
2) Rely on pickling
3) Enforce JSON serialibility on all args used
We believe both 1) & 2) rely on unstable in-memory objects or
cross version pickling / closure capture, where JSON serialization
provides the right contract needed for proper deployment.
- ALL classes and methods used should be visible on top of the file and
importable via a fully qualified name. Thus no inline class or
function definitions should be used.
Args:
ray_dag_root_node: DAGNode acting as root of a Ray authored DAG. It
should be executable via `ray_dag_root_node.execute(user_input)`
and should have `InputNode` in it.
Returns:
deployments: All deployments needed for an e2e runnable serve pipeline,
accessible via python .remote() call.
Examples:
>>> with InputNode() as dag_input:
... m1 = Model.bind(1)
... m2 = Model.bind(2)
... m1_output = m1.forward.bind(dag_input[0])
... m2_output = m2.forward.bind(dag_input[1])
... ray_dag = ensemble.bind(m1_output, m2_output)
Assuming we have non-JSON serializable or inline defined class or
function in local pipeline development.
>>> from ray.serve.api import build as build_app
>>> deployments = build_app(ray_dag) # it can be method node
>>> deployments = build_app(m1) # or just a regular node.
"""
with DAGNodeNameGenerator() as node_name_generator:
serve_root_dag = ray_dag_root_node.apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
# After Ray DAG is transformed to Serve DAG with deployments and their init
# args filled, generate a minimal weight executor serve dag for perf
serve_executor_root_dag = serve_root_dag.apply_recursive(
transform_serve_dag_to_serve_executor_dag
)
root_driver_deployment = deployments[-1]
new_driver_deployment = generate_executor_dag_driver_deployment(
serve_executor_root_dag, root_driver_deployment
)
# Replace DAGDriver deployment with executor DAGDriver deployment
deployments[-1] = new_driver_deployment
# Validate and only expose HTTP for the endpoint
deployments_with_http = process_ingress_deployment_in_serve_dag(deployments)
return deployments_with_http
def get_and_validate_ingress_deployment(
deployments: List[Deployment],
) -> Deployment:
"""Validation for http route prefixes for a list of deployments in pipeline.
Ensures:
1) One and only one ingress deployment with given route prefix.
2) All other not ingress deployments should have prefix of None.
"""
ingress_deployments = []
for deployment in deployments:
if deployment.route_prefix is not None:
ingress_deployments.append(deployment)
if len(ingress_deployments) != 1:
raise ValueError(
"Only one deployment in an Serve Application or DAG can have "
f"non-None route prefix. {len(ingress_deployments)} ingress "
f"deployments found: {ingress_deployments}"
)
return ingress_deployments[0]

View file

@ -1,9 +0,0 @@
import pytest
import ray
from ray.serve.tests.conftest import _shared_serve_instance, serve_instance # noqa
@pytest.fixture(scope="session")
def shared_ray_instance():
yield ray.init(num_cpus=36, _system_config={"task_retry_delay_ms": 50})

View file

@ -1,5 +0,0 @@
# Reserved constant used as key in other_args_to_resolve to configure if we
# return sync or async handle of a deployment.
# True -> RayServeSyncHandle
# False -> RayServeHandle
USE_SYNC_HANDLE_KEY = "__use_sync_handle__"

View file

@ -1,4 +1,4 @@
from ray.serve.pipeline.tests.resources.test_modules import (
from ray.serve.tests.resources.test_modules import (
Model,
Combine,
combine,

View file

@ -4,18 +4,18 @@ import ray
from ray import serve
from ray.experimental.dag import InputNode
from ray.serve.handle import RayServeLazySyncHandle
from ray.serve.pipeline.generate import (
from ray.serve.deployment_graph_build import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
transform_serve_dag_to_serve_executor_dag,
get_pipeline_input_node,
)
from ray.serve.pipeline.tests.resources.test_modules import (
from ray.serve.tests.resources.test_modules import (
Model,
NESTED_HANDLE_KEY,
combine,
)
from ray.serve.pipeline.tests.resources.test_dags import (
from ray.serve.tests.resources.test_dags import (
get_simple_class_with_class_method_dag,
get_func_class_with_class_method_dag,
get_multi_instantiation_class_deployment_in_init_args_dag,

View file

@ -2,7 +2,7 @@ import pytest
import ray
from ray import serve
from ray.serve.pipeline.deployment_node import (
from ray.serve.deployment_node import (
DeploymentNode,
)

View file

@ -12,12 +12,12 @@ from ray.serve.handle import (
serve_handle_to_json_dict,
serve_handle_from_json_dict,
)
from ray.serve.pipeline.json_serde import (
from ray.serve.json_serde import (
DAGNodeEncoder,
dagnode_from_json,
DAGNODE_TYPE_KEY,
)
from ray.serve.pipeline.tests.resources.test_modules import (
from ray.serve.tests.resources.test_modules import (
Model,
combine,
Counter,
@ -26,7 +26,7 @@ from ray.serve.pipeline.tests.resources.test_modules import (
Combine,
NESTED_HANDLE_KEY,
)
from ray.serve.pipeline.generate import (
from ray.serve.deployment_graph_build import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
transform_serve_dag_to_serve_executor_dag,
@ -124,7 +124,7 @@ def test_simple_function_node_json_serde(serve_instance):
executor_fn=_test_execution_function_node,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.combine",
"import_path": "ray.serve.tests.resources.test_modules.combine",
"args": [1, 2],
"kwargs": {},
"options": {},
@ -139,7 +139,7 @@ def test_simple_function_node_json_serde(serve_instance):
executor_fn=_test_execution_function_node,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.combine",
"import_path": "ray.serve.tests.resources.test_modules.combine",
"args": [1, 2],
"kwargs": {"kwargs_output": 3},
"options": {},
@ -154,7 +154,7 @@ def test_simple_function_node_json_serde(serve_instance):
executor_fn=_test_execution_function_node,
expected_json_dict={
DAGNODE_TYPE_KEY: "FunctionNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.fn_hello",
"import_path": "ray.serve.tests.resources.test_modules.fn_hello",
"args": [],
"kwargs": {},
"options": {},
@ -184,7 +184,7 @@ def test_simple_class_node_json_serde(serve_instance):
executor_fn=_test_execution_class_node_ClassHello,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.ClassHello",
"import_path": "ray.serve.tests.resources.test_modules.ClassHello",
"args": [],
"kwargs": {},
"options": {},
@ -199,7 +199,7 @@ def test_simple_class_node_json_serde(serve_instance):
executor_fn=_test_execution_class_node_Model,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.Model",
"import_path": "ray.serve.tests.resources.test_modules.Model",
"args": [1],
"kwargs": {},
"options": {},
@ -214,7 +214,7 @@ def test_simple_class_node_json_serde(serve_instance):
executor_fn=_test_execution_class_node_Model,
expected_json_dict={
DAGNODE_TYPE_KEY: "ClassNode",
"import_path": "ray.serve.pipeline.tests.resources.test_modules.Model",
"import_path": "ray.serve.tests.resources.test_modules.Model",
"args": [1],
"kwargs": {"ratio": 0.5},
"options": {},

View file

@ -14,7 +14,7 @@ from ray.serve.model_wrappers import (
)
from ray.air.checkpoint import Checkpoint
from ray.air.predictor import DataBatchType, Predictor
from ray.serve.pipeline.api import build
from ray.serve.deployment_graph_build import build
from ray.serve.dag import InputNode
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.http_adapters import json_to_ndarray

View file

@ -11,7 +11,7 @@ from ray import serve
from ray.serve.application import Application
from ray.serve.api import build as build_app
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.pipeline.api import build as pipeline_build
from ray.serve.deployment_graph_build import build as pipeline_build
from ray.serve.deployment_graph import ClassNode, InputNode
from ray.serve.drivers import DAGDriver
import starlette.requests

View file

@ -12,7 +12,7 @@ from ray.serve.deployment_graph import InputNode
from ray.serve.drivers import DAGDriver
import starlette.requests
from ray.serve.pipeline.generate import transform_ray_dag_to_serve_dag
from ray.serve.generate import transform_ray_dag_to_serve_dag
NESTED_HANDLE_KEY = "nested_handle"

View file

@ -6,7 +6,7 @@ from typing import Any
from ray import serve
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.dag import InputNode
from ray.serve.pipeline.api import build as pipeline_build
from ray.serve.deployment_graph_build import build as pipeline_build
@serve.deployment(name="counter", num_replicas=2, user_config={"count": 123, "b": 2})

View file

@ -2,7 +2,7 @@ import sys
import pytest
from ray.serve.dag import InputNode
from ray.serve.pipeline.api import build as pipeline_build
from ray.serve.deployment_graph_build import build as pipeline_build
import ray
from ray import serve

View file

@ -1,8 +1,8 @@
import pytest
import sys
from ray.serve.pipeline.api import get_and_validate_ingress_deployment
from ray.serve.pipeline.api import build as pipeline_build
from ray.serve.deployment_graph_build import get_and_validate_ingress_deployment
from ray.serve.deployment_graph_build import build as pipeline_build
from ray.serve.dag import InputNode
from ray import serve