mirror of
https://github.com/vale981/ray
synced 2025-03-10 13:26:39 -04:00

## Why are these changes needed? This PR ensures that workflow can work properly with Ray client. Regular workflow tests will (also) be running under client mode (as a pytest parameter). Some tests are moved and reorganized, because the Ray client tests requires starting the cluster, so some tests requires isolation or related changes. Tests that literally take down the cluster are not tested with Ray client, since Ray client would fail in this scenario. Limitations of Ray Workflow under Ray client are noted in the doc. ## Related issue number Closes #21595
55 lines
2.1 KiB
ReStructuredText
55 lines
2.1 KiB
ReStructuredText
Advanced Topics
|
|
===============
|
|
|
|
Workflow task Checkpointing
|
|
---------------------------
|
|
|
|
Ray Workflows provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.
|
|
|
|
|
|
We control the checkpoints by specify the checkpoint options like this:
|
|
|
|
.. code-block:: python
|
|
|
|
data = read_data.options(**workflow.options(checkpoint=False)).bind(10)
|
|
|
|
This example skips checkpointing the output of ``read_data``. During recovery, ``read_data`` would be executed again if recovery requires its output.
|
|
|
|
By default, we have ``checkpoint=True`` if not specified.
|
|
|
|
If the output of a task is another task (i.e. dynamic workflows), we skips checkpointing the entire task.
|
|
|
|
Use Workflow with Ray Client
|
|
----------------------------
|
|
|
|
Ray Workflow supports :ref:`Ray Client API<ray-client>`, so you can submit workflows to a remote
|
|
Ray cluster. This requires starting the Ray cluster with the ``--storage=<storage_uri>`` option
|
|
for specifying the workflow storage.
|
|
|
|
To submit a workflow to a remote cluster, All you need is connecting Ray to the cluster before
|
|
submitting a workflow. No code changes are required for Ray Workflow afterwards. For example:
|
|
|
|
.. code-block:: python
|
|
|
|
import subprocess
|
|
import ray
|
|
from ray import workflow
|
|
|
|
@ray.remote
|
|
def hello(count):
|
|
return ["hello world"] * count
|
|
|
|
try:
|
|
subprocess.check_call(
|
|
["ray", "start", "--head", "--ray-client-server-port=10001", "--storage=file:///tmp/ray/workflow_data"])
|
|
ray.init("ray://127.0.0.1:10001")
|
|
assert workflow.run(hello.bind(3)) == ["hello world"] * 3
|
|
finally:
|
|
subprocess.check_call(["ray", "stop"])
|
|
|
|
|
|
.. warning::
|
|
|
|
Ray client support is still experimental and has some limitations. One known limitation is that
|
|
Ray Workflow would not work properly with ObjjectRefs as workflow task inputs. For example,
|
|
``workflow.run(task.bind(ray.put(123)))``.
|