[Workflow]Make workflow logs publish to the correct driver. (#24089)

All workflow tasks are executed as remote functions that submitted from WorkflowManagmentActor. WorkflowManagmentActor is a detached long-running actor whose owner is the first driver in the cluster that runs the very first workflow execution. Therefore, for new drivers that run workflows, the loggings won't be properly published back to the driver because loggings are saved and published based on job_id and the job_id is always the first driver's job_id as the ownership goes like: first_driver -> WorkflowManagmentActor -> workflow executions using remote functions.

To solve this, during workflow execution, we pass the actual driver's job_id along with execution, and re-configure the logging files on each worker that runs the remote functions. Notice that we need to do this in multiple places as a workflow task is executed with more than one remote functions that are running in different workers.
This commit is contained in:
Linsong Chu 2022-05-02 22:53:57 -04:00 committed by GitHub
parent 0b03e4f549
commit e8fc66af34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 240 additions and 50 deletions

View file

@ -177,7 +177,8 @@ class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler):
os.dup2(self.stream.fileno(), self.get_original_stream().fileno()) os.dup2(self.stream.fileno(), self.get_original_stream().fileno())
def get_worker_log_file_name(worker_type): def get_worker_log_file_name(worker_type, job_id=None):
if job_id is None:
job_id = os.environ.get("RAY_JOB_ID") job_id = os.environ.get("RAY_JOB_ID")
if worker_type == "WORKER": if worker_type == "WORKER":
assert job_id is not None, ( assert job_id is not None, (

View file

@ -77,8 +77,9 @@ def run(
# ensures caller of 'run()' holds the reference to the workflow # ensures caller of 'run()' holds the reference to the workflow
# result. Otherwise if the actor removes the reference of the # result. Otherwise if the actor removes the reference of the
# workflow output, the caller may fail to resolve the result. # workflow output, the caller may fail to resolve the result.
job_id = ray.get_runtime_context().job_id.hex()
result: "WorkflowExecutionResult" = ray.get( result: "WorkflowExecutionResult" = ray.get(
workflow_manager.run_or_resume.remote(workflow_id, ignore_existing) workflow_manager.run_or_resume.remote(job_id, workflow_id, ignore_existing)
) )
if not is_growing: if not is_growing:
return flatten_workflow_output(workflow_id, result.persisted_output) return flatten_workflow_output(workflow_id, result.persisted_output)
@ -95,8 +96,11 @@ def resume(workflow_id: str) -> ray.ObjectRef:
# ensures caller of 'run()' holds the reference to the workflow # ensures caller of 'run()' holds the reference to the workflow
# result. Otherwise if the actor removes the reference of the # result. Otherwise if the actor removes the reference of the
# workflow output, the caller may fail to resolve the result. # workflow output, the caller may fail to resolve the result.
job_id = ray.get_runtime_context().job_id.hex()
result: "WorkflowExecutionResult" = ray.get( result: "WorkflowExecutionResult" = ray.get(
workflow_manager.run_or_resume.remote(workflow_id, ignore_existing=False) workflow_manager.run_or_resume.remote(
job_id, workflow_id, ignore_existing=False
)
) )
logger.info(f"Workflow job {workflow_id} resumed.") logger.info(f"Workflow job {workflow_id} resumed.")
return flatten_workflow_output(workflow_id, result.persisted_output) return flatten_workflow_output(workflow_id, result.persisted_output)
@ -195,8 +199,9 @@ def resume_all(with_failed: bool) -> List[Tuple[str, ray.ObjectRef]]:
async def _resume_one(wid: str) -> Tuple[str, Optional[ray.ObjectRef]]: async def _resume_one(wid: str) -> Tuple[str, Optional[ray.ObjectRef]]:
try: try:
job_id = ray.get_runtime_context().job_id.hex()
result: "WorkflowExecutionResult" = ( result: "WorkflowExecutionResult" = (
await workflow_manager.run_or_resume.remote(wid) await workflow_manager.run_or_resume.remote(job_id, wid)
) )
obj = flatten_workflow_output(wid, result.persisted_output) obj = flatten_workflow_output(wid, result.persisted_output)
return wid, obj return wid, obj

View file

@ -166,8 +166,12 @@ def _construct_resume_workflow_from_step(
@ray.remote(num_returns=2) @ray.remote(num_returns=2)
def _resume_workflow_step_executor( def _resume_workflow_step_executor(
workflow_id: str, step_id: "StepID", current_output: [ray.ObjectRef] job_id: str,
workflow_id: str,
step_id: "StepID",
current_output: [ray.ObjectRef],
) -> Tuple[ray.ObjectRef, ray.ObjectRef]: ) -> Tuple[ray.ObjectRef, ray.ObjectRef]:
with workflow_context.workflow_logging_context(job_id):
# TODO (yic): We need better dependency management for virtual actor # TODO (yic): We need better dependency management for virtual actor
# The current output will always be empty for normal workflow # The current output will always be empty for normal workflow
# For virtual actor, if it's not empty, it means the previous job is # For virtual actor, if it's not empty, it means the previous job is
@ -190,13 +194,14 @@ def _resume_workflow_step_executor(
): ):
from ray.workflow.step_executor import execute_workflow from ray.workflow.step_executor import execute_workflow
result = execute_workflow(r) result = execute_workflow(job_id, r)
return result.persisted_output, result.volatile_output return result.persisted_output, result.volatile_output
assert isinstance(r, StepID) assert isinstance(r, StepID)
return wf_store.load_step_output(r), None return wf_store.load_step_output(r), None
def resume_workflow_step( def resume_workflow_step(
job_id: str,
workflow_id: str, workflow_id: str,
step_id: "StepID", step_id: "StepID",
current_output: Optional[ray.ObjectRef], current_output: Optional[ray.ObjectRef],
@ -204,6 +209,8 @@ def resume_workflow_step(
"""Resume a step of a workflow. """Resume a step of a workflow.
Args: Args:
job_id: The ID of the job that submits the workflow execution. The ID
is used to identify the submitter of the workflow.
workflow_id: The ID of the workflow job. The ID is used to identify workflow_id: The ID of the workflow job. The ID is used to identify
the workflow. the workflow.
step_id: The step to resume in the workflow. step_id: The step to resume in the workflow.
@ -220,7 +227,7 @@ def resume_workflow_step(
current_output = [current_output] current_output = [current_output]
persisted_output, volatile_output = _resume_workflow_step_executor.remote( persisted_output, volatile_output = _resume_workflow_step_executor.remote(
workflow_id, step_id, current_output job_id, workflow_id, step_id, current_output
) )
persisted_output = WorkflowStaticRef.from_output(step_id, persisted_output) persisted_output = WorkflowStaticRef.from_output(step_id, persisted_output)
volatile_output = WorkflowStaticRef.from_output(step_id, volatile_output) volatile_output = WorkflowStaticRef.from_output(step_id, volatile_output)

View file

@ -47,7 +47,7 @@ def _resolve_static_workflow_ref(workflow_ref: WorkflowStaticRef):
return workflow_ref return workflow_ref
def _resolve_dynamic_workflow_refs(workflow_refs: "List[WorkflowRef]"): def _resolve_dynamic_workflow_refs(job_id, workflow_refs: "List[WorkflowRef]"):
"""Get the output of a workflow step with the step ID at runtime. """Get the output of a workflow step with the step ID at runtime.
We lookup the output by the following order: We lookup the output by the following order:
@ -86,14 +86,14 @@ def _resolve_dynamic_workflow_refs(workflow_refs: "List[WorkflowRef]"):
f"Current step: '{current_step_id}'" f"Current step: '{current_step_id}'"
) )
step_ref = recovery.resume_workflow_step( step_ref = recovery.resume_workflow_step(
workflow_id, workflow_ref.step_id, None job_id, workflow_id, workflow_ref.step_id, None
).persisted_output ).persisted_output
output = _resolve_static_workflow_ref(step_ref) output = _resolve_static_workflow_ref(step_ref)
workflow_ref_mapping.append(output) workflow_ref_mapping.append(output)
return workflow_ref_mapping return workflow_ref_mapping
def _execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult": def _execute_workflow(job_id, workflow: "Workflow") -> "WorkflowExecutionResult":
"""Internal function of workflow execution.""" """Internal function of workflow execution."""
if workflow.executed: if workflow.executed:
return workflow.result return workflow.result
@ -138,7 +138,7 @@ def _execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult":
extra_options = w.data.step_options.ray_options extra_options = w.data.step_options.ray_options
# The input workflow is not a reference to an executed # The input workflow is not a reference to an executed
# workflow. # workflow.
static_ref = execute_workflow(w).persisted_output static_ref = execute_workflow(job_id, w).persisted_output
static_ref._resolve_like_object_ref_in_args = extra_options.get( static_ref._resolve_like_object_ref_in_args = extra_options.get(
"_resolve_like_object_ref_in_args", False "_resolve_like_object_ref_in_args", False
) )
@ -148,6 +148,7 @@ def _execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult":
args=inputs.args, args=inputs.args,
workflow_outputs=workflow_outputs, workflow_outputs=workflow_outputs,
workflow_refs=inputs.workflow_refs, workflow_refs=inputs.workflow_refs,
job_id=job_id,
) )
# Stage 2: match executors # Stage 2: match executors
@ -185,6 +186,7 @@ def _execute_workflow(workflow: "Workflow") -> "WorkflowExecutionResult":
persisted_output, volatile_output = executor( persisted_output, volatile_output = executor(
workflow_data.func_body, workflow_data.func_body,
step_context, step_context,
job_id,
workflow.step_id, workflow.step_id,
baked_inputs, baked_inputs,
workflow_data.step_options, workflow_data.step_options,
@ -221,7 +223,7 @@ class InplaceReturnedWorkflow:
context: Dict context: Dict
def execute_workflow(workflow: Workflow) -> "WorkflowExecutionResult": def execute_workflow(job_id, workflow: Workflow) -> "WorkflowExecutionResult":
"""Execute workflow. """Execute workflow.
This function also performs tail-recursion optimization for inplace This function also performs tail-recursion optimization for inplace
@ -236,7 +238,7 @@ def execute_workflow(workflow: Workflow) -> "WorkflowExecutionResult":
context = {} context = {}
while True: while True:
with workflow_context.fork_workflow_step_context(**context): with workflow_context.fork_workflow_step_context(**context):
result = _execute_workflow(workflow) result = _execute_workflow(job_id, workflow)
if not isinstance(result.persisted_output, InplaceReturnedWorkflow): if not isinstance(result.persisted_output, InplaceReturnedWorkflow):
break break
workflow = result.persisted_output.workflow workflow = result.persisted_output.workflow
@ -414,6 +416,7 @@ def _wrap_run(
def _workflow_step_executor( def _workflow_step_executor(
func: Callable, func: Callable,
context: "WorkflowStepContext", context: "WorkflowStepContext",
job_id: str,
step_id: "StepID", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs", baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions", runtime_options: "WorkflowStepRuntimeOptions",
@ -511,7 +514,7 @@ def _workflow_step_executor(
with workflow_context.fork_workflow_step_context( with workflow_context.fork_workflow_step_context(
outer_most_step_id=outer_most_step_id outer_most_step_id=outer_most_step_id
): ):
result = execute_workflow(sub_workflow) result = execute_workflow(job_id, sub_workflow)
# When virtual actor returns a workflow in the method, # When virtual actor returns a workflow in the method,
# the volatile_output and persisted_output will be put together # the volatile_output and persisted_output will be put together
persisted_output = result.persisted_output persisted_output = result.persisted_output
@ -536,19 +539,22 @@ def _workflow_step_executor(
def _workflow_step_executor_remote( def _workflow_step_executor_remote(
func: Callable, func: Callable,
context: "WorkflowStepContext", context: "WorkflowStepContext",
job_id: str,
step_id: "StepID", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs", baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions", runtime_options: "WorkflowStepRuntimeOptions",
) -> Any: ) -> Any:
"""The remote version of '_workflow_step_executor'.""" """The remote version of '_workflow_step_executor'."""
with workflow_context.workflow_logging_context(job_id):
return _workflow_step_executor( return _workflow_step_executor(
func, context, step_id, baked_inputs, runtime_options func, context, job_id, step_id, baked_inputs, runtime_options
) )
def _workflow_wait_executor( def _workflow_wait_executor(
func: Callable, func: Callable,
context: "WorkflowStepContext", context: "WorkflowStepContext",
job_id: str,
step_id: "StepID", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs", baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions", runtime_options: "WorkflowStepRuntimeOptions",
@ -590,13 +596,15 @@ def _workflow_wait_executor(
def _workflow_wait_executor_remote( def _workflow_wait_executor_remote(
func: Callable, func: Callable,
context: "WorkflowStepContext", context: "WorkflowStepContext",
job_id: str,
step_id: "StepID", step_id: "StepID",
baked_inputs: "_BakedWorkflowInputs", baked_inputs: "_BakedWorkflowInputs",
runtime_options: "WorkflowStepRuntimeOptions", runtime_options: "WorkflowStepRuntimeOptions",
) -> Any: ) -> Any:
"""The remote version of '_workflow_wait_executor'""" """The remote version of '_workflow_wait_executor'"""
with workflow_context.workflow_logging_context(job_id):
return _workflow_wait_executor( return _workflow_wait_executor(
func, context, step_id, baked_inputs, runtime_options func, context, job_id, step_id, baked_inputs, runtime_options
) )
@ -619,6 +627,7 @@ class _BakedWorkflowInputs:
args: "ObjectRef" args: "ObjectRef"
workflow_outputs: "List[WorkflowStaticRef]" workflow_outputs: "List[WorkflowStaticRef]"
workflow_refs: "List[WorkflowRef]" workflow_refs: "List[WorkflowRef]"
job_id: str
def resolve(self) -> Tuple[List, Dict]: def resolve(self) -> Tuple[List, Dict]:
""" """
@ -647,7 +656,9 @@ class _BakedWorkflowInputs:
obj = _resolve_static_workflow_ref(static_workflow_ref) obj = _resolve_static_workflow_ref(static_workflow_ref)
objects_mapping.append(obj) objects_mapping.append(obj)
workflow_ref_mapping = _resolve_dynamic_workflow_refs(self.workflow_refs) workflow_ref_mapping = _resolve_dynamic_workflow_refs(
self.job_id, self.workflow_refs
)
with serialization_context.workflow_args_resolving_context( with serialization_context.workflow_args_resolving_context(
objects_mapping, workflow_ref_mapping objects_mapping, workflow_ref_mapping
@ -695,6 +706,7 @@ class _BakedWorkflowInputs:
self.args, self.args,
self.workflow_outputs, self.workflow_outputs,
self.workflow_refs, self.workflow_refs,
self.job_id,
) )

View file

@ -0,0 +1,128 @@
from ray._private.test_utils import run_string_as_driver_nonblocking
def test_basic_workflow_logs(workflow_start_regular):
from ray.internal.storage import _storage_uri
script = f"""
import ray
from ray import workflow
ray.init(address='auto', storage="{_storage_uri}")
@workflow.step(name="f")
def f():
return 10
f.step().run("wid")
"""
proc = run_string_as_driver_nonblocking(script)
logs = proc.stdout.read().decode("ascii") + proc.stderr.read().decode("ascii")
# on driver
assert 'Workflow job created. [id="wid"' in logs
# # in WorkflowManagementActor's run_or_resume.remote()
# assert "run_or_resume: wid" in logs
# assert "Workflow job [id=wid] started." in logs
# in _workflow_step_executor_remote
assert "Step status [RUNNING]\t[wid@f" in logs
assert "Step status [SUCCESSFUL]\t[wid@f" in logs
def test_chained_workflow_logs(workflow_start_regular):
from ray.internal.storage import _storage_uri
script = f"""
import ray
from ray import workflow
ray.init(address='auto', storage="{_storage_uri}")
@workflow.step(name="f1")
def f1():
return 10
@workflow.step(name="f2")
def f2(x):
return x+1
f2.step(f1.step()).run("wid1")
"""
proc = run_string_as_driver_nonblocking(script)
logs = proc.stdout.read().decode("ascii") + proc.stderr.read().decode("ascii")
# on driver
assert 'Workflow job created. [id="wid1"' in logs
# # in WorkflowManagementActor's run_or_resume.remote()
# assert "run_or_resume: wid1" in logs
# assert "Workflow job [id=wid1] started." in logs
# in _workflow_step_executor_remote
assert "Step status [RUNNING]\t[wid1@f1" in logs
assert "Step status [SUCCESSFUL]\t[wid1@f1" in logs
assert "Step status [RUNNING]\t[wid1@f2" in logs
assert "Step status [SUCCESSFUL]\t[wid1@f2" in logs
def test_dynamic_workflow_logs(workflow_start_regular):
from ray.internal.storage import _storage_uri
script = f"""
import ray
from ray import workflow
ray.init(address='auto', storage="{_storage_uri}")
@workflow.step(name="f3")
def f3(x):
return x+1
@workflow.step(name="f4")
def f4(x):
return f3.step(x*2)
f4.step(10).run("wid2")
"""
proc = run_string_as_driver_nonblocking(script)
logs = proc.stdout.read().decode("ascii") + proc.stderr.read().decode("ascii")
# on driver
assert 'Workflow job created. [id="wid2"' in logs
# # in WorkflowManagementActor's run_or_resume.remote()
# assert "run_or_resume: wid2" in logs
# assert "Workflow job [id=wid2] started." in logs
# in _workflow_step_executor_remote
assert "Step status [RUNNING]\t[wid2@f3" in logs
assert "Step status [SUCCESSFUL]\t[wid2@f3" in logs
assert "Step status [RUNNING]\t[wid2@f4" in logs
assert "Step status [SUCCESSFUL]\t[wid2@f4" in logs
def test_virtual_actor_logs(workflow_start_regular):
from ray.internal.storage import _storage_uri
script = f"""
import ray
from ray import workflow
ray.init(address='auto', storage="{_storage_uri}")
@workflow.virtual_actor
class Counter:
def __init__(self, x: int):
self.x = x
def add(self, y):
self.x += y
return self.x
couter = Counter.get_or_create("vid", 10)
couter.add.options(name="add").run(1)
"""
proc = run_string_as_driver_nonblocking(script)
logs = proc.stdout.read().decode("ascii") + proc.stderr.read().decode("ascii")
print(logs)
# on driver
assert 'Workflow job created. [id="vid"' in logs
# # in WorkflowManagementActor's run_or_resume.remote()
# assert "run_or_resume: vid" in logs
# assert "Workflow job [id=vid] started." in logs
# in _workflow_step_executor_remote
assert "Step status [RUNNING]\t[vid@add" in logs
assert "Step status [SUCCESSFUL]\t[vid@add" in logs

View file

@ -141,8 +141,9 @@ class ActorMethod(ActorMethodBase):
"actor.method.run()' instead." "actor.method.run()' instead."
) )
try: try:
job_id = ray.get_runtime_context().job_id.hex()
return actor._actor_method_call( return actor._actor_method_call(
self._method_helper, args=args, kwargs=kwargs job_id, self._method_helper, args=args, kwargs=kwargs
) )
except TypeError as exc: # capture a friendlier stacktrace except TypeError as exc: # capture a friendlier stacktrace
raise TypeError( raise TypeError(
@ -518,7 +519,8 @@ class VirtualActor:
workflow_storage = WorkflowStorage(self._actor_id) workflow_storage = WorkflowStorage(self._actor_id)
workflow_storage.save_actor_class_body(self._metadata.cls) workflow_storage.save_actor_class_body(self._metadata.cls)
method_helper = self._metadata.methods["__init__"] method_helper = self._metadata.methods["__init__"]
ref = self._actor_method_call(method_helper, args, kwargs) job_id = ray.get_runtime_context().job_id.hex()
ref = self._actor_method_call(job_id, method_helper, args, kwargs)
workflow_manager = get_or_create_management_actor() workflow_manager = get_or_create_management_actor()
# keep the ref in a list to prevent dereference # keep the ref in a list to prevent dereference
ray.get(workflow_manager.init_actor.remote(self._actor_id, [ref])) ray.get(workflow_manager.init_actor.remote(self._actor_id, [ref]))
@ -541,12 +543,12 @@ class VirtualActor:
raise AttributeError(f"No method with name '{method_name}'") raise AttributeError(f"No method with name '{method_name}'")
def _actor_method_call( def _actor_method_call(
self, method_helper: _VirtualActorMethodHelper, args, kwargs self, job_id, method_helper: _VirtualActorMethodHelper, args, kwargs
) -> "ObjectRef": ) -> "ObjectRef":
with workflow_context.workflow_step_context(self._actor_id): with workflow_context.workflow_step_context(self._actor_id):
wf = method_helper.step(*args, **kwargs) wf = method_helper.step(*args, **kwargs)
if method_helper.readonly: if method_helper.readonly:
return execute_workflow(wf).volatile_output.ref return execute_workflow(job_id, wf).volatile_output.ref
else: else:
return wf.run_async(self._actor_id) return wf.run_async(self._actor_id)

View file

@ -156,11 +156,12 @@ class WorkflowManagementActor:
return None return None
def run_or_resume( def run_or_resume(
self, workflow_id: str, ignore_existing: bool = False self, job_id: str, workflow_id: str, ignore_existing: bool = False
) -> "WorkflowExecutionResult": ) -> "WorkflowExecutionResult":
"""Run or resume a workflow. """Run or resume a workflow.
Args: Args:
job_id: The ID of the job that submits the workflow execution.
workflow_id: The ID of the workflow. workflow_id: The ID of the workflow.
ignore_existing: Ignore we already have an existing output. When ignore_existing: Ignore we already have an existing output. When
set false, raise an exception if there has already been a workflow set false, raise an exception if there has already been a workflow
@ -181,7 +182,9 @@ class WorkflowManagementActor:
current_output = self._workflow_outputs[workflow_id].output current_output = self._workflow_outputs[workflow_id].output
except KeyError: except KeyError:
current_output = None current_output = None
result = recovery.resume_workflow_step(workflow_id, step_id, current_output) result = recovery.resume_workflow_step(
job_id, workflow_id, step_id, current_output
)
latest_output = LatestWorkflowOutput( latest_output = LatestWorkflowOutput(
result.persisted_output, workflow_id, step_id result.persisted_output, workflow_id, step_id
) )

View file

@ -3,7 +3,9 @@ from dataclasses import dataclass, field
import logging import logging
from typing import Optional, List, TYPE_CHECKING from typing import Optional, List, TYPE_CHECKING
from contextlib import contextmanager from contextlib import contextmanager
import ray
from ray.workflow.common import WorkflowStatus from ray.workflow.common import WorkflowStatus
from ray._private.ray_logging import get_worker_log_file_name, configure_log_file
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -186,3 +188,33 @@ def in_workflow_execution() -> bool:
"""Whether we are in workflow step execution.""" """Whether we are in workflow step execution."""
global _in_workflow_execution global _in_workflow_execution
return _in_workflow_execution return _in_workflow_execution
@contextmanager
def workflow_logging_context(job_id) -> None:
"""Initialize the workflow logging context.
Workflow executions are running as remote functions from
WorkflowManagementActor. Without logging redirection, workflow
inner execution logs will be pushed to the driver that initially
created WorkflowManagementActor rather than the driver that
actually submits the current workflow execution.
We use this conext manager to re-configure the log files to send
the logs to the correct driver, and to restore the log files once
the execution is done.
Args:
job_id: The ID of the job that submits the workflow execution.
"""
node = ray.worker._global_node
original_out_file, original_err_file = node.get_log_file_handles(
get_worker_log_file_name("WORKER")
)
out_file, err_file = node.get_log_file_handles(
get_worker_log_file_name("WORKER", job_id)
)
try:
configure_log_file(out_file, err_file)
yield
finally:
configure_log_file(original_out_file, original_err_file)