[Workflow] Fix recovery storage mismatch issue (#17166)

* fix recovery path issue and add test

* add TODOs
This commit is contained in:
Siyuan (Ryans) Zhuang 2021-07-19 21:49:12 -07:00 committed by GitHub
parent 2de7b8f084
commit 9ca6bda3a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 102 additions and 54 deletions

View file

@ -16,6 +16,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# TODO(suquark): Update the storage interface (e.g., passing it through
# context instead of argument).
def step(func: types.FunctionType) -> WorkflowStepFunction:
"""A decorator used for creating workflow steps.

View file

@ -205,6 +205,6 @@ class Workflow:
storage: The external storage URL or a custom storage class. If not
specified, ``/tmp/ray/workflow_data`` will be used.
"""
# avoid cyclic importing
# TODO(suquark): avoid cyclic importing
from ray.experimental.workflow.execution import run
return run(self, storage, workflow_id)

View file

@ -5,14 +5,14 @@ from typing import Union, Optional
import ray
from ray.experimental.workflow import workflow_context
from ray.experimental.workflow import workflow_storage
from ray.experimental.workflow.common import Workflow
from ray.experimental.workflow.step_executor import commit_step
from ray.experimental.workflow.storage import (
Storage, create_storage, get_global_storage, set_global_storage)
from ray.experimental.workflow.storage import (Storage, create_storage,
get_global_storage)
from ray.experimental.workflow.workflow_access import (
MANAGEMENT_ACTOR_NAME, flatten_workflow_output,
get_or_create_management_actor)
from ray.experimental.workflow.step_executor import commit_step
logger = logging.getLogger(__name__)
@ -26,6 +26,28 @@ def _is_anonymous_namespace():
return bool(match)
def _get_storage(storage: Optional[Union[str, Storage]]) -> Storage:
if storage is None:
return get_global_storage()
elif isinstance(storage, str):
return create_storage(storage)
elif isinstance(storage, Storage):
return storage
else:
raise TypeError("'storage' should be None, str, or Storage type.")
def _get_storage_url(storage: Optional[Union[str, Storage]]) -> str:
if storage is None:
return get_global_storage().storage_url
elif isinstance(storage, str):
return storage
elif isinstance(storage, Storage):
return storage.storage_url
else:
raise TypeError("'storage' should be None, str, or Storage type.")
def run(entry_workflow: Workflow,
storage: Optional[Union[str, Storage]] = None,
workflow_id: Optional[str] = None) -> ray.ObjectRef:
@ -38,28 +60,21 @@ def run(entry_workflow: Workflow,
if workflow_id is None:
# Workflow ID format: {Entry workflow UUID}.{Unix time to nanoseconds}
workflow_id = f"{entry_workflow.id}.{time.time():.9f}"
if isinstance(storage, str):
set_global_storage(create_storage(storage))
elif isinstance(storage, Storage):
set_global_storage(storage)
elif storage is not None:
raise TypeError("'storage' should be None, str, or Storage type.")
storage_url = get_global_storage().storage_url
store = _get_storage(storage)
logger.info(f"Workflow job created. [id=\"{workflow_id}\", storage_url="
f"\"{storage_url}\"].")
try:
workflow_context.init_workflow_step_context(workflow_id, storage_url)
commit_step(entry_workflow)
actor = get_or_create_management_actor()
f"\"{store.storage_url}\"].")
# checkpoint the workflow
ws = workflow_storage.WorkflowStorage(workflow_id, store)
commit_step(ws, "", entry_workflow)
workflow_manager = get_or_create_management_actor()
# NOTE: It is important to 'ray.get' the returned output. This
# ensures caller of 'run()' holds the reference to the workflow
# result. Otherwise if the actor removes the reference of the
# workflow output, the caller may fail to resolve the result.
output = ray.get(actor.run_or_resume.remote(workflow_id, storage_url))
direct_output = flatten_workflow_output(workflow_id, output)
finally:
workflow_context.set_workflow_step_context(None)
return direct_output
output = ray.get(
workflow_manager.run_or_resume.remote(workflow_id, store.storage_url))
return flatten_workflow_output(workflow_id, output)
# TODO(suquark): support recovery with ObjectRef inputs.
@ -74,23 +89,16 @@ def resume(workflow_id: str,
raise ValueError("Must use a namespace in 'ray.init()' to access "
"workflows properly. Current namespace seems to "
"be anonymous.")
if isinstance(storage, str):
store = create_storage(storage)
elif isinstance(storage, Storage):
store = storage
elif storage is None:
store = get_global_storage()
else:
raise TypeError("'storage' should be None, str, or Storage type.")
storage_url = _get_storage_url(storage)
logger.info(f"Resuming workflow [id=\"{workflow_id}\", storage_url="
f"\"{store.storage_url}\"].")
actor = get_or_create_management_actor()
f"\"{storage_url}\"].")
workflow_manager = get_or_create_management_actor()
# NOTE: It is important to 'ray.get' the returned output. This
# ensures caller of 'run()' holds the reference to the workflow
# result. Otherwise if the actor removes the reference of the
# workflow output, the caller may fail to resolve the result.
output = ray.get(
actor.run_or_resume.remote(workflow_id, store.storage_url))
workflow_manager.run_or_resume.remote(workflow_id, storage_url))
direct_output = flatten_workflow_output(workflow_id, output)
logger.info(f"Workflow job {workflow_id} resumed.")
return direct_output

View file

@ -1,13 +1,17 @@
from typing import List, Tuple, Union, Any, Dict, Callable, Optional
from typing import (List, Tuple, Any, Dict, Callable, Optional, TYPE_CHECKING,
Union)
import ray
from ray import ObjectRef
from ray.experimental.workflow import workflow_context
from ray.experimental.workflow import serialization_context
from ray.experimental.workflow.common import (
Workflow, StepID, WorkflowOutputType, WorkflowInputTuple)
from ray.experimental.workflow.common import Workflow
from ray.experimental.workflow import workflow_storage
if TYPE_CHECKING:
from ray.experimental.workflow.common import (StepID, WorkflowOutputType,
WorkflowInputTuple)
StepInputTupleToResolve = Tuple[ObjectRef, List[ObjectRef], List[ObjectRef]]
@ -122,28 +126,29 @@ def execute_workflow(workflow: Workflow,
return workflow.execute(outer_most_step_id)
def commit_step(ret: Union[Workflow, Any],
def commit_step(store: workflow_storage.WorkflowStorage,
step_id: "StepID",
ret: Union[Workflow, Any],
outer_most_step_id: Optional[str] = None):
"""Checkpoint the step output.
Args:
The returned object of the workflow step.
store: The storage the current workflow is using.
step_id: The ID of the step.
ret: The returned object of the workflow step.
outer_most_step_id: The ID of the outer most workflow. None if it
does not exists. See "step_executor.execute_workflow" for detailed
explanation.
"""
store = workflow_storage.WorkflowStorage()
if isinstance(ret, Workflow):
store.save_subworkflow(ret)
step_id = workflow_context.get_current_step_id()
store.save_step_output(step_id, ret, outer_most_step_id)
@ray.remote
def _workflow_step_executor(
func: Callable, context: workflow_context.WorkflowStepContext,
step_id: StepID, step_inputs: StepInputTupleToResolve,
outer_most_step_id: StepID) -> Any:
step_id: "StepID", step_inputs: "StepInputTupleToResolve",
outer_most_step_id: "StepID") -> Any:
"""Executor function for workflow step.
Args:
@ -166,16 +171,18 @@ def _workflow_step_executor(
# Running the actual step function
ret = func(*args, **kwargs)
# Save workflow output
commit_step(ret, outer_most_step_id)
store = workflow_storage.WorkflowStorage()
commit_step(store, step_id, ret, outer_most_step_id)
if isinstance(ret, Workflow):
# execute sub-workflow
return execute_workflow(ret, outer_most_step_id)
return ret
def execute_workflow_step(step_func: Callable, step_id: StepID,
step_inputs: WorkflowInputTuple,
outer_most_step_id: StepID) -> WorkflowOutputType:
def execute_workflow_step(
step_func: Callable, step_id: "StepID",
step_inputs: "WorkflowInputTuple",
outer_most_step_id: "StepID") -> "WorkflowOutputType":
return _workflow_step_executor.remote(
step_func, workflow_context.get_workflow_step_context(), step_id,
step_inputs, outer_most_step_id)

View file

@ -1,4 +1,6 @@
import subprocess
import shutil
import tempfile
import time
from ray.tests.conftest import * # noqa
@ -161,3 +163,26 @@ def test_shortcut(ray_start_regular):
store = workflow_storage.WorkflowStorage("shortcut")
step_id = store.get_entrypoint_step_id()
assert store.inspect_step(step_id).output_object_valid
@workflow.step
def constant_1():
return 271828
@workflow.step
def constant_2():
return 31416
@pytest.mark.parametrize(
"ray_start_regular", [{
"namespace": "workflow"
}], indirect=True)
def test_resume_different_storage(ray_start_regular):
constant_1.step().run(workflow_id="const")
tmp_dir = tempfile.mkdtemp()
constant_2.step().run(workflow_id="const", storage=tmp_dir)
assert ray.get(workflow.resume(workflow_id="const",
storage=tmp_dir)) == 31416
shutil.rmtree(tmp_dir)

View file

@ -366,9 +366,9 @@ class VirtualActor:
"""Return a future. If 'ray.get()' it successfully, then the actor
is fully initialized."""
# TODO(suquark): should ray.get(xxx.ready()) always be true?
actor = get_or_create_management_actor()
workflow_manager = get_or_create_management_actor()
return ray.get(
actor.actor_ready.remote(self._actor_id,
workflow_manager.actor_ready.remote(self._actor_id,
self._storage.storage_url))
def __getattr__(self, item):

View file

@ -55,6 +55,11 @@ def update_workflow_step_context(context: Optional[WorkflowStepContext],
global _context
_context = context
_context.workflow_scope.append(step_id)
# avoid cyclic import
from ray.experimental.workflow import storage
# TODO(suquark): [optimization] if the original storage has the same URL,
# skip creating the new one
storage.set_global_storage(storage.create_storage(context.storage_url))
def get_current_step_id() -> str: