From 4645893a5f65fb2f79e1cd0fe94c55ef81a8a26f Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 19 Oct 2021 13:36:49 -0500 Subject: [PATCH] Add prototype of ray.serve.pipeline (#19278) --- python/ray/serve/BUILD | 26 +++ python/ray/serve/pipeline/__init__.py | 3 + python/ray/serve/pipeline/common.py | 27 ++++ python/ray/serve/pipeline/conftest.py | 14 ++ python/ray/serve/pipeline/executor.py | 101 ++++++++++++ python/ray/serve/pipeline/node.py | 99 ++++++++++++ python/ray/serve/pipeline/step.py | 133 ++++++++++++++++ python/ray/serve/pipeline/test_utils.py | 28 ++++ .../pipeline/tests/test_actor_executor.py | 18 +++ .../ray/serve/pipeline/tests/test_pipeline.py | 149 ++++++++++++++++++ python/ray/serve/pipeline/tests/test_step.py | 95 +++++++++++ 11 files changed, 693 insertions(+) create mode 100644 python/ray/serve/pipeline/__init__.py create mode 100644 python/ray/serve/pipeline/common.py create mode 100644 python/ray/serve/pipeline/conftest.py create mode 100644 python/ray/serve/pipeline/executor.py create mode 100644 python/ray/serve/pipeline/node.py create mode 100644 python/ray/serve/pipeline/step.py create mode 100644 python/ray/serve/pipeline/test_utils.py create mode 100644 python/ray/serve/pipeline/tests/test_actor_executor.py create mode 100644 python/ray/serve/pipeline/tests/test_pipeline.py create mode 100644 python/ray/serve/pipeline/tests/test_step.py diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index d6a136c28..87d33b5ed 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -354,3 +354,29 @@ py_test( tags = ["exclusive", "post_wheel_build", "team:serve"], deps = [":serve_lib"] ) + +pipeline_tests_srcs = glob(["pipeline/tests/**/*.py"]) + +py_test( + name = "test_step", + size = "small", + srcs = pipeline_tests_srcs, + tags = ["exclusive", "team:serve"], + deps = [":serve_lib"], +) + +py_test( + name = "test_pipeline", + size = "small", + srcs = pipeline_tests_srcs, + tags = ["exclusive", "team:serve"], + deps = [":serve_lib"], +) + +py_test( + name = "test_actor_executor", + size = "small", + srcs = pipeline_tests_srcs, + tags = ["exclusive", "team:serve"], + deps = [":serve_lib"], +) diff --git a/python/ray/serve/pipeline/__init__.py b/python/ray/serve/pipeline/__init__.py new file mode 100644 index 000000000..486a52b30 --- /dev/null +++ b/python/ray/serve/pipeline/__init__.py @@ -0,0 +1,3 @@ +from ray.serve.pipeline.common import ExecutionMode # noqa:F401 +from ray.serve.pipeline.node import INPUT # noqa:F401 +from ray.serve.pipeline.step import step # noqa:F401 diff --git a/python/ray/serve/pipeline/common.py b/python/ray/serve/pipeline/common.py new file mode 100644 index 000000000..5df6bc30a --- /dev/null +++ b/python/ray/serve/pipeline/common.py @@ -0,0 +1,27 @@ +from enum import Enum + +from pydantic import BaseModel, PositiveInt + + +class ExecutionMode(Enum): + LOCAL = 1 + TASKS = 2 + ACTORS = 3 + + +def str_to_execution_mode(s: str) -> ExecutionMode: + if s in ["local", "LOCAL"]: + return ExecutionMode.LOCAL + elif s in ["TASKS", "tasks"]: + return ExecutionMode.TASKS + elif s in ["ACTORS", "actors"]: + return ExecutionMode.ACTORS + else: + valid_strs = [str(mode).split(".")[1] for mode in ExecutionMode] + raise ValueError(f"Unknown ExecutionMode str: '{s}'. " + f"Valid options are: {valid_strs}.") + + +class StepConfig(BaseModel): + execution_mode: ExecutionMode + num_replicas: PositiveInt diff --git a/python/ray/serve/pipeline/conftest.py b/python/ray/serve/pipeline/conftest.py new file mode 100644 index 000000000..6f8756797 --- /dev/null +++ b/python/ray/serve/pipeline/conftest.py @@ -0,0 +1,14 @@ +import pytest + +import ray +from ray.serve.pipeline.test_utils import LOCAL_EXECUTION_ONLY + + +@pytest.fixture(scope="session") +def shared_ray_instance(): + if LOCAL_EXECUTION_ONLY: + # Don't ray.init() if only testing local execution. + yield + else: + # Overriding task_retry_delay_ms to relaunch actors more quickly. + yield ray.init(num_cpus=36, _system_config={"task_retry_delay_ms": 50}) diff --git a/python/ray/serve/pipeline/executor.py b/python/ray/serve/pipeline/executor.py new file mode 100644 index 000000000..7b5b7a4a7 --- /dev/null +++ b/python/ray/serve/pipeline/executor.py @@ -0,0 +1,101 @@ +from abc import ABC +import random +from typing import Any, Callable, List, Tuple, Union + +import ray +from ray import cloudpickle, ObjectRef +from ray.actor import ActorHandle +from ray.remote_function import RemoteFunction + +from ray.serve.pipeline.common import ExecutionMode, StepConfig + + +class Executor(ABC): + def __init__(self, serialized_callable_factory: bytes, config: StepConfig): + pass + + def call(self, input_arg: Tuple[Any]) -> Union[Any, ObjectRef]: + pass + + async def call_async(self, input_arg: Tuple[Any]) -> Union[Any, ObjectRef]: + pass + + +class LocalExecutor(Executor): + """Executor that runs code in-process.""" + + def __init__(self, serialized_callable_factory: bytes, config: StepConfig): + self._callable: Callable = cloudpickle.loads( + serialized_callable_factory)() + + def call(self, *args: Tuple[Any]) -> Any: + args = tuple( + ray.get(arg) if isinstance(arg, ObjectRef) else arg + for arg in args) + return self._callable(*args) + + async def call_async(self, input_arg: Tuple[Any]) -> Any: + raise NotImplementedError("No async support yet.") + + +class TasksExecutor(Executor): + """Executor that wraps code in Ray tasks.""" + + def __init__(self, serialized_callable_factory: bytes, config: StepConfig): + @ray.remote + def callable_wrapper(*args): + return cloudpickle.loads(serialized_callable_factory)()(*args) + + self._remote_function: RemoteFunction = callable_wrapper + + def call(self, *args: Tuple[Any]) -> ObjectRef: + return self._remote_function.remote(*args) + + async def call_async(self) -> ObjectRef: + raise NotImplementedError("No async support yet.") + + +@ray.remote +class ExecutorActor: + """Actor that executes the serialized code for a given step.""" + + def __init__(self, serialize_callable_factory: bytes): + self._callable = cloudpickle.loads(serialize_callable_factory)() + + def call(self, *args, **kwargs): + """Call the underlying callable.""" + return self._callable(*args, **kwargs) + + def ready(self): + """Used to wait for the actor to become available.""" + pass + + +class ActorsExecutor(Executor): + """Executor that wraps a pool of Ray actors.""" + + def __init__(self, serialized_callable_factory: bytes, config: StepConfig): + self._actors: List[ActorHandle] = [ + ExecutorActor.remote(serialized_callable_factory) + for _ in range(config.num_replicas) + ] + # Wait for the actors to be ready. + ray.get([actor.ready.remote() for actor in self._actors]) + + def call(self, *args: Tuple[Any]) -> ObjectRef: + return random.choice(self._actors).call.remote(*args) + + async def call_async(self) -> ObjectRef: + raise NotImplementedError("No async support yet.") + + +def create_executor_from_step_config(serialized_callable_factory: bytes, + config: StepConfig) -> Executor: + if config.execution_mode == ExecutionMode.LOCAL: + return LocalExecutor(serialized_callable_factory, config) + elif config.execution_mode == ExecutionMode.TASKS: + return TasksExecutor(serialized_callable_factory, config) + elif config.execution_mode == ExecutionMode.ACTORS: + return ActorsExecutor(serialized_callable_factory, config) + else: + assert False, f"Unknown execution mode: {config.execution_mode}" diff --git a/python/ray/serve/pipeline/node.py b/python/ray/serve/pipeline/node.py new file mode 100644 index 000000000..30572884c --- /dev/null +++ b/python/ray/serve/pipeline/node.py @@ -0,0 +1,99 @@ +from abc import ABC +from typing import Any, Callable, Tuple + +import ray +from ray import cloudpickle, ObjectRef + +from ray.serve.pipeline.common import StepConfig +from ray.serve.pipeline.executor import (create_executor_from_step_config, + Executor) + + +class PipelineNode(ABC): + def deploy(self): + pass + + def call(self, input_arg: Tuple[Any]) -> Any: + pass + + async def call_async(self, input_arg: Tuple[Any]) -> Any: + pass + + +class Pipeline: + """A deployed pipeline that can be called by the user.""" + + def __init__(self, entry_node: PipelineNode): + self._entry_node = entry_node + + def deploy(self): + raise RuntimeError("A pipeline can only be deployed once.") + + def call(self, input_arg: Tuple[Any]) -> Any: + result = self._entry_node.call(input_arg) + if isinstance(result, ObjectRef): + result = ray.get(result) + + return result + + async def call_async(self, input_arg: Tuple[Any]) -> Any: + raise NotImplementedError("No async support yet.") + + +class ExecutorPipelineNode(PipelineNode): + """Result of constructing a pipeline from user-defined steps. + + Call .deploy() on this to instantiate the pipeline. + """ + + def __init__(self, callable_factory: Callable[[], Callable], + config: StepConfig, incoming_edges: Tuple[PipelineNode]): + # Serialize to make this class environment-independent. + self._serialized_callable_factory: bytes = cloudpickle.dumps( + callable_factory) + self._config: StepConfig = config + self._incoming_edges: PipelineNode = incoming_edges + + # Populated in .deploy(). + self._executor: Executor = None + + assert len(self._incoming_edges) > 0 + + def deploy(self) -> Pipeline: + """Instantiates executors for this and all dependent nodes. + + After the pipeline is deployed, .call() and .call_async() can be used. + """ + [node.deploy() for node in self._incoming_edges] + self._executor = create_executor_from_step_config( + self._serialized_callable_factory, self._config) + + return Pipeline(self) + + def call(self, input_arg: Tuple[Any]) -> Any: + if self._executor is None: + raise RuntimeError( + "Pipeline hasn't been deployed, call .deploy() first.") + args = tuple(node.call(input_arg) for node in self._incoming_edges) + return self._executor.call(*args) + + async def call_async(self): + if self._executor is None: + raise RuntimeError( + "Pipeline hasn't been deployed, call .deploy() first.") + raise NotImplementedError("No async support yet.") + + +class InputPipelineNode(PipelineNode): + def deploy(self) -> PipelineNode: + pass + + def call(self, input_arg: Tuple[Any]) -> Any: + return input_arg + + async def call_async(self, input_arg: Tuple[Any]) -> Any: + return input_arg + + +# Special node that's used to designate the input of a pipeline. +INPUT = InputPipelineNode() diff --git a/python/ray/serve/pipeline/step.py b/python/ray/serve/pipeline/step.py new file mode 100644 index 000000000..4127014d5 --- /dev/null +++ b/python/ray/serve/pipeline/step.py @@ -0,0 +1,133 @@ +from types import FunctionType +from typing import Callable, Optional, Union + +from ray.serve.pipeline.node import ExecutorPipelineNode, INPUT, PipelineNode +from ray.serve.pipeline.common import (ExecutionMode, str_to_execution_mode, + StepConfig) + + +def _validate_step_args(*args, **kwargs): + """Validate arguments passed into a step. + + Currently, these must only consist of other steps (including INPUT), and + kwargs are not supported. + """ + if len(kwargs): + raise NotImplementedError("No kwargs support yet!") + + # Check arguments for incoming edges. + for arg in args: + if isinstance(arg, PipelineStep): + raise TypeError("PipelineSteps cannot be passed in directly, " + "you need to call them with an input first. For " + "example: instead of `my_step_2(my_step_1)`, try " + "`my_step_2(my_step_1(pipeline.INPUT))`.") + + elif arg is INPUT: + if len(args) > 1: + raise ValueError( + "INPUT steps can only take a single argument.") + + elif not isinstance(arg, PipelineNode): + raise TypeError( + f"Only PipelineNodes supported as arguments, got {type(arg)}") + + +class PipelineStep: + def __init__(self, config: StepConfig): + self._config = config + + @property + def num_replicas(self) -> int: + return self._config.num_replicas + + def options(self, *args, **kwargs): + raise NotImplementedError(".options() not supported yet.") + + +class CallablePipelineStep(PipelineStep): + """A step that is ready to be used in a pipeline. + + Wraps either a function or a class and its constructor args & kwargs. + + This should be used by calling it with a number of other pipeline steps + as its arguments (or the INPUT step). + """ + + def __init__(self, callable_factory: Callable[[], Callable], + config: StepConfig): + super().__init__(config) + assert callable(callable_factory) + self._callable_factory = callable_factory + + def options(self, *args, **kwargs): + raise NotImplementedError("No options yet!") + + def __call__(self, *args, **kwargs): + _validate_step_args(*args, **kwargs) + return ExecutorPipelineNode( + self._callable_factory, self._config, incoming_edges=args) + + +class UninstantiatedClassPipelineStep(PipelineStep): + """Represents a class step whose constructor has not been initialized. + + This must be called with constructor args & kwargs to return an + InstantiatedClassPipelineStep before it can actually be used. + """ + + def __init__(self, _class: Callable, config: StepConfig): + super().__init__(config) + self._class = _class + + def options(self, *args, **kwargs): + raise NotImplementedError("No options yet!") + + def __call__(self, *args, **kwargs): + return CallablePipelineStep(lambda: self._class(*args, **kwargs), + self._config) + + +def step(_func_or_class: Optional[Callable] = None, + execution_mode: Union[ExecutionMode, str] = ExecutionMode.LOCAL, + num_replicas: int = 1) -> Callable[[Callable], PipelineStep]: + """Decorator used to define a pipeline step. + + Args: + execution_mode(ExecutionMode): The execution mode for this step. + Supported modes: + - ExecutionMode.LOCAL (default): executes this step inline in + the calling process. + - ExecutionMode.TASKS: executes this step in Ray tasks. + - ExecutionMode.ACTORS: executes this step in Ray actors. + num_replicas (int): The number of Ray actors to start that + will run this step (efaults to 1). Only valid when using + ExecutionMode.ACTORS. + + Example: + + >>> @pipeline.step(num_replicas=10) + def my_step(*args): + pass + + Returns: + PipelineStep + """ + + if isinstance(execution_mode, str): + execution_mode = str_to_execution_mode(execution_mode) + elif not isinstance(execution_mode, ExecutionMode): + raise TypeError("execution_mode must be an ExecutionMode or str.") + + config = StepConfig( + execution_mode=execution_mode, num_replicas=num_replicas) + + def decorator(_func_or_class): + if isinstance(_func_or_class, FunctionType): + return CallablePipelineStep(lambda: _func_or_class, config) + else: + return UninstantiatedClassPipelineStep(_func_or_class, config) + + # This handles both parametrized and non-parametrized usage of the + # decorator. See the @serve.batch code for more details. + return decorator(_func_or_class) if callable(_func_or_class) else decorator diff --git a/python/ray/serve/pipeline/test_utils.py b/python/ray/serve/pipeline/test_utils.py new file mode 100644 index 000000000..9af5e8e7e --- /dev/null +++ b/python/ray/serve/pipeline/test_utils.py @@ -0,0 +1,28 @@ +import os +from functools import wraps + +import pytest + +from ray.serve.pipeline import ExecutionMode + +LOCAL_EXECUTION_ONLY = os.environ.get("LOCAL_EXECUTION_ONLY") not in [ + None, "0" +] + + +def enable_local_execution_mode_only(f): + """Convenience decorator to enable local execution-only testing. + + This should be used to wrap pytest functions that take an "execution_mode" + parameter as their *first* argument. If local-only testing is enabled, + other cases will be skipped. + """ + + @wraps(f) + def wrapper(execution_mode: ExecutionMode, *args, **kwargs): + if execution_mode != ExecutionMode.LOCAL and LOCAL_EXECUTION_ONLY: + pytest.skip("local execution-only testing enabled") + else: + return f(execution_mode, *args, **kwargs) + + return wrapper diff --git a/python/ray/serve/pipeline/tests/test_actor_executor.py b/python/ray/serve/pipeline/tests/test_actor_executor.py new file mode 100644 index 000000000..bba7f87d3 --- /dev/null +++ b/python/ray/serve/pipeline/tests/test_actor_executor.py @@ -0,0 +1,18 @@ +import os + +from ray.serve import pipeline + + +def test_num_replicas(shared_ray_instance): + @pipeline.step(execution_mode="ACTORS", num_replicas=3) + def get_pid(arg): + return os.getpid() + + get_pid = get_pid(pipeline.INPUT).deploy() + assert len({get_pid.call("") for _ in range(100)}) == 3 + + +if __name__ == "__main__": + import sys + import pytest + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/pipeline/tests/test_pipeline.py b/python/ray/serve/pipeline/tests/test_pipeline.py new file mode 100644 index 000000000..f337e9028 --- /dev/null +++ b/python/ray/serve/pipeline/tests/test_pipeline.py @@ -0,0 +1,149 @@ +import sys +import tempfile + +import pytest + +from ray.serve import pipeline +from ray.serve.pipeline.test_utils import enable_local_execution_mode_only + +ALL_EXECUTION_MODES = list(pipeline.ExecutionMode) + + +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_basic_sequential(execution_mode, shared_ray_instance): + @pipeline.step(execution_mode=execution_mode) + def step1(input_arg: str): + assert isinstance(input_arg, str) + return input_arg + "|step1" + + @pipeline.step(execution_mode=execution_mode) + def step2(input_arg: str): + assert isinstance(input_arg, str) + return input_arg + "|step2" + + sequential = step2(step1(pipeline.INPUT)).deploy() + assert sequential.call("HELLO") == "HELLO|step1|step2" + + +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_basic_parallel(execution_mode, shared_ray_instance): + @pipeline.step(execution_mode=execution_mode) + def step1(input_arg: str): + return input_arg + + @pipeline.step(execution_mode=execution_mode) + def step2_1(input_arg: str): + return f"step2_1_{input_arg}" + + @pipeline.step(execution_mode=execution_mode) + def step2_2(input_arg: str): + return f"step2_2_{input_arg}" + + @pipeline.step(execution_mode=execution_mode) + def step3(step2_1_output: str, step2_2_output: str): + return f"{step2_1_output}|{step2_2_output}" + + step1_output = step1(pipeline.INPUT) + parallel = step3(step2_1(step1_output), step2_2(step1_output)).deploy() + assert parallel.call("HELLO") == "step2_1_HELLO|step2_2_HELLO" + + +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_multiple_inputs(execution_mode, shared_ray_instance): + @pipeline.step(execution_mode=execution_mode) + def step1(input_arg: str): + return f"step1_{input_arg}" + + @pipeline.step(execution_mode=execution_mode) + def step2(input_arg: str): + return f"step2_{input_arg}" + + @pipeline.step(execution_mode=execution_mode) + def step3(step1_output: str, step2_output: str): + return f"{step1_output}|{step2_output}" + + multiple_inputs = step3(step1(pipeline.INPUT), + step2(pipeline.INPUT)).deploy() + assert multiple_inputs.call("HELLO") == "step1_HELLO|step2_HELLO" + + +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_basic_class(execution_mode, shared_ray_instance): + @pipeline.step(execution_mode=execution_mode) + class GreeterStep: + def __init__(self, greeting: str): + self._greeting = greeting + + def __call__(self, name: str): + return f"{self._greeting} {name}!" + + greeter = GreeterStep("Top of the morning")(pipeline.INPUT).deploy() + assert greeter.call("Theodore") == "Top of the morning Theodore!" + + +@pytest.mark.skipif(sys.platform == "win32", reason="File handling.") +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_class_constructor_not_called_until_deployed(execution_mode, + shared_ray_instance): + """Constructor should only be called after .deploy().""" + + with tempfile.NamedTemporaryFile("w") as tmp: + + @pipeline.step(execution_mode=execution_mode) + class FileWriter: + def __init__(self, tmpfile: str, msg: str): + with open(tmpfile, "w") as f: + f.write(msg) + f.flush() + + def __call__(self, arg: str): + return arg + + msg = "hello" + + def constructor_called(): + with open(tmp.name, "r") as f: + return f.read() == msg + + file_writer = FileWriter(tmp.name, msg) + assert not constructor_called() + + writer_pipeline = file_writer(pipeline.INPUT) + assert not constructor_called() + + assert writer_pipeline.deploy().call("hi") == "hi" + assert constructor_called() + + +@pytest.mark.parametrize("execution_mode", ALL_EXECUTION_MODES) +@enable_local_execution_mode_only +def test_mix_classes_and_functions(execution_mode, shared_ray_instance): + @pipeline.step(execution_mode=execution_mode) + class GreeterStep1: + def __init__(self, greeting: str): + self._greeting = greeting + + def __call__(self, name: str): + return f"{self._greeting} {name}!" + + @pipeline.step(execution_mode=execution_mode) + def greeter_step_2(name: str): + return f"How's it hanging, {name}?" + + @pipeline.step(execution_mode=execution_mode) + def combiner(greeting1: str, greeting2: str): + return f"{greeting1}|{greeting2}" + + greeter = combiner( + GreeterStep1("Howdy")(pipeline.INPUT), + greeter_step_2(pipeline.INPUT)).deploy() + assert greeter.call("Teddy") == "Howdy Teddy!|How's it hanging, Teddy?" + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/pipeline/tests/test_step.py b/python/ray/serve/pipeline/tests/test_step.py new file mode 100644 index 000000000..34462c823 --- /dev/null +++ b/python/ray/serve/pipeline/tests/test_step.py @@ -0,0 +1,95 @@ +import pytest + +from ray.serve import pipeline +from ray.serve.pipeline import ExecutionMode +from ray.serve.pipeline.step import PipelineStep + + +def test_decorator_no_args(): + @pipeline.step + def f(): + pass + + assert isinstance(f, PipelineStep) + assert f.num_replicas == 1 + + @pipeline.step + class A: + pass + + assert isinstance(A, PipelineStep) + assert A.num_replicas == 1 + + +def test_decorator_with_arg(): + @pipeline.step(num_replicas=2) + def f(): + pass + + assert isinstance(f, PipelineStep) + assert f.num_replicas == 2 + + @pipeline.step(num_replicas=5) + class A: + pass + + assert isinstance(A, PipelineStep) + assert A.num_replicas == 5 + + +def test_pass_step_without_calling(): + @pipeline.step + def step1(): + pass + + @pipeline.step + def step2(): + pass + + step2(step1(pipeline.INPUT)) + with pytest.raises(TypeError): + step2(step1) + + +def test_input_step_multiple_args_rejected(): + @pipeline.step + def step1(): + pass + + @pipeline.step + def step2(): + pass + + step1(pipeline.INPUT) + with pytest.raises(ValueError): + step1(pipeline.INPUT, step2(pipeline.INPUT)) + + +@pytest.mark.parametrize("execution_mode", [(ExecutionMode.LOCAL, "LOCAL"), + (ExecutionMode.TASKS, "TASKS"), + (ExecutionMode.ACTORS, "ACTORS")]) +def test_execution_mode_validation(execution_mode): + mode_enum, mode_str = execution_mode + + @pipeline.step(execution_mode=mode_enum) + def f1(): + pass + + @pipeline.step(execution_mode=mode_str) + def f2(): + pass + + @pipeline.step(execution_mode=mode_str.lower()) + def f3(): + pass + + with pytest.raises(TypeError): + + @pipeline.step(execution_mode=123) + def f4(): + pass + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", "-s", __file__]))