[workflow] Virtual actor writer - Part I (#17256)

* update readonly virtual actor

use signature module

refactoring workflow

new execution interface

advance progress of a workflow

update storage

last_step_of_workflow

prevent setting dynamic output of "output.json" in workflow directory

use alternative exception

* fix

* fix comments

* better step names

* add TODO

* fix comments

* log errors when retry

* fix storage test
This commit is contained in:
Siyuan (Ryans) Zhuang 2021-07-22 22:53:04 -07:00 committed by GitHub
parent daf37b7621
commit 57b2328e7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 434 additions and 234 deletions

View file

@ -1,7 +1,7 @@
from enum import Enum, unique
from collections import deque
import re
from typing import Tuple, Dict, List, Optional, Callable, Set, Iterator, Any
from typing import Dict, List, Optional, Callable, Set, Iterator, Any
import unicodedata
import uuid
@ -13,10 +13,6 @@ from ray import ObjectRef
# Alias types
StepID = str
WorkflowOutputType = ObjectRef
WorkflowInputTuple = Tuple[ObjectRef, List["Workflow"], List[ObjectRef]]
StepExecutionFunction = Callable[
[StepID, WorkflowInputTuple, Optional[StepID]], WorkflowOutputType]
SerializedStepFunction = str
@unique
@ -35,14 +31,21 @@ class WorkflowStatus(str, Enum):
@dataclass
class WorkflowInputs:
# The workflow step function body.
func_body: Callable
# The object ref of the input arguments.
args: ObjectRef
# The hex string of object refs in the arguments.
object_refs: List[str]
# The ID of workflows in the arguments.
workflows: List[str]
# The object refs in the arguments.
object_refs: List[ObjectRef]
# TODO(suquark): maybe later we can replace it with WorkflowData.
# The workflows in the arguments.
workflows: "List[Workflow]"
@dataclass
class WorkflowData:
# The workflow step function body.
func_body: Callable
# The arguments of a workflow.
inputs: WorkflowInputs
# The num of retry for application exception
step_max_retries: int
# Whether the user want to handle the exception mannually
@ -50,6 +53,17 @@ class WorkflowInputs:
# ray_remote options
ray_options: Dict[str, Any]
def to_metadata(self) -> Dict[str, Any]:
f = self.func_body
return {
"name": f.__module__ + "." + f.__qualname__,
"object_refs": [r.hex() for r in self.inputs.object_refs],
"workflows": [w.id for w in self.inputs.workflows],
"step_max_retries": self.step_max_retries,
"catch_exceptions": self.catch_exceptions,
"ray_options": self.ray_options,
}
@dataclass
class WorkflowMetaData:
@ -75,26 +89,14 @@ def slugify(value: str, allow_unicode=False) -> str:
class Workflow:
def __init__(self, original_function: Callable,
step_execution_function: StepExecutionFunction,
input_placeholder: ObjectRef,
input_workflows: List["Workflow"],
input_object_refs: List[ObjectRef], step_max_retries: int,
catch_exceptions: bool, ray_options: Dict[str, Any]):
self._input_placeholder: ObjectRef = input_placeholder
self._input_workflows: List[Workflow] = input_workflows
self._input_object_refs: List[ObjectRef] = input_object_refs
# we need the original function for checkpointing
self._original_function: Callable = original_function
self._step_execution_function: StepExecutionFunction = (
step_execution_function)
def __init__(self, workflow_data: WorkflowData):
if workflow_data.ray_options.get("num_returns", 1) > 1:
raise ValueError("Workflow should have one return value.")
self._data = workflow_data
self._executed: bool = False
self._output: Optional[WorkflowOutputType] = None
self._step_id: StepID = slugify(
original_function.__qualname__) + "." + uuid.uuid4().hex
self._step_max_retries: int = step_max_retries
self._catch_exceptions: bool = catch_exceptions
self._ray_options: Dict[str, Any] = ray_options
self._data.func_body.__qualname__) + "." + uuid.uuid4().hex
@property
def executed(self) -> bool:
@ -111,28 +113,23 @@ class Workflow:
return self._step_id
def execute(self,
outer_most_step_id: Optional[StepID] = None) -> ObjectRef:
outer_most_step_id: Optional[StepID] = None,
last_step_of_workflow: bool = False) -> ObjectRef:
"""Trigger workflow execution recursively.
Args:
outer_most_step_id: See
"step_executor.execute_workflow" for explanation.
last_step_of_workflow: The step that generates the output of the
workflow (including nested steps).
"""
if self.executed:
return self._output
workflow_outputs = [w.execute() for w in self._input_workflows]
# NOTE: Input placeholder is only a placeholder. It only can be
# deserialized under a proper serialization context. Directly
# deserialize the placeholder without a context would raise
# an exception. If we pass the placeholder to _step_execution_function
# as a direct argument, it would be deserialized by Ray without a
# proper context. To prevent it, we put it inside a tuple.
step_inputs = (self._input_placeholder, workflow_outputs,
self._input_object_refs)
output = self._step_execution_function(
self._step_id, step_inputs, self._catch_exceptions,
self._step_max_retries, self._ray_options, outer_most_step_id)
from ray.experimental.workflow import step_executor
output = step_executor.execute_workflow_step(self._step_id, self._data,
outer_most_step_id,
last_step_of_workflow)
if not isinstance(output, WorkflowOutputType):
raise TypeError("Unexpected return type of the workflow.")
self._output = output
@ -148,23 +145,16 @@ class Workflow:
q = deque([self])
while q: # deque's pythonic way to check emptyness
w: Workflow = q.popleft()
for p in w._input_workflows:
for p in w._data.inputs.workflows:
if p not in visited_workflows:
visited_workflows.add(p)
q.append(p)
yield w
def get_inputs(self) -> WorkflowInputs:
"""Get the inputs of the workflow."""
return WorkflowInputs(
func_body=self._original_function,
args=self._input_placeholder,
object_refs=[r.hex() for r in self._input_object_refs],
workflows=[w.id for w in self._input_workflows],
step_max_retries=self._step_max_retries,
catch_exceptions=self._catch_exceptions,
ray_options=self._ray_options,
)
@property
def data(self) -> WorkflowData:
"""Get the workflow data."""
return self._data
def __reduce__(self):
raise ValueError(

View file

@ -128,7 +128,7 @@ def resume_workflow_job(workflow_id: str, store_url: str) -> ray.ObjectRef:
store.storage_url):
from ray.experimental.workflow.step_executor import (
execute_workflow)
return execute_workflow(r)
return execute_workflow(r, last_step_of_workflow=True)
return wf_store.load_step_output(r)
@ -146,7 +146,7 @@ def get_latest_output(workflow_id: str, store: storage.Storage) -> Any:
"""
reader = workflow_storage.WorkflowStorage(workflow_id, store)
try:
step_id: StepID = reader.get_entrypoint_step_id()
step_id: StepID = reader.get_latest_progress()
while True:
result: workflow_storage.StepInspectResult = reader.inspect_step(
step_id)

View file

@ -5,7 +5,7 @@ import ray
import ray.cloudpickle
from ray.util.serialization import register_serializer, deregister_serializer
from ray.experimental.workflow.common import Workflow
from ray.experimental.workflow.common import Workflow, WorkflowInputs
def _resolve_workflow_outputs(index: int) -> Any:
@ -157,3 +157,24 @@ def workflow_args_keeping_context() -> None:
finally:
_resolve_workflow_outputs = _resolve_workflow_outputs_bak
_resolve_objectrefs = _resolve_objectrefs_bak
def make_workflow_inputs(args_list: List[Any]) -> WorkflowInputs:
workflows: List[Workflow] = []
object_refs: List[ray.ObjectRef] = []
with workflow_args_serialization_context(workflows, object_refs):
# NOTE: When calling 'ray.put', we trigger python object
# serialization. Under our serialization context,
# Workflows and ObjectRefs are separated from the arguments,
# leaving a placeholder object with all other python objects.
# Then we put the placeholder object to object store,
# so it won't be mutated later. This guarantees correct
# semantics. See "tests/test_variable_mutable.py" as
# an example.
input_placeholder: ray.ObjectRef = ray.put(args_list)
if object_refs:
raise ValueError(
"There are ObjectRefs in workflow inputs. However "
"workflow currently does not support checkpointing "
"ObjectRefs.")
return WorkflowInputs(input_placeholder, object_refs, workflows)

View file

@ -1,19 +1,25 @@
import enum
import logging
from typing import (List, Tuple, Any, Dict, Callable, Optional, TYPE_CHECKING,
Union)
import ray
from ray import ObjectRef
from ray._private import signature
from ray.experimental.workflow import workflow_context
from ray.experimental.workflow import serialization_context
from ray.experimental.workflow import workflow_storage
from ray.experimental.workflow.workflow_access import MANAGEMENT_ACTOR_NAME
from ray.experimental.workflow.common import Workflow, WorkflowStatus
if TYPE_CHECKING:
from ray.experimental.workflow.common import (StepID, WorkflowOutputType,
Workflow, WorkflowInputTuple,
WorkflowStatus)
WorkflowData)
StepInputTupleToResolve = Tuple[ObjectRef, List[ObjectRef], List[ObjectRef]]
logger = logging.getLogger(__name__)
def _resolve_object_ref(ref: ObjectRef) -> Tuple[Any, ObjectRef]:
"""
@ -22,7 +28,6 @@ def _resolve_object_ref(ref: ObjectRef) -> Tuple[Any, ObjectRef]:
Returns:
The object instance and the direct ObjectRef to the instance.
"""
assert ray.is_initialized()
last_ref = ref
while True:
if isinstance(ref, ObjectRef):
@ -33,26 +38,6 @@ def _resolve_object_ref(ref: ObjectRef) -> Tuple[Any, ObjectRef]:
return ref, last_ref
def _deref_arguments(args: List, kwargs: Dict) -> Tuple[List, Dict]:
"""
This function decides how the ObjectRefs in the argument will be presented
to the user. Currently we dereference arguments like Ray remote functions.
Args:
args: Positional arguments.
kwargs: Keywords arguments.
Returns:
Post processed arguments.
"""
_args = [ray.get(a) if isinstance(a, ObjectRef) else a for a in args]
_kwargs = {
k: ray.get(v) if isinstance(v, ObjectRef) else v
for k, v in kwargs.items()
}
return _args, _kwargs
def _resolve_step_inputs(
step_inputs: StepInputTupleToResolve) -> Tuple[List, Dict]:
"""
@ -82,14 +67,17 @@ def _resolve_step_inputs(
with serialization_context.workflow_args_resolving_context(
objects_mapping, input_object_refs):
# reconstruct input arguments under correct serialization context
args, kwargs = ray.get(input_placeholder)
_args, _kwargs = _deref_arguments(args, kwargs)
return _args, _kwargs
flattened_args: List[Any] = ray.get(input_placeholder)
# dereference arguments like Ray remote functions
flattened_args = [
ray.get(a) if isinstance(a, ObjectRef) else a for a in flattened_args
]
return signature.recover_args(flattened_args)
def execute_workflow(workflow: "Workflow",
outer_most_step_id: Optional[str] = None
) -> ray.ObjectRef:
outer_most_step_id: Optional[str] = None,
last_step_of_workflow: bool = False) -> ray.ObjectRef:
"""Execute workflow.
To fully explain what we are doing, we need to introduce some syntax first.
@ -112,10 +100,12 @@ def execute_workflow(workflow: "Workflow",
outer_most_step_id: The ID of the outer most workflow. None if it
does not exists. See "step_executor.execute_workflow" for detailed
explanation.
last_step_of_workflow: The step that generates the output of the
workflow (including nested steps).
Returns:
An object ref that represent the result.
"""
if outer_most_step_id is None:
if outer_most_step_id is None or outer_most_step_id == "":
# The current workflow step returns a nested workflow, and
# there is no outer step for the current step. So the current
# step is the outer most step for the inner nested workflow
@ -123,7 +113,7 @@ def execute_workflow(workflow: "Workflow",
outer_most_step_id = workflow_context.get_current_step_id()
# Passing down outer most step so inner nested steps would
# access the same outer most step.
return workflow.execute(outer_most_step_id)
return workflow.execute(outer_most_step_id, last_step_of_workflow)
def commit_step(store: workflow_storage.WorkflowStorage,
@ -145,15 +135,102 @@ def commit_step(store: workflow_storage.WorkflowStorage,
store.save_step_output(step_id, ret, outer_most_step_id)
@ray.remote
class StepType(enum.Enum):
"""All step types."""
FUNCTION = 1
ACTOR_METHOD = 2
READONLY_ACTOR_METHOD = 3
def _wrap_run(func: Callable, step_type: StepType, step_id: "StepID",
catch_exceptions: bool, step_max_retries: int, *args,
**kwargs) -> Tuple[Any, Any]:
"""Wrap the function and execute it.
It returns two parts, state and output. State is the part of result
to persist in a storage and pass to the next step. Output is the part
of result to return to the user but does not require persistence.
This table describes their relationships
+-----------------------------+-------+--------+----------------------+
| Step Type | state | output | catch exception into |
+-----------------------------+-------+--------+----------------------+
| Function Step | Y | N | state |
+-----------------------------+-------+--------+----------------------+
| Virtual Actor Step | Y | Y | output |
+-----------------------------+-------+--------+----------------------+
| Readonly Virtual Actor Step | N | Y | output |
+-----------------------------+-------+--------+----------------------+
Args:
step_type: The type of the step producing the result.
catch_exceptions: True if we would like to catch the exception.
step_max_retries: Max retry times for failure.
Returns:
State and output.
"""
exception = None
result = None
# step_max_retries are for application level failure.
# For ray failure, we should use max_retries.
for i in range(step_max_retries):
if exception is not None:
logger.error(f"Step '{step_id}' raises an exception. Retrying "
f"[{i}/{step_max_retries-1}]. Exception: {exception}")
try:
result = func(*args, **kwargs)
exception = None
break
except BaseException as e:
exception = e
if catch_exceptions:
if step_type == StepType.FUNCTION:
state, output = (result, exception), None
elif step_type == StepType.ACTOR_METHOD:
# virtual actors do not persist exception
state, output = result[0], (result[1], exception)
elif step_type == StepType.READONLY_ACTOR_METHOD:
state, output = None, (result, exception)
else:
raise ValueError(f"Unknown StepType '{step_type}'")
else:
if exception is not None:
if step_type != StepType.READONLY_ACTOR_METHOD:
_record_step_status(step_id, WorkflowStatus.RESUMABLE)
raise exception
if step_type == StepType.FUNCTION:
state, output = result, None
elif step_type == StepType.ACTOR_METHOD:
state, output = result
elif step_type == StepType.READONLY_ACTOR_METHOD:
state, output = None, result
else:
raise ValueError(f"Unknown StepType '{step_type}'")
is_nested = isinstance(state, Workflow)
if step_type != StepType.FUNCTION and is_nested:
# TODO(suquark): Support returning a workflow inside
# a virtual actor.
raise TypeError("Only a workflow step function "
"can return a workflow.")
return state, output
@ray.remote(num_returns=2)
def _workflow_step_executor(
func: Callable, context: workflow_context.WorkflowStepContext,
step_id: "StepID", step_inputs: "StepInputTupleToResolve",
outer_most_step_id: "StepID", catch_exceptions: bool,
step_max_retries: int) -> Any:
step_type: StepType, func: Callable,
context: workflow_context.WorkflowStepContext, step_id: "StepID",
step_inputs: "StepInputTupleToResolve", outer_most_step_id: "StepID",
catch_exceptions: bool, step_max_retries: int,
last_step_of_workflow: bool) -> Any:
"""Executor function for workflow step.
Args:
step_type: The type of workflow step.
func: The workflow step function.
context: Workflow step context. Used to access correct storage etc.
step_id: The ID of the step.
@ -163,53 +240,87 @@ def _workflow_step_executor(
catch_exceptions: If set to be true, return
(Optional[Result], Optional[Error]) instead of Result.
step_max_retries: Max number of retries encounter of a failure.
last_step_of_workflow: The step that generates the output of the
workflow (including nested steps).
Returns:
Workflow step output.
"""
ret = None
err = None
# step_max_retries are for application level failure.
# For ray failure, we should use max_retries.
from ray.experimental.workflow.common import WorkflowStatus
from ray.experimental.workflow.common import Workflow
for _ in range(step_max_retries):
try:
workflow_context.update_workflow_step_context(context, step_id)
args, kwargs = _resolve_step_inputs(step_inputs)
# Running the actual step function
ret = func(*args, **kwargs)
# Save workflow output
store = workflow_storage.get_workflow_storage()
commit_step(store, step_id, ret, outer_most_step_id)
if isinstance(ret, Workflow):
workflow_context.update_workflow_step_context(context, step_id)
args, kwargs = _resolve_step_inputs(step_inputs)
state, output = _wrap_run(func, step_type, step_id, catch_exceptions,
step_max_retries, *args, **kwargs)
if step_type != StepType.READONLY_ACTOR_METHOD:
store = workflow_storage.get_workflow_storage()
# Save workflow output
commit_step(store, step_id, state, outer_most_step_id)
# We MUST execute the workflow after saving the output.
if isinstance(state, Workflow):
if step_type == StepType.FUNCTION:
# execute sub-workflow
ret = execute_workflow(ret, outer_most_step_id)
err = None
break
except BaseException as e:
err = e
if catch_exceptions:
state = execute_workflow(state, outer_most_step_id,
last_step_of_workflow)
else:
# TODO(suquark): Support returning a workflow inside
# a virtual actor.
raise TypeError("Only a workflow step function "
"can return a workflow.")
elif last_step_of_workflow:
# advance the progress of the workflow
store.advance_progress(step_id)
_record_step_status(step_id, WorkflowStatus.FINISHED)
return (ret, err)
else:
if err is not None:
_record_step_status(step_id, WorkflowStatus.RESUMABLE)
raise err
_record_step_status(step_id, WorkflowStatus.FINISHED)
return ret
return state, output
def execute_workflow_step(
step_func: Callable, step_id: "StepID",
step_inputs: "WorkflowInputTuple", catch_exceptions: bool,
step_max_retries: int, ray_options: Dict[str, Any],
outer_most_step_id: "StepID") -> "WorkflowOutputType":
from ray.experimental.workflow.common import WorkflowStatus
def execute_workflow_step(step_id: "StepID", workflow_data: "WorkflowData",
outer_most_step_id: "StepID",
last_step_of_workflow: bool) -> "WorkflowOutputType":
_record_step_status(step_id, WorkflowStatus.RUNNING)
return _workflow_step_executor.options(**ray_options).remote(
step_func, workflow_context.get_workflow_step_context(), step_id,
step_inputs, outer_most_step_id, catch_exceptions, step_max_retries)
workflow_outputs = [w.execute() for w in workflow_data.inputs.workflows]
# NOTE: Input placeholder is only a placeholder. It only can be
# deserialized under a proper serialization context. Directly
# deserialize the placeholder without a context would raise
# an exception. If we pass the placeholder to _step_execution_function
# as a direct argument, it would be deserialized by Ray without a
# proper context. To prevent it, we put it inside a tuple.
step_inputs = (workflow_data.inputs.args, workflow_outputs,
workflow_data.inputs.object_refs)
return _workflow_step_executor.options(**workflow_data.ray_options).remote(
StepType.FUNCTION, workflow_data.func_body,
workflow_context.get_workflow_step_context(), step_id, step_inputs,
outer_most_step_id, workflow_data.catch_exceptions,
workflow_data.step_max_retries, last_step_of_workflow)[0]
def execute_virtual_actor_step(step_id: "StepID",
workflow_data: "WorkflowData",
readonly: bool) -> "WorkflowOutputType":
from ray.experimental.workflow.common import WorkflowStatus
if not readonly:
_record_step_status(step_id, WorkflowStatus.RUNNING)
workflow_outputs = [w.execute() for w in workflow_data.inputs.workflows]
step_inputs = (workflow_data.inputs.args, workflow_outputs,
workflow_data.inputs.object_refs)
outer_most_step_id = ""
if readonly:
step_type = StepType.READONLY_ACTOR_METHOD
else:
step_type = StepType.ACTOR_METHOD
ret = _workflow_step_executor.options(**workflow_data.ray_options).remote(
step_type,
workflow_data.func_body,
workflow_context.get_workflow_step_context(),
step_id,
step_inputs,
outer_most_step_id,
workflow_data.catch_exceptions,
workflow_data.step_max_retries,
last_step_of_workflow=True)
if readonly:
return ret[1] # only return output. skip state
return ret
def _record_step_status(step_id: "StepID", status: "WorkflowStatus") -> None:

View file

@ -1,15 +1,9 @@
import functools
import inspect
from typing import Any, Dict, List, Tuple, Callable, Optional
from typing import Callable
import ray
from ray import ObjectRef
from ray._private import signature
from ray.experimental.workflow import serialization_context
from ray.experimental.workflow.common import (
Workflow, StepID, WorkflowOutputType, WorkflowInputTuple)
StepInputTupleToResolve = Tuple[ObjectRef, List[ObjectRef], List[ObjectRef]]
from ray.experimental.workflow.common import Workflow, WorkflowData
class WorkflowStepFunction:
@ -30,58 +24,26 @@ class WorkflowStepFunction:
self._step_max_retries = step_max_retries
self._catch_exceptions = catch_exceptions
self._ray_options = ray_options or {}
self._func_signature = list(
inspect.signature(func).parameters.values())
self._func_signature = signature.extract_signature(func)
# Override signature and docstring
@functools.wraps(func)
def _build_workflow(*args, **kwargs) -> Workflow:
# validate if the input arguments match the signature of the
# original function.
reconstructed_signature = inspect.Signature(
parameters=self._func_signature)
try:
reconstructed_signature.bind(*args, **kwargs)
except TypeError as exc: # capture a friendlier stacktrace
raise TypeError(str(exc)) from None
workflows: List[Workflow] = []
object_refs: List[ObjectRef] = []
with serialization_context.workflow_args_serialization_context(
workflows, object_refs):
# NOTE: When calling 'ray.put', we trigger python object
# serialization. Under our serialization context,
# Workflows and ObjectRefs are separated from the arguments,
# leaving a placeholder object with all other python objects.
# Then we put the placeholder object to object store,
# so it won't be mutated later. This guarantees correct
# semantics. See "tests/test_variable_mutable.py" as
# an example.
input_placeholder: ObjectRef = ray.put((args, kwargs))
if object_refs:
raise ValueError(
"There are ObjectRefs in workflow inputs. However "
"workflow currently does not support checkpointing "
"ObjectRefs.")
return Workflow(self._func, self._run_step, input_placeholder,
workflows, object_refs, self._step_max_retries,
self._catch_exceptions, self._ray_options)
flattened_args = signature.flatten_args(self._func_signature, args,
kwargs)
workflow_inputs = serialization_context.make_workflow_inputs(
flattened_args)
workflow_data = WorkflowData(
func_body=self._func,
inputs=workflow_inputs,
step_max_retries=self._step_max_retries,
catch_exceptions=self._catch_exceptions,
ray_options=self._ray_options,
)
return Workflow(workflow_data)
self.step = _build_workflow
def _run_step(
self,
step_id: StepID,
step_inputs: WorkflowInputTuple,
catch_exceptions: bool,
step_max_retries: int,
ray_options: Dict[str, Any],
outer_most_step_id: Optional[StepID] = None) -> WorkflowOutputType:
from ray.experimental.workflow.step_executor import (
execute_workflow_step)
return execute_workflow_step(self._func, step_id, step_inputs,
catch_exceptions, step_max_retries,
ray_options, outer_most_step_id)
def __call__(self, *args, **kwargs):
raise TypeError("Workflow steps cannot be called directly. Instead "
f"of running '{self.step.__name__}()', "
@ -102,8 +64,8 @@ class WorkflowStepFunction:
If it's set to be true, (Optional[R], Optional[E]) will be
returned.
If it's false, the normal result will be returned.
**kwargs(dict): All parameters in this fields will be passed to
ray remote function options.
**ray_options(dict): All parameters in this fields will be passed
to ray remote function options.
Returns:
The step function itself.

View file

@ -297,6 +297,35 @@ class Storage(metaclass=abc.ABCMeta):
A list of workflow ids
"""
@abstractmethod
async def load_workflow_progress(self, workflow_id: str) -> Dict[str, Any]:
"""Load the latest progress of a workflow. This is used by a
virtual actor.
Args:
workflow_id: ID of the workflow job.
Raises:
DataLoadError: if we fail to load the progress.
Returns:
Metadata about the workflow progress.
"""
@abstractmethod
async def save_workflow_progress(self, workflow_id: str,
metadata: Dict[str, Any]) -> None:
"""Save the latest progress of a workflow. This is used by a
virtual actor.
Args:
workflow_id: ID of the workflow job.
metadata: Metadata about the workflow progress.
Raises:
DataSaveError: if we fail to save the progress.
"""
@abstractmethod
def __reduce__(self):
"""Reduce the storage to a serializable object."""

View file

@ -20,6 +20,7 @@ STEP_OUTPUT = "output.pkl"
STEP_FUNC_BODY = "func_body.pkl"
CLASS_BODY = "class_body.pkl"
WORKFLOW_META = "workflow_meta.json"
WORKFLOW_PROGRESS = "progress.json"
@contextlib.contextmanager
@ -276,6 +277,25 @@ class FilesystemStorageImpl(Storage):
except Exception as e:
raise DataSaveError from e
async def load_workflow_progress(self, workflow_id: str) -> Dict[str, Any]:
path = (self._workflow_root_dir / workflow_id / STEPS_DIR /
WORKFLOW_PROGRESS)
try:
with _open_atomic(path) as f:
return json.load(f)
except Exception as e:
raise DataLoadError from e
async def save_workflow_progress(self, workflow_id: str,
metadata: Dict[str, Any]) -> None:
path = (self._workflow_root_dir / workflow_id / STEPS_DIR /
WORKFLOW_PROGRESS)
try:
with _open_atomic(path, "w") as f:
json.dump(metadata, f)
except Exception as e:
raise DataSaveError from e
async def save_workflow_meta(self, workflow_id: str,
metadata: Dict[str, Any]) -> None:
file_path = self._workflow_root_dir / workflow_id / WORKFLOW_META

View file

@ -21,6 +21,7 @@ STEP_OUTPUT = "output.pkl"
STEP_FUNC_BODY = "func_body.pkl"
CLASS_BODY = "class_body.pkl"
WORKFLOW_META = "workflow_meta.json"
WORKFLOW_PROGRESS = "progress.json"
MAX_RECEIVED_DATA_MEMORY_SIZE = 25 * 1024 * 1024 # 25MB
@ -291,6 +292,17 @@ class S3StorageImpl(Storage):
objs = await self._list_objects()
return objs
@data_load_error
async def load_workflow_progress(self, workflow_id: str) -> Dict[str, Any]:
path = self._get_s3_path(workflow_id, STEPS_DIR, STEP_OUTPUTS_METADATA)
return await self._get_object(path, True)
@data_save_error
async def save_workflow_progress(self, workflow_id: str,
metadata: Dict[str, Any]) -> None:
path = self._get_s3_path(workflow_id, STEPS_DIR, STEP_OUTPUTS_METADATA)
await self._put_object(path, metadata, True)
def __reduce__(self):
return S3StorageImpl, (self._bucket, self._s3_path, self._region_name,
self._endpoint_url, self._aws_access_key_id,

View file

@ -151,9 +151,9 @@ def test_workflow_output_resolving(workflow_start_regular_shared):
def test_run_or_resume_during_running(workflow_start_regular_shared):
output = simple_sequential.step().run_async(workflow_id="running_workflow")
with pytest.raises(ValueError):
with pytest.raises(RuntimeError):
simple_sequential.step().run_async(workflow_id="running_workflow")
with pytest.raises(ValueError):
with pytest.raises(RuntimeError):
workflow.resume(workflow_id="running_workflow")
assert ray.get(output) == "[source1][append1][append2]"

View file

@ -144,7 +144,8 @@ def test_shortcut(workflow_start_regular):
# the shortcut points to the step with output checkpoint
store = workflow_storage.get_workflow_storage("shortcut")
step_id = store.get_entrypoint_step_id()
assert store.inspect_step(step_id).output_object_valid
output_step_id = store.inspect_step(step_id).output_step_id
assert store.inspect_step(output_step_id).output_object_valid
@workflow.step

View file

@ -1,6 +1,7 @@
import pytest
import asyncio
import ray
from ray._private import signature
from ray.tests.conftest import * # noqa
from ray.experimental.workflow import workflow_storage
from ray.experimental.workflow import storage
@ -108,7 +109,10 @@ def test_workflow_storage(workflow_start_regular):
"output_step_id": "a12423",
"dynamic_output_step_id": "b1234"
}
args = ([1, "2"], {"k": b"543"})
flattened_args = [
signature.DUMMY_TYPE, 1, signature.DUMMY_TYPE, "2", "k", b"543"
]
args = signature.recover_args(flattened_args)
output = ["the_answer"]
object_resolved = 42
obj_ref = ray.put(object_resolved)
@ -119,7 +123,8 @@ def test_workflow_storage(workflow_start_regular):
input_metadata))
asyncio_run(
raw_storage.save_step_func_body(workflow_id, step_id, some_func))
asyncio_run(raw_storage.save_step_args(workflow_id, step_id, args))
asyncio_run(
raw_storage.save_step_args(workflow_id, step_id, flattened_args))
asyncio_run(raw_storage.save_object_ref(workflow_id, obj_ref))
asyncio_run(
raw_storage.save_step_output_metadata(workflow_id, step_id,

View file

@ -1,4 +1,5 @@
import abc
import functools
import inspect
import logging
from typing import List, TYPE_CHECKING, Any, Tuple, Dict
@ -9,13 +10,17 @@ import ray
from ray.util.inspect import (is_function_or_method, is_class_method,
is_static_method)
from ray._private import signature
from ray.experimental.workflow.common import slugify
from ray.experimental.workflow.common import slugify, WorkflowData, Workflow
from ray.experimental.workflow import serialization_context
from ray.experimental.workflow.storage import Storage, get_global_storage
from ray.experimental.workflow.workflow_storage import WorkflowStorage
from ray.experimental.workflow.recovery import get_latest_output
from ray.experimental.workflow.workflow_access import (
get_or_create_management_actor)
from ray.experimental.workflow import workflow_context
from ray.experimental.workflow.step_function import WorkflowStepFunction
from ray.experimental.workflow.step_executor import execute_virtual_actor_step
if TYPE_CHECKING:
from ray import ObjectRef
@ -29,23 +34,6 @@ class VirtualActorNotInitializedError(Exception):
super().__init__(message)
@ray.remote
def _readonly_method_executor(actor_id: str, storage: Storage, cls: type,
method_name: str, flattened_args: List):
instance = cls.__new__(cls)
try:
state = get_latest_output(actor_id, storage)
except Exception as e:
raise VirtualActorNotInitializedError(
f"Virtual actor '{actor_id}' has not been initialized. "
"We cannot get the latest state for the "
"readonly virtual actor.") from e
instance.__setstate__(state)
args, kwargs = signature.recover_args(flattened_args)
method = getattr(instance, method_name)
return method(*args, **kwargs)
# TODO(suquark): This is just a temporary solution. A virtual actor writer
# should take place of this solution later.
@WorkflowStepFunction
@ -326,6 +314,26 @@ class VirtualActorClass(VirtualActorClassBase):
return VirtualActor(self._metadata, actor_id, storage)
def _wrap_readonly_actor_method(actor_id: str, cls: type, method_name: str):
# generate better step names
@functools.wraps(getattr(cls, method_name))
def _readonly_actor_method(*args, **kwargs):
storage = get_global_storage()
instance = cls.__new__(cls)
try:
state = get_latest_output(actor_id, storage)
except Exception as e:
raise VirtualActorNotInitializedError(
f"Virtual actor '{actor_id}' has not been initialized. "
"We cannot get the latest state for the "
"readonly virtual actor.") from e
instance.__setstate__(state)
method = getattr(instance, method_name)
return method(*args, **kwargs)
return _readonly_actor_method
class VirtualActor:
"""The instance of a virtual actor class."""
@ -372,17 +380,32 @@ class VirtualActor:
def _actor_method_call(self, method_name: str, args,
kwargs) -> "ObjectRef":
flatten_args = self._metadata.flatten_args(method_name, args, kwargs)
flattened_args = self._metadata.flatten_args(method_name, args, kwargs)
cls = self._metadata.cls
method = getattr(cls, method_name, None)
if method is None:
raise AttributeError(f"Method '{method}' does not exist.")
readonly = getattr(method, "__virtual_actor_readonly__", None)
if readonly:
return _readonly_method_executor.remote(
self._actor_id, self._storage, cls, method_name, flatten_args)
raise NotImplementedError("Virtual actor writer mode has not been "
"supported yet.")
raise AttributeError(f"Method '{method_name}' does not exist.")
workflow_inputs = serialization_context.make_workflow_inputs(
flattened_args)
readonly = getattr(method, "__virtual_actor_readonly__", False)
with workflow_context.workflow_step_context(self._actor_id,
self._storage.storage_url):
if readonly:
_readonly_actor_method = _wrap_readonly_actor_method(
self._actor_id, cls, method_name)
# TODO(suquark): Support actor options.
workflow_data = WorkflowData(
func_body=_readonly_actor_method,
inputs=workflow_inputs,
step_max_retries=1,
catch_exceptions=False,
ray_options={},
)
wf = Workflow(workflow_data)
return execute_virtual_actor_step(wf.id, workflow_data, True)
else:
raise NotImplementedError(
"Virtual actor writer mode has not been supported yet.")
def decorate_actor(cls: type):

View file

@ -126,8 +126,8 @@ class WorkflowManagementActor:
workflow result.
"""
if workflow_id in self._workflow_outputs:
raise ValueError(f"The output of workflow[id={workflow_id}] "
"already exists.")
raise RuntimeError(f"The output of workflow[id={workflow_id}] "
"already exists.")
output = recovery.resume_workflow_job.remote(workflow_id,
self._store.storage_url)
self._workflow_outputs[workflow_id] = output

View file

@ -8,8 +8,9 @@ from typing import Dict, List, Optional, Any, Callable, Tuple, Union
from dataclasses import dataclass
import ray
from ray._private import signature
from ray.experimental.workflow import storage
from ray.experimental.workflow.common import (Workflow, WorkflowInputs, StepID,
from ray.experimental.workflow.common import (Workflow, WorkflowData, StepID,
WorkflowMetaData, WorkflowStatus)
from ray.experimental.workflow import workflow_context
from ray.experimental.workflow import serialization_context
@ -99,7 +100,10 @@ class WorkflowStorage:
self._storage.save_step_output(self._workflow_id, step_id,
ret))
dynamic_output_id = step_id
if outer_most_step_id is not None:
# outer_most_step_id == "" indicates the root step of a workflow.
# This would directly update "outputs.json" in the workflow dir,
# and we want to avoid it.
if outer_most_step_id is not None and outer_most_step_id != "":
tasks.append(
self._update_dynamic_output(outer_most_step_id,
dynamic_output_id))
@ -135,8 +139,9 @@ class WorkflowStorage:
"""
with serialization_context.workflow_args_resolving_context(
workflows, object_refs):
return asyncio_run(
flattened_args = asyncio_run(
self._storage.load_step_args(self._workflow_id, step_id))
return signature.recover_args(flattened_args)
def load_object_ref(self, object_id: str) -> ray.ObjectRef:
"""Load the input object ref.
@ -249,25 +254,18 @@ class WorkflowStorage:
)
async def _write_step_inputs(self, step_id: StepID,
inputs: WorkflowInputs) -> None:
inputs: WorkflowData) -> None:
"""Save workflow inputs."""
f = inputs.func_body
metadata = {
"name": f.__module__ + "." + f.__qualname__,
"object_refs": inputs.object_refs,
"workflows": inputs.workflows,
"step_max_retries": inputs.step_max_retries,
"catch_exceptions": inputs.catch_exceptions,
"ray_options": inputs.ray_options,
}
metadata = inputs.to_metadata()
with serialization_context.workflow_args_keeping_context():
# TODO(suquark): in the future we should write to storage directly
# with plasma store object in memory.
args_obj = ray.get(inputs.args)
args_obj = ray.get(inputs.inputs.args)
save_tasks = [
self._storage.save_step_input_metadata(self._workflow_id, step_id,
metadata),
self._storage.save_step_func_body(self._workflow_id, step_id, f),
self._storage.save_step_func_body(self._workflow_id, step_id,
inputs.func_body),
self._storage.save_step_args(self._workflow_id, step_id, args_obj)
]
await asyncio.gather(*save_tasks)
@ -281,7 +279,7 @@ class WorkflowStorage:
"""
assert not workflow.executed
tasks = [
self._write_step_inputs(w.id, w.get_inputs())
self._write_step_inputs(w.id, w.data)
for w in workflow.iter_workflows_in_dag()
]
asyncio_run(asyncio.gather(*tasks))
@ -341,6 +339,34 @@ class WorkflowStorage:
def list_workflow(self) -> List[Tuple[str, WorkflowStatus]]:
return asyncio_run(self._list_workflow())
def advance_progress(self, finished_step_id: "StepID") -> None:
"""Save the latest progress of a workflow. This is used by a
virtual actor.
Args:
finished_step_id: The step that contains the latest output.
Raises:
DataSaveError: if we fail to save the progress.
"""
asyncio_run(
self._storage.save_workflow_progress(self._workflow_id, {
"step_id": finished_step_id,
}))
def get_latest_progress(self) -> "StepID":
"""Load the latest progress of a workflow. This is used by a
virtual actor.
Raises:
DataLoadError: if we fail to load the progress.
Returns:
The step that contains the latest output.
"""
return asyncio_run(
self._storage.load_workflow_progress(self._workflow_id))["step_id"]
def get_workflow_storage(workflow_id: Optional[str] = None) -> WorkflowStorage:
"""Get the storage for the workflow.