[tune] stdout/stderr logging redirection (#9817)

* Add `log_to_file` parameter, pass to Trainable config, redirect stdout/stderr.

* Add logging handler to root ray logger

* Added test for `log_to_file` parameter

* Added logs, reuse test

* Revert debug change

* Update logdir on reset, flush streams after each train() step

* Remove magic keys from visible config

Co-authored-by: Kai Fricke <kai@anyscale.com>
This commit is contained in:
krfricke 2020-08-03 20:18:34 +02:00 committed by GitHub
parent 9089fab0ef
commit c741d1cf9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 346 additions and 29 deletions

View file

@ -431,6 +431,58 @@ If a string is provided, then it must include replacement fields ``{source}`` an
By default, syncing occurs every 300 seconds. To change the frequency of syncing, set the ``TUNE_CLOUD_SYNC_S`` environment variable in the driver to the desired syncing period. Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the ``global_checkpoint_period`` argument. So the true upload period is given by ``max(TUNE_CLOUD_SYNC_S, global_checkpoint_period)``.
.. _tune-log_to_file:
Redirecting stdout and stderr to files
--------------------------------------
The stdout and stderr streams are usually printed to the console. For remote actors,
Ray collects these logs and prints them to the head process, as long as it
has been initialized with ``log_to_driver=True``, which is the default.
However, if you would like to collect the stream outputs in files for later
analysis or troubleshooting, Tune offers an utility parameter, ``log_to_file``,
for this.
By passing ``log_to_file=True`` to ``tune.run()``, stdout and stderr will be logged
to ``trial_logdir/stdout`` and ``trial_logdir/stderr``, respectively:
.. code-block:: python
tune.run(
trainable,
log_to_file=True)
If you would like to specify the output files, you can either pass one filename,
where the combined output will be stored, or two filenames, for stdout and stderr,
respectively:
.. code-block:: python
tune.run(
trainable,
log_to_file="std_combined.log")
tune.run(
trainable,
log_to_file=("my_stdout.log", "my_stderr.log"))
The file names are relative to the trial's logdir. You can pass absolute paths,
too.
If ``log_to_file`` is set, Tune will automatically register a new logging handler
for Ray's base logger and log the output to the specified stderr output file.
Setting ``log_to_file`` does not disable logging to the driver. If you would
like to disable the logs showing up in the driver output (i.e. they should only
show up in the logfiles), initialize Ray accordingly:
.. code-block:: python
ray.init(log_to_driver=False)
tune.run(
trainable,
log_to_file=True)
.. _tune-debugging:
Debugging

View file

@ -14,7 +14,7 @@ from ray.resource_spec import ResourceSpec
from ray.tune.durable_trainable import DurableTrainable
from ray.tune.error import AbortTrialExecution, TuneError
from ray.tune.logger import NoopLogger
from ray.tune.result import TRIAL_INFO
from ray.tune.result import TRIAL_INFO, LOGDIR_PATH
from ray.tune.resources import Resources
from ray.tune.trainable import TrainableUtil
from ray.tune.trial import Trial, Checkpoint, Location, TrialInfo
@ -161,10 +161,11 @@ class RayTrialExecutor(TrialExecutor):
def logger_creator(config):
# Set the working dir in the remote process, for user file writes
os.makedirs(remote_logdir, exist_ok=True)
logdir = config.pop(LOGDIR_PATH, remote_logdir)
os.makedirs(logdir, exist_ok=True)
if not ray.worker._mode() == ray.worker.LOCAL_MODE:
os.chdir(remote_logdir)
return NoopLogger(config, remote_logdir)
os.chdir(logdir)
return NoopLogger(config, logdir)
# Clear the Trial's location (to be updated later on result)
# since we don't know where the remote runner is placed.
@ -339,7 +340,7 @@ class RayTrialExecutor(TrialExecutor):
super(RayTrialExecutor, self).pause_trial(trial)
def reset_trial(self, trial, new_config, new_experiment_tag):
"""Tries to invoke `Trainable.reset_config()` to reset trial.
"""Tries to invoke `Trainable.reset()` to reset trial.
Args:
trial (Trial): Trial to be reset.
@ -353,14 +354,13 @@ class RayTrialExecutor(TrialExecutor):
trial.config = new_config
trainable = trial.runner
with self._change_working_directory(trial):
with warn_if_slow("reset_config"):
with warn_if_slow("reset"):
try:
reset_val = ray.get(
trainable.reset_config.remote(new_config),
trainable.reset.remote(new_config, trial.logdir),
DEFAULT_GET_TIMEOUT)
except RayTimeoutError:
logger.exception("Trial %s: reset_config timed out.",
trial)
logger.exception("Trial %s: reset timed out.", trial)
return False
return reset_val

View file

@ -69,6 +69,15 @@ RESULT_DUPLICATE = "__duplicate__"
# to the Trainable via the constructor.
TRIAL_INFO = "__trial_info__"
# __stdout_file__/__stderr_file__ are magic keywords used internally
# to pass log file locations to the Trainable via the constructor.
STDOUT_FILE = "__stdout_file__"
STDERR_FILE = "__stderr_file__"
# __logdir_path__ is a magic keyword used internally to pass a new
# logdir to existing loggers.
LOGDIR_PATH = "__logdir_path__"
# Where Tune writes result files by default
DEFAULT_RESULTS_DIR = (os.environ.get("TEST_TMPDIR")
or os.environ.get("TUNE_RESULT_DIR")

View file

@ -1,7 +1,10 @@
import os
import unittest
import sys
import ray
from ray.tune import Trainable, run_experiments
from ray import tune, logger
from ray.tune import Trainable, run_experiments, register_trainable
from ray.tune.error import TuneError
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
@ -17,9 +20,15 @@ def create_resettable_class():
self.config = config
self.num_resets = 0
self.iter = 0
self.msg = config.get("message", "No message")
def step(self):
self.iter += 1
print("PRINT_STDOUT: {}".format(self.msg))
print("PRINT_STDERR: {}".format(self.msg), file=sys.stderr)
logger.info("LOG_STDERR: {}".format(self.msg))
return {"num_resets": self.num_resets, "done": self.iter > 1}
def save_checkpoint(self, chkpt_dir):
@ -32,6 +41,7 @@ def create_resettable_class():
if "fake_reset_not_supported" in self.config:
return False
self.num_resets += 1
self.msg = new_config.get("message", "No message")
return True
return MyResettableClass
@ -90,8 +100,50 @@ class ActorReuseTest(unittest.TestCase):
self.assertRaises(TuneError, lambda: run())
def testTrialReuseLogToFile(self):
register_trainable("foo2", create_resettable_class())
# Log to default files
[trial1, trial2] = tune.run(
"foo2",
config={
"message": tune.grid_search(["First", "Second"])
},
log_to_file=True,
scheduler=FrequentPausesScheduler(),
reuse_actors=True).trials
# Check trial 1
self.assertEqual(trial1.last_result["num_resets"], 1)
self.assertTrue(os.path.exists(os.path.join(trial1.logdir, "stdout")))
self.assertTrue(os.path.exists(os.path.join(trial1.logdir, "stderr")))
with open(os.path.join(trial1.logdir, "stdout"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDOUT: First", content)
self.assertNotIn("PRINT_STDOUT: Second", content)
with open(os.path.join(trial1.logdir, "stderr"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDERR: First", content)
self.assertIn("LOG_STDERR: First", content)
self.assertNotIn("PRINT_STDERR: Second", content)
self.assertNotIn("LOG_STDERR: Second", content)
# Check trial 2
self.assertEqual(trial2.last_result["num_resets"], 2)
self.assertTrue(os.path.exists(os.path.join(trial2.logdir, "stdout")))
self.assertTrue(os.path.exists(os.path.join(trial2.logdir, "stderr")))
with open(os.path.join(trial2.logdir, "stdout"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDOUT: Second", content)
self.assertNotIn("PRINT_STDOUT: First", content)
with open(os.path.join(trial2.logdir, "stderr"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDERR: Second", content)
self.assertIn("LOG_STDERR: Second", content)
self.assertNotIn("PRINT_STDERR: First", content)
self.assertNotIn("LOG_STDERR: First", content)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -973,6 +973,64 @@ class TrainableFunctionApiTest(unittest.TestCase):
self.assertEqual(trial.status, Trial.TERMINATED)
self.assertTrue(trial.has_checkpoint())
def testLogToFile(self):
def train(config, reporter):
import sys
from ray import logger
for i in range(10):
reporter(timesteps_total=i)
print("PRINT_STDOUT")
print("PRINT_STDERR", file=sys.stderr)
logger.info("LOG_STDERR")
register_trainable("f1", train)
# Do not log to file
[trial] = tune.run("f1", log_to_file=False).trials
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout")))
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr")))
# Log to default files
[trial] = tune.run("f1", log_to_file=True).trials
self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stdout")))
self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stderr")))
with open(os.path.join(trial.logdir, "stdout"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDOUT", content)
with open(os.path.join(trial.logdir, "stderr"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDERR", content)
self.assertIn("LOG_STDERR", content)
# Log to one file
[trial] = tune.run("f1", log_to_file="combined").trials
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout")))
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr")))
self.assertTrue(os.path.exists(os.path.join(trial.logdir, "combined")))
with open(os.path.join(trial.logdir, "combined"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDOUT", content)
self.assertIn("PRINT_STDERR", content)
self.assertIn("LOG_STDERR", content)
# Log to two files
[trial] = tune.run(
"f1", log_to_file=("alt.stdout", "alt.stderr")).trials
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout")))
self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr")))
self.assertTrue(
os.path.exists(os.path.join(trial.logdir, "alt.stdout")))
self.assertTrue(
os.path.exists(os.path.join(trial.logdir, "alt.stderr")))
with open(os.path.join(trial.logdir, "alt.stdout"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDOUT", content)
with open(os.path.join(trial.logdir, "alt.stderr"), "rt") as fp:
content = fp.read()
self.assertIn("PRINT_STDERR", content)
self.assertIn("LOG_STDERR", content)
if __name__ == "__main__":
import pytest

View file

@ -1,3 +1,5 @@
import sys
from contextlib import redirect_stdout, redirect_stderr
from datetime import datetime
import copy
@ -7,7 +9,9 @@ import glob
import os
import pickle
import platform
import pandas as pd
from ray.tune.utils.util import Tee
from six import string_types
import shutil
import tempfile
@ -17,10 +21,10 @@ import uuid
import ray
from ray.util.debug import log_once
from ray.tune.logger import UnifiedLogger
from ray.tune.result import (DEFAULT_RESULTS_DIR, TIME_THIS_ITER_S,
TIMESTEPS_THIS_ITER, DONE, TIMESTEPS_TOTAL,
EPISODES_THIS_ITER, EPISODES_TOTAL,
TRAINING_ITERATION, RESULT_DUPLICATE, TRIAL_INFO)
from ray.tune.result import (
DEFAULT_RESULTS_DIR, TIME_THIS_ITER_S, TIMESTEPS_THIS_ITER, DONE,
TIMESTEPS_TOTAL, EPISODES_THIS_ITER, EPISODES_TOTAL, TRAINING_ITERATION,
RESULT_DUPLICATE, TRIAL_INFO, STDOUT_FILE, STDERR_FILE, LOGDIR_PATH)
from ray.tune.utils import UtilMonitor
logger = logging.getLogger(__name__)
@ -216,16 +220,17 @@ class Trainable:
self.config = config or {}
trial_info = self.config.pop(TRIAL_INFO, None)
if logger_creator:
self._result_logger = logger_creator(self.config)
self._logdir = self._result_logger.logdir
else:
logdir_prefix = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
ray.utils.try_to_create_directory(DEFAULT_RESULTS_DIR)
self._logdir = tempfile.mkdtemp(
prefix=logdir_prefix, dir=DEFAULT_RESULTS_DIR)
self._result_logger = UnifiedLogger(
self.config, self._logdir, loggers=None)
self._logger_creator = logger_creator
self._result_logger = self._logdir = None
self._create_logger(self.config)
self._stdout_context = self._stdout_fp = self._stdout_stream = None
self._stderr_context = self._stderr_fp = self._stderr_stream = None
self._stderr_logging_handler = None
stdout_file = self.config.pop(STDOUT_FILE, None)
stderr_file = self.config.pop(STDERR_FILE, None)
self._open_logfiles(stdout_file, stderr_file)
self._iteration = 0
self._time_total = 0.0
@ -389,6 +394,11 @@ class Trainable:
self.log_result(result)
if self._stdout_stream:
self._stdout_stream.flush()
if self._stderr_stream:
self._stderr_stream.flush()
return result
def get_state(self):
@ -521,13 +531,37 @@ class Trainable:
export_dir = export_dir or self.logdir
return self._export_model(export_formats, export_dir)
def reset(self, new_config, new_logdir):
"""Resets trial for use with new config.
Subclasses should override reset_config() to actually
reset actor behavior for the new config."""
self.config = new_config
logger_config = new_config.copy()
logger_config[LOGDIR_PATH] = new_logdir
self._logdir = new_logdir
self._result_logger.flush()
self._result_logger.close()
self._create_logger(logger_config)
stdout_file = new_config.pop(STDOUT_FILE, None)
stderr_file = new_config.pop(STDERR_FILE, None)
self._close_logfiles()
self._open_logfiles(stdout_file, stderr_file)
return self.reset_config(new_config)
def reset_config(self, new_config):
"""Resets configuration without restarting the trial.
This method is optional, but can be implemented to speed up algorithms
such as PBT, and to allow performance optimizations such as running
experiments with reuse_actors=True. Note that self.config need to
be updated to reflect the latest parameter information in Ray logs.
experiments with reuse_actors=True.
Args:
new_config (dict): Updated hyperparameter configuration
@ -538,6 +572,60 @@ class Trainable:
"""
return False
def _create_logger(self, config):
"""Create logger from logger creator"""
if self._logger_creator:
self._result_logger = self._logger_creator(config)
self._logdir = self._result_logger.logdir
else:
logdir_prefix = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
ray.utils.try_to_create_directory(DEFAULT_RESULTS_DIR)
self._logdir = tempfile.mkdtemp(
prefix=logdir_prefix, dir=DEFAULT_RESULTS_DIR)
self._result_logger = UnifiedLogger(
config, self._logdir, loggers=None)
def _open_logfiles(self, stdout_file, stderr_file):
"""Create loggers. Open stdout and stderr logfiles."""
if stdout_file:
stdout_path = os.path.expanduser(
os.path.join(self._logdir, stdout_file))
self._stdout_fp = open(stdout_path, "a+")
self._stdout_stream = Tee(sys.stdout, self._stdout_fp)
self._stdout_context = redirect_stdout(self._stdout_stream)
self._stdout_context.__enter__()
if stderr_file:
stderr_path = os.path.expanduser(
os.path.join(self._logdir, stderr_file))
self._stderr_fp = open(stderr_path, "a+")
self._stderr_stream = Tee(sys.stderr, self._stderr_fp)
self._stderr_context = redirect_stderr(self._stderr_stream)
self._stderr_context.__enter__()
# Add logging handler to root ray logger
formatter = logging.Formatter("[%(levelname)s %(asctime)s] "
"%(filename)s: %(lineno)d "
"%(message)s")
self._stderr_logging_handler = logging.StreamHandler(
self._stderr_fp)
self._stderr_logging_handler.setFormatter(formatter)
ray.logger.addHandler(self._stderr_logging_handler)
def _close_logfiles(self):
"""Close stdout and stderr logfiles."""
if self._stderr_logging_handler:
ray.logger.removeHandler(self._stderr_logging_handler)
if self._stdout_context:
self._stdout_stream.flush()
self._stdout_context.__exit__(None, None, None)
self._stdout_fp.close()
if self._stderr_context:
self._stderr_stream.flush()
self._stderr_context.__exit__(None, None, None)
self._stderr_fp.close()
def stop(self):
"""Releases all resources used by this trainable.
@ -548,6 +636,8 @@ class Trainable:
self._result_logger.close()
self.cleanup()
self._close_logfiles()
@property
def logdir(self):
"""Directory of the results and checkpoints for this Trainable.

View file

@ -129,7 +129,7 @@ class TrialExecutor:
self.start_trial(trial)
def reset_trial(self, trial, new_config, new_experiment_tag):
"""Tries to invoke `Trainable.reset_config()` to reset trial.
"""Tries to invoke `Trainable.reset()` to reset trial.
Args:
trial (Trial): Trial to be reset.
@ -139,7 +139,7 @@ class TrialExecutor:
for trial.
Returns:
True if `reset_config` is successful else False.
True if `reset` is successful else False.
"""
raise NotImplementedError

View file

@ -1,8 +1,10 @@
import logging
from typing import Sequence
from ray.tune.error import TuneError
from ray.tune.experiment import convert_to_experiment_list, Experiment
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.result import STDOUT_FILE, STDERR_FILE
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.suggest.suggestion import Searcher, SearchGenerator
from ray.tune.trial import Trial
@ -65,6 +67,31 @@ def _report_progress(runner, reporter, done=False):
reporter.report(trials, done, sched_debug_str, executor_debug_str)
def _validate_log_to_file(log_to_file):
"""Validate ``tune.run``'s ``log_to_file`` parameter. Return
validated relative stdout and stderr filenames."""
if not log_to_file:
stdout_file = stderr_file = None
elif isinstance(log_to_file, bool) and log_to_file:
stdout_file = "stdout"
stderr_file = "stderr"
elif isinstance(log_to_file, str):
stdout_file = stderr_file = log_to_file
elif isinstance(log_to_file, Sequence):
if len(log_to_file) != 2:
raise ValueError(
"If you pass a Sequence to `log_to_file` it has to have "
"a length of 2 (for stdout and stderr, respectively). The "
"Sequence you passed has length {}.".format(len(log_to_file)))
stdout_file, stderr_file = log_to_file
else:
raise ValueError(
"You can pass a boolean, a string, or a Sequence of length 2 to "
"`log_to_file`, but you passed something else ({}).".format(
type(log_to_file)))
return stdout_file, stderr_file
def run(run_or_experiment,
name=None,
stop=None,
@ -75,6 +102,7 @@ def run(run_or_experiment,
upload_dir=None,
trial_name_creator=None,
loggers=None,
log_to_file=False,
sync_to_cloud=None,
sync_to_driver=None,
checkpoint_freq=0,
@ -143,6 +171,14 @@ def run(run_or_experiment,
loggers (list): List of logger creators to be used with
each Trial. If None, defaults to ray.tune.logger.DEFAULT_LOGGERS.
See `ray/tune/logger.py`.
log_to_file (bool|str|Sequence): Log stdout and stderr to files in
Tune's trial directories. If this is `False` (default), no files
are written. If `true`, outputs are written to `trialdir/stdout`
and `trialdir/stderr`, respectively. If this is a single string,
this is interpreted as a file relative to the trialdir, to which
both streams are written. If this is a Sequence (e.g. a Tuple),
it has to have length 2 and the elements indicate the files to
which stdout and stderr are written, respectively.
sync_to_cloud (func|str): Function for syncing the local_dir to and
from upload_dir. If string, then it must be a string template that
includes `{source}` and `{target}` for the syncer to run. If not
@ -242,6 +278,8 @@ def run(run_or_experiment,
space = {"lr": tune.uniform(0, 1), "momentum": tune.uniform(0, 1)}
tune.run(my_trainable, config=space, stop={"training_iteration": 10})
"""
config = config or {}
trial_executor = trial_executor or RayTrialExecutor(
queue_trials=queue_trials,
reuse_actors=reuse_actors,
@ -251,6 +289,10 @@ def run(run_or_experiment,
else:
experiments = [run_or_experiment]
stdout_file, stderr_file = _validate_log_to_file(log_to_file)
config[STDOUT_FILE] = stdout_file
config[STDERR_FILE] = stderr_file
for i, exp in enumerate(experiments):
if not isinstance(exp, Experiment):
run_identifier = Experiment.register_if_needed(exp)

View file

@ -137,6 +137,20 @@ class warn_if_slow:
now - self.start)
class Tee(object):
def __init__(self, stream1, stream2):
self.stream1 = stream1
self.stream2 = stream2
def write(self, *args, **kwargs):
self.stream1.write(*args, **kwargs)
self.stream2.write(*args, **kwargs)
def flush(self, *args, **kwargs):
self.stream1.flush(*args, **kwargs)
self.stream2.flush(*args, **kwargs)
def merge_dicts(d1, d2):
"""
Args: