mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Polish workflows doc, add semantics and best practices for sub-workflows (#18525)
This commit is contained in:
parent
ea4a22249c
commit
53a2a47655
3 changed files with 48 additions and 15 deletions
|
@ -56,7 +56,7 @@ Readonly methods are not only lower overhead since they skip action logging, but
|
|||
Launching sub-workflows from actor methods
|
||||
------------------------------------------
|
||||
|
||||
In side virtual actor method, a workflow can be launched. Besides this, a workflow can also be passed to actor method:
|
||||
Inside virtual actor methods, sub-workflow involving other methods of the virtual actor can be launched. These sub-workflows can also include workflow steps defined outside the actor class, for example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -88,7 +88,46 @@ In side virtual actor method, a workflow can be launched. Besides this, a workfl
|
|||
assert handler.double.run(False) == 2
|
||||
assert handler.double.run(True) == 2
|
||||
assert handler.double.run(True) == 4
|
||||
|
||||
|
||||
Actor method ordering
|
||||
---------------------
|
||||
|
||||
Workflow virtual actors provide similar ordering guarantees as Ray actors: the methods will be executed in the same order as they are submitted, provided they are submitted from the same thread. This applies both to ``.run()`` (trivially true) and ``.run_async()```, and is also guaranteed to hold under cluster failures. Hence, you can use actor methods as a short-lived queue of work to process for the actor.
|
||||
|
||||
When an actor method launches a sub-workflow, that entire sub-workflow will be run as part of the actor method step. This means all steps of the sub-workflow will be guaranteed to complete before any other queued actor method calls are run. However, note that the sub-workflow is not transactional, that is, read-only methods can read intermediate actor state written by steps of the sub-workflow.
|
||||
|
||||
Long-lived sub-workflows
|
||||
------------------------
|
||||
|
||||
We do not recommend running long-lived workflows as sub-workflows of a virtual actor. This is because sub-workflows block future actor methods calls from executing while they are running. Instead, you can launch a *separate* workflow and track its execution using workflow API methods. By generating the workflow id deterministically (ensuring idempotency), no duplicate workflows will be launched even if there is a failure.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Long-lived sub-workflow (bad).
|
||||
|
||||
@workflow.virtual_actor
|
||||
class ShoppingCart:
|
||||
...
|
||||
# BAD: blocks until shipping completes, which could be
|
||||
# slow. Until that workflow finishes, no mutating methods
|
||||
# can be called on this actor.
|
||||
def do_checkout():
|
||||
# Run shipping workflow as sub-workflow of this method.
|
||||
return ship_items.step(self.items)
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Launching separate workflows (good).
|
||||
|
||||
@workflow.virtual_actor
|
||||
class ShoppingCart:
|
||||
...
|
||||
# GOOD: the checkout method is non-blocking, and the shipment
|
||||
# status can be monitored via ``self.shipment_workflow_id``.
|
||||
def do_checkout():
|
||||
# Deterministically generate a workflow id for idempotency.
|
||||
self.shipment_workflow_id = "ship_{}".format(self.order_id)
|
||||
# Run shipping workflow as a separate async workflow.
|
||||
ship_items.step(self.items).run_async(
|
||||
workflow_id=self.shipment_workflow_id)
|
||||
|
||||
Receiving external events
|
||||
-------------------------
|
||||
|
|
|
@ -64,21 +64,21 @@ As seen in the example above, workflow steps can be composed by passing ``Workfl
|
|||
|
||||
Here we can see though ``get_val1.step()`` returns a ``Workflow[int]``, when passed to the ``add`` step, the ``add`` function will see its resolved value.
|
||||
|
||||
Retrieve results
|
||||
-------------------------
|
||||
Retrieving results
|
||||
------------------
|
||||
|
||||
Workflow result can be retrieved, by ``workflow.get_output(workflow_id)`` and the return value will be ``ObjectRef[T]``. The function will return even the underline workflow is still execution. For example:
|
||||
Workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``. If the workflow has not yet completed, calling ``ray.get()`` on the returned reference will block until the result is computed. For example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
assert ray.get(workflow.get_output("add_example")) == 30
|
||||
|
||||
We can retrieve the results for steps too with named steps. A step can be named in two ways,
|
||||
We can retrieve the results for individual workflow steps too with *named steps*. A step can be named in two ways:
|
||||
|
||||
1) via ``.options(name="step_name")``
|
||||
2) via decorator ``@workflow.step(name="step_name"``.
|
||||
2) via decorator ``@workflow.step(name="step_name"``)
|
||||
|
||||
Once a step is given a name, the result of the step will be able to be retrived via ``workflow.get_output(workflow_id, name="step_name")``. The return value will be either an ``ObjectRef[T]`` or throw an exception if the step hasn't been executed. Here are some examples:
|
||||
Once a step is given a name, the result of the step will be retrievable via ``workflow.get_output(workflow_id, name="step_name")``. If the step with the given name hasn't been executed yet, an exception will be thrown. Here are some examples:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -96,7 +96,7 @@ Once a step is given a name, the result of the step will be able to be retrived
|
|||
assert ray.get(outer) == 4
|
||||
assert ray.get(result) == 4
|
||||
|
||||
If there are multiple steps with the same name, the suffix with a counter ``_n`` will be added automatically. For example,
|
||||
If there are multiple steps with the same name, a suffix with a counter ``_n`` will be added automatically. For example,
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
|
|
@ -117,12 +117,6 @@ Virtual actors have their state durably logged to workflow storage. This enables
|
|||
self.count += 1
|
||||
return self.count
|
||||
|
||||
def __getstate__(self):
|
||||
return self.count
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.count = state
|
||||
|
||||
workflow.init(storage="/tmp/data")
|
||||
c1 = Counter.get_or_create("counter_1")
|
||||
assert c1.incr.run() == 1
|
||||
|
|
Loading…
Add table
Reference in a new issue