From d0b879cdb1401192ebb5e6708ce2c705f1f348d0 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 1 Sep 2022 03:27:32 +0000 Subject: [PATCH] [workflow] Change name in step to task_id (#28151) We've deprecated the name options and use task_id. This is the cleanup to fix everything left. --- doc/source/workflows/basics.rst | 17 ++++---- doc/source/workflows/metadata.rst | 10 ++--- python/ray/workflow/api.py | 26 ++++++------ .../examples/comparisons/argo/dag_workflow.py | 8 ++-- .../comparisons/argo/multi_step_workflow.py | 6 +-- .../workflow/tests/test_basic_workflows.py | 4 +- .../workflow/tests/test_basic_workflows_2.py | 40 +++++++++---------- .../workflow/tests/test_basic_workflows_3.py | 4 +- .../workflow/tests/test_basic_workflows_4.py | 6 +-- python/ray/workflow/tests/test_checkpoint.py | 14 ++++--- python/ray/workflow/tests/test_logging.py | 10 ++--- python/ray/workflow/tests/test_metadata.py | 30 +++++++------- python/ray/workflow/workflow_state.py | 4 +- .../ray/workflow/workflow_state_from_dag.py | 18 +++++---- .../workflow/workflow_state_from_storage.py | 2 +- python/ray/workflow/workflow_storage.py | 14 ++++--- 16 files changed, 110 insertions(+), 103 deletions(-) diff --git a/doc/source/workflows/basics.rst b/doc/source/workflows/basics.rst index aab311f98..d8ff1cc20 100644 --- a/doc/source/workflows/basics.rst +++ b/doc/source/workflows/basics.rst @@ -128,16 +128,15 @@ workflow ids, call ``ray.workflow.list_all()``. Sub-Task Results ~~~~~~~~~~~~~~~~ -We can retrieve the results for individual workflow tasks too with *named tasks*. A task can be named in two ways: +We can retrieve the results for individual workflow tasks too with *task id*. Task ID can be given with ``task_id``: - 1) via ``.options(**workflow.options(name="task_name"))`` - 2) via decorator ``@workflow.options(name="task_name")`` + 1) via ``.options(**workflow.options(task_id="task_name"))`` + 2) via decorator ``@workflow.options(task_id="task_name")`` -If tasks are not given ``task_name``, the function name of the steps is set as the ``task_name``. -The ID of the task would be the same as the name. If there are multiple tasks with the same name, -a suffix with a counter ``_n`` will be added. +If tasks are not given ``task_id``, the function name of the steps is set as the ``task_id``. +If there are multiple tasks with the same id, a suffix with a counter ``_n`` will be added. -Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_name")``. +Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``. If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples: .. code-block:: python @@ -156,8 +155,8 @@ If the task with the given ``task_id`` hasn't been executed before the workflow def double(v): return 2 * v - inner_task = double.options(**workflow.options(name="inner")).bind(1) - outer_task = double.options(**workflow.options(name="outer")).bind(inner_task) + inner_task = double.options(**workflow.options(task_id="inner")).bind(1) + outer_task = double.options(**workflow.options(task_id="outer")).bind(inner_task) result_ref = workflow.run_async(outer_task, workflow_id="double") inner = workflow.get_output_async(workflow_id, task_id="inner") diff --git a/doc/source/workflows/metadata.rst b/doc/source/workflows/metadata.rst index f42880d71..e7dfee9ac 100644 --- a/doc/source/workflows/metadata.rst +++ b/doc/source/workflows/metadata.rst @@ -34,10 +34,10 @@ providing the task name: workflow.run( add.options( - **workflow.options(name="add_task") + **workflow.options(task_id="add_task") ).bind(10, 20), workflow_id="add_example_2") - task_metadata = workflow.get_metadata("add_example_2", name="add_task") + task_metadata = workflow.get_metadata("add_example_2", task_id="add_task") assert "start_time" in workflow_metadata["stats"] assert "end_time" in workflow_metadata["stats"] @@ -53,11 +53,11 @@ workflow or workflow task. .. code-block:: python - workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20), + workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20), workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"}) assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"} - assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"} + assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"} **Note: user-defined metadata must be a python dictionary with values that are JSON serializable.** @@ -72,7 +72,7 @@ Available Metrics **Task level** -- name: name of the task, either provided by the user via ``task.options(**workflow.options(name=xxx))`` or generated by the system. +- name: name of the task, either provided by the user via ``task.options(**workflow.options(task_id=xxx))`` or generated by the system. - task_options: options of the task, either provided by the user via ``task.options()`` or default by system. - user_metadata: a python dictionary of custom metadata by the user via ``task.options()``. - stats: task running stats, including task start time and end time. diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index 83034e1c5..873806af9 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -294,18 +294,18 @@ def resume_async(workflow_id: str) -> ray.ObjectRef: @PublicAPI(stability="alpha") -def get_output(workflow_id: str, *, name: Optional[str] = None) -> Any: +def get_output(workflow_id: str, *, task_id: Optional[str] = None) -> Any: """Get the output of a running workflow. Args: workflow_id: The workflow to get the output of. - name: If set, fetch the specific task instead of the output of the + task_id: If set, fetch the specific task instead of the output of the workflow. Examples: >>> from ray import workflow >>> start_trip = ... # doctest: +SKIP - >>> trip = start_trip.options(name="trip").bind() # doctest: +SKIP + >>> trip = start_trip.options(task_id="trip").bind() # doctest: +SKIP >>> res1 = workflow.run_async(trip, workflow_id="trip1") # doctest: +SKIP >>> # you could "get_output()" in another machine >>> res2 = workflow.get_output_async("trip1") # doctest: +SKIP @@ -316,7 +316,7 @@ def get_output(workflow_id: str, *, name: Optional[str] = None) -> Any: Returns: The output of the workflow task. """ - return ray.get(get_output_async(workflow_id, task_id=name)) + return ray.get(get_output_async(workflow_id, task_id=task_id)) @PublicAPI(stability="alpha") @@ -596,20 +596,20 @@ def sleep(duration: float) -> "DAGNode[Event]": @PublicAPI(stability="alpha") @client_mode_wrap -def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any]: +def get_metadata(workflow_id: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Get the metadata of the workflow. This will return a dict of metadata of either the workflow ( if only workflow_id is given) or a specific workflow task (if - both workflow_id and task name are given). Exception will be - raised if the given workflow id or task name does not exist. + both workflow_id and task id are given). Exception will be + raised if the given workflow id or task id does not exist. If only workflow id is given, this will return metadata on workflow level, which includes running status, workflow-level user metadata and workflow-level running stats (e.g. the start time and end time of the workflow). - If both workflow id and task name are given, this will return + If both workflow id and task id are given, this will return metadata on workflow task level, which includes task inputs, task-level user metadata and task-level running stats (e.g. the start time and end time of the task). @@ -617,14 +617,14 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any] Args: workflow_id: The workflow to get the metadata of. - name: If set, fetch the metadata of the specific task instead of + task_id: If set, fetch the metadata of the specific task instead of the metadata of the workflow. Examples: >>> from ray import workflow >>> trip = ... # doctest: +SKIP >>> workflow_task = trip.options( # doctest: +SKIP - ... **workflow.options(name="trip", metadata={"k1": "v1"})).bind() + ... **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind() >>> workflow.run(workflow_task, # doctest: +SKIP ... workflow_id="trip1", metadata={"k2": "v2"}) >>> workflow_metadata = workflow.get_metadata("trip1") # doctest: +SKIP @@ -646,10 +646,10 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any] """ _ensure_workflow_initialized() store = WorkflowStorage(workflow_id) - if name is None: + if task_id is None: return store.load_workflow_metadata() else: - return store.load_task_metadata(name) + return store.load_task_metadata(task_id) @PublicAPI(stability="alpha") @@ -751,7 +751,7 @@ class options: # TODO(suquark): More rigid arguments check like @ray.remote arguments. This is # fairly complex, but we should enable it later. valid_options = { - "name", + "task_id", "metadata", "catch_exceptions", "checkpoint", diff --git a/python/ray/workflow/examples/comparisons/argo/dag_workflow.py b/python/ray/workflow/examples/comparisons/argo/dag_workflow.py index a8e28634c..5bf245605 100644 --- a/python/ray/workflow/examples/comparisons/argo/dag_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/dag_workflow.py @@ -8,8 +8,8 @@ def echo(msg: str, *deps) -> None: if __name__ == "__main__": - A = echo.options(**workflow.options(name="A")).bind("A") - B = echo.options(**workflow.options(name="B")).bind("B", A) - C = echo.options(**workflow.options(name="C")).bind("C", A) - D = echo.options(**workflow.options(name="D")).bind("D", A, B) + A = echo.options(**workflow.options(task_id="A")).bind("A") + B = echo.options(**workflow.options(task_id="B")).bind("B", A) + C = echo.options(**workflow.options(task_id="C")).bind("C", A) + D = echo.options(**workflow.options(task_id="D")).bind("D", A, B) workflow.run(D) diff --git a/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py b/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py index b7d969a33..d4b4f0ae7 100644 --- a/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py +++ b/python/ray/workflow/examples/comparisons/argo/multi_step_workflow.py @@ -13,7 +13,7 @@ def wait_all(*args) -> None: if __name__ == "__main__": - h1 = hello.options(**workflow.options(name="hello1")).bind("hello1") - h2a = hello.options(**workflow.options(name="hello2a")).bind("hello2a") - h2b = hello.options(**workflow.options(name="hello2b")).bind("hello2b", h2a) + h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1") + h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a") + h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a) workflow.run(wait_all.bind(h1, h2b)) diff --git a/python/ray/workflow/tests/test_basic_workflows.py b/python/ray/workflow/tests/test_basic_workflows.py index a66bb9817..cab92675e 100644 --- a/python/ray/workflow/tests/test_basic_workflows.py +++ b/python/ray/workflow/tests/test_basic_workflows.py @@ -173,7 +173,7 @@ def test_dynamic_output(workflow_start_regular_shared): if n < 3: raise Exception("Failed intentionally") return workflow.continuation( - exponential_fail.options(**workflow.options(name=f"task_{n}")).bind( + exponential_fail.options(**workflow.options(task_id=f"task_{n}")).bind( k * 2, n - 1 ) ) @@ -183,7 +183,7 @@ def test_dynamic_output(workflow_start_regular_shared): # latest successful task. try: workflow.run( - exponential_fail.options(**workflow.options(name="task_0")).bind(3, 10), + exponential_fail.options(**workflow.options(task_id="task_0")).bind(3, 10), workflow_id="dynamic_output", ) except Exception: diff --git a/python/ray/workflow/tests/test_basic_workflows_2.py b/python/ray/workflow/tests/test_basic_workflows_2.py index 7a3230f51..6fd2a6be8 100644 --- a/python/ray/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/workflow/tests/test_basic_workflows_2.py @@ -114,13 +114,13 @@ def test_get_output_4(workflow_start_regular, tmp_path): with FileLock(lock_path): return 42 return workflow.continuation( - recursive.options(**workflow.options(name=str(n - 1))).bind(n - 1) + recursive.options(**workflow.options(task_id=str(n - 1))).bind(n - 1) ) workflow_id = "test_get_output_4" lock.acquire() obj = workflow.run_async( - recursive.options(**workflow.options(name="10")).bind(10), + recursive.options(**workflow.options(task_id="10")).bind(10), workflow_id=workflow_id, ) @@ -160,8 +160,8 @@ def test_output_with_name(workflow_start_regular): def double(v): return 2 * v - inner_task = double.options(**workflow.options(name="inner")).bind(1) - outer_task = double.options(**workflow.options(name="outer")).bind(inner_task) + inner_task = double.options(**workflow.options(task_id="inner")).bind(1) + outer_task = double.options(**workflow.options(task_id="outer")).bind(inner_task) result = workflow.run_async(outer_task, workflow_id="double") inner = workflow.get_output_async("double", task_id="inner") outer = workflow.get_output_async("double", task_id="outer") @@ -170,7 +170,7 @@ def test_output_with_name(workflow_start_regular): assert ray.get(outer) == 4 assert ray.get(result) == 4 - @workflow.options(name="double") + @workflow.options(task_id="double") @ray.remote def double_2(s): return s * 2 @@ -199,7 +199,7 @@ def test_get_non_exist_output(workflow_start_regular, tmp_path): workflow_id = "test_get_non_exist_output" with FileLock(lock_path): - dag = simple.options(**workflow.options(name="simple")).bind() + dag = simple.options(**workflow.options(task_id="simple")).bind() ret = workflow.run_async(dag, workflow_id=workflow_id) exist = workflow.get_output_async(workflow_id, task_id="simple") non_exist = workflow.get_output_async(workflow_id, task_id="non_exist") @@ -217,13 +217,13 @@ def test_get_named_task_output_finished(workflow_start_regular, tmp_path): # Get the result from named task after workflow finished assert 4 == workflow.run( - double.options(**workflow.options(name="outer")).bind( - double.options(**workflow.options(name="inner")).bind(1) + double.options(**workflow.options(task_id="outer")).bind( + double.options(**workflow.options(task_id="inner")).bind(1) ), workflow_id="double", ) - assert workflow.get_output("double", name="inner") == 2 - assert workflow.get_output("double", name="outer") == 4 + assert workflow.get_output("double", task_id="inner") == 2 + assert workflow.get_output("double", task_id="outer") == 4 def test_get_named_task_output_running(workflow_start_regular, tmp_path): @@ -240,8 +240,8 @@ def test_get_named_task_output_running(workflow_start_regular, tmp_path): lock = FileLock(lock_path) lock.acquire() output = workflow.run_async( - double.options(**workflow.options(name="outer")).bind( - double.options(**workflow.options(name="inner")).bind(1, lock_path), + double.options(**workflow.options(task_id="outer")).bind( + double.options(**workflow.options(task_id="inner")).bind(1, lock_path), lock_path, ), workflow_id="double-2", @@ -280,16 +280,16 @@ def test_get_named_task_output_error(workflow_start_regular, tmp_path): # Force it to fail for the outer task with pytest.raises(Exception): workflow.run( - double.options(**workflow.options(name="outer")).bind( - double.options(**workflow.options(name="inner")).bind(1, False), True + double.options(**workflow.options(task_id="outer")).bind( + double.options(**workflow.options(task_id="inner")).bind(1, False), True ), workflow_id="double", ) # For the inner task, it should have already been executed. - assert 2 == workflow.get_output("double", name="inner") + assert 2 == workflow.get_output("double", task_id="inner") with pytest.raises(Exception): - workflow.get_output("double", name="outer") + workflow.get_output("double", task_id="outer") def test_get_named_task_default(workflow_start_regular, tmp_path): @@ -311,11 +311,11 @@ def test_get_named_task_default(workflow_start_regular, tmp_path): if i != 0: task_name += "_" + str(i) # All outputs will be 120 - assert math.factorial(5) == workflow.get_output("factorial", name=task_name) + assert math.factorial(5) == workflow.get_output("factorial", task_id=task_name) def test_get_named_task_duplicate(workflow_start_regular): - @workflow.options(name="f") + @workflow.options(task_id="f") @ray.remote def f(n, dep): return n @@ -324,10 +324,10 @@ def test_get_named_task_duplicate(workflow_start_regular): outer = f.bind(20, inner) assert 20 == workflow.run(outer, workflow_id="duplicate") # The outer will be checkpointed first. So there is no suffix for the name - assert workflow.get_output("duplicate", name="f") == 10 + assert workflow.get_output("duplicate", task_id="f") == 10 # The inner will be checkpointed after the outer. And there is a duplicate # for the name. suffix _1 is added automatically - assert workflow.get_output("duplicate", name="f_1") == 20 + assert workflow.get_output("duplicate", task_id="f_1") == 20 if __name__ == "__main__": diff --git a/python/ray/workflow/tests/test_basic_workflows_3.py b/python/ray/workflow/tests/test_basic_workflows_3.py index 48650e1a2..d0f02b3d8 100644 --- a/python/ray/workflow/tests/test_basic_workflows_3.py +++ b/python/ray/workflow/tests/test_basic_workflows_3.py @@ -81,10 +81,10 @@ def test_task_id_generation(workflow_start_regular_shared, request): def simple(x): return x + 1 - x = simple.options(**workflow.options(name="simple")).bind(-1) + x = simple.options(**workflow.options(task_id="simple")).bind(-1) n = 20 for i in range(1, n): - x = simple.options(**workflow.options(name="simple")).bind(x) + x = simple.options(**workflow.options(task_id="simple")).bind(x) workflow_id = "test_task_id_generation" ret = workflow.run_async(x, workflow_id=workflow_id) diff --git a/python/ray/workflow/tests/test_basic_workflows_4.py b/python/ray/workflow/tests/test_basic_workflows_4.py index 611a3d217..622420607 100644 --- a/python/ray/workflow/tests/test_basic_workflows_4.py +++ b/python/ray/workflow/tests/test_basic_workflows_4.py @@ -28,7 +28,7 @@ def test_options_update(shutdown_only): # Options are given in decorator first, then in the first .options() # and finally in the second .options() - @workflow.options(name="old_name", metadata={"k": "v"}) + @workflow.options(task_id="old_name", metadata={"k": "v"}) @ray.remote(num_cpus=2, max_retries=1) def f(): return @@ -39,7 +39,7 @@ def test_options_update(shutdown_only): # max_retries only defined in the decorator and it got preserved all the way new_f = f.options( num_returns=2, - **workflow.options(name="new_name", metadata={"extra_k2": "extra_v2"}), + **workflow.options(task_id="new_name", metadata={"extra_k2": "extra_v2"}), ) options = new_f.bind().get_options() assert options == { @@ -48,7 +48,7 @@ def test_options_update(shutdown_only): "max_retries": 1, "_metadata": { WORKFLOW_OPTIONS: { - "name": "new_name", + "task_id": "new_name", "metadata": {"extra_k2": "extra_v2"}, } }, diff --git a/python/ray/workflow/tests/test_checkpoint.py b/python/ray/workflow/tests/test_checkpoint.py index eab4d37ee..65b885eee 100644 --- a/python/ray/workflow/tests/test_checkpoint.py +++ b/python/ray/workflow/tests/test_checkpoint.py @@ -24,20 +24,20 @@ def checkpoint_dag(checkpoint): return np.mean(x) x = large_input.options( - **workflow.options(name="large_input", checkpoint=checkpoint) + **workflow.options(task_id="large_input", checkpoint=checkpoint) ).bind() y = identity.options( - **workflow.options(name="identity", checkpoint=checkpoint) + **workflow.options(task_id="identity", checkpoint=checkpoint) ).bind(x) return workflow.continuation( - average.options(**workflow.options(name="average")).bind(y) + average.options(**workflow.options(task_id="average")).bind(y) ) def test_checkpoint_dag_skip_all(workflow_start_regular_shared): outputs = workflow.run( checkpoint_dag.options( - **workflow.options(name="checkpoint_dag", checkpoint=False) + **workflow.options(task_id="checkpoint_dag", checkpoint=False) ).bind(False), workflow_id="checkpoint_skip", ) @@ -58,7 +58,9 @@ def test_checkpoint_dag_skip_all(workflow_start_regular_shared): def test_checkpoint_dag_skip_partial(workflow_start_regular_shared): outputs = workflow.run( - checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False), + checkpoint_dag.options(**workflow.options(task_id="checkpoint_dag")).bind( + False + ), workflow_id="checkpoint_partial", ) assert np.isclose(outputs, 8388607.5) @@ -78,7 +80,7 @@ def test_checkpoint_dag_skip_partial(workflow_start_regular_shared): def test_checkpoint_dag_full(workflow_start_regular_shared): outputs = workflow.run( - checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True), + checkpoint_dag.options(**workflow.options(task_id="checkpoint_dag")).bind(True), workflow_id="checkpoint_whole", ) assert np.isclose(outputs, 8388607.5) diff --git a/python/ray/workflow/tests/test_logging.py b/python/ray/workflow/tests/test_logging.py index 849d3c03e..2763ff726 100644 --- a/python/ray/workflow/tests/test_logging.py +++ b/python/ray/workflow/tests/test_logging.py @@ -9,7 +9,7 @@ from ray import workflow ray.init(address='auto') -@ray.remote(**workflow.options(name="f")) +@ray.remote(**workflow.options(task_id="f")) def f(): return 10 @@ -34,11 +34,11 @@ from ray import workflow ray.init(address='auto') -@ray.remote(**workflow.options(name="f1")) +@ray.remote(**workflow.options(task_id="f1")) def f1(): return 10 -@ray.remote(**workflow.options(name="f2")) +@ray.remote(**workflow.options(task_id="f2")) def f2(x): return x+1 @@ -65,11 +65,11 @@ from ray import workflow ray.init(address='auto') -@ray.remote(**workflow.options(name="f3")) +@ray.remote(**workflow.options(task_id="f3")) def f3(x): return x+1 -@ray.remote(**workflow.options(name="f4")) +@ray.remote(**workflow.options(task_id="f4")) def f4(x): return f3.bind(x*2) diff --git a/python/ray/workflow/tests/test_metadata.py b/python/ray/workflow/tests/test_metadata.py index 4b59ef4b6..d84b92f6d 100644 --- a/python/ray/workflow/tests/test_metadata.py +++ b/python/ray/workflow/tests/test_metadata.py @@ -11,10 +11,10 @@ def test_user_metadata(workflow_start_regular): user_task_metadata = {"k1": "v1"} user_run_metadata = {"k2": "v2"} - task_name = "simple_task" + task_id = "simple_task" workflow_id = "simple" - @workflow.options(name=task_name, metadata=user_task_metadata) + @workflow.options(task_id=task_id, metadata=user_task_metadata) @ray.remote def simple(): return 0 @@ -30,10 +30,10 @@ def test_user_metadata(workflow_start_regular): def test_user_metadata_empty(workflow_start_regular): - task_name = "simple_task" + task_id = "simple_task" workflow_id = "simple" - @workflow.options(name=task_name) + @workflow.options(task_id=task_id) @ray.remote def simple(): return 0 @@ -75,10 +75,10 @@ def test_user_metadata_not_json_serializable(workflow_start_regular): def test_runtime_metadata(workflow_start_regular): - task_name = "simple_task" + task_id = "simple_task" workflow_id = "simple" - @workflow.options(name=task_name) + @workflow.options(task_id=task_id) @ray.remote def simple(): time.sleep(2) @@ -106,10 +106,10 @@ def test_successful_workflow(workflow_start_regular): user_task_metadata = {"k1": "v1"} user_run_metadata = {"k2": "v2"} - task_name = "simple_task" + task_id = "simple_task" workflow_id = "simple" - @workflow.options(name=task_name, metadata=user_task_metadata) + @workflow.options(task_id=task_id, metadata=user_task_metadata) @ray.remote def simple(): time.sleep(2) @@ -202,13 +202,13 @@ def test_failed_and_resumed_workflow(workflow_start_regular, tmp_path): def test_nested_workflow(workflow_start_regular): - @workflow.options(name="inner", metadata={"inner_k": "inner_v"}) + @workflow.options(task_id="inner", metadata={"inner_k": "inner_v"}) @ray.remote def inner(): time.sleep(2) return 10 - @workflow.options(name="outer", metadata={"outer_k": "outer_v"}) + @workflow.options(task_id="outer", metadata={"outer_k": "outer_v"}) @ray.remote def outer(): time.sleep(2) @@ -246,24 +246,24 @@ def test_nested_workflow(workflow_start_regular): def test_no_workflow_found(workflow_start_regular): - task_name = "simple_task" + task_id = "simple_task" workflow_id = "simple" - @workflow.options(name=task_name) + @workflow.options(task_id=task_id) @ray.remote def simple(): return 0 workflow.run(simple.bind(), workflow_id=workflow_id) - with pytest.raises(ValueError, match="No such workflow_id simple1"): + with pytest.raises(ValueError, match="No such workflow_id 'simple1'"): workflow.get_metadata("simple1") - with pytest.raises(ValueError, match="No such workflow_id simple1"): + with pytest.raises(ValueError, match="No such workflow_id 'simple1'"): workflow.get_metadata("simple1", "simple_task") with pytest.raises( - ValueError, match="No such task_id simple_task1 in workflow simple" + ValueError, match="No such task_id 'simple_task1' in workflow 'simple'" ): workflow.get_metadata("simple", "simple_task1") diff --git a/python/ray/workflow/workflow_state.py b/python/ray/workflow/workflow_state.py index 7d9ee4873..19a7cfad3 100644 --- a/python/ray/workflow/workflow_state.py +++ b/python/ray/workflow/workflow_state.py @@ -32,14 +32,14 @@ class TaskExecutionMetadata: class Task: """Data class for a workflow task.""" - name: str + task_id: str options: WorkflowTaskRuntimeOptions user_metadata: Dict func_body: Optional[Callable] def to_dict(self) -> Dict: return { - "name": self.name, + "task_id": self.task_id, "task_options": self.options.to_dict(), "user_metadata": self.user_metadata, } diff --git a/python/ray/workflow/workflow_state_from_dag.py b/python/ray/workflow/workflow_state_from_dag.py index b9f1d4578..9eed25078 100644 --- a/python/ray/workflow/workflow_state_from_dag.py +++ b/python/ray/workflow/workflow_state_from_dag.py @@ -103,7 +103,7 @@ def workflow_state_from_dag( if num_returns > 1: raise ValueError("Workflow task can only have one return.") - workflow_options = bound_options.pop("_metadata", {}).get( + workflow_options = bound_options.get("_metadata", {}).get( WORKFLOW_OPTIONS, {} ) @@ -170,17 +170,21 @@ def workflow_state_from_dag( ) input_placeholder: ray.ObjectRef = ray.put(flattened_args) - name = workflow_options.get("name") - if name is None: - name = f"{get_module(node._body)}.{slugify(get_qualname(node._body))}" - task_id = ray.get(mgr.gen_task_id.remote(workflow_id, name)) + orig_task_id = workflow_options.get("task_id", None) + if orig_task_id is None: + orig_task_id = ( + f"{get_module(node._body)}.{slugify(get_qualname(node._body))}" + ) + + task_id = ray.get(mgr.gen_task_id.remote(workflow_id, orig_task_id)) state.add_dependencies(task_id, [s.task_id for s in workflow_refs]) state.task_input_args[task_id] = input_placeholder - user_metadata = workflow_options.pop("metadata", {}) + user_metadata = workflow_options.get("metadata", {}) + validate_user_metadata(user_metadata) state.tasks[task_id] = Task( - name=name, + task_id=task_id, options=task_options, user_metadata=user_metadata, func_body=node._body, diff --git a/python/ray/workflow/workflow_state_from_storage.py b/python/ray/workflow/workflow_state_from_storage.py index cf9394fcc..a13e31283 100644 --- a/python/ray/workflow/workflow_state_from_storage.py +++ b/python/ray/workflow/workflow_state_from_storage.py @@ -60,7 +60,7 @@ def workflow_state_from_storage( # TODO(suquark): although not necessary, but for completeness, # we may also load name and metadata. state.tasks[task_id] = Task( - name="", + task_id="", options=r.task_options, user_metadata={}, func_body=reader.load_task_func_body(task_id), diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 51699109d..ff73d17c4 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -495,7 +495,7 @@ class WorkflowStorage: """ status = self.load_workflow_status() if status == WorkflowStatus.NONE: - raise ValueError(f"No such workflow {self._workflow_id}") + raise ValueError(f"No such workflow '{self._workflow_id}'") if status == WorkflowStatus.CANCELED: raise ValueError(f"Workflow {self._workflow_id} is canceled") # For resumable workflow, the workflow result is not ready. @@ -622,10 +622,12 @@ class WorkflowStorage: def _load_task_metadata(): if not self._scan(self._key_task_prefix(task_id), ignore_errors=True): if not self._scan("", ignore_errors=True): - raise ValueError("No such workflow_id {}".format(self._workflow_id)) + raise ValueError( + "No such workflow_id '{}'".format(self._workflow_id) + ) else: raise ValueError( - "No such task_id {} in workflow {}".format( + "No such task_id '{}' in workflow '{}'".format( task_id, self._workflow_id ) ) @@ -662,7 +664,7 @@ class WorkflowStorage: def _load_workflow_metadata(): if not self._scan("", ignore_errors=True): - raise ValueError("No such workflow_id {}".format(self._workflow_id)) + raise ValueError("No such workflow_id '{}'".format(self._workflow_id)) tasks = [ self._get(self._key_workflow_metadata(), True, True), @@ -842,8 +844,8 @@ class WorkflowStorage: def _key_workflow_postrun_metadata(self): return os.path.join(WORKFLOW_POSTRUN_METADATA) - def _key_num_tasks_with_name(self, name): - return os.path.join(DUPLICATE_NAME_COUNTER, name) + def _key_num_tasks_with_name(self, task_name): + return os.path.join(DUPLICATE_NAME_COUNTER, task_name) def get_workflow_storage(workflow_id: Optional[str] = None) -> WorkflowStorage: