[workflow] Fast workflow indexing (#24767)

* workflow indexing

* simplify workflow storage API

* Only fix workflow status when updating the status.

* support status filter
This commit is contained in:
Siyuan (Ryans) Zhuang 2022-05-24 20:21:08 -07:00 committed by GitHub
parent fa32cb7c40
commit f67871c1f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 292 additions and 70 deletions

View file

@ -286,7 +286,8 @@ def list_all(
f" {status_filter}"
)
elif status_filter is None:
status_filter = set(WorkflowStatus.__members__.keys())
status_filter = set(WorkflowStatus)
status_filter.discard(WorkflowStatus.NONE)
else:
raise TypeError(
"status_filter must be WorkflowStatus or a set of WorkflowStatus."

View file

@ -140,6 +140,8 @@ class WorkflowStaticRef:
@PublicAPI(stability="beta")
@unique
class WorkflowStatus(str, Enum):
# No status is set for this workflow.
NONE = "NONE"
# There is at least a remote task running in ray cluster
RUNNING = "RUNNING"
# It got canceled and can't be resumed later.

View file

@ -10,7 +10,6 @@ from ray.workflow import workflow_storage
from ray.workflow.common import (
Workflow,
WorkflowStatus,
WorkflowMetaData,
StepType,
WorkflowNotFoundError,
validate_user_metadata,
@ -128,10 +127,15 @@ def get_output(workflow_id: str, name: Optional[str]) -> ray.ObjectRef:
def cancel(workflow_id: str) -> None:
try:
workflow_manager = get_management_actor()
ray.get(workflow_manager.cancel_workflow.remote(workflow_id))
except ValueError:
wf_store = workflow_storage.get_workflow_storage(workflow_id)
wf_store.save_workflow_meta(WorkflowMetaData(WorkflowStatus.CANCELED))
# TODO(suquark): Here we update workflow status "offline", so it is likely
# thread-safe because there is no workflow management actor updating the
# workflow concurrently. But we should be careful if we are going to
# update more workflow status offline in the future.
wf_store.update_workflow_status(WorkflowStatus.CANCELED)
return
ray.get(workflow_manager.cancel_workflow.remote(workflow_id))
def get_status(workflow_id: str) -> Optional[WorkflowStatus]:
@ -143,12 +147,12 @@ def get_status(workflow_id: str) -> Optional[WorkflowStatus]:
if running:
return WorkflowStatus.RUNNING
store = workflow_storage.get_workflow_storage(workflow_id)
meta = store.load_workflow_meta()
if meta is None:
status = store.load_workflow_status()
if status == WorkflowStatus.NONE:
raise WorkflowNotFoundError(workflow_id)
if meta.status == WorkflowStatus.RUNNING:
if status == WorkflowStatus.RUNNING:
return WorkflowStatus.RESUMABLE
return meta.status
return status
def get_metadata(workflow_id: str, name: Optional[str]) -> Dict[str, Any]:
@ -178,10 +182,24 @@ def list_all(status_filter: Set[WorkflowStatus]) -> List[Tuple[str, WorkflowStat
runnings = set(runnings)
# Here we don't have workflow id, so use empty one instead
store = workflow_storage.get_workflow_storage("")
exclude_running = False
if (
WorkflowStatus.RESUMABLE in status_filter
and WorkflowStatus.RUNNING not in status_filter
):
# Here we have to add "RUNNING" to the status filter, because some "RESUMABLE"
# workflows are converted from "RUNNING" workflows below.
exclude_running = True
status_filter.add(WorkflowStatus.RUNNING)
status_from_storage = store.list_workflow(status_filter)
ret = []
for (k, s) in store.list_workflow():
if s == WorkflowStatus.RUNNING and k not in runnings:
s = WorkflowStatus.RESUMABLE
for (k, s) in status_from_storage:
if s == WorkflowStatus.RUNNING:
if k not in runnings:
s = WorkflowStatus.RESUMABLE
elif exclude_running:
continue
if s in status_filter:
ret.append((k, s))
return ret

View file

@ -0,0 +1,83 @@
import pytest
from ray.workflow.common import WorkflowStatus
from ray.workflow.workflow_storage import WorkflowIndexingStorage
def test_workflow_status_update(workflow_start_regular):
# Test workflow status update is working.
store = WorkflowIndexingStorage()
assert not store.list_workflow()
for i in range(100):
assert store.load_workflow_status(workflow_id=str(i)) == WorkflowStatus.NONE
for i in range(100):
store.update_workflow_status(str(i), WorkflowStatus.RUNNING)
assert sorted(store.list_workflow()) == sorted(
[(str(i), WorkflowStatus.RUNNING) for i in range(100)]
)
assert sorted(store.list_workflow({WorkflowStatus.RUNNING})) == sorted(
[(str(i), WorkflowStatus.RUNNING) for i in range(100)]
)
assert sorted(store.list_workflow({WorkflowStatus.RESUMABLE})) == []
for i in range(100):
store.update_workflow_status(str(i), WorkflowStatus.RESUMABLE)
assert sorted(store.list_workflow({WorkflowStatus.RESUMABLE})) == sorted(
[(str(i), WorkflowStatus.RESUMABLE) for i in range(100)]
)
assert sorted(store.list_workflow({WorkflowStatus.FAILED})) == []
for i in range(100):
store.update_workflow_status(str(i), WorkflowStatus.FAILED)
assert sorted(store.list_workflow()) == sorted(
[(str(i), WorkflowStatus.FAILED) for i in range(100)]
)
assert sorted(store.list_workflow({WorkflowStatus.FAILED})) == sorted(
[(str(i), WorkflowStatus.FAILED) for i in range(100)]
)
assert sorted(store.list_workflow({WorkflowStatus.RUNNING})) == []
def test_workflow_auto_fix_status(workflow_start_regular):
# Test workflow can recovery from corrupted status updating.
store = WorkflowIndexingStorage()
assert not store.list_workflow()
# this is a hack to crash status updating
_key_workflow_with_status = store._key_workflow_with_status
store._key_workflow_with_status = None
for i in range(100):
try:
store.update_workflow_status(str(i), WorkflowStatus.RUNNING)
except TypeError:
pass
store._key_workflow_with_status = _key_workflow_with_status
assert sorted(store.list_workflow()) == sorted(
[(str(i), WorkflowStatus.RUNNING) for i in range(100)]
)
for i in range(100):
try:
# when update workflow, we fix failed status
store.update_workflow_status(str(i), WorkflowStatus.RESUMABLE)
except TypeError:
pass
for i in range(100):
assert store.load_workflow_status(str(i)) == WorkflowStatus.RESUMABLE
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -135,6 +135,7 @@ class WorkflowManagementActor:
self._step_output_cache: Dict[Tuple[str, str], LatestWorkflowOutput] = {}
self._actor_initialized: Dict[str, ray.ObjectRef] = {}
self._step_status: Dict[str, Dict[str, common.WorkflowStatus]] = {}
self._workflow_status: Dict[str, common.WorkflowStatus] = {}
def get_cached_step_output(
self, workflow_id: str, step_id: "StepID"
@ -194,9 +195,7 @@ class WorkflowManagementActor:
)
self._step_output_cache[(workflow_id, step_id)] = latest_output
wf_store.save_workflow_meta(
common.WorkflowMetaData(common.WorkflowStatus.RUNNING)
)
self._update_workflow_status(workflow_id, common.WorkflowStatus.RUNNING)
if workflow_id not in self._step_status:
self._step_status[workflow_id] = {}
@ -211,6 +210,11 @@ class WorkflowManagementActor:
else:
return f"{step_name}_{idx}"
def _update_workflow_status(self, workflow_id: str, status: common.WorkflowStatus):
wf_store = workflow_storage.WorkflowStorage(workflow_id)
wf_store.update_workflow_status(status)
self._workflow_status[workflow_id] = status
def update_step_status(
self,
workflow_id: str,
@ -233,30 +237,21 @@ class WorkflowManagementActor:
if status != common.WorkflowStatus.FAILED and remaining != 0:
return
wf_store = workflow_storage.WorkflowStorage(workflow_id)
if status == common.WorkflowStatus.FAILED:
if workflow_id in self._workflow_outputs:
cancel_job(self._workflow_outputs.pop(workflow_id).output)
wf_store.save_workflow_meta(
common.WorkflowMetaData(common.WorkflowStatus.FAILED)
)
self._update_workflow_status(workflow_id, common.WorkflowStatus.FAILED)
self._step_status.pop(workflow_id)
else:
wf_store.save_workflow_meta(
common.WorkflowMetaData(common.WorkflowStatus.SUCCESSFUL)
)
self._update_workflow_status(workflow_id, common.WorkflowStatus.SUCCESSFUL)
self._step_status.pop(workflow_id)
workflow_postrun_metadata = {"end_time": time.time()}
wf_store.save_workflow_postrun_metadata(workflow_postrun_metadata)
wf_store = workflow_storage.WorkflowStorage(workflow_id)
wf_store.save_workflow_postrun_metadata({"end_time": time.time()})
def cancel_workflow(self, workflow_id: str) -> None:
self._step_status.pop(workflow_id)
cancel_job(self._workflow_outputs.pop(workflow_id).output)
wf_store = workflow_storage.WorkflowStorage(workflow_id)
wf_store.save_workflow_meta(
common.WorkflowMetaData(common.WorkflowStatus.CANCELED)
)
self._update_workflow_status(workflow_id, common.WorkflowStatus.CANCELED)
def is_workflow_running(self, workflow_id: str) -> bool:
return (
@ -318,15 +313,15 @@ class WorkflowManagementActor:
if workflow_id in self._workflow_outputs and name is None:
return self._workflow_outputs[workflow_id].output
wf_store = workflow_storage.WorkflowStorage(workflow_id)
meta = wf_store.load_workflow_meta()
if meta is None:
status = wf_store.load_workflow_status()
if status == common.WorkflowStatus.NONE:
raise ValueError(f"No such workflow {workflow_id}")
if meta == common.WorkflowStatus.CANCELED:
if status == common.WorkflowStatus.CANCELED:
raise ValueError(f"Workflow {workflow_id} is canceled")
if name is None:
# For resumable workflow, the workflow result is not ready.
# It has to be resumed first.
if meta == common.WorkflowStatus.RESUMABLE:
if status == common.WorkflowStatus.RESUMABLE:
raise ValueError(
f"Workflow {workflow_id} is in resumable status, "
"please resume it"

View file

@ -5,7 +5,7 @@ workflows.
import json
import os
from typing import Dict, List, Optional, Any, Callable, Tuple, Union
from typing import Dict, List, Optional, Any, Callable, Tuple, Union, Set
from dataclasses import dataclass
import logging
@ -16,7 +16,6 @@ from ray.internal import storage
from ray.workflow.common import (
Workflow,
StepID,
WorkflowMetaData,
WorkflowStatus,
WorkflowRef,
WorkflowNotFoundError,
@ -51,6 +50,8 @@ WORKFLOW_USER_METADATA = "user_run_metadata.json"
WORKFLOW_PRERUN_METADATA = "pre_run_metadata.json"
WORKFLOW_POSTRUN_METADATA = "post_run_metadata.json"
WORKFLOW_PROGRESS = "progress.json"
WORKFLOW_STATUS_DIR = "__status__"
WORKFLOW_STATUS_DIRTY_DIR = "dirty"
# Without this counter, we're going to scan all steps to get the number of
# steps with a given name. This can be very expensive if there are too
# many duplicates.
@ -86,6 +87,144 @@ class StepInspectResult:
)
class WorkflowIndexingStorage:
"""Access and maintenance the indexing of workflow status.
It runs a protocol that guarantees we can recover from any interrupted
status updating. This protocol is **not thread-safe** for updating the
status of the same workflow, currently it is executed by workflow management
actor with a single thread.
Here is how the protocol works:
Update the status of a workflow
1. Load workflow status from workflow data. If it is the same as the new status,
return.
2. Check if the workflow status updating is dirty. If it is, fix the
workflow status; otherwise, mark the workflow status updating dirty.
3. Update status in the workflow metadata.
4. Insert the workflow ID key in the status indexing directory of the new status.
5. Delete the workflow ID key in the status indexing directory of
the previous status.
6. Remove the workflow status updating dirty mark.
Load a status of a workflow
1. Read the status of the workflow from the workflow metadata.
2. Return the status.
List the status of all workflows
1. Get status of all workflows by listing workflow ID keys in each workflow
status indexing directory.
2. List all workflows with dirty updating status. Get their status from
workflow data. Override the status of the corresponding workflow.
3. Return all the status.
"""
def __init__(self):
self._storage = storage.get_client(WORKFLOW_ROOT)
def update_workflow_status(self, workflow_id: str, status: WorkflowStatus):
"""Update the status of the workflow.
Try fixing indexing if workflow status updating was marked dirty.
This method is NOT thread-safe. It is handled by the workflow management actor.
"""
prev_status = self.load_workflow_status(workflow_id)
if prev_status != status:
# Try fixing indexing if workflow status updating was marked dirty.
if (
self._storage.get_info(self._key_workflow_status_dirty(workflow_id))
is not None
):
# This means the previous status update failed. Fix it.
self._storage.put(
self._key_workflow_with_status(workflow_id, prev_status), b""
)
for s in WorkflowStatus:
if s != prev_status:
self._storage.delete(
self._key_workflow_with_status(workflow_id, s)
)
else:
self._storage.put(self._key_workflow_status_dirty(workflow_id), b"")
# Transactional update of workflow status
self._storage.put(
self._key_workflow_metadata(workflow_id),
json.dumps({"status": status.value}).encode(),
)
self._storage.put(self._key_workflow_with_status(workflow_id, status), b"")
if prev_status is not WorkflowStatus.NONE:
self._storage.delete(
self._key_workflow_with_status(workflow_id, prev_status)
)
self._storage.delete(self._key_workflow_status_dirty(workflow_id))
def load_workflow_status(self, workflow_id: str):
"""Load the committed workflow status."""
raw_data = self._storage.get(self._key_workflow_metadata(workflow_id))
if raw_data is not None:
metadata = json.loads(raw_data)
return WorkflowStatus(metadata["status"])
return WorkflowStatus.NONE
def list_workflow(
self, status_filter: Optional[Set[WorkflowStatus]] = None
) -> List[Tuple[str, WorkflowStatus]]:
"""List workflow status. Override status of the workflows whose status updating
were marked dirty with the workflow status from workflow metadata.
Args:
status_filter: If given, only returns workflow with that status. This can
be a single status or set of statuses.
"""
if status_filter is None:
status_filter = set(WorkflowStatus)
status_filter.discard(WorkflowStatus.NONE)
elif not isinstance(status_filter, set):
raise TypeError("'status_filter' should either be 'None' or a set.")
elif WorkflowStatus.NONE in status_filter:
raise ValueError("'WorkflowStatus.NONE' is not a valid filter value.")
results = {}
for status in status_filter:
try:
# empty string points the key to the dir
for p in self._storage.list(self._key_workflow_with_status("", status)):
workflow_id = p.base_name
results[workflow_id] = status
except FileNotFoundError:
pass
# Get "correct" status of workflows
try:
for p in self._storage.list(self._key_workflow_status_dirty("")):
workflow_id = p.base_name
# overwrite status
results.pop(workflow_id, None)
status = self.load_workflow_status(workflow_id)
if status in status_filter:
results[workflow_id] = status
except FileNotFoundError:
pass
return list(results.items())
def delete_workflow_status(self, workflow_id: str):
"""Delete status indexing for the workflow."""
for status in WorkflowStatus:
self._storage.delete(self._key_workflow_with_status(workflow_id, status))
self._storage.delete(self._key_workflow_status_dirty(workflow_id))
def _key_workflow_with_status(self, workflow_id: str, status: WorkflowStatus):
"""A key whose existence marks the status of the workflow."""
return os.path.join(WORKFLOW_STATUS_DIR, status.value, workflow_id)
def _key_workflow_status_dirty(self, workflow_id: str):
"""A key marks the workflow status dirty, because it is under change."""
return os.path.join(WORKFLOW_STATUS_DIR, WORKFLOW_STATUS_DIRTY_DIR, workflow_id)
def _key_workflow_metadata(self, workflow_id: str):
return os.path.join(workflow_id, WORKFLOW_META)
class WorkflowStorage:
"""Access workflow in storage. This is a higher-level abstraction,
which does not care about the underlining storage implementation."""
@ -96,6 +235,7 @@ class WorkflowStorage:
_ensure_workflow_initialized()
self._storage = storage.get_client(os.path.join(WORKFLOW_ROOT, workflow_id))
self._status_storage = WorkflowIndexingStorage()
self._workflow_id = workflow_id
def load_step_output(self, step_id: StepID) -> Any:
@ -545,45 +685,16 @@ class WorkflowStorage:
return _load_workflow_metadata()
def save_workflow_meta(self, metadata: WorkflowMetaData) -> None:
"""Save the metadata of the current workflow.
def list_workflow(
self, status_filter: Optional[Set[WorkflowStatus]] = None
) -> List[Tuple[str, WorkflowStatus]]:
"""List all workflows matching a given status filter.
Args:
metadata: WorkflowMetaData of the current workflow.
Raises:
DataSaveError: if we fail to save the class body.
status_filter: If given, only returns workflow with that status. This can
be a single status or set of statuses.
"""
metadata = {"status": metadata.status.value}
self._put(self._key_workflow_metadata(), metadata, True)
def load_workflow_meta(self) -> Optional[WorkflowMetaData]:
"""Load the metadata of the current workflow.
Returns:
The metadata of the current workflow. If it doesn't exist,
return None.
"""
try:
metadata = self._get(self._key_workflow_metadata(), True)
return WorkflowMetaData(status=WorkflowStatus(metadata["status"]))
except KeyNotFoundError:
return None
def _list_workflow(self) -> List[Tuple[str, WorkflowStatus]]:
results = []
for workflow_id in self._scan("", ignore_errors=True):
try:
metadata = self._get(os.path.join(workflow_id, WORKFLOW_META), True)
results.append((workflow_id, WorkflowStatus(metadata["status"])))
except KeyNotFoundError:
pass
return results
def list_workflow(self) -> List[Tuple[str, WorkflowStatus]]:
return self._list_workflow()
return self._status_storage.list_workflow(status_filter)
def advance_progress(self, finished_step_id: "StepID") -> None:
"""Save the latest progress of a workflow. This is used by a
@ -613,6 +724,7 @@ class WorkflowStorage:
def delete_workflow(self) -> None:
# TODO (Alex): There's a race condition here if someone tries to
# start the workflow between these ops.
self._status_storage.delete_workflow_status(self._workflow_id)
found = self._storage.delete_dir("")
# TODO (Alex): Different file systems seem to have different
# behavior when deleting a prefix that doesn't exist, so we may
@ -621,6 +733,17 @@ class WorkflowStorage:
if not found:
raise WorkflowNotFoundError(self._workflow_id)
def update_workflow_status(self, status: WorkflowStatus):
"""Update the status of the workflow.
This method is NOT thread-safe. It is handled by the workflow management actor.
"""
self._status_storage.update_workflow_status(self._workflow_id, status)
def load_workflow_status(self):
"""Load workflow status. If we find the previous status updating failed,
fix it with redo-log transaction recovery."""
return self._status_storage.load_workflow_status(self._workflow_id)
def _put(self, key: str, data: Any, is_json: bool = False) -> str:
"""Serialize and put an object in the object store.