mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00

This PR unified the semantics of some workflow APIs. Those workflow APIs acts on workflow tasks so they could be blocked for a long time. So we have both the blocking and non-blocking versions for them: xxx for blocking and xxx_async for non-blocking APIs.
145 lines
5.2 KiB
ReStructuredText
145 lines
5.2 KiB
ReStructuredText
Workflow Metadata
|
|
=================
|
|
|
|
Observability is important for workflows - sometimes we not only want
|
|
to get the output, but also want to gain insights on the internal
|
|
states (e.g., to measure the performance or find bottlenecks).
|
|
Workflow metadata provides several stats that help understand
|
|
the workflow, from basic running status and task options to performance
|
|
and user-imposed metadata.
|
|
|
|
Retrieving metadata
|
|
-------------------
|
|
Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``.
|
|
For example:
|
|
|
|
.. code-block:: python
|
|
|
|
@ray.remote
|
|
def add(left: int, right: int) -> int:
|
|
return left + right
|
|
|
|
workflow.run(add.bind(10, 20), workflow_id="add_example")
|
|
|
|
workflow_metadata = workflow.get_metadata("add_example")
|
|
|
|
assert workflow_metadata["status"] == "SUCCESSFUL"
|
|
assert "start_time" in workflow_metadata["stats"]
|
|
assert "end_time" in workflow_metadata["stats"]
|
|
|
|
You can also retrieve metadata for individual workflow tasks by
|
|
providing the task name:
|
|
|
|
.. code-block:: python
|
|
|
|
workflow.run(add.options(**workflow.options(name="add_task")).bind(10, 20), workflow_id="add_example_2")
|
|
|
|
task_metadata = workflow.get_metadata("add_example_2", name="add_task")
|
|
|
|
assert "task_options" in task_metadata
|
|
assert "start_time" in workflow_metadata["stats"]
|
|
assert "end_time" in workflow_metadata["stats"]
|
|
|
|
User-defined metadata
|
|
---------------------
|
|
Custom metadata can be added to a workflow or a workflow task by the user,
|
|
which is useful when you want to attach some extra information to the
|
|
workflow or workflow task.
|
|
|
|
- workflow-level metadata can be added via ``.run(metadata=metadata)``
|
|
- task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)``
|
|
|
|
.. code-block:: python
|
|
|
|
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20)
|
|
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
|
|
|
|
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
|
|
assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"}
|
|
|
|
**Note: user-defined metadata must be a python dictionary with values that are
|
|
JSON serializable.**
|
|
|
|
Available Metrics
|
|
-----------------
|
|
**Workflow level**
|
|
|
|
- status: workflow states, can be one of RUNNING, FAILED, RESUMABLE, CANCELED, or SUCCESSFUL.
|
|
- user_metadata: a python dictionary of custom metadata by the user via ``workflow.run()``.
|
|
- stats: workflow running stats, including workflow start time and end time.
|
|
|
|
**Task level**
|
|
|
|
- name: name of the task, either provided by the user via ``task.options(**workflow.options(name=xxx))`` or generated by the system.
|
|
- task_options: options of the task, either provided by the user via ``task.options()`` or default by system.
|
|
- user_metadata: a python dictionary of custom metadata by the user via ``task.options()``.
|
|
- stats: task running stats, including task start time and end time.
|
|
|
|
|
|
Notes
|
|
-----
|
|
1. Unlike ``get_output()``, ``get_metadata()`` returns an immediate
|
|
result for the time it is called, this also means not all fields will
|
|
be available in the result if corresponding metadata is not available
|
|
(e.g., ``metadata["stats"]["end_time"]`` won't be available until the workflow
|
|
is completed).
|
|
|
|
.. code-block:: python
|
|
|
|
@ray.remote
|
|
def simple():
|
|
flag.touch() # touch a file here
|
|
time.sleep(1000)
|
|
return 0
|
|
|
|
workflow.run_async(simple.bind(), workflow_id=workflow_id)
|
|
|
|
# make sure workflow task starts running
|
|
while not flag.exists():
|
|
time.sleep(1)
|
|
|
|
workflow_metadata = workflow.get_metadata(workflow_id)
|
|
assert workflow_metadata["status"] == "RUNNING"
|
|
assert "start_time" in workflow_metadata["stats"]
|
|
assert "end_time" not in workflow_metadata["stats"]
|
|
|
|
workflow.cancel(workflow_id)
|
|
|
|
workflow_metadata = workflow.get_metadata(workflow_id)
|
|
assert workflow_metadata["status"] == "CANCELED"
|
|
assert "start_time" in workflow_metadata["stats"]
|
|
assert "end_time" not in workflow_metadata["stats"]
|
|
|
|
2. For resumed workflows, the current behavior is that "stats" will
|
|
be updated whenever a workflow is resumed.
|
|
|
|
.. code-block:: python
|
|
|
|
workflow_id = "simple"
|
|
error_flag = tmp_path / "error"
|
|
error_flag.touch()
|
|
|
|
@ray.remote
|
|
def simple():
|
|
if error_flag.exists():
|
|
raise ValueError()
|
|
return 0
|
|
|
|
with pytest.raises(ray.exceptions.RaySystemError):
|
|
workflow.run(simple.bind(), workflow_id=workflow_id)
|
|
|
|
workflow_metadata_failed = workflow.get_metadata(workflow_id)
|
|
assert workflow_metadata_failed["status"] == "FAILED"
|
|
|
|
# remove flag to make task success
|
|
error_flag.unlink()
|
|
ref = workflow.resume_async(workflow_id)
|
|
assert ray.get(ref) == 0
|
|
|
|
workflow_metadata_resumed = workflow.get_metadata(workflow_id)
|
|
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
|
|
|
|
# make sure resume updated running metrics
|
|
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
|
|
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]
|
|
|