Add prototype of ray.serve.pipeline (#19278)

This commit is contained in:
Edward Oakes 2021-10-19 13:36:49 -05:00 committed by GitHub
parent a6f9c93db0
commit 4645893a5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 693 additions and 0 deletions

View file

@ -354,3 +354,29 @@ py_test(
tags = ["exclusive", "post_wheel_build", "team:serve"],
deps = [":serve_lib"]
)
pipeline_tests_srcs = glob(["pipeline/tests/**/*.py"])
py_test(
name = "test_step",
size = "small",
srcs = pipeline_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
py_test(
name = "test_pipeline",
size = "small",
srcs = pipeline_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
py_test(
name = "test_actor_executor",
size = "small",
srcs = pipeline_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

View file

@ -0,0 +1,3 @@
from ray.serve.pipeline.common import ExecutionMode # noqa:F401
from ray.serve.pipeline.node import INPUT # noqa:F401
from ray.serve.pipeline.step import step # noqa:F401

View file

@ -0,0 +1,27 @@
from enum import Enum
from pydantic import BaseModel, PositiveInt
class ExecutionMode(Enum):
LOCAL = 1
TASKS = 2
ACTORS = 3
def str_to_execution_mode(s: str) -> ExecutionMode:
if s in ["local", "LOCAL"]:
return ExecutionMode.LOCAL
elif s in ["TASKS", "tasks"]:
return ExecutionMode.TASKS
elif s in ["ACTORS", "actors"]:
return ExecutionMode.ACTORS
else:
valid_strs = [str(mode).split(".")[1] for mode in ExecutionMode]
raise ValueError(f"Unknown ExecutionMode str: '{s}'. "
f"Valid options are: {valid_strs}.")
class StepConfig(BaseModel):
execution_mode: ExecutionMode
num_replicas: PositiveInt

View file

@ -0,0 +1,14 @@
import pytest
import ray
from ray.serve.pipeline.test_utils import LOCAL_EXECUTION_ONLY
@pytest.fixture(scope="session")
def shared_ray_instance():
if LOCAL_EXECUTION_ONLY:
# Don't ray.init() if only testing local execution.
yield
else:
# Overriding task_retry_delay_ms to relaunch actors more quickly.
yield ray.init(num_cpus=36, _system_config={"task_retry_delay_ms": 50})

View file

@ -0,0 +1,101 @@
from abc import ABC
import random
from typing import Any, Callable, List, Tuple, Union
import ray
from ray import cloudpickle, ObjectRef
from ray.actor import ActorHandle
from ray.remote_function import RemoteFunction
from ray.serve.pipeline.common import ExecutionMode, StepConfig
class Executor(ABC):
def __init__(self, serialized_callable_factory: bytes, config: StepConfig):
pass
def call(self, input_arg: Tuple[Any]) -> Union[Any, ObjectRef]:
pass
async def call_async(self, input_arg: Tuple[Any]) -> Union[Any, ObjectRef]:
pass
class LocalExecutor(Executor):
"""Executor that runs code in-process."""
def __init__(self, serialized_callable_factory: bytes, config: StepConfig):
self._callable: Callable = cloudpickle.loads(
serialized_callable_factory)()
def call(self, *args: Tuple[Any]) -> Any:
args = tuple(
ray.get(arg) if isinstance(arg, ObjectRef) else arg
for arg in args)
return self._callable(*args)
async def call_async(self, input_arg: Tuple[Any]) -> Any:
raise NotImplementedError("No async support yet.")
class TasksExecutor(Executor):
"""Executor that wraps code in Ray tasks."""
def __init__(self, serialized_callable_factory: bytes, config: StepConfig):
@ray.remote
def callable_wrapper(*args):
return cloudpickle.loads(serialized_callable_factory)()(*args)
self._remote_function: RemoteFunction = callable_wrapper
def call(self, *args: Tuple[Any]) -> ObjectRef:
return self._remote_function.remote(*args)
async def call_async(self) -> ObjectRef:
raise NotImplementedError("No async support yet.")
@ray.remote
class ExecutorActor:
"""Actor that executes the serialized code for a given step."""
def __init__(self, serialize_callable_factory: bytes):
self._callable = cloudpickle.loads(serialize_callable_factory)()
def call(self, *args, **kwargs):
"""Call the underlying callable."""
return self._callable(*args, **kwargs)
def ready(self):
"""Used to wait for the actor to become available."""
pass
class ActorsExecutor(Executor):
"""Executor that wraps a pool of Ray actors."""
def __init__(self, serialized_callable_factory: bytes, config: StepConfig):
self._actors: List[ActorHandle] = [
ExecutorActor.remote(serialized_callable_factory)
for _ in range(config.num_replicas)
]
# Wait for the actors to be ready.
ray.get([actor.ready.remote() for actor in self._actors])
def call(self, *args: Tuple[Any]) -> ObjectRef:
return random.choice(self._actors).call.remote(*args)
async def call_async(self) -> ObjectRef:
raise NotImplementedError("No async support yet.")
def create_executor_from_step_config(serialized_callable_factory: bytes,
config: StepConfig) -> Executor:
if config.execution_mode == ExecutionMode.LOCAL:
return LocalExecutor(serialized_callable_factory, config)
elif config.execution_mode == ExecutionMode.TASKS:
return TasksExecutor(serialized_callable_factory, config)
elif config.execution_mode == ExecutionMode.ACTORS:
return ActorsExecutor(serialized_callable_factory, config)
else:
assert False, f"Unknown execution mode: {config.execution_mode}"

View file

@ -0,0 +1,99 @@
from abc import ABC
from typing import Any, Callable, Tuple
import ray
from ray import cloudpickle, ObjectRef
from ray.serve.pipeline.common import StepConfig
from ray.serve.pipeline.executor import (create_executor_from_step_config,
Executor)
class PipelineNode(ABC):
def deploy(self):
pass
def call(self, input_arg: Tuple[Any]) -> Any:
pass
async def call_async(self, input_arg: Tuple[Any]) -> Any:
pass
class Pipeline:
"""A deployed pipeline that can be called by the user."""
def __init__(self, entry_node: PipelineNode):
self._entry_node = entry_node
def deploy(self):
raise RuntimeError("A pipeline can only be deployed once.")
def call(self, input_arg: Tuple[Any]) -> Any:
result = self._entry_node.call(input_arg)
if isinstance(result, ObjectRef):
result = ray.get(result)
return result
async def call_async(self, input_arg: Tuple[Any]) -> Any:
raise NotImplementedError("No async support yet.")
class ExecutorPipelineNode(PipelineNode):
"""Result of constructing a pipeline from user-defined steps.
Call .deploy() on this to instantiate the pipeline.
"""
def __init__(self, callable_factory: Callable[[], Callable],
config: StepConfig, incoming_edges: Tuple[PipelineNode]):
# Serialize to make this class environment-independent.
self._serialized_callable_factory: bytes = cloudpickle.dumps(
callable_factory)
self._config: StepConfig = config
self._incoming_edges: PipelineNode = incoming_edges
# Populated in .deploy().
self._executor: Executor = None
assert len(self._incoming_edges) > 0
def deploy(self) -> Pipeline:
"""Instantiates executors for this and all dependent nodes.
After the pipeline is deployed, .call() and .call_async() can be used.
"""
[node.deploy() for node in self._incoming_edges]
self._executor = create_executor_from_step_config(
self._serialized_callable_factory, self._config)
return Pipeline(self)
def call(self, input_arg: Tuple[Any]) -> Any:
if self._executor is None:
raise RuntimeError(
"Pipeline hasn't been deployed, call .deploy() first.")
args = tuple(node.call(input_arg) for node in self._incoming_edges)
return self._executor.call(*args)
async def call_async(self):
if self._executor is None:
raise RuntimeError(
"Pipeline hasn't been deployed, call .deploy() first.")
raise NotImplementedError("No async support yet.")
class InputPipelineNode(PipelineNode):
def deploy(self) -> PipelineNode:
pass
def call(self, input_arg: Tuple[Any]) -> Any:
return input_arg
async def call_async(self, input_arg: Tuple[Any]) -> Any:
return input_arg
# Special node that's used to designate the input of a pipeline.
INPUT = InputPipelineNode()

View file

@ -0,0 +1,133 @@
from types import FunctionType
from typing import Callable, Optional, Union
from ray.serve.pipeline.node import ExecutorPipelineNode, INPUT, PipelineNode
from ray.serve.pipeline.common import (ExecutionMode, str_to_execution_mode,
StepConfig)
def _validate_step_args(*args, **kwargs):
"""Validate arguments passed into a step.
Currently, these must only consist of other steps (including INPUT), and
kwargs are not supported.
"""
if len(kwargs):
raise NotImplementedError("No kwargs support yet!")
# Check arguments for incoming edges.
for arg in args:
if isinstance(arg, PipelineStep):
raise TypeError("PipelineSteps cannot be passed in directly, "
"you need to call them with an input first. For "
"example: instead of `my_step_2(my_step_1)`, try "
"`my_step_2(my_step_1(pipeline.INPUT))`.")
elif arg is INPUT:
if len(args) > 1:
raise ValueError(
"INPUT steps can only take a single argument.")
elif not isinstance(arg, PipelineNode):
raise TypeError(
f"Only PipelineNodes supported as arguments, got {type(arg)}")
class PipelineStep:
def __init__(self, config: StepConfig):
self._config = config
@property
def num_replicas(self) -> int:
return self._config.num_replicas
def options(self, *args, **kwargs):
raise NotImplementedError(".options() not supported yet.")
class CallablePipelineStep(PipelineStep):
"""A step that is ready to be used in a pipeline.
Wraps either a function or a class and its constructor args & kwargs.
This should be used by calling it with a number of other pipeline steps
as its arguments (or the INPUT step).
"""
def __init__(self, callable_factory: Callable[[], Callable],
config: StepConfig):
super().__init__(config)
assert callable(callable_factory)
self._callable_factory = callable_factory
def options(self, *args, **kwargs):
raise NotImplementedError("No options yet!")
def __call__(self, *args, **kwargs):
_validate_step_args(*args, **kwargs)
return ExecutorPipelineNode(
self._callable_factory, self._config, incoming_edges=args)
class UninstantiatedClassPipelineStep(PipelineStep):
"""Represents a class step whose constructor has not been initialized.
This must be called with constructor args & kwargs to return an
InstantiatedClassPipelineStep before it can actually be used.
"""
def __init__(self, _class: Callable, config: StepConfig):
super().__init__(config)
self._class = _class
def options(self, *args, **kwargs):
raise NotImplementedError("No options yet!")
def __call__(self, *args, **kwargs):
return CallablePipelineStep(lambda: self._class(*args, **kwargs),
self._config)
def step(_func_or_class: Optional[Callable] = None,
execution_mode: Union[ExecutionMode, str] = ExecutionMode.LOCAL,
num_replicas: int = 1) -> Callable[[Callable], PipelineStep]:
"""Decorator used to define a pipeline step.
Args:
execution_mode(ExecutionMode): The execution mode for this step.
Supported modes:
- ExecutionMode.LOCAL (default): executes this step inline in
the calling process.
- ExecutionMode.TASKS: executes this step in Ray tasks.
- ExecutionMode.ACTORS: executes this step in Ray actors.
num_replicas (int): The number of Ray actors to start that
will run this step (efaults to 1). Only valid when using
ExecutionMode.ACTORS.
Example:
>>> @pipeline.step(num_replicas=10)
def my_step(*args):
pass
Returns:
PipelineStep
"""
if isinstance(execution_mode, str):
execution_mode = str_to_execution_mode(execution_mode)
elif not isinstance(execution_mode, ExecutionMode):
raise TypeError("execution_mode must be an ExecutionMode or str.")
config = StepConfig(
execution_mode=execution_mode, num_replicas=num_replicas)
def decorator(_func_or_class):
if isinstance(_func_or_class, FunctionType):
return CallablePipelineStep(lambda: _func_or_class, config)
else:
return UninstantiatedClassPipelineStep(_func_or_class, config)
# This handles both parametrized and non-parametrized usage of the
# decorator. See the @serve.batch code for more details.
return decorator(_func_or_class) if callable(_func_or_class) else decorator

View file

@ -0,0 +1,28 @@
import os
from functools import wraps
import pytest
from ray.serve.pipeline import ExecutionMode
LOCAL_EXECUTION_ONLY = os.environ.get("LOCAL_EXECUTION_ONLY") not in [
None, "0"
]
def enable_local_execution_mode_only(f):
"""Convenience decorator to enable local execution-only testing.
This should be used to wrap pytest functions that take an "execution_mode"
parameter as their *first* argument. If local-only testing is enabled,
other cases will be skipped.
"""
@wraps(f)
def wrapper(execution_mode: ExecutionMode, *args, **kwargs):
if execution_mode != ExecutionMode.LOCAL and LOCAL_EXECUTION_ONLY:
pytest.skip("local execution-only testing enabled")
else:
return f(execution_mode, *args, **kwargs)
return wrapper

View file

@ -0,0 +1,18 @@
import os
from ray.serve import pipeline
def test_num_replicas(shared_ray_instance):
@pipeline.step(execution_mode="ACTORS", num_replicas=3)
def get_pid(arg):
return os.getpid()
get_pid = get_pid(pipeline.INPUT).deploy()
assert len({get_pid.call("") for _ in range(100)}) == 3
if __name__ == "__main__":
import sys
import pytest
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -0,0 +1,149 @@
import sys
import tempfile
import pytest
from ray.serve import pipeline
from ray.serve.pipeline.test_utils import enable_local_execution_mode_only
ALL_EXECUTION_MODES = list(pipeline.ExecutionMode)
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_basic_sequential(execution_mode, shared_ray_instance):
@pipeline.step(execution_mode=execution_mode)
def step1(input_arg: str):
assert isinstance(input_arg, str)
return input_arg + "|step1"
@pipeline.step(execution_mode=execution_mode)
def step2(input_arg: str):
assert isinstance(input_arg, str)
return input_arg + "|step2"
sequential = step2(step1(pipeline.INPUT)).deploy()
assert sequential.call("HELLO") == "HELLO|step1|step2"
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_basic_parallel(execution_mode, shared_ray_instance):
@pipeline.step(execution_mode=execution_mode)
def step1(input_arg: str):
return input_arg
@pipeline.step(execution_mode=execution_mode)
def step2_1(input_arg: str):
return f"step2_1_{input_arg}"
@pipeline.step(execution_mode=execution_mode)
def step2_2(input_arg: str):
return f"step2_2_{input_arg}"
@pipeline.step(execution_mode=execution_mode)
def step3(step2_1_output: str, step2_2_output: str):
return f"{step2_1_output}|{step2_2_output}"
step1_output = step1(pipeline.INPUT)
parallel = step3(step2_1(step1_output), step2_2(step1_output)).deploy()
assert parallel.call("HELLO") == "step2_1_HELLO|step2_2_HELLO"
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_multiple_inputs(execution_mode, shared_ray_instance):
@pipeline.step(execution_mode=execution_mode)
def step1(input_arg: str):
return f"step1_{input_arg}"
@pipeline.step(execution_mode=execution_mode)
def step2(input_arg: str):
return f"step2_{input_arg}"
@pipeline.step(execution_mode=execution_mode)
def step3(step1_output: str, step2_output: str):
return f"{step1_output}|{step2_output}"
multiple_inputs = step3(step1(pipeline.INPUT),
step2(pipeline.INPUT)).deploy()
assert multiple_inputs.call("HELLO") == "step1_HELLO|step2_HELLO"
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_basic_class(execution_mode, shared_ray_instance):
@pipeline.step(execution_mode=execution_mode)
class GreeterStep:
def __init__(self, greeting: str):
self._greeting = greeting
def __call__(self, name: str):
return f"{self._greeting} {name}!"
greeter = GreeterStep("Top of the morning")(pipeline.INPUT).deploy()
assert greeter.call("Theodore") == "Top of the morning Theodore!"
@pytest.mark.skipif(sys.platform == "win32", reason="File handling.")
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_class_constructor_not_called_until_deployed(execution_mode,
shared_ray_instance):
"""Constructor should only be called after .deploy()."""
with tempfile.NamedTemporaryFile("w") as tmp:
@pipeline.step(execution_mode=execution_mode)
class FileWriter:
def __init__(self, tmpfile: str, msg: str):
with open(tmpfile, "w") as f:
f.write(msg)
f.flush()
def __call__(self, arg: str):
return arg
msg = "hello"
def constructor_called():
with open(tmp.name, "r") as f:
return f.read() == msg
file_writer = FileWriter(tmp.name, msg)
assert not constructor_called()
writer_pipeline = file_writer(pipeline.INPUT)
assert not constructor_called()
assert writer_pipeline.deploy().call("hi") == "hi"
assert constructor_called()
@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES)
@enable_local_execution_mode_only
def test_mix_classes_and_functions(execution_mode, shared_ray_instance):
@pipeline.step(execution_mode=execution_mode)
class GreeterStep1:
def __init__(self, greeting: str):
self._greeting = greeting
def __call__(self, name: str):
return f"{self._greeting} {name}!"
@pipeline.step(execution_mode=execution_mode)
def greeter_step_2(name: str):
return f"How's it hanging, {name}?"
@pipeline.step(execution_mode=execution_mode)
def combiner(greeting1: str, greeting2: str):
return f"{greeting1}|{greeting2}"
greeter = combiner(
GreeterStep1("Howdy")(pipeline.INPUT),
greeter_step_2(pipeline.INPUT)).deploy()
assert greeter.call("Teddy") == "Howdy Teddy!|How's it hanging, Teddy?"
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -0,0 +1,95 @@
import pytest
from ray.serve import pipeline
from ray.serve.pipeline import ExecutionMode
from ray.serve.pipeline.step import PipelineStep
def test_decorator_no_args():
@pipeline.step
def f():
pass
assert isinstance(f, PipelineStep)
assert f.num_replicas == 1
@pipeline.step
class A:
pass
assert isinstance(A, PipelineStep)
assert A.num_replicas == 1
def test_decorator_with_arg():
@pipeline.step(num_replicas=2)
def f():
pass
assert isinstance(f, PipelineStep)
assert f.num_replicas == 2
@pipeline.step(num_replicas=5)
class A:
pass
assert isinstance(A, PipelineStep)
assert A.num_replicas == 5
def test_pass_step_without_calling():
@pipeline.step
def step1():
pass
@pipeline.step
def step2():
pass
step2(step1(pipeline.INPUT))
with pytest.raises(TypeError):
step2(step1)
def test_input_step_multiple_args_rejected():
@pipeline.step
def step1():
pass
@pipeline.step
def step2():
pass
step1(pipeline.INPUT)
with pytest.raises(ValueError):
step1(pipeline.INPUT, step2(pipeline.INPUT))
@pytest.mark.parametrize("execution_mode", [(ExecutionMode.LOCAL, "LOCAL"),
(ExecutionMode.TASKS, "TASKS"),
(ExecutionMode.ACTORS, "ACTORS")])
def test_execution_mode_validation(execution_mode):
mode_enum, mode_str = execution_mode
@pipeline.step(execution_mode=mode_enum)
def f1():
pass
@pipeline.step(execution_mode=mode_str)
def f2():
pass
@pipeline.step(execution_mode=mode_str.lower())
def f3():
pass
with pytest.raises(TypeError):
@pipeline.step(execution_mode=123)
def f4():
pass
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))