[workflow] Deprecate workflow.create (#26106)

This commit is contained in:
Siyuan (Ryans) Zhuang 2022-07-02 21:24:05 -07:00 committed by GitHub
parent 7d3ceb222c
commit 5a094f1d18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 395 additions and 349 deletions

View file

@ -43,22 +43,21 @@ The Ray DAG will not be executed until further actions are taken on it.
Your first workflow
-------------------
A single line is all you need to turn the previous DAG into a workflow:
A single line is all you need to run the workflow DAG:
.. code-block:: python
# <follow the previous code>
from ray import workflow
# Create the workflow from the DAG.
wf = workflow.create(output)
# Execute the workflow and print the result.
print(wf.run())
print(workflow.run(output))
# The workflow can also be executed asynchronously.
# print(ray.get(output.run_async()))
# You can also run the workflow asynchronously and fetching the output via 'ray.get'
output_ref = workflow.run_async(dag)
print(ray.get(output_ref))
Here is the workflow we created:
Here this figure visualizes the workflow we created:
.. image:: basic.png
:width: 500px
@ -115,7 +114,7 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor
ret = add.bind(get_val.bind(), 20)
assert workflow.create(ret).run(workflow_id="add_example") == 30
assert workflow.run(ret, workflow_id="add_example") == 30
Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -> ObjectRef[T]``.
If a workflow is not given ``workflow_id``, a random string is set as the ``workflow_id``. To confirm ``workflow_id`` in the situation, call ``ray.workflow.list_all()``.
@ -153,14 +152,14 @@ Once a task is given a name, the result of the task will be retrievable via ``wo
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.create(outer_task).run_async("double")
result_ref = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output(workflow_id, name="inner")
outer = workflow.get_output(workflow_id, name="outer")
assert ray.get(inner) == 2
assert ray.get(outer) == 4
assert ray.get(result) == 4
assert ray.get(result_ref) == 4
If there are multiple tasks with the same name, a suffix with a counter ``_n`` will be added automatically.
@ -193,7 +192,7 @@ For example,
for i in range(1, n):
x = simple.options(**workflow.options(name="step")).bind(x)
ret = workflow.create(x).run_async(workflow_id=workflow_id)
ret = workflow.run_async(x, workflow_id=workflow_id)
outputs = [workflow.get_output(workflow_id, name="step")]
for i in range(1, n):
outputs.append(workflow.get_output(workflow_id, name=f"step_{i}"))
@ -229,7 +228,7 @@ The following error handling flags can be either set in the task decorator or vi
# Tries up to five times before giving up.
r1 = faulty_function.options(**workflow.options(max_retries=5)).bind()
workflow.create(r1).run()
workflow.run(r1)
@ray.remote
def handle_errors(result: Tuple[str, Exception]):
@ -242,7 +241,7 @@ The following error handling flags can be either set in the task decorator or vi
# `handle_errors` receives a tuple of (result, exception).
r2 = faulty_function.options(**workflow.options(catch_exceptions=True)).bind()
workflow.create(handle_errors.bind(r2)).run()
workflow.run(handle_errors.bind(r2))
- If ``max_retries`` is given, the task will be retried for the given number of times if an exception is raised. It will only retry for the application level error. For system errors, it's controlled by ray. By default, ``max_retries`` is set to be 3.
- If ``catch_exceptions`` is True, the return value of the function will be converted to ``Tuple[Optional[T], Optional[Exception]]``. This can be combined with ``max_retries`` to try a given number of times before returning the result tuple.
@ -278,7 +277,7 @@ Note that tasks that have side-effects still need to be idempotent. This is beca
return ticket
# UNSAFE: we could book multiple flight tickets
workflow.create(book_flight_unsafe.bind()).run()
workflow.run(book_flight_unsafe.bind())
.. code-block:: python
:caption: Idempotent workflow:
@ -297,7 +296,7 @@ Note that tasks that have side-effects still need to be idempotent. This is beca
# SAFE: book_flight is written to be idempotent
request_id = generate_id.bind()
workflow.create(book_flight_idempotent.bind(request_id)).run()
workflow.run(book_flight_idempotent.bind(request_id))
Dynamic workflows
-----------------
@ -305,7 +304,41 @@ Dynamic workflows
Additional tasks can be dynamically created and inserted into the workflow DAG during execution.
This is achieved by returning a continuation of a DAG.
A continuation is something returned by a function and executed after it returns.
In our context, a continuation is basically a tail function call returned by a function. For example:
.. code-block:: python
def bar(): ...
def foo_1():
# Here we say 'foo_1()' returns a continuation.
# The continuation is made of 'bar()'
return bar()
def foo_2():
# This is NOT a continuation because we do not return it.
bar()
def foo_3():
# This is NOT a continuation because it is not a function call.
return 42
Continuations can be used to implement something more complex, for example, recursions:
.. code-block:: python
def factorial(n: int) -> int:
if n == 1:
return 1
else:
return multiply(n, factorial(n - 1))
def multiply(a: int, b: int) -> int:
return a * b
assert factorial(10) == 3628800
The continuation feature enables nesting, looping, and recursion within workflows.
The following example shows how to implement the recursive ``factorial`` program using dynamically generated tasks:
@ -323,10 +356,11 @@ The following example shows how to implement the recursive ``factorial`` program
def multiply(a: int, b: int) -> int:
return a * b
ret = workflow.create(factorial.bind(10))
assert ret.run() == 3628800
assert workflow.run(factorial.bind(10)) == 3628800
The key behavior to note is that when a task returns a ``Workflow`` output instead of a concrete value, that workflow's output will be substituted for the task's return. To better understand dynamic workflows, let's look at a more realistic example of booking a trip:
The key behavior to note is that when a task returns a continuation instead of a concrete value,
that continuation will be substituted for the task's return.
To better understand dynamic workflows, let's look at a more realistic example of booking a trip:
.. code-block:: python
@ -342,18 +376,16 @@ The key behavior to note is that when a task returns a ``Workflow`` output inste
hotels: List[Hotel]) -> Receipt: ...
@ray.remote
def book_trip(origin: str, dest: str, dates) ->
"Workflow[Receipt]":
def book_trip(origin: str, dest: str, dates) -> Receipt:
# Note that the workflow engine will not begin executing
# child workflows until the parent task returns.
# This avoids task overlap and ensures recoverability.
f1: Workflow = book_flight.bind(origin, dest, dates[0])
f2: Workflow = book_flight.bind(dest, origin, dates[1])
hotel: Workflow = book_hotel.bind(dest, dates)
f1 = book_flight.bind(origin, dest, dates[0])
f2 = book_flight.bind(dest, origin, dates[1])
hotel = book_hotel.bind(dest, dates)
return workflow.continuation(finalize_or_cancel.bind([f1, f2], [hotel]))
fut = workflow.create(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))
fut.run() # returns Receipt(...)
receipt: Receipt = workflow.run(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))
Here the workflow initially just consists of the ``book_trip`` task. Once executed, ``book_trip`` generates tasks to book flights and hotels in parallel, which feeds into a task to decide whether to cancel the trip or finalize it. The DAG can be visualized as follows (note the dynamically generated nested workflows within ``book_trip``):
@ -379,7 +411,8 @@ Workflows are compatible with Ray tasks and actors. There are two methods of usi
Passing nested arguments
~~~~~~~~~~~~~~~~~~~~~~~~
Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the values are not resolved. But we ensure that all ancestors of a task are fully executed prior to the task starting:
Like Ray tasks, when you pass a list of task outputs to a task, the values are not resolved.
But we ensure that all ancestors of a task are fully executed prior to the task starting:
.. code-block:: python
@ -395,7 +428,7 @@ Like Ray tasks, when you pass a list of ``Workflow`` outputs to a task, the valu
return 10
ret = add.bind([get_val.bind() for _ in range(3)])
assert workflow.create(ret).run() == 30
assert workflow.run(ret) == 30
Passing object references between tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -412,7 +445,7 @@ Ray object references and data structures composed of them (e.g., ``ray.Dataset`
def add(a, b):
return do_add.remote(a, b)
workflow.create(add.bind(ray.put(10), ray.put(20))).run() == 30
workflow.run(add.bind(ray.put(10), ray.put(20))) == 30
Ray actor handles are not allowed to be passed between tasks.
@ -428,4 +461,4 @@ You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, `
def train_model() -> Model:
pass # This task is assigned a GPU by Ray.
workflow.create(train_model.bind()).run()
workflow.run(train_model.bind())

View file

@ -7,7 +7,11 @@ Workflows is built on top of Ray, and offers a mostly consistent subset of its A
``func.remote`` vs ``func.bind``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
With Ray tasks, ``func.remote`` will submit a remote task to run eagerly. In Ray workflows, ``func.bind`` is used to create a DAG, and the DAG is converted into a workflow. Execution of the workflow is deferred until ``.run(workflow_id="id")`` or ``.run_async(workflow_id="id")`` is called on the ``Workflow``. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.
With Ray tasks, ``func.remote`` will submit a remote task to run eagerly; ``func.bind`` will generate
a node in a DAG, it will not be executed until the DAG is been executed.
Under the context of Ray Workflows, the execution of the DAG is deferred until ``workflow.run(dag, workflow_id=...)`` or ``workflow.run_async(dag, workflow_id=...)`` is called on the DAG.
Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.
Other Workflow Engines
----------------------

View file

@ -61,18 +61,24 @@ Unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on
Workflows
~~~~~~~~~
It takes a single line of code to turn a DAG into a workflow DAG:
It takes a single line of code to run a workflow DAG:
.. code-block:: python
:caption: Turning the DAG into a workflow DAG:
:caption: Run a workflow DAG:
from ray import workflow
output: "Workflow[int]" = workflow.create(dag)
# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101
Execute the workflow DAG by ``<workflow>.run()`` or ``<workflow>.run_async()``. Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage.
# Or you can run it asynchronously and fetching the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101
When executing the workflow DAG, remote functions are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again.
Once started, a workflow's execution is durably logged to storage. On system failure, workflows can be resumed on any Ray cluster with access to the storage.
When executing the workflow DAG, workflow tasks are retried on failure, but once they finish successfully and the results are persisted by the workflow engine, they will never be run again.
.. code-block:: python
:caption: Run the workflow:
@ -108,7 +114,7 @@ Large data objects can be stored in the Ray object store. References to these ob
def concat(words: List[ray.ObjectRef]) -> str:
return " ".join([ray.get(w) for w in words])
assert workflow.create(concat.bind(words.bind())).run() == "hello world"
assert workflow.run(concat.bind(words.bind())) == "hello world"
Dynamic Workflows
~~~~~~~~~~~~~~~~~
@ -130,7 +136,7 @@ The continuation feature enables nesting, looping, and recursion within workflow
# return a continuation of a DAG
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
assert workflow.create(fib.bind(10)).run() == 55
assert workflow.run(fib.bind(10)) == 55
Events
@ -151,4 +157,4 @@ Workflows can be efficiently triggered by timers or external events using the ev
return args
# If a task's arguments include events, the task won't be executed until all of the events have occured.
workflow.create(gather.bind(sleep_task, event_task, "hello world")).run()
workflow.run(gather.bind(sleep_task, event_task, "hello world"))

View file

@ -34,7 +34,7 @@ Workflow events are a special type of workflow task. They "finish" when the even
return args
# Gather will run after 60 seconds, when both event1 and event2 are done.
workflow.create(gather.bind(event1_task, event_2_task)).run()
workflow.run(gather.bind(event1_task, event_2_task))
Custom event listeners

View file

@ -19,7 +19,7 @@ For example:
def add(left: int, right: int) -> int:
return left + right
workflow.create(add.bind(10, 20)).run("add_example")
workflow.run(add.bind(10, 20), workflow_id="add_example")
workflow_metadata = workflow.get_metadata("add_example")
@ -32,7 +32,7 @@ providing the task name:
.. code-block:: python
workflow.create(add.options(**workflow.options(name="add_task")).bind(10, 20)).run("add_example_2")
workflow.run(add.options(**workflow.options(name="add_task")).bind(10, 20), workflow_id="add_example_2")
task_metadata = workflow.get_metadata("add_example_2", name="add_task")
@ -51,8 +51,8 @@ workflow or workflow task.
.. code-block:: python
workflow.create(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20))\
.run("add_example_3", metadata={"workflow_k": "workflow_v"})
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20)
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"}
@ -92,7 +92,7 @@ is completed).
time.sleep(1000)
return 0
workflow.create(simple.bind()).run_async(workflow_id)
workflow.run_async(simple.bind(), workflow_id=workflow_id)
# make sure workflow task starts running
while not flag.exists():
@ -126,7 +126,7 @@ be updated whenever a workflow is resumed.
return 0
with pytest.raises(ray.exceptions.RaySystemError):
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)
workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"

View file

@ -1,6 +1,13 @@
Ray Workflows API
=================
Workflow Execution API
----------------------
.. autofunction:: ray.workflow.run
.. autofunction:: ray.workflow.run_async
Management API
--------------
.. autofunction:: ray.workflow.resume_all

View file

@ -1,17 +1,18 @@
from ray.workflow.api import (
init,
run,
run_async,
continuation,
resume,
resume_all,
cancel,
list_all,
delete,
get_output,
get_status,
get_metadata,
resume,
cancel,
list_all,
resume_all,
wait_for_event,
sleep,
delete,
create,
continuation,
wait_for_event,
options,
)
from ray.workflow.exceptions import (
@ -23,6 +24,8 @@ from ray.workflow.common import WorkflowStatus
from ray.workflow.event_listener import EventListener
__all__ = [
"run",
"run_async",
"resume",
"get_output",
"WorkflowError",
@ -38,7 +41,6 @@ __all__ = [
"sleep",
"EventListener",
"delete",
"create",
"continuation",
"options",
]

View file

@ -58,6 +58,81 @@ def _ensure_workflow_initialized() -> None:
init()
@PublicAPI(stability="beta")
def run(
dag_node: DAGNode,
*args,
workflow_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
"""Run a workflow.
If the workflow with the given id already exists, it will be resumed.
Examples:
>>> import ray
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def book_flight(origin: str, dest: str) -> Flight: # doctest: +SKIP
... return Flight(...) # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def book_hotel(location: str) -> Reservation: # doctest: +SKIP
... return Reservation(...) # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def finalize_trip(bookings: List[Any]) -> Trip: # doctest: +SKIP
... return Trip(...) # doctest: +SKIP
>>> flight1 = book_flight.bind("OAK", "SAN") # doctest: +SKIP
>>> flight2 = book_flight.bind("SAN", "OAK") # doctest: +SKIP
>>> hotel = book_hotel.bind("SAN") # doctest: +SKIP
>>> trip = finalize_trip.bind([flight1, flight2, hotel]) # doctest: +SKIP
>>> result = workflow.run(trip) # doctest: +SKIP
Args:
workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
Returns:
The running result.
"""
return ray.get(
run_async(dag_node, *args, workflow_id=workflow_id, metadata=metadata, **kwargs)
)
@PublicAPI(stability="beta")
def run_async(
dag_node: DAGNode,
*args,
workflow_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs,
) -> ray.ObjectRef:
"""Run a workflow asynchronously.
If the workflow with the given id already exists, it will be resumed.
Args:
workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
Returns:
The running result as ray.ObjectRef.
"""
_ensure_workflow_initialized()
if not isinstance(dag_node, DAGNode):
raise TypeError("Input should be a DAG.")
input_data = DAGInputData(*args, **kwargs)
return execution.run(dag_node, input_data, workflow_id, metadata)
@PublicAPI(stability="beta")
def resume(workflow_id: str) -> ray.ObjectRef:
"""Resume a workflow.
@ -174,7 +249,7 @@ def resume_all(include_failed: bool = False) -> Dict[str, ray.ObjectRef]:
This can be used after cluster restart to resume all tasks.
Args:
with_failed: Whether to resume FAILED workflows.
include_failed: Whether to resume FAILED workflows.
Examples:
>>> from ray import workflow
@ -378,87 +453,6 @@ def delete(workflow_id: str) -> None:
wf_storage.delete_workflow()
@PublicAPI(stability="beta")
def create(dag_node: "DAGNode", *args, **kwargs) -> "DAGNode":
"""Converts a DAG into a workflow.
TODO(suquark): deprecate this API.
Examples:
>>> import ray
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def book_flight(origin: str, dest: str) -> Flight: # doctest: +SKIP
... return Flight(...) # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def book_hotel(location: str) -> Reservation: # doctest: +SKIP
... return Reservation(...) # doctest: +SKIP
>>> @ray.remote # doctest: +SKIP
... def finalize_trip(bookings: List[Any]) -> Trip: # doctest: +SKIP
... return Trip(...) # doctest: +SKIP
>>> flight1 = book_flight.bind("OAK", "SAN") # doctest: +SKIP
>>> flight2 = book_flight.bind("SAN", "OAK") # doctest: +SKIP
>>> hotel = book_hotel.bind("SAN") # doctest: +SKIP
>>> trip = finalize_trip.bind([flight1, flight2, hotel]) # doctest: +SKIP
>>> result = workflow.create(trip).run() # doctest: +SKIP
Args:
dag_node: The DAG to be converted.
args: Positional arguments of the DAG input node.
kwargs: Keyword arguments of the DAG input node.
"""
if not isinstance(dag_node, DAGNode):
raise TypeError("Input should be a DAG.")
input_data = DAGInputData(*args, **kwargs)
def run_async(
workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None
):
"""Run a workflow asynchronously.
If the workflow with the given id already exists, it will be resumed.
Args:
workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
Returns:
The running result as ray.ObjectRef.
"""
_ensure_workflow_initialized()
return execution.run(dag_node, input_data, workflow_id, metadata)
def run(
workflow_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Any:
"""Run a workflow.
If the workflow with the given id already exists, it will be resumed.
Args:
workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
Returns:
The running result.
"""
return ray.get(run_async(workflow_id, metadata))
dag_node.run_async = run_async
dag_node.run = run
return dag_node
@PublicAPI(stability="beta")
def continuation(dag_node: "DAGNode") -> Union["DAGNode", Any]:
"""Converts a DAG into a continuation.
@ -476,7 +470,7 @@ def continuation(dag_node: "DAGNode") -> Union["DAGNode", Any]:
raise TypeError("Input should be a DAG.")
if in_workflow_execution():
return create(dag_node)
return dag_node
return ray.get(dag_node.execute())
@ -535,11 +529,19 @@ class options:
__all__ = (
"init",
"run",
"run_async",
"continuation",
"resume",
"get_output",
"resume_all",
"cancel",
"list_all",
"delete",
"get_output",
"get_status",
"get_metadata",
"cancel",
"sleep",
"wait_for_event",
"options",
)

View file

@ -29,4 +29,4 @@ if __name__ == "__main__":
order_data = extract.bind()
order_summary = transform.bind(order_data)
etl = load.bind(order_summary)
print(workflow.create(etl).run())
print(workflow.run(etl))

View file

@ -26,5 +26,4 @@ def flip_coin() -> str:
if __name__ == "__main__":
workflow.init()
print(workflow.create(flip_coin.bind()).run())
print(workflow.run(flip_coin.bind()))

View file

@ -12,4 +12,4 @@ if __name__ == "__main__":
B = echo.options(**workflow.options(name="B")).bind("B", A)
C = echo.options(**workflow.options(name="C")).bind("C", A)
D = echo.options(**workflow.options(name="D")).bind("D", A, B)
workflow.create(D).run()
workflow.run(D)

View file

@ -42,4 +42,4 @@ def wait_all(*deps):
if __name__ == "__main__":
res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()
print(workflow.create(exit_handler.bind(res)).run())
print(workflow.run(exit_handler.bind(res)))

View file

@ -9,4 +9,4 @@ def hello(msg: str) -> None:
if __name__ == "__main__":
workflow.create(hello.bind("hello world")).run()
workflow.run(hello.bind("hello world"))

View file

@ -16,4 +16,4 @@ if __name__ == "__main__":
children = []
for msg in ["hello world", "goodbye world"]:
children.append(hello.bind(msg))
workflow.create(wait_all.bind(*children)).run()
workflow.run(wait_all.bind(*children))

View file

@ -16,4 +16,4 @@ if __name__ == "__main__":
h1 = hello.options(**workflow.options(name="hello1")).bind("hello1")
h2a = hello.options(**workflow.options(name="hello2a")).bind("hello2a")
h2b = hello.options(**workflow.options(name="hello2b")).bind("hello2b", h2a)
workflow.create(wait_all.bind(h1, h2b)).run()
workflow.run(wait_all.bind(h1, h2b))

View file

@ -28,4 +28,4 @@ def flip_coin() -> str:
if __name__ == "__main__":
print(workflow.create(flip_coin.bind()).run())
print(workflow.run(flip_coin.bind()))

View file

@ -37,12 +37,7 @@ def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
if __name__ == "__main__":
workflow.init()
# Default retry strategy.
print(
workflow.create(
flaky_step.options(**workflow.options(max_retries=10)).bind()
).run()
)
print(workflow.run(flaky_step.options(**workflow.options(max_retries=10)).bind()))
# Custom strategy.
print(workflow.create(custom_retry_strategy.bind(flaky_step, 10, 1)).run())
print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1)))

View file

@ -58,4 +58,4 @@ def download_all(urls: List[str]) -> None:
if __name__ == "__main__":
res = download_all.bind(FILES_TO_PROCESS)
workflow.create(res).run()
workflow.run(res)

View file

@ -13,5 +13,4 @@ def main_workflow(name: str) -> str:
if __name__ == "__main__":
wf = workflow.create(main_workflow.bind("Alice"))
print(wf.run())
print(workflow.run(main_workflow.bind("Alice")))

View file

@ -89,4 +89,4 @@ if __name__ == "__main__":
final_result = handle_errors.bind(
car_req_id, hotel_req_id, flight_req_id, saga_result
)
print(workflow.create(final_result).run())
print(workflow.run(final_result))

View file

@ -12,4 +12,4 @@ def iterate(array: List[str], result: str, i: int) -> str:
if __name__ == "__main__":
print(workflow.create(iterate.bind(["foo", "ba", "r"], "", 0)).run())
print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0)))

View file

@ -38,4 +38,4 @@ def decide(result: int) -> str:
if __name__ == "__main__":
print(workflow.create(decide.bind(get_size.bind())).run())
print(workflow.run(decide.bind(get_size.bind())))

View file

@ -20,4 +20,4 @@ def report(msg: str) -> None:
if __name__ == "__main__":
r1 = hello.bind("Kristof")
r2 = report.bind(r1)
workflow.create(r2).run()
workflow.run(r2)

View file

@ -22,4 +22,4 @@ def end(results: "List[ray.ObjectRef[str]]") -> str:
if __name__ == "__main__":
workflow.create(start.bind()).run()
workflow.run(start.bind())

View file

@ -15,4 +15,4 @@ def compute_large_fib(M: int, n: int = 1, fib: int = 1):
if __name__ == "__main__":
assert workflow.create(compute_large_fib.bind(100)).run() == 89
assert workflow.run(compute_large_fib.bind(100)) == 89

View file

@ -79,7 +79,7 @@ if __name__ == "__main__":
ray.workflow.delete(workflow_id)
except Exception:
pass
assert ray.workflow.create(pipeline(10)).run(workflow_id=workflow_id) == 20
assert ray.workflow.run(pipeline(10), workflow_id=workflow_id) == 20
pipeline = function_compose(
[
@ -99,5 +99,4 @@ if __name__ == "__main__":
ray.workflow.delete(workflow_id)
except Exception:
pass
wf = ray.workflow.create(pipeline(10))
assert wf.run(workflow_id=workflow_id) == (14, 15, 15, 16)
assert ray.workflow.run(pipeline(10), workflow_id=workflow_id) == (14, 15, 15, 16)

View file

@ -75,23 +75,21 @@ def test_basic_workflows(workflow_start_regular_shared):
return workflow.continuation(mul.bind(n, factorial.bind(n - 1)))
# This test also shows different "style" of running workflows.
assert (
workflow.create(simple_sequential.bind()).run() == "[source1][append1][append2]"
)
assert workflow.run(simple_sequential.bind()) == "[source1][append1][append2]"
wf = simple_sequential_with_input.bind("start:")
assert workflow.create(wf).run() == "start:[append1][append2]"
assert workflow.run(wf) == "start:[append1][append2]"
wf = loop_sequential.bind(3)
assert workflow.create(wf).run() == "[source1]" + "[append1]" * 3 + "[append2]"
assert workflow.run(wf) == "[source1]" + "[append1]" * 3 + "[append2]"
wf = nested.bind("nested:")
assert workflow.create(wf).run() == "nested:~[nested]~[append1][append2]"
assert workflow.run(wf) == "nested:~[nested]~[append1][append2]"
wf = fork_join.bind()
assert workflow.create(wf).run() == "join([source1][append1], [source1][append2])"
assert workflow.run(wf) == "join([source1][append1], [source1][append2])"
assert workflow.create(factorial.bind(10)).run() == 3628800
assert workflow.run(factorial.bind(10)) == 3628800
def test_async_execution(workflow_start_regular_shared):
@ -101,7 +99,7 @@ def test_async_execution(workflow_start_regular_shared):
return 314
start = time.time()
output = workflow.create(blocking.bind()).run_async()
output = workflow.run_async(blocking.bind())
duration = time.time() - start
assert duration < 5 # workflow.run is not blocked
assert ray.get(output) == 314
@ -134,7 +132,7 @@ def test_partial(workflow_start_regular_shared):
wf_step = workflow.step(fs[i]).step(wf_step)
return wf_step
assert workflow.create(chain_func.bind(1)).run() == 7
assert workflow.run(chain_func.bind(1)) == 7
def test_run_or_resume_during_running(workflow_start_regular_shared):
@ -156,13 +154,11 @@ def test_run_or_resume_during_running(workflow_start_regular_shared):
y = append1.bind(x)
return workflow.continuation(append2.bind(y))
output = workflow.create(simple_sequential.bind()).run_async(
workflow_id="running_workflow"
output = workflow.run_async(
simple_sequential.bind(), workflow_id="running_workflow"
)
with pytest.raises(RuntimeError):
workflow.create(simple_sequential.bind()).run_async(
workflow_id="running_workflow"
)
workflow.run_async(simple_sequential.bind(), workflow_id="running_workflow")
with pytest.raises(RuntimeError):
workflow.resume(workflow_id="running_workflow")
assert ray.get(output) == "[source1][append1][append2]"
@ -180,33 +176,28 @@ def test_step_failure(workflow_start_regular_shared, tmp_path):
return v
with pytest.raises(Exception):
workflow.create(
workflow.run_async(
unstable_step.options(**workflow.options(max_retries=-2).bind())
)
with pytest.raises(Exception):
workflow.create(
unstable_step.options(**workflow.options(max_retries=2)).bind()
).run()
assert (
10
== workflow.create(
workflow.run(unstable_step.options(**workflow.options(max_retries=2)).bind())
assert 10 == workflow.run(
unstable_step.options(**workflow.options(max_retries=7)).bind()
).run()
)
(tmp_path / "test").write_text("0")
(ret, err) = workflow.create(
(ret, err) = workflow.run(
unstable_step.options(
**workflow.options(max_retries=2, catch_exceptions=True)
).bind()
).run()
)
assert ret is None
assert isinstance(err, ValueError)
(ret, err) = workflow.create(
(ret, err) = workflow.run(
unstable_step.options(
**workflow.options(max_retries=7, catch_exceptions=True)
).bind()
).run()
)
assert ret == 10
assert err is None
@ -223,7 +214,7 @@ def test_step_failure_decorator(workflow_start_regular_shared, tmp_path):
raise ValueError("Invalid")
return v
assert workflow.create(unstable_step.bind()).run() == 10
assert workflow.run(unstable_step.bind()) == 10
(tmp_path / "test").write_text("0")
@ -236,7 +227,7 @@ def test_step_failure_decorator(workflow_start_regular_shared, tmp_path):
raise ValueError("Invalid")
return v
(ret, err) = workflow.create(unstable_step_exception.bind()).run()
(ret, err) = workflow.run(unstable_step_exception.bind())
assert ret is None
assert err is not None
@ -251,7 +242,7 @@ def test_step_failure_decorator(workflow_start_regular_shared, tmp_path):
raise ValueError("Invalid")
return v
(ret, err) = workflow.create(unstable_step_exception.bind()).run()
(ret, err) = workflow.run(unstable_step_exception.bind())
assert ret is None
assert err is not None
assert (tmp_path / "test").read_text() == "4"
@ -266,9 +257,9 @@ def test_nested_catch_exception(workflow_start_regular_shared):
def f1():
return workflow.continuation(f2.bind())
assert (10, None) == workflow.create(
assert (10, None) == workflow.run(
f1.options(**workflow.options(catch_exceptions=True)).bind()
).run()
)
def test_nested_catch_exception_2(workflow_start_regular_shared):
@ -279,9 +270,9 @@ def test_nested_catch_exception_2(workflow_start_regular_shared):
else:
return workflow.continuation(f1.bind(n - 1))
ret, err = workflow.create(
ret, err = workflow.run(
f1.options(**workflow.options(catch_exceptions=True)).bind(5)
).run()
)
assert ret is None
assert isinstance(err, ValueError)
@ -309,15 +300,15 @@ def test_nested_catch_exception_3(workflow_start_regular_shared, tmp_path):
else:
return workflow.continuation(f2.bind(f3.bind()))
ret, err = workflow.create(
ret, err = workflow.run(
f1.options(**workflow.options(catch_exceptions=True)).bind(True)
).run()
)
assert ret is None
assert isinstance(err, ValueError)
assert (10, None) == workflow.create(
assert (10, None) == workflow.run(
f1.options(**workflow.options(catch_exceptions=True)).bind(False)
).run()
)
def test_dynamic_output(workflow_start_regular_shared):
@ -336,9 +327,10 @@ def test_dynamic_output(workflow_start_regular_shared):
# When workflow fails, the dynamic output should points to the
# latest successful step.
try:
workflow.create(
exponential_fail.options(**workflow.options(name="step_0")).bind(3, 10)
).run(workflow_id="dynamic_output")
workflow.run(
exponential_fail.options(**workflow.options(name="step_0")).bind(3, 10),
workflow_id="dynamic_output",
)
except Exception:
pass
from ray.workflow.workflow_storage import get_workflow_storage

View file

@ -33,7 +33,7 @@ def test_step_resources(workflow_start_regular, tmp_path):
lock = FileLock(lock_path)
lock.acquire()
ret = workflow.create(step_run.options(num_cpus=2).bind()).run_async()
ret = workflow.run_async(step_run.options(num_cpus=2).bind())
ray.wait([signal_actor.wait.remote()])
obj = remote_run.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
@ -48,7 +48,7 @@ def test_get_output_1(workflow_start_regular, tmp_path):
def simple(v):
return v
assert 0 == workflow.create(simple.bind(0)).run("simple")
assert 0 == workflow.run(simple.bind(0), workflow_id="simple")
assert 0 == ray.get(workflow.get_output("simple"))
@ -62,7 +62,7 @@ def test_get_output_2(workflow_start_regular, tmp_path):
return v
lock.acquire()
obj = workflow.create(simple.bind(0)).run_async("simple")
obj = workflow.run_async(simple.bind(0), workflow_id="simple")
obj2 = workflow.get_output("simple")
lock.release()
assert ray.get([obj, obj2]) == [0, 0]
@ -83,8 +83,8 @@ def test_get_output_3(workflow_start_regular, tmp_path):
return 10
with pytest.raises(workflow.WorkflowExecutionError):
workflow.create(incr.options(**workflow.options(max_retries=0)).bind()).run(
"incr"
workflow.run(
incr.options(**workflow.options(max_retries=0)).bind(), workflow_id="incr"
)
assert cnt_file.read_text() == "1"
@ -121,9 +121,10 @@ def test_get_output_4(workflow_start_regular, tmp_path):
workflow_id = "test_get_output_4"
lock.acquire()
obj = workflow.create(
recursive.options(**workflow.options(name="10")).bind(10)
).run_async(workflow_id)
obj = workflow.run_async(
recursive.options(**workflow.options(name="10")).bind(10),
workflow_id=workflow_id,
)
outputs = [workflow.get_output(workflow_id, name=str(i)) for i in range(11)]
outputs.append(obj)
@ -148,7 +149,7 @@ def test_get_output_5(workflow_start_regular, tmp_path):
outputs = []
for i in range(20):
workflow.create(simple.bind()).run_async(workflow_id.format(i))
workflow.run_async(simple.bind(), workflow_id=workflow_id.format(i))
outputs.append(workflow.get_output(workflow_id.format(i)))
assert ray.get(outputs) == [314] * len(outputs)
@ -161,7 +162,7 @@ def test_output_with_name(workflow_start_regular):
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.create(outer_task).run_async("double")
result = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output("double", name="inner")
outer = workflow.get_output("double", name="outer")
@ -177,7 +178,7 @@ def test_output_with_name(workflow_start_regular):
inner_task = double_2.bind(1)
outer_task = double_2.bind(inner_task)
workflow_id = "double_2"
result = workflow.create(outer_task).run_async(workflow_id)
result = workflow.run_async(outer_task, workflow_id=workflow_id)
inner = workflow.get_output(workflow_id, name="double")
outer = workflow.get_output(workflow_id, name="double_1")
@ -199,7 +200,7 @@ def test_get_non_exist_output(workflow_start_regular, tmp_path):
with FileLock(lock_path):
dag = simple.options(**workflow.options(name="simple")).bind()
ret = workflow.create(dag).run_async(workflow_id=workflow_id)
ret = workflow.run_async(dag, workflow_id=workflow_id)
exist = workflow.get_output(workflow_id, name="simple")
non_exist = workflow.get_output(workflow_id, name="non_exist")
@ -215,11 +216,12 @@ def test_get_named_step_output_finished(workflow_start_regular, tmp_path):
return 2 * v
# Get the result from named step after workflow finished
assert 4 == workflow.create(
assert 4 == workflow.run(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1)
),
workflow_id="double",
)
).run("double")
assert ray.get(workflow.get_output("double", name="inner")) == 2
assert ray.get(workflow.get_output("double", name="outer")) == 4
@ -237,12 +239,13 @@ def test_get_named_step_output_running(workflow_start_regular, tmp_path):
lock_path = str(tmp_path / "lock")
lock = FileLock(lock_path)
lock.acquire()
output = workflow.create(
output = workflow.run_async(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1, lock_path),
lock_path,
),
workflow_id="double-2",
)
).run_async("double-2")
inner = workflow.get_output("double-2", name="inner")
outer = workflow.get_output("double-2", name="outer")
@ -276,11 +279,12 @@ def test_get_named_step_output_error(workflow_start_regular, tmp_path):
# Force it to fail for the outer step
with pytest.raises(Exception):
workflow.create(
workflow.run(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1, False), True
),
workflow_id="double",
)
).run("double")
# For the inner step, it should have already been executed.
assert 2 == ray.get(workflow.get_output("double", name="inner"))
@ -298,7 +302,7 @@ def test_get_named_step_default(workflow_start_regular, tmp_path):
import math
assert math.factorial(5) == workflow.create(factorial.bind(5)).run("factorial")
assert math.factorial(5) == workflow.run(factorial.bind(5), workflow_id="factorial")
for i in range(5):
step_name = (
"test_basic_workflows_2.test_get_named_step_default.locals.factorial"
@ -319,7 +323,7 @@ def test_get_named_step_duplicate(workflow_start_regular):
inner = f.bind(10, None)
outer = f.bind(20, inner)
assert 20 == workflow.create(outer).run("duplicate")
assert 20 == workflow.run(outer, workflow_id="duplicate")
# The outer will be checkpointed first. So there is no suffix for the name
assert ray.get(workflow.get_output("duplicate", name="f")) == 10
# The inner will be checkpointed after the outer. And there is a duplicate
@ -332,7 +336,7 @@ def test_no_init_run(shutdown_only):
def f():
pass
workflow.create(f.bind()).run()
workflow.run(f.bind())
def test_no_init_api(shutdown_only):

View file

@ -16,10 +16,10 @@ def test_wf_run(workflow_start_regular_shared, tmp_path):
v = int(counter.read_text()) + 1
counter.write_text(str(v))
workflow.create(f.bind()).run("abc")
workflow.run(f.bind(), workflow_id="abc")
assert counter.read_text() == "1"
# This will not rerun the job from beginning
workflow.create(f.bind()).run("abc")
workflow.run(f.bind(), workflow_id="abc")
assert counter.read_text() == "1"
@ -37,8 +37,7 @@ def test_wf_no_run(shutdown_only):
def f2(*w):
pass
f = workflow.create(f2.bind(*[f1.bind() for _ in range(10)]))
f.run()
workflow.run(f2.bind(*[f1.bind() for _ in range(10)]))
def test_dedupe_indirect(workflow_start_regular_shared, tmp_path):
@ -66,11 +65,11 @@ def test_dedupe_indirect(workflow_start_regular_shared, tmp_path):
a = incr.bind()
i1 = identity.bind(a)
i2 = identity.bind(a)
assert "1" == workflow.create(join.bind(i1, i2)).run()
assert "2" == workflow.create(join.bind(i1, i2)).run()
assert "1" == workflow.run(join.bind(i1, i2))
assert "2" == workflow.run(join.bind(i1, i2))
# pass a multiple times
assert "3" == workflow.create(join.bind(a, a, a, a)).run()
assert "4" == workflow.create(join.bind(a, a, a, a)).run()
assert "3" == workflow.run(join.bind(a, a, a, a))
assert "4" == workflow.run(join.bind(a, a, a, a))
def test_run_off_main_thread(workflow_start_regular_shared):
@ -84,8 +83,7 @@ def test_run_off_main_thread(workflow_start_regular_shared):
def run():
global succ
# Setup the workflow.
data = workflow.create(fake_data.bind(10))
assert data.run(workflow_id="run") == list(range(10))
assert workflow.run(fake_data.bind(10), workflow_id="run") == list(range(10))
import threading
@ -106,7 +104,7 @@ def test_task_id_generation(workflow_start_regular_shared, request):
x = simple.options(**workflow.options(name="simple")).bind(x)
workflow_id = "test_task_id_generation"
ret = workflow.create(x).run_async(workflow_id=workflow_id)
ret = workflow.run_async(x, workflow_id=workflow_id)
outputs = [workflow.get_output(workflow_id, name="simple")]
for i in range(1, n):
outputs.append(workflow.get_output(workflow_id, name=f"simple_{i}"))

View file

@ -18,10 +18,9 @@ def test_cancellation(tmp_path, workflow_start_regular):
pass
workflow_id = "test_cancellation"
wf = workflow.create(simple.bind())
with filelock.FileLock(lock_b):
r = wf.run_async(workflow_id=workflow_id)
r = workflow.run_async(simple.bind(), workflow_id=workflow_id)
try:
ray.get(r, timeout=5)
except GetTimeoutError:

View file

@ -53,11 +53,12 @@ def _assert_step_checkpoints(wf_storage, task_id, mode):
def test_checkpoint_dag_skip_all(workflow_start_regular_shared):
outputs = workflow.create(
outputs = workflow.run(
checkpoint_dag.options(
**workflow.options(name="checkpoint_dag", checkpoint=False)
).bind(False)
).run(workflow_id="checkpoint_skip")
).bind(False),
workflow_id="checkpoint_skip",
)
assert np.isclose(outputs, 8388607.5)
recovered = ray.get(workflow.resume("checkpoint_skip"))
assert np.isclose(recovered, 8388607.5)
@ -70,9 +71,10 @@ def test_checkpoint_dag_skip_all(workflow_start_regular_shared):
def test_checkpoint_dag_skip_partial(workflow_start_regular_shared):
outputs = workflow.create(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False)
).run(workflow_id="checkpoint_partial")
outputs = workflow.run(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False),
workflow_id="checkpoint_partial",
)
assert np.isclose(outputs, 8388607.5)
recovered = ray.get(workflow.resume("checkpoint_partial"))
assert np.isclose(recovered, 8388607.5)
@ -85,9 +87,10 @@ def test_checkpoint_dag_skip_partial(workflow_start_regular_shared):
def test_checkpoint_dag_full(workflow_start_regular_shared):
outputs = workflow.create(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True)
).run(workflow_id="checkpoint_whole")
outputs = workflow.run(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True),
workflow_id="checkpoint_whole",
)
assert np.isclose(outputs, 8388607.5)
recovered = ray.get(workflow.resume("checkpoint_whole"))
assert np.isclose(recovered, 8388607.5)

View file

@ -39,9 +39,10 @@ def test_checkpoint_dag_recovery_skip(workflow_start_regular_shared):
start = time.time()
with pytest.raises(workflow.WorkflowExecutionError):
workflow.create(
checkpoint_dag.options(**workflow.options(checkpoint=False)).bind(False)
).run(workflow_id="checkpoint_skip_recovery")
workflow.run(
checkpoint_dag.options(**workflow.options(checkpoint=False)).bind(False),
workflow_id="checkpoint_skip_recovery",
)
run_duration_skipped = time.time() - start
utils.set_global_mark()
@ -62,8 +63,8 @@ def test_checkpoint_dag_recovery_partial(workflow_start_regular_shared):
start = time.time()
with pytest.raises(workflow.WorkflowExecutionError):
workflow.create(checkpoint_dag.bind(False)).run(
workflow_id="checkpoint_partial_recovery"
workflow.run(
checkpoint_dag.bind(False), workflow_id="checkpoint_partial_recovery"
)
run_duration_partial = time.time() - start
@ -84,9 +85,7 @@ def test_checkpoint_dag_recovery_whole(workflow_start_regular_shared):
start = time.time()
with pytest.raises(workflow.WorkflowExecutionError):
workflow.create(checkpoint_dag.bind(True)).run(
workflow_id="checkpoint_whole_recovery"
)
workflow.run(checkpoint_dag.bind(True), workflow_id="checkpoint_whole_recovery")
run_duration_whole = time.time() - start
utils.set_global_mark()

View file

@ -73,11 +73,10 @@ def test_workflow_with_pressure(workflow_start_regular_shared):
]
ans = ray.get([d.execute() for d in dags])
workflows = [workflow.create(d) for d in dags]
outputs = []
for _ in range(pressure_level):
for w in workflows:
outputs.append(w.run_async())
for w in dags:
outputs.append(workflow.run_async(w))
assert ray.get(outputs) == ans * pressure_level

View file

@ -28,7 +28,7 @@ def test_dag_to_workflow_execution(workflow_start_regular_shared):
return f"{lf},{rt};{b}"
with pytest.raises(TypeError):
workflow.create(begin.remote(1, 2, 3))
workflow.run_async(begin.remote(1, 2, 3))
with InputNode() as dag_input:
f = begin.bind(2, dag_input[1], a=dag_input.a)
@ -36,8 +36,10 @@ def test_dag_to_workflow_execution(workflow_start_regular_shared):
rt = right.bind(f, b=dag_input.b, pos=dag_input[0])
b = end.bind(lf, rt, b=dag_input.b)
wf = workflow.create(b, 2, 3.14, a=10, b="ok")
assert wf.run() == "left(23.14, hello, 10),right(23.14, ok, 2);ok"
assert (
workflow.run(b, 2, 3.14, a=10, b="ok")
== "left(23.14, hello, 10),right(23.14, ok, 2);ok"
)
def test_dedupe_serialization_dag(workflow_start_regular_shared):
@ -64,7 +66,7 @@ def test_dedupe_serialization_dag(workflow_start_regular_shared):
single = identity.bind((ref,))
double = identity.bind(list_of_refs)
result_ref, result_list = workflow.create(gather.bind(single, double)).run()
result_ref, result_list = workflow.run(gather.bind(single, double))
for result in result_list:
assert ray.get(*result_ref) == ray.get(result)
@ -84,10 +86,10 @@ def test_same_object_many_dags(workflow_start_regular_shared):
x = {0: ray.put(10)}
result1 = workflow.create(f.bind(x)).run()
result2 = workflow.create(f.bind(x)).run()
result1 = workflow.run(f.bind(x))
result2 = workflow.run(f.bind(x))
with InputNode() as dag_input:
result3 = workflow.create(f.bind(dag_input.x), x=x).run()
result3 = workflow.run(f.bind(dag_input.x), x=x)
assert ray.get(*result1) == 10
assert ray.get(*result2) == 10
@ -116,7 +118,7 @@ def test_dereference_object_refs(workflow_start_regular_shared):
dag = f.bind(g.bind(x=ray.put(314), y=[ray.put(2022)]))
# Run with workflow and normal Ray engine.
workflow.create(dag).run()
workflow.run(dag)
ray.get(dag.execute())
@ -164,7 +166,7 @@ def test_dereference_dags(workflow_start_regular_shared):
)
# Run with workflow and normal Ray engine.
assert workflow.create(dag).run() == "ok"
assert workflow.run(dag) == "ok"
assert ray.get(dag.execute()) == "ok"
@ -189,7 +191,7 @@ def test_workflow_continuation(workflow_start_regular_shared):
dag = f.bind()
assert ray.get(dag.execute()) == 43
assert workflow.create(dag).run() == 43
assert workflow.run(dag) == 43
if __name__ == "__main__":

View file

@ -41,7 +41,7 @@ def test_dataset(workflow_start_regular_shared):
transformed_ref = transform_dataset.bind(ds_ref)
output_ref = sum_dataset.bind(transformed_ref)
result = workflow.create(output_ref).run()
result = workflow.run(output_ref)
assert result == 2 * sum(range(1000))
@ -50,7 +50,7 @@ def test_dataset_1(workflow_start_regular_shared):
transformed_ref = transform_dataset.bind(ds_ref)
output_ref = sum_dataset.bind(transformed_ref)
result = workflow.create(output_ref).run()
result = workflow.run(output_ref)
assert result == 2 * sum(range(1000))
@ -59,7 +59,7 @@ def test_dataset_2(workflow_start_regular_shared):
transformed_ref = transform_dataset_1.bind(ds_ref)
output_ref = sum_dataset.bind(transformed_ref)
result = workflow.create(output_ref).run()
result = workflow.run(output_ref)
assert result == 2 * sum(range(1000))

View file

@ -13,11 +13,14 @@ def test_dynamic_workflow_ref(workflow_start_regular_shared):
return x + 1
# This test also shows different "style" of running workflows.
first_step = workflow.create(incr.bind(0))
assert first_step.run("test_dynamic_workflow_ref") == 1
second_step = workflow.create(incr.bind(WorkflowRef("incr")))
assert workflow.run(incr.bind(0), workflow_id="test_dynamic_workflow_ref") == 1
# Without rerun, it'll just return the previous result
assert second_step.run("test_dynamic_workflow_ref") == 1
assert (
workflow.run(
incr.bind(WorkflowRef("incr")), workflow_id="test_dynamic_workflow_ref"
)
== 1
)
# TODO (yic) We need re-run to make this test work
# assert second_step.run("test_dynamic_workflow_ref") == 2

View file

@ -19,7 +19,7 @@ def test_sleep(workflow_start_regular_shared):
def sleep_helper():
return workflow.continuation(after_sleep.bind(time.time(), workflow.sleep(2)))
start, end = workflow.create(sleep_helper.bind()).run()
start, end = workflow.run(sleep_helper.bind())
duration = end - start
assert 1 < duration
@ -31,7 +31,7 @@ def test_sleep_checkpointing(workflow_start_regular_shared):
sleep_step = workflow.sleep(2)
time.sleep(2)
start_time = time.time()
workflow.create(sleep_step).run()
workflow.run(sleep_step)
end_time = time.time()
duration = end_time - start_time
assert 1 < duration
@ -73,9 +73,7 @@ def test_wait_for_multiple_events(workflow_start_regular_shared):
event1_promise = workflow.wait_for_event(EventListener1)
event2_promise = workflow.wait_for_event(EventListener2)
promise = workflow.create(
trivial_step.bind(event1_promise, event2_promise)
).run_async()
promise = workflow.run_async(trivial_step.bind(event1_promise, event2_promise))
while not (
utils.check_global_mark("listener1") and utils.check_global_mark("listener2")
@ -118,7 +116,7 @@ def test_event_after_arg_resolution(workflow_start_regular_shared):
event_promise = workflow.wait_for_event(MyEventListener)
assert workflow.create(gather.bind(event_promise, triggers_event.bind())).run() == (
assert workflow.run(gather.bind(event_promise, triggers_event.bind())) == (
None,
None,
)
@ -156,7 +154,7 @@ def test_event_during_arg_resolution(workflow_start_regular_shared):
return args
event_promise = workflow.wait_for_event(MyEventListener)
assert workflow.create(gather.bind(event_promise, triggers_event.bind())).run() == (
assert workflow.run(gather.bind(event_promise, triggers_event.bind())) == (
None,
None,
)
@ -192,7 +190,7 @@ def test_crash_during_event_checkpointing(workflow_start_regular_shared):
pass
event_promise = workflow.wait_for_event(MyEventListener)
workflow.create(wait_then_finish.bind(event_promise)).run_async("workflow")
workflow.run_async(wait_then_finish.bind(event_promise), workflow_id="workflow")
while not utils.check_global_mark("time_to_die"):
time.sleep(0.1)
@ -247,7 +245,7 @@ def test_crash_after_commit(workflow_start_regular_shared):
await asyncio.sleep(1000000)
event_promise = workflow.wait_for_event(MyEventListener)
workflow.create(event_promise).run_async("workflow")
workflow.run_async(event_promise, workflow_id="workflow")
while not utils.check_global_mark("first"):
time.sleep(0.1)
@ -280,7 +278,9 @@ def test_event_as_workflow(workflow_start_regular_shared):
await asyncio.sleep(1)
utils.unset_global_mark()
promise = workflow.create(workflow.wait_for_event(MyEventListener)).run_async("wf")
promise = workflow.run_async(
workflow.wait_for_event(MyEventListener), workflow_id="wf"
)
assert workflow.get_status("wf") == workflow.WorkflowStatus.RUNNING

View file

@ -27,7 +27,7 @@ def test_simple_large_intermediate(workflow_start_regular_shared):
return workflow.continuation(average.bind(y))
start = time.time()
outputs = workflow.create(simple_large_intermediate.bind()).run()
outputs = workflow.run(simple_large_intermediate.bind())
print(f"duration = {time.time() - start}")
assert np.isclose(outputs, 8388607.5)

View file

@ -27,7 +27,7 @@ def foo(x):
if __name__ == "__main__":
ray.init()
output = workflow.create(foo.bind(0)).run_async(workflow_id="driver_terminated")
output = workflow.run_async(foo.bind(0), workflow_id="driver_terminated")
time.sleep({})
"""

View file

@ -19,7 +19,7 @@ def test_user_metadata(workflow_start_regular):
def simple():
return 0
workflow.create(simple.bind()).run(workflow_id, metadata=user_run_metadata)
workflow.run(simple.bind(), workflow_id=workflow_id, metadata=user_run_metadata)
assert workflow.get_metadata("simple")["user_metadata"] == user_run_metadata
assert (
@ -38,7 +38,7 @@ def test_user_metadata_empty(workflow_start_regular):
def simple():
return 0
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)
assert workflow.get_metadata("simple")["user_metadata"] == {}
assert workflow.get_metadata("simple", "simple_step")["user_metadata"] == {}
@ -50,10 +50,10 @@ def test_user_metadata_not_dict(workflow_start_regular):
return 0
with pytest.raises(ValueError):
workflow.create(simple.options(**workflow.options(metadata="x")).bind())
workflow.run_async(simple.options(**workflow.options(metadata="x")).bind())
with pytest.raises(ValueError):
workflow.create(simple.bind()).run(metadata="x")
workflow.run(simple.bind(), metadata="x")
def test_user_metadata_not_json_serializable(workflow_start_regular):
@ -65,10 +65,12 @@ def test_user_metadata_not_json_serializable(workflow_start_regular):
pass
with pytest.raises(ValueError):
workflow.create(simple.options(**workflow.options(metadata={"x": X()})).bind())
workflow.run_async(
simple.options(**workflow.options(metadata={"x": X()})).bind()
)
with pytest.raises(ValueError):
workflow.create(simple.bind()).run(metadata={"x": X()})
workflow.run(simple.bind(), metadata={"x": X()})
def test_runtime_metadata(workflow_start_regular):
@ -82,7 +84,7 @@ def test_runtime_metadata(workflow_start_regular):
time.sleep(2)
return 0
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)
workflow_metadata = workflow.get_metadata("simple")
assert "start_time" in workflow_metadata["stats"]
@ -113,7 +115,7 @@ def test_successful_workflow(workflow_start_regular):
time.sleep(2)
return 0
workflow.create(simple.bind()).run(workflow_id, metadata=user_run_metadata)
workflow.run(simple.bind(), workflow_id=workflow_id, metadata=user_run_metadata)
workflow_metadata = workflow.get_metadata("simple")
assert workflow_metadata["status"] == "SUCCESSFUL"
@ -145,7 +147,7 @@ def test_running_and_canceled_workflow(workflow_start_regular, tmp_path):
time.sleep(1000)
return 0
workflow.create(simple.bind()).run_async(workflow_id)
workflow.run_async(simple.bind(), workflow_id=workflow_id)
# Wait until step runs to make sure pre-run metadata is written
while not flag.exists():
@ -177,7 +179,7 @@ def test_failed_and_resumed_workflow(workflow_start_regular, tmp_path):
return 0
with pytest.raises(workflow.WorkflowExecutionError):
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)
workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"
@ -213,7 +215,9 @@ def test_nested_workflow(workflow_start_regular):
time.sleep(2)
return workflow.continuation(inner.bind())
workflow.create(outer.bind()).run("nested", metadata={"workflow_k": "workflow_v"})
workflow.run(
outer.bind(), workflow_id="nested", metadata={"workflow_k": "workflow_v"}
)
workflow_metadata = workflow.get_metadata("nested")
outer_step_metadata = workflow.get_metadata("nested", "outer")
@ -251,7 +255,7 @@ def test_no_workflow_found(workflow_start_regular):
def simple():
return 0
workflow.create(simple.bind()).run(workflow_id)
workflow.run(simple.bind(), workflow_id=workflow_id)
with pytest.raises(ValueError) as excinfo:
workflow.get_metadata("simple1")

View file

@ -33,14 +33,14 @@ def test_objectref_inputs(workflow_start_regular_shared):
except Exception as e:
return False, str(e)
output, s = workflow.create(
output, s = workflow.run(
deref_check.bind(
ray.put(42),
nested_workflow.bind(10),
[nested_workflow.bind(9)],
[{"output": nested_workflow.bind(7)}],
)
).run()
)
assert output is True, s
@ -57,10 +57,10 @@ def test_objectref_outputs(workflow_start_regular_shared):
def return_objectrefs() -> List[ObjectRef]:
return [ray.put(x) for x in range(5)]
single = workflow.create(nested_ref_workflow.bind()).run()
single = workflow.run(nested_ref_workflow.bind())
assert ray.get(ray.get(single)) == 42
multi = workflow.create(return_objectrefs.bind()).run()
multi = workflow.run(return_objectrefs.bind())
assert ray.get(multi) == list(range(5))
@ -77,7 +77,7 @@ def test_object_deref(workflow_start_regular_shared):
@ray.remote
def return_workflow():
return workflow.create(empty_list.bind())
return empty_list.bind()
@ray.remote
def return_data() -> ray.ObjectRef:
@ -88,7 +88,7 @@ def test_object_deref(workflow_start_regular_shared):
return ray.get(data)
# test we are forbidden from directly passing workflow to Ray.
x = workflow.create(empty_list.bind())
x = empty_list.bind()
with pytest.raises(ValueError):
ray.put(x)
with pytest.raises(ValueError):
@ -98,7 +98,7 @@ def test_object_deref(workflow_start_regular_shared):
# test return object ref
obj = return_data.bind()
arr: np.ndarray = workflow.create(receive_data.bind(obj)).run()
arr: np.ndarray = workflow.run(receive_data.bind(obj))
assert np.array_equal(arr, np.ones(4096))

View file

@ -42,7 +42,7 @@ def test_dedupe_downloads_list(workflow_start_regular):
numbers = [ray.put(i) for i in range(5)]
workflows = [identity.bind(numbers) for _ in range(100)]
workflow.create(gather.bind(*workflows)).run()
workflow.run(gather.bind(*workflows))
ops = debug_store._logged_storage.get_op_counter()
get_objects_count = 0
@ -70,7 +70,7 @@ def test_dedupe_download_raw_ref(workflow_start_regular):
ref = ray.put("hello")
workflows = [identity.bind(ref) for _ in range(100)]
workflow.create(gather.bind(*workflows)).run()
workflow.run(gather.bind(*workflows))
ops = debug_store._logged_storage.get_op_counter()
get_objects_count = 0
@ -107,7 +107,7 @@ def test_nested_workflow_no_download(workflow_start_regular):
utils._alter_storage(debug_store)
ref = ray.put("hello")
result = workflow.create(recursive.bind([ref], 10)).run()
result = workflow.run(recursive.bind([ref], 10))
ops = debug_store._logged_storage.get_op_counter()
get_objects_count = 0
@ -140,7 +140,7 @@ def test_recovery_simple_1(workflow_start_regular):
workflow_id = "test_recovery_simple_1"
with pytest.raises(workflow.WorkflowExecutionError):
# internally we get WorkerCrashedError
workflow.create(the_failed_step.bind("x")).run(workflow_id=workflow_id)
workflow.run(the_failed_step.bind("x"), workflow_id=workflow_id)
assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED
@ -162,7 +162,7 @@ def test_recovery_simple_2(workflow_start_regular):
workflow_id = "test_recovery_simple_2"
with pytest.raises(workflow.WorkflowExecutionError):
# internally we get WorkerCrashedError
workflow.create(simple.bind("x")).run(workflow_id=workflow_id)
workflow.run(simple.bind("x"), workflow_id=workflow_id)
assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED
@ -196,7 +196,7 @@ def test_recovery_simple_3(workflow_start_regular):
workflow_id = "test_recovery_simple_3"
with pytest.raises(workflow.WorkflowExecutionError):
# internally we get WorkerCrashedError
workflow.create(simple.bind("x")).run(workflow_id=workflow_id)
workflow.run(simple.bind("x"), workflow_id=workflow_id)
assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED
@ -240,7 +240,7 @@ def test_recovery_complex(workflow_start_regular):
workflow_id = "test_recovery_complex"
with pytest.raises(workflow.WorkflowExecutionError):
# internally we get WorkerCrashedError
workflow.create(complex.bind("x")).run(workflow_id=workflow_id)
workflow.run(complex.bind("x"), workflow_id=workflow_id)
assert workflow.get_status(workflow_id) == workflow.WorkflowStatus.FAILED
@ -281,8 +281,7 @@ def foo(x):
if __name__ == "__main__":
ray.init(storage="{tmp_path}")
workflow.init()
assert workflow.create(foo.bind(0)).run(workflow_id="cluster_failure") == 20
assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20
"""
)
time.sleep(10)
@ -320,8 +319,7 @@ def foo(x):
if __name__ == "__main__":
ray.init(storage="{str(workflow_dir)}")
workflow.init()
assert workflow.create(foo.bind(0)).run(workflow_id="cluster_failure") == 20
assert workflow.run(foo.bind(0), workflow_id="cluster_failure") == 20
"""
)
time.sleep(10)
@ -346,7 +344,7 @@ def test_shortcut(workflow_start_regular):
else:
return 100
assert workflow.create(recursive_chain.bind(0)).run(workflow_id="shortcut") == 100
assert workflow.run(recursive_chain.bind(0), workflow_id="shortcut") == 100
# the shortcut points to the step with output checkpoint
store = workflow_storage.get_workflow_storage("shortcut")
task_id = store.get_entrypoint_step_id()
@ -361,7 +359,7 @@ def test_resume_different_storage(shutdown_only, tmp_path):
ray.init(storage=str(tmp_path))
workflow.init()
workflow.create(constant.bind()).run(workflow_id="const")
workflow.run(constant.bind(), workflow_id="const")
assert ray.get(workflow.resume(workflow_id="const")) == 31416

View file

@ -60,7 +60,7 @@ def test_dedupe_serialization(workflow_start_regular_shared):
single = identity.bind((ref,))
double = identity.bind(list_of_refs)
workflow.create(gather.bind(single, double)).run()
workflow.run(gather.bind(single, double))
# One more for hashing the ref, and for uploading.
assert ray.get(counter.get_count.remote()) == 3
@ -75,7 +75,7 @@ def test_dedupe_serialization_2(workflow_start_regular_shared):
single = identity.bind((ref,))
double = identity.bind(list_of_refs)
result_ref, result_list = workflow.create(gather.bind(single, double)).run()
result_ref, result_list = workflow.run(gather.bind(single, double))
for result in result_list:
assert ray.get(*result_ref) == ray.get(result)
@ -96,8 +96,8 @@ def test_same_object_many_workflows(workflow_start_regular_shared):
x = {0: ray.put(10)}
result1 = workflow.create(f.bind(x)).run()
result2 = workflow.create(f.bind(x)).run()
result1 = workflow.run(f.bind(x))
result2 = workflow.run(f.bind(x))
print(result1)
print(result2)
@ -140,7 +140,7 @@ if __name__ == "__main__":
workflow.init()
arg = ray.put("hello world")
workflow.create(foo.bind([arg, arg])).run()
workflow.run(foo.bind([arg, arg]))
assert False
"""

View file

@ -16,21 +16,21 @@ def test_signature_check(workflow_start_regular):
# TODO(suquark): Ray DAG does not check the inputs. Fix it in Ray DAG.
with pytest.raises(TypeError):
workflow.create(signature_check.bind(1)).run()
workflow.run(signature_check.bind(1))
with pytest.raises(TypeError):
workflow.create(signature_check.bind(1, c=2)).run()
workflow.run(signature_check.bind(1, c=2))
with pytest.raises(TypeError):
workflow.create(signature_check.bind(1, 2, d=3)).run()
workflow.run(signature_check.bind(1, 2, d=3))
with pytest.raises(TypeError):
workflow.create(signature_check.bind(1, 2, 3, 4)).run()
workflow.run(signature_check.bind(1, 2, 3, 4))
workflow.create(signature_check.bind(1, 2, 3)).run()
workflow.create(signature_check.bind(1, 2, c=3)).run()
workflow.create(signature_check.bind(1, b=2, c=3)).run()
workflow.create(signature_check.bind(a=1, b=2, c=3)).run()
workflow.run(signature_check.bind(1, 2, 3))
workflow.run(signature_check.bind(1, 2, c=3))
workflow.run(signature_check.bind(1, b=2, c=3))
workflow.run(signature_check.bind(a=1, b=2, c=3))
if __name__ == "__main__":

View file

@ -39,7 +39,7 @@ def test_delete(workflow_start_regular):
time.sleep(1000000)
return x
workflow.create(never_ends.bind("hello world")).run_async("never_finishes")
workflow.run_async(never_ends.bind("hello world"), workflow_id="never_finishes")
# Make sure the step is actualy executing before killing the cluster
while not utils.check_global_mark():
@ -75,7 +75,7 @@ def test_delete(workflow_start_regular):
def basic_step(arg):
return arg
result = workflow.create(basic_step.bind("hello world")).run(workflow_id="finishes")
result = workflow.run(basic_step.bind("hello world"), workflow_id="finishes")
assert result == "hello world"
ouput = workflow.get_output("finishes")
assert ray.get(ouput) == "hello world"
@ -98,7 +98,7 @@ def test_delete(workflow_start_regular):
assert workflow.list_all() == []
# The workflow can be re-run as if it was never run before.
assert workflow.create(basic_step.bind("123")).run(workflow_id="finishes") == "123"
assert workflow.run(basic_step.bind("123"), workflow_id="finishes") == "123"
# utils.unset_global_mark()
# never_ends.step("123").run_async(workflow_id="never_finishes")
@ -252,7 +252,7 @@ def test_cluster_storage_init(workflow_start_cluster, tmp_path):
def f():
return 10
assert workflow.create(f.bind()).run() == 10
assert workflow.run(f.bind()) == 10
if __name__ == "__main__":

View file

@ -49,7 +49,7 @@ def construct_workflow(length: int):
for i in range(length):
x0, x1, x2 = results[-2], results[-1], str(i)
results.append(scan.bind(x0, x1, x2))
return workflow.create(results[-1])
return results[-1]
def _locate_initial_commit(debug_store: DebugStorage) -> int:

View file

@ -19,7 +19,7 @@ def test_variable_mutable(workflow_start_regular):
a = identity.bind(x)
x.append(1)
b = identity.bind(x)
assert workflow.create(projection.bind(a, b)).run() == []
assert workflow.run(projection.bind(a, b)) == []
if __name__ == "__main__":

View file

@ -32,8 +32,7 @@ def test_workflow_manager(workflow_start_regular, tmp_path):
return 100
outputs = [
workflow.create(long_running.bind(i)).run_async(workflow_id=str(i))
for i in range(100)
workflow.run_async(long_running.bind(i), workflow_id=str(i)) for i in range(100)
]
# Test list all, it should list all jobs running
all_tasks = workflow.list_all()