In order to allow an event to trigger a workflow, workflows support pluggable event systems. Using the event framework provides a few properties.
1. Waits for events efficiently (without requiring a running workflow step while waiting).
2. Supports exactly-once event delivery semantics while providing fault tolerance.
Like other workflow steps, events support fault tolerance via checkpointing. When an event occurs, the event is checkpointed, then optionally committed.
Using events
------------
Workflow events are a special type of workflow step. They "finish" when the event occurs. `workflow.wait_for_event(EventListenerType` can be used to create an event step.
..code-block:: python
from ray import workflow
import time
# Create an event which finishes after 60 seconds.
"""Optional. Called after an event has been checkpointed and a transaction can
be safely committed."""
pass
The `listener.poll_for_events()` coroutine should finish when the event is done. Arguments to `workflow.wait_for_event` are passed to `poll_for_events()`. For example, an event listener which sleeps until a timestamp can be written as:
..code-block:: python
class TimerListener(EventListener):
async def poll_for_event(self, timestamp):
await asyncio.sleep(timestamp - time.time())
The `event_checkpointed` routine can be overridden to support systems with exactly once delivery semantics which typically follow a pattern of:
1. Wait for event.
2. Process event.
3. Commit event.
After the workflow finishes checkpointing the event, the event listener will be invoked and can free the event. For example, to guarantee that events are consumed from a `kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits>` queue: