2021-11-12 09:13:40 -08:00
Advanced Topics
===============
2022-05-05 22:22:51 -07:00
Workflow task Checkpointing
2022-03-27 12:18:14 -07:00
---------------------------
2022-05-05 22:22:51 -07:00
Ray Workflows provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.
2022-03-27 12:18:14 -07:00
We control the checkpoints by specify the checkpoint options like this:
.. code-block :: python
2022-05-16 15:41:14 -07:00
data = read_data.options(**workflow.options(checkpoint=False)).bind(10)
2022-03-27 12:18:14 -07:00
This example skips checkpointing the output of `` read_data `` . During recovery, `` read_data `` would be executed again if recovery requires its output.
By default, we have `` checkpoint=True `` if not specified.
2022-05-05 22:22:51 -07:00
If the output of a task is another task (i.e. dynamic workflows), we skips checkpointing the entire task.
2022-03-27 12:18:14 -07:00
2022-07-26 11:15:47 -07:00
Use Workflow with Ray Client
----------------------------
Ray Workflow supports :ref: `Ray Client API<ray-client>` , so you can submit workflows to a remote
Ray cluster. This requires starting the Ray cluster with the `` --storage=<storage_uri> `` option
for specifying the workflow storage.
To submit a workflow to a remote cluster, All you need is connecting Ray to the cluster before
submitting a workflow. No code changes are required for Ray Workflow afterwards. For example:
.. code-block :: python
import subprocess
import ray
from ray import workflow
@ray.remote
def hello(count):
return ["hello world"] * count
try:
subprocess.check_call(
["ray", "start", "--head", "--ray-client-server-port=10001", "--storage=file:///tmp/ray/workflow_data"])
ray.init("ray://127.0.0.1:10001")
assert workflow.run(hello.bind(3)) == ["hello world"] * 3
finally:
subprocess.check_call(["ray", "stop"])
.. warning ::
Ray client support is still experimental and has some limitations. One known limitation is that
Ray Workflow would not work properly with ObjjectRefs as workflow task inputs. For example,
`` workflow.run(task.bind(ray.put(123))) `` .