diff --git a/python/ray/experimental/workflow/api.py b/python/ray/experimental/workflow/api.py index eb61dba0b..353a18189 100644 --- a/python/ray/experimental/workflow/api.py +++ b/python/ray/experimental/workflow/api.py @@ -16,6 +16,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# TODO(suquark): Update the storage interface (e.g., passing it through +# context instead of argument). + def step(func: types.FunctionType) -> WorkflowStepFunction: """A decorator used for creating workflow steps. diff --git a/python/ray/experimental/workflow/common.py b/python/ray/experimental/workflow/common.py index 54bf18fa8..c5e9a2e2e 100644 --- a/python/ray/experimental/workflow/common.py +++ b/python/ray/experimental/workflow/common.py @@ -205,6 +205,6 @@ class Workflow: storage: The external storage URL or a custom storage class. If not specified, ``/tmp/ray/workflow_data`` will be used. """ - # avoid cyclic importing + # TODO(suquark): avoid cyclic importing from ray.experimental.workflow.execution import run return run(self, storage, workflow_id) diff --git a/python/ray/experimental/workflow/execution.py b/python/ray/experimental/workflow/execution.py index 2b4b9dd11..d91f25e26 100644 --- a/python/ray/experimental/workflow/execution.py +++ b/python/ray/experimental/workflow/execution.py @@ -5,14 +5,14 @@ from typing import Union, Optional import ray -from ray.experimental.workflow import workflow_context +from ray.experimental.workflow import workflow_storage from ray.experimental.workflow.common import Workflow -from ray.experimental.workflow.step_executor import commit_step -from ray.experimental.workflow.storage import ( - Storage, create_storage, get_global_storage, set_global_storage) +from ray.experimental.workflow.storage import (Storage, create_storage, + get_global_storage) from ray.experimental.workflow.workflow_access import ( MANAGEMENT_ACTOR_NAME, flatten_workflow_output, get_or_create_management_actor) +from ray.experimental.workflow.step_executor import commit_step logger = logging.getLogger(__name__) @@ -26,6 +26,28 @@ def _is_anonymous_namespace(): return bool(match) +def _get_storage(storage: Optional[Union[str, Storage]]) -> Storage: + if storage is None: + return get_global_storage() + elif isinstance(storage, str): + return create_storage(storage) + elif isinstance(storage, Storage): + return storage + else: + raise TypeError("'storage' should be None, str, or Storage type.") + + +def _get_storage_url(storage: Optional[Union[str, Storage]]) -> str: + if storage is None: + return get_global_storage().storage_url + elif isinstance(storage, str): + return storage + elif isinstance(storage, Storage): + return storage.storage_url + else: + raise TypeError("'storage' should be None, str, or Storage type.") + + def run(entry_workflow: Workflow, storage: Optional[Union[str, Storage]] = None, workflow_id: Optional[str] = None) -> ray.ObjectRef: @@ -38,28 +60,21 @@ def run(entry_workflow: Workflow, if workflow_id is None: # Workflow ID format: {Entry workflow UUID}.{Unix time to nanoseconds} workflow_id = f"{entry_workflow.id}.{time.time():.9f}" - if isinstance(storage, str): - set_global_storage(create_storage(storage)) - elif isinstance(storage, Storage): - set_global_storage(storage) - elif storage is not None: - raise TypeError("'storage' should be None, str, or Storage type.") - storage_url = get_global_storage().storage_url + store = _get_storage(storage) logger.info(f"Workflow job created. [id=\"{workflow_id}\", storage_url=" - f"\"{storage_url}\"].") - try: - workflow_context.init_workflow_step_context(workflow_id, storage_url) - commit_step(entry_workflow) - actor = get_or_create_management_actor() - # NOTE: It is important to 'ray.get' the returned output. This - # ensures caller of 'run()' holds the reference to the workflow - # result. Otherwise if the actor removes the reference of the - # workflow output, the caller may fail to resolve the result. - output = ray.get(actor.run_or_resume.remote(workflow_id, storage_url)) - direct_output = flatten_workflow_output(workflow_id, output) - finally: - workflow_context.set_workflow_step_context(None) - return direct_output + f"\"{store.storage_url}\"].") + + # checkpoint the workflow + ws = workflow_storage.WorkflowStorage(workflow_id, store) + commit_step(ws, "", entry_workflow) + workflow_manager = get_or_create_management_actor() + # NOTE: It is important to 'ray.get' the returned output. This + # ensures caller of 'run()' holds the reference to the workflow + # result. Otherwise if the actor removes the reference of the + # workflow output, the caller may fail to resolve the result. + output = ray.get( + workflow_manager.run_or_resume.remote(workflow_id, store.storage_url)) + return flatten_workflow_output(workflow_id, output) # TODO(suquark): support recovery with ObjectRef inputs. @@ -74,23 +89,16 @@ def resume(workflow_id: str, raise ValueError("Must use a namespace in 'ray.init()' to access " "workflows properly. Current namespace seems to " "be anonymous.") - if isinstance(storage, str): - store = create_storage(storage) - elif isinstance(storage, Storage): - store = storage - elif storage is None: - store = get_global_storage() - else: - raise TypeError("'storage' should be None, str, or Storage type.") + storage_url = _get_storage_url(storage) logger.info(f"Resuming workflow [id=\"{workflow_id}\", storage_url=" - f"\"{store.storage_url}\"].") - actor = get_or_create_management_actor() + f"\"{storage_url}\"].") + workflow_manager = get_or_create_management_actor() # NOTE: It is important to 'ray.get' the returned output. This # ensures caller of 'run()' holds the reference to the workflow # result. Otherwise if the actor removes the reference of the # workflow output, the caller may fail to resolve the result. output = ray.get( - actor.run_or_resume.remote(workflow_id, store.storage_url)) + workflow_manager.run_or_resume.remote(workflow_id, storage_url)) direct_output = flatten_workflow_output(workflow_id, output) logger.info(f"Workflow job {workflow_id} resumed.") return direct_output diff --git a/python/ray/experimental/workflow/step_executor.py b/python/ray/experimental/workflow/step_executor.py index 68bcd0f13..0664e44d0 100644 --- a/python/ray/experimental/workflow/step_executor.py +++ b/python/ray/experimental/workflow/step_executor.py @@ -1,13 +1,17 @@ -from typing import List, Tuple, Union, Any, Dict, Callable, Optional +from typing import (List, Tuple, Any, Dict, Callable, Optional, TYPE_CHECKING, + Union) import ray from ray import ObjectRef from ray.experimental.workflow import workflow_context from ray.experimental.workflow import serialization_context -from ray.experimental.workflow.common import ( - Workflow, StepID, WorkflowOutputType, WorkflowInputTuple) +from ray.experimental.workflow.common import Workflow from ray.experimental.workflow import workflow_storage +if TYPE_CHECKING: + from ray.experimental.workflow.common import (StepID, WorkflowOutputType, + WorkflowInputTuple) + StepInputTupleToResolve = Tuple[ObjectRef, List[ObjectRef], List[ObjectRef]] @@ -122,28 +126,29 @@ def execute_workflow(workflow: Workflow, return workflow.execute(outer_most_step_id) -def commit_step(ret: Union[Workflow, Any], +def commit_step(store: workflow_storage.WorkflowStorage, + step_id: "StepID", + ret: Union[Workflow, Any], outer_most_step_id: Optional[str] = None): """Checkpoint the step output. - Args: - The returned object of the workflow step. + store: The storage the current workflow is using. + step_id: The ID of the step. + ret: The returned object of the workflow step. outer_most_step_id: The ID of the outer most workflow. None if it does not exists. See "step_executor.execute_workflow" for detailed explanation. """ - store = workflow_storage.WorkflowStorage() if isinstance(ret, Workflow): store.save_subworkflow(ret) - step_id = workflow_context.get_current_step_id() store.save_step_output(step_id, ret, outer_most_step_id) @ray.remote def _workflow_step_executor( func: Callable, context: workflow_context.WorkflowStepContext, - step_id: StepID, step_inputs: StepInputTupleToResolve, - outer_most_step_id: StepID) -> Any: + step_id: "StepID", step_inputs: "StepInputTupleToResolve", + outer_most_step_id: "StepID") -> Any: """Executor function for workflow step. Args: @@ -166,16 +171,18 @@ def _workflow_step_executor( # Running the actual step function ret = func(*args, **kwargs) # Save workflow output - commit_step(ret, outer_most_step_id) + store = workflow_storage.WorkflowStorage() + commit_step(store, step_id, ret, outer_most_step_id) if isinstance(ret, Workflow): # execute sub-workflow return execute_workflow(ret, outer_most_step_id) return ret -def execute_workflow_step(step_func: Callable, step_id: StepID, - step_inputs: WorkflowInputTuple, - outer_most_step_id: StepID) -> WorkflowOutputType: +def execute_workflow_step( + step_func: Callable, step_id: "StepID", + step_inputs: "WorkflowInputTuple", + outer_most_step_id: "StepID") -> "WorkflowOutputType": return _workflow_step_executor.remote( step_func, workflow_context.get_workflow_step_context(), step_id, step_inputs, outer_most_step_id) diff --git a/python/ray/experimental/workflow/tests/test_recovery.py b/python/ray/experimental/workflow/tests/test_recovery.py index 3f1b18efc..4f71bcd89 100644 --- a/python/ray/experimental/workflow/tests/test_recovery.py +++ b/python/ray/experimental/workflow/tests/test_recovery.py @@ -1,4 +1,6 @@ import subprocess +import shutil +import tempfile import time from ray.tests.conftest import * # noqa @@ -161,3 +163,26 @@ def test_shortcut(ray_start_regular): store = workflow_storage.WorkflowStorage("shortcut") step_id = store.get_entrypoint_step_id() assert store.inspect_step(step_id).output_object_valid + + +@workflow.step +def constant_1(): + return 271828 + + +@workflow.step +def constant_2(): + return 31416 + + +@pytest.mark.parametrize( + "ray_start_regular", [{ + "namespace": "workflow" + }], indirect=True) +def test_resume_different_storage(ray_start_regular): + constant_1.step().run(workflow_id="const") + tmp_dir = tempfile.mkdtemp() + constant_2.step().run(workflow_id="const", storage=tmp_dir) + assert ray.get(workflow.resume(workflow_id="const", + storage=tmp_dir)) == 31416 + shutil.rmtree(tmp_dir) diff --git a/python/ray/experimental/workflow/virtual_actor_class.py b/python/ray/experimental/workflow/virtual_actor_class.py index 1c45b36f4..544c784fa 100644 --- a/python/ray/experimental/workflow/virtual_actor_class.py +++ b/python/ray/experimental/workflow/virtual_actor_class.py @@ -366,10 +366,10 @@ class VirtualActor: """Return a future. If 'ray.get()' it successfully, then the actor is fully initialized.""" # TODO(suquark): should ray.get(xxx.ready()) always be true? - actor = get_or_create_management_actor() + workflow_manager = get_or_create_management_actor() return ray.get( - actor.actor_ready.remote(self._actor_id, - self._storage.storage_url)) + workflow_manager.actor_ready.remote(self._actor_id, + self._storage.storage_url)) def __getattr__(self, item): if item in self._metadata.signatures: diff --git a/python/ray/experimental/workflow/workflow_context.py b/python/ray/experimental/workflow/workflow_context.py index a870259e7..8ec6de57e 100644 --- a/python/ray/experimental/workflow/workflow_context.py +++ b/python/ray/experimental/workflow/workflow_context.py @@ -55,6 +55,11 @@ def update_workflow_step_context(context: Optional[WorkflowStepContext], global _context _context = context _context.workflow_scope.append(step_id) + # avoid cyclic import + from ray.experimental.workflow import storage + # TODO(suquark): [optimization] if the original storage has the same URL, + # skip creating the new one + storage.set_global_storage(storage.create_storage(context.storage_url)) def get_current_step_id() -> str: