[Workflow] Workflow tail recursion optimization (#19928)

* tail recursion optimization
This commit is contained in:
Siyuan (Ryans) Zhuang 2021-11-12 09:13:40 -08:00 committed by GitHub
parent b6bd4fd5f3
commit 3b62388a9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 160 additions and 20 deletions

View file

@ -298,6 +298,7 @@ Papers
workflows/actors.rst
workflows/events.rst
workflows/comparison.rst
workflows/advanced.rst
workflows/package-ref.rst
.. toctree::

View file

@ -0,0 +1,40 @@
Advanced Topics
===============
Inplace Execution
-----------------
When executing a workflow step inside another workflow step, it is usually executed in another Ray worker process. This is good for resource and performance isolation, but at the cost of lower efficiency due to non-locality, scheduling and data transfer.
For example, this recursive workflow calculates the exponent. We write it with workflow so that we can recover from any step. However, it is really inefficient to scheduling each step in a different worker.
.. code-block:: python
:caption: Workflow without inplace execution:
@workflow.step
def exp_remote(k, n):
if n == 0:
return k
return exp_remote.step(2 * k, n - 1)
We could optimize it with inplace option:
.. code-block:: python
:caption: Workflow with inplace execution:
def exp_inplace(k, n, worker_id=None):
if n == 0:
return k
return exp_inplace.options(allow_inplace=True).step(
2 * k, n - 1, worker_id)
With ``allow_inplace=True``, the step that called ``.step()`` executes in the function. Ray options are ignored because they are used for remote execution. Also, you cannot retrieve the output of an inplace step using ``workflow.get_output()`` before it finishes execution.
Inplace is also useful when you need to pass something that is only valid in the current process/physical machine to another step. For example:
.. code-block:: python
@workflow.step
def Foo():
x = "<something that is only valid in the current process>"
return Bar.options(allow_inplace=True).step(x)

View file

@ -105,12 +105,14 @@ def step(*args, **kwargs):
catch_exceptions = kwargs.pop("catch_exceptions", None)
name = kwargs.pop("name", None)
metadata = kwargs.pop("metadata", None)
allow_inplace = kwargs.pop("allow_inplace", False)
ray_options = kwargs
options = WorkflowStepRuntimeOptions.make(
step_type=StepType.FUNCTION,
catch_exceptions=catch_exceptions,
max_retries=max_retries,
allow_inplace=allow_inplace,
ray_options=ray_options)
return make_step_decorator(options, name, metadata)

View file

@ -123,6 +123,8 @@ class WorkflowStepRuntimeOptions:
catch_exceptions: bool
# The num of retry for application exception.
max_retries: int
# Run the workflow step inplace.
allow_inplace: bool
# ray_remote options
ray_options: Dict[str, Any]
@ -132,6 +134,7 @@ class WorkflowStepRuntimeOptions:
step_type,
catch_exceptions=None,
max_retries=None,
allow_inplace=False,
ray_options=None):
if max_retries is None:
max_retries = 3
@ -149,6 +152,7 @@ class WorkflowStepRuntimeOptions:
step_type=step_type,
catch_exceptions=catch_exceptions,
max_retries=max_retries,
allow_inplace=allow_inplace,
ray_options=ray_options)
def to_dict(self) -> Dict[str, Any]:
@ -156,6 +160,7 @@ class WorkflowStepRuntimeOptions:
"step_type": self.step_type,
"max_retries": self.max_retries,
"catch_exceptions": self.catch_exceptions,
"allow_inplace": self.allow_inplace,
"ray_options": self.ray_options,
}
@ -165,6 +170,7 @@ class WorkflowStepRuntimeOptions:
step_type=StepType[value["step_type"]],
max_retries=value["max_retries"],
catch_exceptions=value["catch_exceptions"],
allow_inplace=value["allow_inplace"],
ray_options=value["ray_options"],
)

View file

@ -140,6 +140,9 @@ def _resolve_step_inputs(
def execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult":
"""Execute workflow.
Args:
workflow: The workflow to be executed.
Returns:
An object ref that represent the result.
"""
@ -149,16 +152,36 @@ def execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult":
baked_inputs = _BakedWorkflowInputs.from_workflow_inputs(
workflow_data.inputs)
step_options = workflow_data.step_options
persisted_output, volatile_output = _workflow_step_executor.options(
**step_options.ray_options).remote(
workflow_data.func_body,
workflow_context.get_workflow_step_context(), workflow.step_id,
baked_inputs, workflow_data.step_options)
if step_options.allow_inplace:
# TODO(suquark): For inplace execution, it is impossible
# to get the ObjectRef of the output before execution.
# Here we use a dummy ObjectRef, because _record_step_status does not
# even use it (?!).
_record_step_status(workflow.step_id, WorkflowStatus.RUNNING,
[ray.put(None)])
# Note: we need to be careful about workflow context when
# calling the executor directly.
# TODO(suquark): We still have recursive Python calls.
# This would cause stack overflow if we have a really
# deep recursive call. We should fix it later.
executor = _workflow_step_executor
else:
executor = _workflow_step_executor_remote.options(
**step_options.ray_options).remote
persisted_output, volatile_output = executor(
workflow_data.func_body, workflow_context.get_workflow_step_context(),
workflow.step_id, baked_inputs, workflow_data.step_options)
if not isinstance(persisted_output, WorkflowOutputType):
raise TypeError("Unexpected return type of the workflow.")
persisted_output = ray.put(persisted_output)
if not isinstance(persisted_output, WorkflowOutputType):
volatile_output = ray.put(volatile_output)
if step_options.step_type != StepType.READONLY_ACTOR_METHOD:
if not step_options.allow_inplace:
# TODO: [Possible flaky bug] Here the RUNNING state may
# be recorded earlier than SUCCESSFUL. This caused some
# confusion during development.
_record_step_status(workflow.step_id, WorkflowStatus.RUNNING,
[volatile_output])
@ -309,11 +332,10 @@ def _wrap_run(func: Callable, runtime_options: "WorkflowStepRuntimeOptions",
return persisted_output, volatile_output
@ray.remote(num_returns=2)
def _workflow_step_executor(
func: Callable, context: "WorkflowStepContext", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions") -> Any:
runtime_options: "WorkflowStepRuntimeOptions") -> Tuple[Any, Any]:
"""Executor function for workflow step.
Args:
@ -391,6 +413,16 @@ def _workflow_step_executor(
return persisted_output, volatile_output
@ray.remote(num_returns=2)
def _workflow_step_executor_remote(
func: Callable, context: "WorkflowStepContext", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions") -> Any:
"""The remote version of '_workflow_step_executor'."""
return _workflow_step_executor(func, context, step_id, baked_inputs,
runtime_options)
@dataclass
class _BakedWorkflowInputs:
"""This class stores pre-processed inputs for workflow step execution.

View file

@ -71,6 +71,7 @@ class WorkflowStepFunction:
catch_exceptions: bool = False,
name: str = None,
metadata: Dict[str, Any] = None,
allow_inplace: bool = False,
**ray_options) -> "WorkflowStepFunction":
"""This function set how the step function is going to be executed.
@ -87,6 +88,7 @@ class WorkflowStepFunction:
directly as the step id if possible, otherwise deduplicated by
appending .N suffixes.
metadata: metadata to add to the step.
allow_inplace: Execute the workflow step inplace.
**ray_options: All parameters in this fields will be passed
to ray remote function options.
@ -100,6 +102,7 @@ class WorkflowStepFunction:
step_type=StepType.FUNCTION,
catch_exceptions=catch_exceptions,
max_retries=max_retries,
allow_inplace=allow_inplace,
ray_options=ray_options,
)
return WorkflowStepFunction(

View file

@ -0,0 +1,65 @@
from ray.tests.conftest import * # noqa
import pytest
from ray import workflow
@workflow.step
def check_and_update(x, worker_id):
from ray.worker import global_worker
_worker_id = global_worker.worker_id
if worker_id == _worker_id:
return x + "0"
return x + "1"
@workflow.step
def inplace_test():
from ray.worker import global_worker
worker_id = global_worker.worker_id
x = check_and_update.options(allow_inplace=True).step("@", worker_id)
y = check_and_update.step(x, worker_id)
z = check_and_update.options(allow_inplace=True).step(y, worker_id)
return z
@workflow.step
def exp_inplace(k, n, worker_id=None):
from ray.worker import global_worker
_worker_id = global_worker.worker_id
if worker_id is not None:
# sub-workflows running inplace
assert _worker_id == worker_id
worker_id = _worker_id
if n == 0:
return k
return exp_inplace.options(allow_inplace=True).step(
2 * k, n - 1, worker_id)
@workflow.step
def exp_remote(k, n, worker_id=None):
from ray.worker import global_worker
_worker_id = global_worker.worker_id
if worker_id is not None:
# sub-workflows running in another worker
assert _worker_id != worker_id
worker_id = _worker_id
if n == 0:
return k
return exp_remote.step(2 * k, n - 1, worker_id)
def test_inplace_workflows(workflow_start_regular_shared):
assert inplace_test.step().run() == "@010"
k, n = 12, 10
assert exp_inplace.step(k, n).run() == k * 2**n
assert exp_remote.step(k, n).run() == k * 2**n
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -135,11 +135,7 @@ def test_workflow_storage(workflow_start_regular):
wf_storage = workflow_storage.WorkflowStorage(workflow_id,
storage.get_global_storage())
step_id = "some_step"
step_options = WorkflowStepRuntimeOptions(
step_type=StepType.FUNCTION,
catch_exceptions=False,
max_retries=1,
ray_options={})
step_options = WorkflowStepRuntimeOptions.make(step_type=StepType.FUNCTION)
input_metadata = {
"name": "test_basic_workflows.append1",
"workflows": ["def"],
@ -232,11 +228,6 @@ def test_workflow_storage(workflow_start_regular):
wf_storage._key_step_function_body(step_id), some_func))
asyncio_run(wf_storage._put(wf_storage._key_step_args(step_id), args))
inspect_result = wf_storage.inspect_step(step_id)
step_options = WorkflowStepRuntimeOptions(
step_type=StepType.FUNCTION,
catch_exceptions=False,
max_retries=1,
ray_options={})
assert inspect_result == workflow_storage.StepInspectResult(
args_valid=True,
func_body_valid=True,