mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[workflow][doc] First pass of workflow doc. (#27331)
Signed-off-by: Yi Cheng 74173148+iycheng@users.noreply.github.com Why are these changes needed? This PR update workflow doc to reflect the recent change. Focusing on position change and others.
This commit is contained in:
parent
61880591e9
commit
2262ac02f3
13 changed files with 350 additions and 344 deletions
|
@ -219,9 +219,10 @@ parts:
|
|||
- file: ray-more-libs/multiprocessing
|
||||
- file: ray-more-libs/ray-collective
|
||||
- file: ray-core/examples/using-ray-with-pytorch-lightning
|
||||
- file: workflows/concepts
|
||||
title: Ray Workflows
|
||||
- file: workflows/index
|
||||
title: Ray Workflows (Alpha)
|
||||
sections:
|
||||
- file: workflows/key-concepts
|
||||
- file: workflows/basics
|
||||
- file: workflows/management
|
||||
- file: workflows/metadata
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
.. _ray-dag-guide:
|
||||
|
||||
Building Computation Graphs with Ray DAG API
|
||||
Lazy Computation Graphs with the Ray DAG API
|
||||
============================================
|
||||
|
||||
With ``ray.remote`` you have the flexibility of running an application where
|
||||
|
@ -90,4 +90,4 @@ from other Ray libraries built on top of Ray DAG API with the same mechanism.
|
|||
|
||||
| `Visualization of DAGs <https://docs.ray.io/en/master/serve/model_composition.html#visualizing-the-graph>`_
|
||||
| `DAG Cookbook and patterns <https://docs.ray.io/en/master/serve/tutorials/deployment-graph-patterns.html>`_
|
||||
| `Serve Deployment Graph's original REP <https://github.com/ray-project/enhancements/blob/main/reps/2022-03-08-serve_pipeline.md>`_
|
||||
| `Serve Deployment Graph's original REP <https://github.com/ray-project/enhancements/blob/main/reps/2022-03-08-serve_pipeline.md>`_
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
Advanced Topics
|
||||
===============
|
||||
|
||||
Workflow task Checkpointing
|
||||
---------------------------
|
||||
Skipping Checkpoints
|
||||
--------------------
|
||||
|
||||
Ray Workflow provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.
|
||||
Ray Workflows provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.
|
||||
|
||||
|
||||
We control the checkpoints by specify the checkpoint options like this:
|
||||
Checkpoints can be skipped by specifying ``checkpoint=False``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -15,19 +14,17 @@ We control the checkpoints by specify the checkpoint options like this:
|
|||
|
||||
This example skips checkpointing the output of ``read_data``. During recovery, ``read_data`` would be executed again if recovery requires its output.
|
||||
|
||||
By default, we have ``checkpoint=True`` if not specified.
|
||||
If the output of a task is another task (i.e., for dynamic workflows), we skip checkpointing the entire task.
|
||||
|
||||
If the output of a task is another task (i.e. dynamic workflows), we skips checkpointing the entire task.
|
||||
Use Workflows with Ray Client
|
||||
-----------------------------
|
||||
|
||||
Use Workflow with Ray Client
|
||||
----------------------------
|
||||
|
||||
Ray Workflow supports :ref:`Ray Client API <ray-client-ref>`, so you can submit workflows to a remote
|
||||
Ray Workflows supports :ref:`Ray Client API <ray-client-ref>`, so you can submit workflows to a remote
|
||||
Ray cluster. This requires starting the Ray cluster with the ``--storage=<storage_uri>`` option
|
||||
for specifying the workflow storage.
|
||||
|
||||
To submit a workflow to a remote cluster, All you need is connecting Ray to the cluster before
|
||||
submitting a workflow. No code changes are required for Ray Workflow afterwards. For example:
|
||||
To submit a workflow to a remote cluster, all you need is connect Ray to the cluster before
|
||||
submitting a workflow. No code changes are required. For example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -51,5 +48,5 @@ submitting a workflow. No code changes are required for Ray Workflow afterwards.
|
|||
.. warning::
|
||||
|
||||
Ray client support is still experimental and has some limitations. One known limitation is that
|
||||
Ray Workflow would not work properly with ObjjectRefs as workflow task inputs. For example,
|
||||
workflows will not work properly with ObjectRefs as workflow task inputs. For example,
|
||||
``workflow.run(task.bind(ray.put(123)))``.
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
Workflow Basics
|
||||
Getting Started
|
||||
===============
|
||||
|
||||
If you’re brand new to Ray, we recommend starting with the :ref:`walkthrough <core-walkthrough>`.
|
||||
.. note::
|
||||
Workflows is a library that provides strong durability for Ray task graphs.
|
||||
If you’re brand new to Ray, we recommend starting with the :ref:`core walkthrough <core-walkthrough>` instead.
|
||||
|
||||
Ray DAG
|
||||
-------
|
||||
Your first workflow
|
||||
-------------------
|
||||
|
||||
Normally, Ray tasks are executed eagerly.
|
||||
Ray DAG provides a way to build the DAG without execution, and Ray Workflow is based on Ray DAGs.
|
||||
|
||||
It is simple to build a Ray DAG: you just replace all ``.remote(...)`` with ``.bind(...)`` in a Ray application.
|
||||
Ray DAGs can be composed in arbitrarily like normal Ray tasks.
|
||||
|
||||
Here is a single three-node DAG:
|
||||
Let's start by defining a simple workflow DAG, which we'll use for the below example.
|
||||
Here is a single three-node DAG (note the use of ``.bind(...)`` instead of
|
||||
``.remote(...)``). The DAG will not be executed until further actions are
|
||||
taken on it:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -32,18 +31,20 @@ Here is a single three-node DAG:
|
|||
def aggregate(data: List[float]) -> float:
|
||||
return sum(data)
|
||||
|
||||
# Build the DAG.
|
||||
# Build the DAG:
|
||||
# data -> preprocessed_data -> aggregate
|
||||
data = read_data.bind(10)
|
||||
preprocessed_data = preprocessing.bind(data)
|
||||
output = aggregate.bind(preprocessed_data)
|
||||
|
||||
|
||||
The Ray DAG will not be executed until further actions are taken on it.
|
||||
We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``:
|
||||
|
||||
Your first workflow
|
||||
-------------------
|
||||
.. image:: basic.png
|
||||
:width: 500px
|
||||
:align: center
|
||||
|
||||
A single line is all you need to run the workflow DAG:
|
||||
Next, let's execute the DAG we defined and inspect the result:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -53,33 +54,30 @@ A single line is all you need to run the workflow DAG:
|
|||
# Execute the workflow and print the result.
|
||||
print(workflow.run(output))
|
||||
|
||||
# You can also run the workflow asynchronously and fetching the output via 'ray.get'
|
||||
# You can also run the workflow asynchronously and fetch the output via
|
||||
# 'ray.get'
|
||||
output_ref = workflow.run_async(dag)
|
||||
print(ray.get(output_ref))
|
||||
|
||||
Here this figure visualizes the workflow we created:
|
||||
|
||||
.. image:: basic.png
|
||||
:width: 500px
|
||||
:align: center
|
||||
|
||||
Each node in the original DAG becomes a workflow task.
|
||||
Workflow tasks behave similarly to Ray tasks. They are executed in a parallel and distributed way.
|
||||
|
||||
Each node in the original DAG becomes a workflow task. You can think of workflow
|
||||
tasks as wrappers around Ray tasks that insert *checkpointing logic* to
|
||||
ensure intermediate results are durably persisted. This enables workflow DAGs to
|
||||
always resume from the last successful task on failure.
|
||||
|
||||
Setting workflow options
|
||||
------------------------
|
||||
|
||||
You can directly set Ray options to a workflow task just like to a normal
|
||||
Ray remote function. To set workflow-specific options, you can use ``workflow.options``
|
||||
either as a decorator or as a option feeding dictionary:
|
||||
You can directly set Ray options to a workflow task just like a normal
|
||||
Ray remote function. To set workflow-specific options, use ``workflow.options``
|
||||
either as a decorator or as kwargs to ``<task>.options``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
from ray import workflow
|
||||
|
||||
@workflow.options(checkpoint=False)
|
||||
@workflow.options(max_retries=5)
|
||||
@ray.remote(num_cpus=2, num_gpus=3)
|
||||
def read_data(num: int):
|
||||
return [i for i in range(num)]
|
||||
|
@ -88,10 +86,10 @@ either as a decorator or as a option feeding dictionary:
|
|||
num_cpus=1, num_gpus=1, **workflow.options(checkpoint=True))
|
||||
|
||||
|
||||
Retrieving results
|
||||
------------------
|
||||
Retrieving Workflow Results
|
||||
---------------------------
|
||||
|
||||
To retrieve a workflow result, you can assign ``workflow_id`` when running a workflow:
|
||||
To retrieve a workflow result, assign ``workflow_id`` when running a workflow:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -99,9 +97,10 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor
|
|||
from ray import workflow
|
||||
|
||||
try:
|
||||
# cleanup previous workflows
|
||||
# Cleanup previous workflows
|
||||
# An exception will be raised if it doesn't exist.
|
||||
workflow.delete("add_example")
|
||||
except Exception:
|
||||
except workflow.WorkflowNotFoundError:
|
||||
pass
|
||||
|
||||
@ray.remote
|
||||
|
@ -116,24 +115,27 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor
|
|||
|
||||
assert workflow.run(ret, workflow_id="add_example") == 30
|
||||
|
||||
Then workflow results can be retrieved with ``workflow.get_output(workflow_id)``.
|
||||
If a workflow is not given ``workflow_id``, a random string is set as the ``workflow_id``. To confirm ``workflow_id`` in the situation, call ``ray.workflow.list_all()``.
|
||||
The workflow results can be retrieved with
|
||||
``workflow.get_output(workflow_id)``. If a workflow is not given a
|
||||
``workflow_id``, a random string is set as the ``workflow_id``. To list all
|
||||
workflow ids, call ``ray.workflow.list_all()``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
assert workflow.get_output("add_example") == 30
|
||||
# "workflow.get_output_async" is an asynchronous version
|
||||
|
||||
Sub-Task Results
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
We can retrieve the results for individual workflow tasks too with *named tasks*. A task can be named in two ways:
|
||||
|
||||
1) via ``.options(**workflow.options(name="task_name"))``
|
||||
2) via decorator ``@workflow.options(name="task_name")``
|
||||
|
||||
If tasks are not given ``task_name``, the function name of the steps is set as the ``task_name``.
|
||||
The ID of the task would be same as the name. If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.
|
||||
|
||||
The suffix with a counter ``_n`` is a sequential number (1,2,3,...) of the tasks to be executed.
|
||||
(Note that the first task does not have the suffix.)
|
||||
The ID of the task would be the same as the name. If there are multiple tasks with the same name,
|
||||
a suffix with a counter ``_n`` will be added.
|
||||
|
||||
Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_name")``.
|
||||
If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples:
|
||||
|
@ -147,7 +149,7 @@ If the task with the given ``task_id`` hasn't been executed before the workflow
|
|||
try:
|
||||
# cleanup previous workflows
|
||||
workflow.delete(workflow_id)
|
||||
except Exception:
|
||||
except workflow.WorkflowNotFoundError:
|
||||
pass
|
||||
|
||||
@ray.remote
|
||||
|
@ -165,50 +167,10 @@ If the task with the given ``task_id`` hasn't been executed before the workflow
|
|||
assert ray.get(outer) == 4
|
||||
assert ray.get(result_ref) == 4
|
||||
|
||||
|
||||
# TODO(suquark): make sure Ray DAG does not depend on Ray Serve and PyArrow.
|
||||
|
||||
For example,
|
||||
(Note that before trying the following, install Ray Serve and PyArrow ``pip install "ray[serve]" pyarrow``.)
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
from ray import workflow
|
||||
|
||||
workflow_id = "_test_task_id_generation"
|
||||
try:
|
||||
# cleanup previous workflows
|
||||
workflow.delete(workflow_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@ray.remote
|
||||
def simple(x):
|
||||
return x + 1
|
||||
|
||||
x = simple.options(**workflow.options(name="step")).bind(-1)
|
||||
n = 20
|
||||
for i in range(1, n):
|
||||
x = simple.options(**workflow.options(name="step")).bind(x)
|
||||
|
||||
ret = workflow.run_async(x, workflow_id=workflow_id)
|
||||
outputs = [workflow.get_output_async(workflow_id, task_id="step")]
|
||||
for i in range(1, n):
|
||||
outputs.append(workflow.get_output_async(workflow_id, task_id=f"step_{i}"))
|
||||
assert ray.get(ret) == n - 1
|
||||
assert ray.get(outputs) == list(range(n))
|
||||
|
||||
ray.workflow.list_all() == [(workflow_id, workflow.WorkflowStatus.SUCCESSFUL)]
|
||||
|
||||
By default, each task will be given a name generated by the library, ``<MODULE_NAME>.<FUNC_NAME>``. In the example above, ``step`` is given as the task name for function ``simple``.
|
||||
|
||||
When the task name duplicates, we append ``_n`` to the name by the order of execution as it task ID. So the initial task get the ID ``step``, the second one get ``step_1``, and this goes on for all later tasks.
|
||||
|
||||
Error handling
|
||||
--------------
|
||||
|
||||
Workflows provides two ways to handle application-level exceptions: (1) automatic retry (as in normal Ray tasks), and (2) the ability to catch and handle exceptions.
|
||||
Workflow provides two ways to handle application-level exceptions: (1) automatic retry (as in normal Ray tasks), and (2) the ability to catch and handle exceptions.
|
||||
|
||||
- If ``max_retries`` is given, the task will be retried for the given number of times if the workflow task failed.
|
||||
- If ``retry_exceptions`` is True, then the workflow task retries both task crashes and application-level errors;
|
||||
|
@ -231,7 +193,7 @@ so they should be used inside the Ray remote decorator. Here is how you could us
|
|||
faulty_function.options(max_retries=3, retry_exceptions=False,
|
||||
**workflow.options(catch_exceptions=False))
|
||||
|
||||
.. note:: By default ``retry_exceptions`` is ``False``, ``max_retries`` is ``3``.
|
||||
.. note:: By default ``retry_exceptions`` is ``False``, and ``max_retries`` is ``3``.
|
||||
|
||||
Here is one example:
|
||||
|
||||
|
@ -270,14 +232,18 @@ Here is one example:
|
|||
Durability guarantees
|
||||
---------------------
|
||||
|
||||
Workflow tasks provide *exactly-once* execution semantics. What this means is that once the result of a workflow task is logged to durable storage, Ray guarantees the task will never be re-executed. A task that receives the output of another workflow task can be assured that its inputs tasks will never be re-executed.
|
||||
Workflow tasks provide *exactly-once* execution semantics. What this means is
|
||||
that **once the result of a workflow task is logged to durable storage, Ray
|
||||
guarantees the task will never be re-executed**. A task that receives the output
|
||||
of another workflow task can be assured that its inputs tasks will never be
|
||||
re-executed.
|
||||
|
||||
Failure model
|
||||
~~~~~~~~~~~~~
|
||||
- If the cluster fails, any workflows running on the cluster enter RESUMABLE state. The workflows can be resumed on another cluster (see the management API section).
|
||||
- If the cluster fails, any workflows running on the cluster enter ``RESUMABLE`` state. The workflows can be resumed on another cluster (see the management API section).
|
||||
- The lifetime of the workflow is not coupled with the driver. If the driver exits, the workflow will continue running in the background of the cluster.
|
||||
|
||||
Note that tasks that have side-effects still need to be idempotent. This is because the task could always fail prior to its result being logged.
|
||||
Note that tasks that have side effects still need to be idempotent. This is because the task could always fail before its result is logged.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Non-idempotent workflow:
|
||||
|
@ -313,47 +279,44 @@ Note that tasks that have side-effects still need to be idempotent. This is beca
|
|||
Dynamic workflows
|
||||
-----------------
|
||||
|
||||
Additional tasks can be dynamically created and inserted into the workflow DAG during execution.
|
||||
|
||||
This is achieved by returning a continuation of a DAG.
|
||||
|
||||
In our context, a continuation is basically a tail function call returned by a function. For example:
|
||||
Workflow tasks can be dynamically created in the runtime. In theory, Ray DAG is
|
||||
static which means a DAG node can't be returned in a DAG node. For example, the
|
||||
following code is invalid:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def bar(): ...
|
||||
|
||||
def foo_1():
|
||||
# Here we say 'foo_1()' returns a continuation.
|
||||
# The continuation is made of 'bar()'
|
||||
return bar()
|
||||
@ray.remote
|
||||
def foo():
|
||||
return bar.bind() # This is invalid since Ray DAG is static
|
||||
|
||||
def foo_2():
|
||||
# This is NOT a continuation because we do not return it.
|
||||
bar()
|
||||
ray.get(foo.bind().execute()) # This will error
|
||||
|
||||
def foo_3():
|
||||
# This is NOT a continuation because it is not a function call.
|
||||
return 42
|
||||
|
||||
Continuations can be used to implement something more complex, for example, recursions:
|
||||
Workflow introduces a utility function called ``workflow.continuation`` which
|
||||
makes Ray DAG node can return a DAG in the runtime:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def factorial(n: int) -> int:
|
||||
if n == 1:
|
||||
return 1
|
||||
else:
|
||||
return multiply(n, factorial(n - 1))
|
||||
@ray.remote
|
||||
def bar():
|
||||
return 10
|
||||
|
||||
def multiply(a: int, b: int) -> int:
|
||||
return a * b
|
||||
@ray.remote
|
||||
def foo():
|
||||
# This will return a DAG to be executed
|
||||
# after this function is finished.
|
||||
return workflow.continuation(bar.bind())
|
||||
|
||||
assert factorial(10) == 3628800
|
||||
assert ray.get(foo.bind().execute()) == 10
|
||||
assert workflow.run(foo.bind()) == 10
|
||||
|
||||
The continuation feature enables nesting, looping, and recursion within workflows.
|
||||
|
||||
The following example shows how to implement the recursive ``factorial`` program using dynamically generated tasks:
|
||||
The dynamic workflow enables nesting, looping, and recursion within workflows.
|
||||
|
||||
The following example shows how to implement the recursive ``factorial`` program
|
||||
using dynamically workflow:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -362,6 +325,8 @@ The following example shows how to implement the recursive ``factorial`` program
|
|||
if n == 1:
|
||||
return 1
|
||||
else:
|
||||
# Here a DAG is passed to the continuation.
|
||||
# The DAG will continue to be executed after this task.
|
||||
return workflow.continuation(multiply.bind(n, factorial.bind(n - 1)))
|
||||
|
||||
@ray.remote
|
||||
|
@ -369,9 +334,14 @@ The following example shows how to implement the recursive ``factorial`` program
|
|||
return a * b
|
||||
|
||||
assert workflow.run(factorial.bind(10)) == 3628800
|
||||
# You can also execute the code with Ray DAG engine.
|
||||
assert ray.get(factorial.bind(10).execute()) == 3628800
|
||||
|
||||
|
||||
The key behavior to note is that when a task returns a DAG wrapped by
|
||||
``workflow.continuation`` instead of a concrete value, that wrapped DAG will be
|
||||
substituted for the task's return.
|
||||
|
||||
The key behavior to note is that when a task returns a continuation instead of a concrete value,
|
||||
that continuation will be substituted for the task's return.
|
||||
To better understand dynamic workflows, let's look at a more realistic example of booking a trip:
|
||||
|
||||
.. code-block:: python
|
||||
|
@ -399,7 +369,11 @@ To better understand dynamic workflows, let's look at a more realistic example o
|
|||
|
||||
receipt: Receipt = workflow.run(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))
|
||||
|
||||
Here the workflow initially just consists of the ``book_trip`` task. Once executed, ``book_trip`` generates tasks to book flights and hotels in parallel, which feeds into a task to decide whether to cancel the trip or finalize it. The DAG can be visualized as follows (note the dynamically generated nested workflows within ``book_trip``):
|
||||
Here the workflow initially just consists of the ``book_trip`` task. Once
|
||||
executed, ``book_trip`` generates tasks to book flights and hotels in parallel,
|
||||
which feeds into a task to decide whether to cancel the trip or finalize it. The
|
||||
DAG can be visualized as follows (note the dynamically generated nested
|
||||
workflows within ``book_trip``):
|
||||
|
||||
.. image:: trip.png
|
||||
:width: 500px
|
||||
|
@ -423,15 +397,17 @@ Workflows are compatible with Ray tasks and actors. There are two methods of usi
|
|||
|
||||
Passing nested arguments
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Like Ray tasks, when you pass a list of task outputs to a task, the values are not resolved.
|
||||
But we ensure that all ancestors of a task are fully executed prior to the task starting:
|
||||
Like Ray tasks, when you pass a list of task outputs to a task, the values are
|
||||
not resolved. But we ensure that all ancestors of a task are fully executed
|
||||
before the task starts which is different from passing them into a Ray remote
|
||||
function whether they have been executed or not is not defined.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def add(values: List[ray.ObjectRef[int]]) -> int:
|
||||
# although those value are not resolved, they have been
|
||||
# fully executed and checkpointed. This guarantees exactly-once
|
||||
# although those values are not resolved, they have been
|
||||
# *fully executed and checkpointed*. This guarantees exactly-once
|
||||
# execution semantics.
|
||||
return sum(ray.get(values))
|
||||
|
||||
|
@ -445,7 +421,11 @@ But we ensure that all ancestors of a task are fully executed prior to the task
|
|||
Passing object references between tasks
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Ray object references and data structures composed of them (e.g., ``ray.Dataset``) can be passed into and returned from workflow tasks. To ensure recoverability, their contents will be logged to durable storage. However, an object will not be checkpointed more than once, even if it is passed to many different tasks.
|
||||
Ray object references and data structures composed of them (e.g.,
|
||||
``ray.Dataset``) can be passed into and returned from workflow tasks. To ensure
|
||||
recoverability, their contents will be logged to durable storage before
|
||||
executing. However, an object will not be checkpointed more than once, even if
|
||||
it is passed to many different tasks.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -471,6 +451,6 @@ You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, `
|
|||
|
||||
@ray.remote(num_gpus=1)
|
||||
def train_model() -> Model:
|
||||
pass # This task is assigned a GPU by Ray.
|
||||
pass # This task is assigned to a GPU by Ray.
|
||||
|
||||
workflow.run(train_model.bind())
|
||||
|
|
|
@ -3,7 +3,7 @@ API Comparisons
|
|||
|
||||
Comparison between Ray Core APIs and Workflows
|
||||
----------------------------------------------
|
||||
Ray Workflow is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:
|
||||
Ray Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:
|
||||
|
||||
``func.remote`` vs ``func.bind``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
|
@ -1,160 +0,0 @@
|
|||
.. _workflows:
|
||||
|
||||
Ray Workflow: Fast, Durable Application Flows
|
||||
=============================================
|
||||
|
||||
.. warning::
|
||||
|
||||
Ray Workflow is available as **alpha** in Ray 1.7+. Expect rough corners and for its APIs and storage format to change. Please file feature requests and bug reports on GitHub Issues or join the discussion on the `Ray Slack <https://forms.gle/9TSdDYUgxYs8SA9e8>`__.
|
||||
|
||||
Ray Workflow provides high-performance, *durable* application workflows using Ray tasks as the underlying execution engine. It is intended to support both large-scale workflows (e.g., ML and data pipelines) and long-running business workflows (when used together with Ray Serve).
|
||||
|
||||
.. image:: workflows.svg
|
||||
|
||||
..
|
||||
https://docs.google.com/drawings/d/113uAs-i4YjGBNxonQBC89ns5VqL3WeQHkUOWPSpeiXk/edit
|
||||
|
||||
Why Ray Workflow?
|
||||
-----------------
|
||||
|
||||
**Flexibility:** Combine the flexibility of Ray's dynamic task graphs with strong durability guarantees. Branch or loop conditionally based on runtime data. Use Ray distributed libraries seamlessly within workflow tasks.
|
||||
|
||||
**Performance:** Ray Workflow offers sub-second overheads for task launch and supports workflows with hundreds of thousands of tasks. Take advantage of the Ray object store to pass distributed datasets between tasks with zero-copy overhead.
|
||||
|
||||
**Dependency management:** Ray Workflow leverages Ray's runtime environment feature to snapshot the code dependencies of a workflow. This enables management of workflows as code is upgraded over time.
|
||||
|
||||
You might find that Ray Workflow is *lower level* compared to engines such as `AirFlow <https://www.astronomer.io/blog/airflow-ray-data-science-story>`__ (which can also run on Ray). This is because Ray Workflow focuses more on core workflow primitives as opposed to tools and integrations.
|
||||
|
||||
Concepts
|
||||
--------
|
||||
Ray Workflow provides the durable *task* primitives, which are analogous to Ray's non-durable tasks.
|
||||
|
||||
Ray DAG
|
||||
~~~~~~~
|
||||
|
||||
If you’re brand new to Ray, we recommend starting with the :ref:`walkthrough <core-walkthrough>`.
|
||||
|
||||
Normally, Ray tasks are executed eagerly.
|
||||
Ray DAG provides a way to build the DAG without execution, and Ray Workflow is based on Ray DAGs.
|
||||
|
||||
It is simple to build a Ray DAG: you just replace all ``.remote(...)`` with ``.bind(...)`` in a Ray application.
|
||||
Ray DAGs can be composed in arbitrarily like normal Ray tasks.
|
||||
|
||||
Unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on DAGs.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Composing functions together into a DAG:
|
||||
|
||||
import ray
|
||||
|
||||
@ray.remote
|
||||
def one() -> int:
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
def add(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
dag = add.bind(100, one.bind())
|
||||
|
||||
|
||||
Workflows
|
||||
~~~~~~~~~
|
||||
|
||||
It takes a single line of code to run a workflow DAG:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Run a workflow DAG:
|
||||
|
||||
from ray import workflow
|
||||
|
||||
# Run the workflow until it completes and returns the output
|
||||
assert workflow.run(dag) == 101
|
||||
|
||||
# Or you can run it asynchronously and fetching the output via 'ray.get'
|
||||
output_ref = workflow.run_async(dag)
|
||||
assert ray.get(output_ref) == 101
|
||||
|
||||
|
||||
Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage.
|
||||
|
||||
When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Run the workflow:
|
||||
|
||||
# configure the storage with "ray.init". A default temporary storage is used by
|
||||
# by the workflow if starting without Ray init.
|
||||
ray.init(storage="/tmp/data")
|
||||
assert output.run(workflow_id="run_1") == 101
|
||||
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
|
||||
assert workflow.get_output("run_1") == 101
|
||||
|
||||
Objects
|
||||
~~~~~~~
|
||||
Large data objects can be stored in the Ray object store. References to these objects can be passed into and returned from tasks. Objects are checkpointed when initially returned from a task. After checkpointing, the object can be shared among any number of workflow tasks at memory-speed via the Ray object store.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Using Ray objects in a workflow:
|
||||
|
||||
import ray
|
||||
from typing import List
|
||||
|
||||
@ray.remote
|
||||
def hello():
|
||||
return "hello"
|
||||
|
||||
@ray.remote
|
||||
def words() -> List[ray.ObjectRef]:
|
||||
# NOTE: Here it is ".remote()" instead of ".bind()", so
|
||||
# it creates an ObjectRef instead of a DAG.
|
||||
return [hello.remote(), ray.put("world")]
|
||||
|
||||
@ray.remote
|
||||
def concat(words: List[ray.ObjectRef]) -> str:
|
||||
return " ".join([ray.get(w) for w in words])
|
||||
|
||||
assert workflow.run(concat.bind(words.bind())) == "hello world"
|
||||
|
||||
Dynamic Workflows
|
||||
~~~~~~~~~~~~~~~~~
|
||||
Workflows can generate new tasks at runtime. This is achieved by returning a continuation of a DAG.
|
||||
A continuation is something returned by a function and executed after it returns.
|
||||
The continuation feature enables nesting, looping, and recursion within workflows.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: The Fibonacci recursive workflow:
|
||||
|
||||
@ray.remote
|
||||
def add(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
@ray.remote
|
||||
def fib(n: int) -> int:
|
||||
if n <= 1:
|
||||
return n
|
||||
# return a continuation of a DAG
|
||||
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
|
||||
|
||||
assert workflow.run(fib.bind(10)) == 55
|
||||
|
||||
|
||||
Events
|
||||
~~~~~~
|
||||
Workflows can be efficiently triggered by timers or external events using the event system.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Using events.
|
||||
|
||||
# Sleep is a special type of event.
|
||||
sleep_task = workflow.sleep(100)
|
||||
|
||||
# `wait_for_events` allows for pluggable event listeners.
|
||||
event_task = workflow.wait_for_event(MyEventListener)
|
||||
|
||||
@ray.remote
|
||||
def gather(*args):
|
||||
return args
|
||||
|
||||
# If a task's arguments include events, the task won't be executed until all of the events have occured.
|
||||
workflow.run(gather.bind(sleep_task, event_task, "hello world"))
|
|
@ -1,11 +1,7 @@
|
|||
|
||||
Events
|
||||
======
|
||||
|
||||
Introduction
|
||||
------------
|
||||
|
||||
In order to allow an event to trigger a workflow, Ray Workflow supports pluggable event systems. Using the event framework provides a few properties.
|
||||
To allow an event to trigger a workflow, Ray Workflows supports pluggable event systems. Using the event framework provides a few properties.
|
||||
|
||||
1. Waits for events efficiently (without requiring a running workflow task while waiting).
|
||||
2. Supports exactly-once event delivery semantics while providing fault tolerance.
|
||||
|
@ -16,7 +12,7 @@ Like other workflow tasks, events support fault tolerance via checkpointing. Whe
|
|||
Using events
|
||||
------------
|
||||
|
||||
Workflow events are a special type of workflow task. They "finish" when the event occurs. ``workflow.wait_for_event(EventListenerType)`` can be used to create an event task.
|
||||
Workflow events are a special type of workflow task. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType)` can be used to create an event task.
|
||||
|
||||
|
||||
.. code-block:: python
|
||||
|
@ -35,7 +31,7 @@ Workflow events are a special type of workflow task. They "finish" when the even
|
|||
def gather(*args):
|
||||
return args
|
||||
|
||||
# Gather will run after 60 seconds, when both event1 and event2 are done.
|
||||
# Gather will run after 60 seconds when both event1 and event2 are done.
|
||||
workflow.run(gather.bind(event1_task, event2_task))
|
||||
|
||||
|
||||
|
@ -89,7 +85,7 @@ Custom event listeners can be written by subclassing the EventListener interface
|
|||
|
||||
class EventListener:
|
||||
def __init__(self):
|
||||
"""Optional constructor. Only the constructor with now arguments will be
|
||||
"""Optional constructor. Only the constructor with no arguments will be
|
||||
called."""
|
||||
pass
|
||||
|
||||
|
@ -111,11 +107,11 @@ The `listener.poll_for_events()` coroutine should finish when the event is done.
|
|||
await asyncio.sleep(timestamp - time.time())
|
||||
|
||||
|
||||
The `event_checkpointed` routine can be overridden to support systems with exactly once delivery semantics which typically follow a pattern of:
|
||||
The `event_checkpointed` routine can be overridden to support systems with exactly-once delivery semantics which typically follows a pattern of:
|
||||
|
||||
1. Wait for event.
|
||||
2. Process event.
|
||||
3. Commit event.
|
||||
1. Wait for an event.
|
||||
2. Process the event.
|
||||
3. Commit the event.
|
||||
|
||||
After the workflow finishes checkpointing the event, the event listener will be invoked and can free the event. For example, to guarantee that events are consumed from a `kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits>` queue:
|
||||
|
||||
|
|
32
doc/source/workflows/index.rst
Normal file
32
doc/source/workflows/index.rst
Normal file
|
@ -0,0 +1,32 @@
|
|||
.. _workflows:
|
||||
|
||||
Ray Workflows: Durable Ray Task Graphs
|
||||
======================================
|
||||
|
||||
.. warning::
|
||||
|
||||
Ray Workflows is available as **alpha** in Ray 2.0+. Expect rough corners and
|
||||
for its APIs and storage format to change. Please file feature requests and
|
||||
bug reports on GitHub Issues or join the discussion on the
|
||||
`Ray Slack <https://forms.gle/9TSdDYUgxYs8SA9e8>`__.
|
||||
|
||||
Ray Workflows implements high-performance, *durable* application workflows using
|
||||
Ray tasks as the underlying execution engine. It enables task-based Ray jobs
|
||||
to seamlessly resume execution even in the case of entire-cluster failure.
|
||||
|
||||
Why Ray Workflows?
|
||||
------------------
|
||||
|
||||
**Flexibility:** Combine the flexibility of Ray's dynamic task graphs with
|
||||
strong durability guarantees. Branch or loop conditionally based on runtime
|
||||
data. Use Ray distributed libraries seamlessly within workflow tasks.
|
||||
|
||||
**Performance:** Ray Workflows offers sub-second overheads for task launch and
|
||||
supports workflows with hundreds of thousands of tasks. Take advantage of the
|
||||
Ray object store to pass distributed datasets between tasks with zero-copy
|
||||
overhead.
|
||||
|
||||
You might find that Ray Workflows is *lower level* compared to engines such as
|
||||
`AirFlow <https://www.astronomer.io/blog/airflow-ray-data-science-story>`__
|
||||
(which can also run on Ray). This is because Ray Workflows focuses more on core
|
||||
durability primitives as opposed to tools and integrations.
|
151
doc/source/workflows/key-concepts.rst
Normal file
151
doc/source/workflows/key-concepts.rst
Normal file
|
@ -0,0 +1,151 @@
|
|||
Key Concepts
|
||||
------------
|
||||
|
||||
.. note::
|
||||
Workflows is a library that provides strong durability for Ray task graphs.
|
||||
If you’re brand new to Ray, we recommend starting with the :ref:`core walkthrough <core-walkthrough>` instead.
|
||||
|
||||
DAG API
|
||||
~~~~~~~
|
||||
|
||||
Normally, Ray tasks are executed eagerly.
|
||||
In order to provide durability, Ray Workflows uses the lazy :ref:`Ray DAG API <ray-dag-guide>`
|
||||
to separate the definition and execution of task DAGs.
|
||||
|
||||
Switching from Ray tasks to the DAG API is simple: just replace all calls to ``.remote(...)``
|
||||
(which return object references), to calls to ``.bind(...)`` (which return DAG nodes).
|
||||
Ray DAG nodes can otherwise be composed like normal Ray tasks.
|
||||
|
||||
However, unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on
|
||||
DAG nodes. Instead, the DAG needs to be *executed* in order to compute a result.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Composing functions together into a DAG:
|
||||
|
||||
import ray
|
||||
|
||||
@ray.remote
|
||||
def one() -> int:
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
def add(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
dag = add.bind(100, one.bind())
|
||||
|
||||
|
||||
Workflow Execution
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
To execute a DAG with workflows, use `workflow.run`:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Executing a DAG with Ray Workflows.
|
||||
|
||||
from ray import workflow
|
||||
|
||||
# Run the workflow until it completes and returns the output
|
||||
assert workflow.run(dag) == 101
|
||||
|
||||
# Or you can run it asynchronously and fetch the output via 'ray.get'
|
||||
output_ref = workflow.run_async(dag)
|
||||
assert ray.get(output_ref) == 101
|
||||
|
||||
|
||||
Once started, a workflow's execution is durably logged to storage. On system
|
||||
failure, the workflow can be resumed on any Ray cluster with access to the
|
||||
storage.
|
||||
|
||||
When executing the workflow DAG, workflow tasks are retried on failure, but once
|
||||
they finish successfully and the results are persisted by the workflow engine,
|
||||
they will never be run again.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Getting the result of a workflow.
|
||||
|
||||
# configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
|
||||
# A default temporary storage is used by by the workflow if starting without
|
||||
# Ray init.
|
||||
ray.init(storage="/tmp/data")
|
||||
assert output.run(workflow_id="run_1") == 101
|
||||
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
|
||||
assert workflow.get_output("run_1") == 101
|
||||
# workflow.get_output_async returns an ObjectRef.
|
||||
assert ray.get(workflow.get_output_async("run_1")) == 101
|
||||
|
||||
Objects
|
||||
~~~~~~~
|
||||
Workflows integrates seamlessly with Ray objects, by allowing Ray object
|
||||
references to be passed into and returned from tasks. Objects are checkpointed
|
||||
when initially returned from a task. After checkpointing, the object can be
|
||||
shared among any number of workflow tasks at memory-speed via the Ray object
|
||||
store.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Using Ray objects in a workflow:
|
||||
|
||||
import ray
|
||||
from typing import List
|
||||
|
||||
@ray.remote
|
||||
def hello():
|
||||
return "hello"
|
||||
|
||||
@ray.remote
|
||||
def words() -> List[ray.ObjectRef]:
|
||||
# NOTE: Here it is ".remote()" instead of ".bind()", so
|
||||
# it creates an ObjectRef instead of a DAG.
|
||||
return [hello.remote(), ray.put("world")]
|
||||
|
||||
@ray.remote
|
||||
def concat(words: List[ray.ObjectRef]) -> str:
|
||||
return " ".join([ray.get(w) for w in words])
|
||||
|
||||
assert workflow.run(concat.bind(words.bind())) == "hello world"
|
||||
|
||||
Dynamic Workflows
|
||||
~~~~~~~~~~~~~~~~~
|
||||
Workflows can generate new tasks at runtime. This is achieved by returning a
|
||||
continuation of a DAG. A continuation is something returned by a function and
|
||||
executed after it returns. The continuation feature enables nesting, looping,
|
||||
and recursion within workflows:
|
||||
|
||||
.. code-block:: python
|
||||
:caption: The Fibonacci recursive workflow:
|
||||
|
||||
@ray.remote
|
||||
def add(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
@ray.remote
|
||||
def fib(n: int) -> int:
|
||||
if n <= 1:
|
||||
return n
|
||||
# return a continuation of a DAG
|
||||
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
|
||||
|
||||
assert workflow.run(fib.bind(10)) == 55
|
||||
|
||||
|
||||
Events
|
||||
~~~~~~
|
||||
Events are external signals sent to the workflow. Workflows can be efficiently
|
||||
triggered by timers or external events using the event system.
|
||||
|
||||
.. code-block:: python
|
||||
:caption: Using events.
|
||||
|
||||
# Sleep is a special type of event.
|
||||
sleep_task = workflow.sleep(100)
|
||||
|
||||
# `wait_for_events` allows for pluggable event listeners.
|
||||
event_task = workflow.wait_for_event(MyEventListener)
|
||||
|
||||
@ray.remote
|
||||
def gather(*args):
|
||||
return args
|
||||
|
||||
# If a task's arguments include events, the task won't be executed until all
|
||||
# of the events have occurred.
|
||||
workflow.run(gather.bind(sleep_task, event_task, "hello world"))
|
|
@ -3,19 +3,21 @@ Workflow Management
|
|||
|
||||
Workflow IDs
|
||||
------------
|
||||
Each workflow has a unique ``workflow_id``. By default, when you call ``.run()`` or ``.run_async()``, a random id is generated. It is recommended you explicitly assign each workflow an id via ``.run(workflow_id="id")``.
|
||||
Each workflow has a unique ``workflow_id``. By default, when you call ``.run()``
|
||||
or ``.run_async()``, a random id is generated. It is recommended that you
|
||||
explicitly assign each workflow an id via ``.run(workflow_id="id")``.
|
||||
|
||||
If ``.run()`` is called with a previously used workflow id, the workflow will be resumed from the previous execution.
|
||||
If ``.run()`` is called with a previously created workflow id, the workflow will be resumed from the previous execution.
|
||||
|
||||
Workflow Status
|
||||
---------------
|
||||
A workflow can be in one of several status:
|
||||
A workflow can be in one of several statuses:
|
||||
|
||||
=================== =======================================================================================
|
||||
Status Description
|
||||
=================== =======================================================================================
|
||||
RUNNING The workflow is currently running in the cluster.
|
||||
PENDING The workflow is queued and waited to be executed.
|
||||
PENDING The workflow is queued and waiting to be executed.
|
||||
FAILED This workflow failed with an application error. It can be resumed from the failed task.
|
||||
RESUMABLE This workflow failed with a system error. It can be resumed from the failed task.
|
||||
CANCELED The workflow was canceled. Its result is unavailable, and it cannot be resumed.
|
||||
|
@ -35,7 +37,7 @@ Single workflow management APIs
|
|||
assert status in {
|
||||
"RUNNING", "RESUMABLE", "FAILED",
|
||||
"CANCELED", "SUCCESSFUL"}
|
||||
except ValueError:
|
||||
except workflow.WorkflowNotFoundError:
|
||||
print("Workflow doesn't exist.")
|
||||
|
||||
# Resume a workflow.
|
||||
|
@ -76,39 +78,44 @@ Bulk workflow management APIs
|
|||
Recurring workflows
|
||||
-------------------
|
||||
|
||||
Ray Workflow currently has no built-in job scheduler. You can however easily use any external job scheduler to interact with your Ray cluster (via :ref:`job submission <jobs-overview>` or :ref:`client connection <ray-client-ref>`) trigger workflow runs.
|
||||
Ray Workflows currently has no built-in job scheduler. You can however easily use
|
||||
any external job scheduler to interact with your Ray cluster
|
||||
(via :ref:`job submission <jobs-overview>` or :ref:`client connection
|
||||
<ray-client-ref>`)
|
||||
to trigger workflow runs.
|
||||
|
||||
Storage Configuration
|
||||
---------------------
|
||||
Ray Workflow supports two types of storage backends out of the box:
|
||||
Ray Workflows supports two types of storage backends out of the box:
|
||||
|
||||
* Local file system: the data is stored locally. This is only for single node testing. It needs to be a NFS to work with multi-node clusters. To use local storage, specify ``ray.init(storage="/path/to/storage_dir")``.
|
||||
* S3: Production users should use S3 as the storage backend. Enable S3 storage with ``ray.init(storage="s3://bucket/path")``.
|
||||
* Local file system: the data is stored locally. This is only for single node
|
||||
testing. It needs to be an NFS to work with multi-node clusters. To use local
|
||||
storage, specify ``ray.init(storage="/path/to/storage_dir")`` or
|
||||
``ray start --head --storage="/path/to/storage_dir"``.
|
||||
* S3: Production users should use S3 as the storage backend. Enable S3 storage
|
||||
with ``ray.init(storage="s3://bucket/path")`` or ``ray start --head --storage="s3://bucket/path"```
|
||||
|
||||
Additional storage backends can be written by subclassing the ``Storage`` class and passing a storage instance to ``ray.init()`` [TODO: note that the Storage API is not currently stable].
|
||||
Additional storage backends can be written by subclassing the ``Storage`` class and passing a storage instance to ``ray.init()``.
|
||||
|
||||
If left unspecified, ``/tmp/ray/workflow_data`` will be used for temporary storage. This default setting *will only work for single-node Ray clusters*.
|
||||
|
||||
Concurrency Control
|
||||
-------------------
|
||||
Ray Workflow supports concurrency control. You can support the maximum running workflows and maximum pending workflows via ``workflow.init()``
|
||||
before executing any workflow. ``workflow.init()`` again with a different configuration would raise an error.
|
||||
Ray Workflows supports concurrency control. You can support the maximum running
|
||||
workflows and maximum pending workflows via ``workflow.init()`` before executing
|
||||
any workflow. ``workflow.init()`` again with a different configuration would
|
||||
raise an error except ``None`` is given.
|
||||
|
||||
For example, ``workflow.init(max_running_workflows=10, max_pending_workflows=50)`` means there will be at most 10 workflows running, 50 workflows pending.
|
||||
Submitting workflows when the number of pending workflows are at maximum would raise ``queue.Full("Workflow queue has been full")``. Getting the output of a pending workflow would be blocking until the workflow finishes running later.
|
||||
For example, ``workflow.init(max_running_workflows=10, max_pending_workflows=50)``
|
||||
means there will be at most 10 workflows running, and 50 workflows pending. And
|
||||
calling with different values on another driver will raise an exception. If
|
||||
they are set to be ``None``, it'll use the previous value set.
|
||||
|
||||
A pending workflows has the ``PENDING`` status. After the pending workflow get interrupted (e.g., a cluster failure), it can be resumed.
|
||||
Submitting workflows when the number of pending workflows is at maximum would raise ``queue.Full("Workflow queue has been full")``. Getting the output of a pending workflow would be blocked until the workflow finishes running later.
|
||||
|
||||
A pending workflow has the ``PENDING`` status. After the pending workflow gets interrupted (e.g., a cluster failure), it can be resumed.
|
||||
When resuming interrupted workflows that were running and pending with ``workflow.resume_all()``, running workflows have higher priority than pending workflows (i.e. the pending workflows would still likely be pending).
|
||||
|
||||
.. note::
|
||||
|
||||
We currently does not guarantee that resumed pending workflows are running in the same order as they originally did.
|
||||
|
||||
Handling Dependencies
|
||||
---------------------
|
||||
|
||||
**Note: This feature is not yet implemented.**
|
||||
|
||||
Ray logs the runtime environment (code and dependencies) of the workflow to storage at submission time. This ensures that the workflow can be resumed at a future time on a different Ray cluster.
|
||||
|
||||
You can also explicitly set the runtime environment for a particular task (e.g., specify conda environment, container image, etc.).
|
||||
Workflows does not guarantee that resumed workflows are run in the same order .
|
||||
|
|
|
@ -32,11 +32,13 @@ providing the task name:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
workflow.run(add.options(**workflow.options(name="add_task")).bind(10, 20), workflow_id="add_example_2")
|
||||
workflow.run(
|
||||
add.options(
|
||||
**workflow.options(name="add_task")
|
||||
).bind(10, 20), workflow_id="add_example_2")
|
||||
|
||||
task_metadata = workflow.get_metadata("add_example_2", name="add_task")
|
||||
|
||||
assert "task_options" in task_metadata
|
||||
assert "start_time" in workflow_metadata["stats"]
|
||||
assert "end_time" in workflow_metadata["stats"]
|
||||
|
||||
|
@ -51,7 +53,7 @@ workflow or workflow task.
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20)
|
||||
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
|
||||
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
|
||||
|
||||
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
|
||||
|
@ -107,6 +109,7 @@ is completed).
|
|||
|
||||
workflow_metadata = workflow.get_metadata(workflow_id)
|
||||
assert workflow_metadata["status"] == "CANCELED"
|
||||
assert "task_options" in workflow_metadata
|
||||
assert "start_time" in workflow_metadata["stats"]
|
||||
assert "end_time" not in workflow_metadata["stats"]
|
||||
|
||||
|
@ -140,6 +143,6 @@ be updated whenever a workflow is resumed.
|
|||
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
|
||||
|
||||
# make sure resume updated running metrics
|
||||
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
|
||||
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
|
||||
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
Ray Workflow API
|
||||
Ray Workflows API
|
||||
=================
|
||||
|
||||
Workflow Execution API
|
||||
|
|
File diff suppressed because one or more lines are too long
Before Width: | Height: | Size: 248 KiB |
Loading…
Add table
Reference in a new issue