[workflow] Change name in step to task_id (#28151)

We've deprecated the name options and use task_id. This is the cleanup to fix everything left.
This commit is contained in:
Yi Cheng 2022-09-01 03:27:32 +00:00 committed by GitHub
parent f747415d80
commit d0b879cdb1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 110 additions and 103 deletions

View file

@ -128,16 +128,15 @@ workflow ids, call ``ray.workflow.list_all()``.
Sub-Task Results
~~~~~~~~~~~~~~~~
We can retrieve the results for individual workflow tasks too with *named tasks*. A task can be named in two ways:
We can retrieve the results for individual workflow tasks too with *task id*. Task ID can be given with ``task_id``:
1) via ``.options(**workflow.options(name="task_name"))``
2) via decorator ``@workflow.options(name="task_name")``
1) via ``.options(**workflow.options(task_id="task_name"))``
2) via decorator ``@workflow.options(task_id="task_name")``
If tasks are not given ``task_name``, the function name of the steps is set as the ``task_name``.
The ID of the task would be the same as the name. If there are multiple tasks with the same name,
a suffix with a counter ``_n`` will be added.
If tasks are not given ``task_id``, the function name of the steps is set as the ``task_id``.
If there are multiple tasks with the same id, a suffix with a counter ``_n`` will be added.
Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_name")``.
Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``.
If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples:
.. code-block:: python
@ -156,8 +155,8 @@ If the task with the given ``task_id`` hasn't been executed before the workflow
def double(v):
return 2 * v
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
inner_task = double.options(**workflow.options(task_id="inner")).bind(1)
outer_task = double.options(**workflow.options(task_id="outer")).bind(inner_task)
result_ref = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output_async(workflow_id, task_id="inner")

View file

@ -34,10 +34,10 @@ providing the task name:
workflow.run(
add.options(
**workflow.options(name="add_task")
**workflow.options(task_id="add_task")
).bind(10, 20), workflow_id="add_example_2")
task_metadata = workflow.get_metadata("add_example_2", name="add_task")
task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]
@ -53,11 +53,11 @@ workflow or workflow task.
.. code-block:: python
workflow.run(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", name="add_task")["user_metadata"] == {"task_k": "task_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}
**Note: user-defined metadata must be a python dictionary with values that are
JSON serializable.**
@ -72,7 +72,7 @@ Available Metrics
**Task level**
- name: name of the task, either provided by the user via ``task.options(**workflow.options(name=xxx))`` or generated by the system.
- name: name of the task, either provided by the user via ``task.options(**workflow.options(task_id=xxx))`` or generated by the system.
- task_options: options of the task, either provided by the user via ``task.options()`` or default by system.
- user_metadata: a python dictionary of custom metadata by the user via ``task.options()``.
- stats: task running stats, including task start time and end time.

View file

@ -294,18 +294,18 @@ def resume_async(workflow_id: str) -> ray.ObjectRef:
@PublicAPI(stability="alpha")
def get_output(workflow_id: str, *, name: Optional[str] = None) -> Any:
def get_output(workflow_id: str, *, task_id: Optional[str] = None) -> Any:
"""Get the output of a running workflow.
Args:
workflow_id: The workflow to get the output of.
name: If set, fetch the specific task instead of the output of the
task_id: If set, fetch the specific task instead of the output of the
workflow.
Examples:
>>> from ray import workflow
>>> start_trip = ... # doctest: +SKIP
>>> trip = start_trip.options(name="trip").bind() # doctest: +SKIP
>>> trip = start_trip.options(task_id="trip").bind() # doctest: +SKIP
>>> res1 = workflow.run_async(trip, workflow_id="trip1") # doctest: +SKIP
>>> # you could "get_output()" in another machine
>>> res2 = workflow.get_output_async("trip1") # doctest: +SKIP
@ -316,7 +316,7 @@ def get_output(workflow_id: str, *, name: Optional[str] = None) -> Any:
Returns:
The output of the workflow task.
"""
return ray.get(get_output_async(workflow_id, task_id=name))
return ray.get(get_output_async(workflow_id, task_id=task_id))
@PublicAPI(stability="alpha")
@ -596,20 +596,20 @@ def sleep(duration: float) -> "DAGNode[Event]":
@PublicAPI(stability="alpha")
@client_mode_wrap
def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any]:
def get_metadata(workflow_id: str, task_id: Optional[str] = None) -> Dict[str, Any]:
"""Get the metadata of the workflow.
This will return a dict of metadata of either the workflow (
if only workflow_id is given) or a specific workflow task (if
both workflow_id and task name are given). Exception will be
raised if the given workflow id or task name does not exist.
both workflow_id and task id are given). Exception will be
raised if the given workflow id or task id does not exist.
If only workflow id is given, this will return metadata on
workflow level, which includes running status, workflow-level
user metadata and workflow-level running stats (e.g. the
start time and end time of the workflow).
If both workflow id and task name are given, this will return
If both workflow id and task id are given, this will return
metadata on workflow task level, which includes task inputs,
task-level user metadata and task-level running stats (e.g.
the start time and end time of the task).
@ -617,14 +617,14 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any]
Args:
workflow_id: The workflow to get the metadata of.
name: If set, fetch the metadata of the specific task instead of
task_id: If set, fetch the metadata of the specific task instead of
the metadata of the workflow.
Examples:
>>> from ray import workflow
>>> trip = ... # doctest: +SKIP
>>> workflow_task = trip.options( # doctest: +SKIP
... **workflow.options(name="trip", metadata={"k1": "v1"})).bind()
... **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind()
>>> workflow.run(workflow_task, # doctest: +SKIP
... workflow_id="trip1", metadata={"k2": "v2"})
>>> workflow_metadata = workflow.get_metadata("trip1") # doctest: +SKIP
@ -646,10 +646,10 @@ def get_metadata(workflow_id: str, name: Optional[str] = None) -> Dict[str, Any]
"""
_ensure_workflow_initialized()
store = WorkflowStorage(workflow_id)
if name is None:
if task_id is None:
return store.load_workflow_metadata()
else:
return store.load_task_metadata(name)
return store.load_task_metadata(task_id)
@PublicAPI(stability="alpha")
@ -751,7 +751,7 @@ class options:
# TODO(suquark): More rigid arguments check like @ray.remote arguments. This is
# fairly complex, but we should enable it later.
valid_options = {
"name",
"task_id",
"metadata",
"catch_exceptions",
"checkpoint",

View file

@ -8,8 +8,8 @@ def echo(msg: str, *deps) -> None:
if __name__ == "__main__":
A = echo.options(**workflow.options(name="A")).bind("A")
B = echo.options(**workflow.options(name="B")).bind("B", A)
C = echo.options(**workflow.options(name="C")).bind("C", A)
D = echo.options(**workflow.options(name="D")).bind("D", A, B)
A = echo.options(**workflow.options(task_id="A")).bind("A")
B = echo.options(**workflow.options(task_id="B")).bind("B", A)
C = echo.options(**workflow.options(task_id="C")).bind("C", A)
D = echo.options(**workflow.options(task_id="D")).bind("D", A, B)
workflow.run(D)

View file

@ -13,7 +13,7 @@ def wait_all(*args) -> None:
if __name__ == "__main__":
h1 = hello.options(**workflow.options(name="hello1")).bind("hello1")
h2a = hello.options(**workflow.options(name="hello2a")).bind("hello2a")
h2b = hello.options(**workflow.options(name="hello2b")).bind("hello2b", h2a)
h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1")
h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a")
h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a)
workflow.run(wait_all.bind(h1, h2b))

View file

@ -173,7 +173,7 @@ def test_dynamic_output(workflow_start_regular_shared):
if n < 3:
raise Exception("Failed intentionally")
return workflow.continuation(
exponential_fail.options(**workflow.options(name=f"task_{n}")).bind(
exponential_fail.options(**workflow.options(task_id=f"task_{n}")).bind(
k * 2, n - 1
)
)
@ -183,7 +183,7 @@ def test_dynamic_output(workflow_start_regular_shared):
# latest successful task.
try:
workflow.run(
exponential_fail.options(**workflow.options(name="task_0")).bind(3, 10),
exponential_fail.options(**workflow.options(task_id="task_0")).bind(3, 10),
workflow_id="dynamic_output",
)
except Exception:

View file

@ -114,13 +114,13 @@ def test_get_output_4(workflow_start_regular, tmp_path):
with FileLock(lock_path):
return 42
return workflow.continuation(
recursive.options(**workflow.options(name=str(n - 1))).bind(n - 1)
recursive.options(**workflow.options(task_id=str(n - 1))).bind(n - 1)
)
workflow_id = "test_get_output_4"
lock.acquire()
obj = workflow.run_async(
recursive.options(**workflow.options(name="10")).bind(10),
recursive.options(**workflow.options(task_id="10")).bind(10),
workflow_id=workflow_id,
)
@ -160,8 +160,8 @@ def test_output_with_name(workflow_start_regular):
def double(v):
return 2 * v
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
inner_task = double.options(**workflow.options(task_id="inner")).bind(1)
outer_task = double.options(**workflow.options(task_id="outer")).bind(inner_task)
result = workflow.run_async(outer_task, workflow_id="double")
inner = workflow.get_output_async("double", task_id="inner")
outer = workflow.get_output_async("double", task_id="outer")
@ -170,7 +170,7 @@ def test_output_with_name(workflow_start_regular):
assert ray.get(outer) == 4
assert ray.get(result) == 4
@workflow.options(name="double")
@workflow.options(task_id="double")
@ray.remote
def double_2(s):
return s * 2
@ -199,7 +199,7 @@ def test_get_non_exist_output(workflow_start_regular, tmp_path):
workflow_id = "test_get_non_exist_output"
with FileLock(lock_path):
dag = simple.options(**workflow.options(name="simple")).bind()
dag = simple.options(**workflow.options(task_id="simple")).bind()
ret = workflow.run_async(dag, workflow_id=workflow_id)
exist = workflow.get_output_async(workflow_id, task_id="simple")
non_exist = workflow.get_output_async(workflow_id, task_id="non_exist")
@ -217,13 +217,13 @@ def test_get_named_task_output_finished(workflow_start_regular, tmp_path):
# Get the result from named task after workflow finished
assert 4 == workflow.run(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1)
double.options(**workflow.options(task_id="outer")).bind(
double.options(**workflow.options(task_id="inner")).bind(1)
),
workflow_id="double",
)
assert workflow.get_output("double", name="inner") == 2
assert workflow.get_output("double", name="outer") == 4
assert workflow.get_output("double", task_id="inner") == 2
assert workflow.get_output("double", task_id="outer") == 4
def test_get_named_task_output_running(workflow_start_regular, tmp_path):
@ -240,8 +240,8 @@ def test_get_named_task_output_running(workflow_start_regular, tmp_path):
lock = FileLock(lock_path)
lock.acquire()
output = workflow.run_async(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1, lock_path),
double.options(**workflow.options(task_id="outer")).bind(
double.options(**workflow.options(task_id="inner")).bind(1, lock_path),
lock_path,
),
workflow_id="double-2",
@ -280,16 +280,16 @@ def test_get_named_task_output_error(workflow_start_regular, tmp_path):
# Force it to fail for the outer task
with pytest.raises(Exception):
workflow.run(
double.options(**workflow.options(name="outer")).bind(
double.options(**workflow.options(name="inner")).bind(1, False), True
double.options(**workflow.options(task_id="outer")).bind(
double.options(**workflow.options(task_id="inner")).bind(1, False), True
),
workflow_id="double",
)
# For the inner task, it should have already been executed.
assert 2 == workflow.get_output("double", name="inner")
assert 2 == workflow.get_output("double", task_id="inner")
with pytest.raises(Exception):
workflow.get_output("double", name="outer")
workflow.get_output("double", task_id="outer")
def test_get_named_task_default(workflow_start_regular, tmp_path):
@ -311,11 +311,11 @@ def test_get_named_task_default(workflow_start_regular, tmp_path):
if i != 0:
task_name += "_" + str(i)
# All outputs will be 120
assert math.factorial(5) == workflow.get_output("factorial", name=task_name)
assert math.factorial(5) == workflow.get_output("factorial", task_id=task_name)
def test_get_named_task_duplicate(workflow_start_regular):
@workflow.options(name="f")
@workflow.options(task_id="f")
@ray.remote
def f(n, dep):
return n
@ -324,10 +324,10 @@ def test_get_named_task_duplicate(workflow_start_regular):
outer = f.bind(20, inner)
assert 20 == workflow.run(outer, workflow_id="duplicate")
# The outer will be checkpointed first. So there is no suffix for the name
assert workflow.get_output("duplicate", name="f") == 10
assert workflow.get_output("duplicate", task_id="f") == 10
# The inner will be checkpointed after the outer. And there is a duplicate
# for the name. suffix _1 is added automatically
assert workflow.get_output("duplicate", name="f_1") == 20
assert workflow.get_output("duplicate", task_id="f_1") == 20
if __name__ == "__main__":

View file

@ -81,10 +81,10 @@ def test_task_id_generation(workflow_start_regular_shared, request):
def simple(x):
return x + 1
x = simple.options(**workflow.options(name="simple")).bind(-1)
x = simple.options(**workflow.options(task_id="simple")).bind(-1)
n = 20
for i in range(1, n):
x = simple.options(**workflow.options(name="simple")).bind(x)
x = simple.options(**workflow.options(task_id="simple")).bind(x)
workflow_id = "test_task_id_generation"
ret = workflow.run_async(x, workflow_id=workflow_id)

View file

@ -28,7 +28,7 @@ def test_options_update(shutdown_only):
# Options are given in decorator first, then in the first .options()
# and finally in the second .options()
@workflow.options(name="old_name", metadata={"k": "v"})
@workflow.options(task_id="old_name", metadata={"k": "v"})
@ray.remote(num_cpus=2, max_retries=1)
def f():
return
@ -39,7 +39,7 @@ def test_options_update(shutdown_only):
# max_retries only defined in the decorator and it got preserved all the way
new_f = f.options(
num_returns=2,
**workflow.options(name="new_name", metadata={"extra_k2": "extra_v2"}),
**workflow.options(task_id="new_name", metadata={"extra_k2": "extra_v2"}),
)
options = new_f.bind().get_options()
assert options == {
@ -48,7 +48,7 @@ def test_options_update(shutdown_only):
"max_retries": 1,
"_metadata": {
WORKFLOW_OPTIONS: {
"name": "new_name",
"task_id": "new_name",
"metadata": {"extra_k2": "extra_v2"},
}
},

View file

@ -24,20 +24,20 @@ def checkpoint_dag(checkpoint):
return np.mean(x)
x = large_input.options(
**workflow.options(name="large_input", checkpoint=checkpoint)
**workflow.options(task_id="large_input", checkpoint=checkpoint)
).bind()
y = identity.options(
**workflow.options(name="identity", checkpoint=checkpoint)
**workflow.options(task_id="identity", checkpoint=checkpoint)
).bind(x)
return workflow.continuation(
average.options(**workflow.options(name="average")).bind(y)
average.options(**workflow.options(task_id="average")).bind(y)
)
def test_checkpoint_dag_skip_all(workflow_start_regular_shared):
outputs = workflow.run(
checkpoint_dag.options(
**workflow.options(name="checkpoint_dag", checkpoint=False)
**workflow.options(task_id="checkpoint_dag", checkpoint=False)
).bind(False),
workflow_id="checkpoint_skip",
)
@ -58,7 +58,9 @@ def test_checkpoint_dag_skip_all(workflow_start_regular_shared):
def test_checkpoint_dag_skip_partial(workflow_start_regular_shared):
outputs = workflow.run(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(False),
checkpoint_dag.options(**workflow.options(task_id="checkpoint_dag")).bind(
False
),
workflow_id="checkpoint_partial",
)
assert np.isclose(outputs, 8388607.5)
@ -78,7 +80,7 @@ def test_checkpoint_dag_skip_partial(workflow_start_regular_shared):
def test_checkpoint_dag_full(workflow_start_regular_shared):
outputs = workflow.run(
checkpoint_dag.options(**workflow.options(name="checkpoint_dag")).bind(True),
checkpoint_dag.options(**workflow.options(task_id="checkpoint_dag")).bind(True),
workflow_id="checkpoint_whole",
)
assert np.isclose(outputs, 8388607.5)

View file

@ -9,7 +9,7 @@ from ray import workflow
ray.init(address='auto')
@ray.remote(**workflow.options(name="f"))
@ray.remote(**workflow.options(task_id="f"))
def f():
return 10
@ -34,11 +34,11 @@ from ray import workflow
ray.init(address='auto')
@ray.remote(**workflow.options(name="f1"))
@ray.remote(**workflow.options(task_id="f1"))
def f1():
return 10
@ray.remote(**workflow.options(name="f2"))
@ray.remote(**workflow.options(task_id="f2"))
def f2(x):
return x+1
@ -65,11 +65,11 @@ from ray import workflow
ray.init(address='auto')
@ray.remote(**workflow.options(name="f3"))
@ray.remote(**workflow.options(task_id="f3"))
def f3(x):
return x+1
@ray.remote(**workflow.options(name="f4"))
@ray.remote(**workflow.options(task_id="f4"))
def f4(x):
return f3.bind(x*2)

View file

@ -11,10 +11,10 @@ def test_user_metadata(workflow_start_regular):
user_task_metadata = {"k1": "v1"}
user_run_metadata = {"k2": "v2"}
task_name = "simple_task"
task_id = "simple_task"
workflow_id = "simple"
@workflow.options(name=task_name, metadata=user_task_metadata)
@workflow.options(task_id=task_id, metadata=user_task_metadata)
@ray.remote
def simple():
return 0
@ -30,10 +30,10 @@ def test_user_metadata(workflow_start_regular):
def test_user_metadata_empty(workflow_start_regular):
task_name = "simple_task"
task_id = "simple_task"
workflow_id = "simple"
@workflow.options(name=task_name)
@workflow.options(task_id=task_id)
@ray.remote
def simple():
return 0
@ -75,10 +75,10 @@ def test_user_metadata_not_json_serializable(workflow_start_regular):
def test_runtime_metadata(workflow_start_regular):
task_name = "simple_task"
task_id = "simple_task"
workflow_id = "simple"
@workflow.options(name=task_name)
@workflow.options(task_id=task_id)
@ray.remote
def simple():
time.sleep(2)
@ -106,10 +106,10 @@ def test_successful_workflow(workflow_start_regular):
user_task_metadata = {"k1": "v1"}
user_run_metadata = {"k2": "v2"}
task_name = "simple_task"
task_id = "simple_task"
workflow_id = "simple"
@workflow.options(name=task_name, metadata=user_task_metadata)
@workflow.options(task_id=task_id, metadata=user_task_metadata)
@ray.remote
def simple():
time.sleep(2)
@ -202,13 +202,13 @@ def test_failed_and_resumed_workflow(workflow_start_regular, tmp_path):
def test_nested_workflow(workflow_start_regular):
@workflow.options(name="inner", metadata={"inner_k": "inner_v"})
@workflow.options(task_id="inner", metadata={"inner_k": "inner_v"})
@ray.remote
def inner():
time.sleep(2)
return 10
@workflow.options(name="outer", metadata={"outer_k": "outer_v"})
@workflow.options(task_id="outer", metadata={"outer_k": "outer_v"})
@ray.remote
def outer():
time.sleep(2)
@ -246,24 +246,24 @@ def test_nested_workflow(workflow_start_regular):
def test_no_workflow_found(workflow_start_regular):
task_name = "simple_task"
task_id = "simple_task"
workflow_id = "simple"
@workflow.options(name=task_name)
@workflow.options(task_id=task_id)
@ray.remote
def simple():
return 0
workflow.run(simple.bind(), workflow_id=workflow_id)
with pytest.raises(ValueError, match="No such workflow_id simple1"):
with pytest.raises(ValueError, match="No such workflow_id 'simple1'"):
workflow.get_metadata("simple1")
with pytest.raises(ValueError, match="No such workflow_id simple1"):
with pytest.raises(ValueError, match="No such workflow_id 'simple1'"):
workflow.get_metadata("simple1", "simple_task")
with pytest.raises(
ValueError, match="No such task_id simple_task1 in workflow simple"
ValueError, match="No such task_id 'simple_task1' in workflow 'simple'"
):
workflow.get_metadata("simple", "simple_task1")

View file

@ -32,14 +32,14 @@ class TaskExecutionMetadata:
class Task:
"""Data class for a workflow task."""
name: str
task_id: str
options: WorkflowTaskRuntimeOptions
user_metadata: Dict
func_body: Optional[Callable]
def to_dict(self) -> Dict:
return {
"name": self.name,
"task_id": self.task_id,
"task_options": self.options.to_dict(),
"user_metadata": self.user_metadata,
}

View file

@ -103,7 +103,7 @@ def workflow_state_from_dag(
if num_returns > 1:
raise ValueError("Workflow task can only have one return.")
workflow_options = bound_options.pop("_metadata", {}).get(
workflow_options = bound_options.get("_metadata", {}).get(
WORKFLOW_OPTIONS, {}
)
@ -170,17 +170,21 @@ def workflow_state_from_dag(
)
input_placeholder: ray.ObjectRef = ray.put(flattened_args)
name = workflow_options.get("name")
if name is None:
name = f"{get_module(node._body)}.{slugify(get_qualname(node._body))}"
task_id = ray.get(mgr.gen_task_id.remote(workflow_id, name))
orig_task_id = workflow_options.get("task_id", None)
if orig_task_id is None:
orig_task_id = (
f"{get_module(node._body)}.{slugify(get_qualname(node._body))}"
)
task_id = ray.get(mgr.gen_task_id.remote(workflow_id, orig_task_id))
state.add_dependencies(task_id, [s.task_id for s in workflow_refs])
state.task_input_args[task_id] = input_placeholder
user_metadata = workflow_options.pop("metadata", {})
user_metadata = workflow_options.get("metadata", {})
validate_user_metadata(user_metadata)
state.tasks[task_id] = Task(
name=name,
task_id=task_id,
options=task_options,
user_metadata=user_metadata,
func_body=node._body,

View file

@ -60,7 +60,7 @@ def workflow_state_from_storage(
# TODO(suquark): although not necessary, but for completeness,
# we may also load name and metadata.
state.tasks[task_id] = Task(
name="",
task_id="",
options=r.task_options,
user_metadata={},
func_body=reader.load_task_func_body(task_id),

View file

@ -495,7 +495,7 @@ class WorkflowStorage:
"""
status = self.load_workflow_status()
if status == WorkflowStatus.NONE:
raise ValueError(f"No such workflow {self._workflow_id}")
raise ValueError(f"No such workflow '{self._workflow_id}'")
if status == WorkflowStatus.CANCELED:
raise ValueError(f"Workflow {self._workflow_id} is canceled")
# For resumable workflow, the workflow result is not ready.
@ -622,10 +622,12 @@ class WorkflowStorage:
def _load_task_metadata():
if not self._scan(self._key_task_prefix(task_id), ignore_errors=True):
if not self._scan("", ignore_errors=True):
raise ValueError("No such workflow_id {}".format(self._workflow_id))
raise ValueError(
"No such workflow_id '{}'".format(self._workflow_id)
)
else:
raise ValueError(
"No such task_id {} in workflow {}".format(
"No such task_id '{}' in workflow '{}'".format(
task_id, self._workflow_id
)
)
@ -662,7 +664,7 @@ class WorkflowStorage:
def _load_workflow_metadata():
if not self._scan("", ignore_errors=True):
raise ValueError("No such workflow_id {}".format(self._workflow_id))
raise ValueError("No such workflow_id '{}'".format(self._workflow_id))
tasks = [
self._get(self._key_workflow_metadata(), True, True),
@ -842,8 +844,8 @@ class WorkflowStorage:
def _key_workflow_postrun_metadata(self):
return os.path.join(WORKFLOW_POSTRUN_METADATA)
def _key_num_tasks_with_name(self, name):
return os.path.join(DUPLICATE_NAME_COUNTER, name)
def _key_num_tasks_with_name(self, task_name):
return os.path.join(DUPLICATE_NAME_COUNTER, task_name)
def get_workflow_storage(workflow_id: Optional[str] = None) -> WorkflowStorage: