diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 6f3261ce4..8ff397237 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -1,21 +1,16 @@ from __future__ import print_function import collections -import os import sys -from typing import Dict, List, Optional import numpy as np import time -from ray.tune.callback import Callback -from ray.tune.logger import pretty_print from ray.tune.result import (EPISODE_REWARD_MEAN, MEAN_ACCURACY, MEAN_LOSS, TRAINING_ITERATION, TIME_TOTAL_S, TIMESTEPS_TOTAL, AUTO_RESULT_KEYS) -from ray.tune.trial import DEBUG_PRINT_INTERVAL, Trial +from ray.tune.trial import Trial from ray.tune.utils import unflattened_lookup -from ray.tune.utils.log import Verbosity, has_verbosity try: from collections.abc import Mapping @@ -229,19 +224,15 @@ class TuneReporterBase(ProgressReporter): best_trial_str(current_best_trial, metric, self._parameter_columns)) - if has_verbosity(Verbosity.V1_EXPERIMENT): - # Will filter the table in `trial_progress_str` - messages.append( - trial_progress_str( - trials, - metric_columns=self._metric_columns, - parameter_columns=self._parameter_columns, - total_samples=self._total_samples, - fmt=fmt, - max_rows=max_progress, - done=done)) - messages.append( - trial_errors_str(trials, fmt=fmt, max_rows=max_error)) + messages.append( + trial_progress_str( + trials, + metric_columns=self._metric_columns, + parameter_columns=self._parameter_columns, + total_samples=self._total_samples, + fmt=fmt, + max_rows=max_progress)) + messages.append(trial_errors_str(trials, fmt=fmt, max_rows=max_error)) return delim.join(messages) + delim @@ -408,20 +399,12 @@ def memory_debug_str(): "(or ray[debug]) to resolve)") -def _get_trials_by_state(trials): - trials_by_state = collections.defaultdict(list) - for t in trials: - trials_by_state[t.status].append(t) - return trials_by_state - - def trial_progress_str(trials, metric_columns, parameter_columns=None, total_samples=0, fmt="psql", - max_rows=None, - done=False): + max_rows=None): """Returns a human readable message for printing to the console. This contains a table where each row represents a trial, its parameters @@ -449,7 +432,9 @@ def trial_progress_str(trials, return delim.join(messages) num_trials = len(trials) - trials_by_state = _get_trials_by_state(trials) + trials_by_state = collections.defaultdict(list) + for t in trials: + trials_by_state[t.status].append(t) for local_dir in sorted({t.local_dir for t in trials}): messages.append("Result logdir: {}".format(local_dir)) @@ -459,30 +444,6 @@ def trial_progress_str(trials, for state in sorted(trials_by_state) ] - if total_samples and total_samples >= sys.maxsize: - total_samples = "infinite" - - messages.append("Number of trials: {}{} ({})".format( - num_trials, f"/{total_samples}" - if total_samples else "", ", ".join(num_trials_strs))) - - if has_verbosity(Verbosity.V3_TRIAL_DETAILS) or (has_verbosity( - Verbosity.V2_TRIAL_NORM) and done): - messages += trial_progress_table(trials, metric_columns, - parameter_columns, fmt, max_rows) - - return delim.join(messages) - - -def trial_progress_table(trials, - metric_columns, - parameter_columns=None, - fmt="psql", - max_rows=None): - messages = [] - num_trials = len(trials) - trials_by_state = _get_trials_by_state(trials) - state_tbl_order = [ Trial.RUNNING, Trial.PAUSED, Trial.PENDING, Trial.TERMINATED, Trial.ERROR @@ -513,6 +474,13 @@ def trial_progress_table(trials, continue trials += trials_by_state[state] + if total_samples and total_samples >= sys.maxsize: + total_samples = "infinite" + + messages.append("Number of trials: {}{} ({})".format( + num_trials, f"/{total_samples}" + if total_samples else "", ", ".join(num_trials_strs))) + # Pre-process trials to figure out what columns to show. if isinstance(metric_columns, Mapping): metric_keys = list(metric_columns.keys()) @@ -554,7 +522,7 @@ def trial_progress_table(trials, if overflow: messages.append("... {} more trials not shown ({})".format( overflow, overflow_str)) - return messages + return delim.join(messages) def trial_errors_str(trials, fmt="psql", max_rows=None): @@ -653,108 +621,3 @@ def _get_trial_info(trial, parameters, metrics): unflattened_lookup(metric, result, default=None) for metric in metrics ] return trial_info - - -class TrialProgressCallback(Callback): - """Reports (prints) intermediate trial progress. - - This callback is automatically added to the callback stack. When a - result is obtained, this callback will print the results according to - the specified verbosity level. - - For ``Verbosity.V3_TRIAL_DETAILS``, a full result list is printed. - - For ``Verbosity.V2_TRIAL_NORM``, only one line is printed per received - result. - - All other verbosity levels do not print intermediate trial progress. - - Result printing is throttled on a per-trial basis. Per default, results are - printed only once every 30 seconds. Results are always printed when a trial - finished or errored. - - """ - - def __init__(self, metric: Optional[str] = None): - self._last_print = collections.defaultdict(float) - self._completed_trials = set() - self._last_result_str = {} - self._metric = metric - - def on_trial_result(self, iteration: int, trials: List["Trial"], - trial: "Trial", result: Dict, **info): - self.log_result(trial, result, error=False) - - def on_trial_error(self, iteration: int, trials: List["Trial"], - trial: "Trial", **info): - self.log_result(trial, trial.last_result, error=True) - - def on_trial_complete(self, iteration: int, trials: List["Trial"], - trial: "Trial", **info): - # Only log when we never logged that a trial was completed - if trial not in self._completed_trials: - self._completed_trials.add(trial) - - print_result_str = self._print_result(trial.last_result) - last_result_str = self._last_result_str.get(trial, "") - # If this is a new result, print full result string - if print_result_str != last_result_str: - self.log_result(trial, trial.last_result, error=False) - else: - print(f"Trial {trial} completed. " - f"Last result: {print_result_str}") - - def log_result(self, trial: "Trial", result: Dict, error: bool = False): - done = result.get("done", False) is True - last_print = self._last_print[trial] - if done and trial not in self._completed_trials: - self._completed_trials.add(trial) - if has_verbosity(Verbosity.V3_TRIAL_DETAILS) and \ - (done or error or time.time() - last_print > DEBUG_PRINT_INTERVAL): - print("Result for {}:".format(trial)) - print(" {}".format(pretty_print(result).replace("\n", "\n "))) - self._last_print[trial] = time.time() - elif has_verbosity(Verbosity.V2_TRIAL_NORM) and ( - done or error - or time.time() - last_print > DEBUG_PRINT_INTERVAL): - info = "" - if done: - info = " This trial completed." - - metric_name = self._metric or "_metric" - metric_value = result.get(metric_name, -99.) - - print_result_str = self._print_result(result) - - self._last_result_str[trial] = print_result_str - - error_file = os.path.join(trial.logdir, "error.txt") - - if error: - message = f"The trial {trial} errored with " \ - f"parameters={trial.config}. " \ - f"Error file: {error_file}" - elif self._metric: - message = f"Trial {trial} reported " \ - f"{metric_name}={metric_value:.2f} " \ - f"with parameters={trial.config}.{info}" - else: - message = f"Trial {trial} reported " \ - f"{print_result_str} " \ - f"with parameters={trial.config}.{info}" - - print(message) - self._last_print[trial] = time.time() - - def _print_result(self, result: Dict): - print_result = result.copy() - print_result.pop("config", None) - print_result.pop("trial_id", None) - print_result.pop("experiment_tag", None) - print_result.pop("done", None) - for auto_result in AUTO_RESULT_KEYS: - print_result.pop(auto_result, None) - - print_result_str = ",".join( - [f"{k}={v}" for k, v in print_result.items()]) - return print_result_str diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index 664b15941..c297b8cdd 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -73,7 +73,7 @@ tune.run_experiments({ "c": tune.grid_search(list(range(10))), }, }, -}, verbose=3, progress_reporter=reporter)""" +}, verbose=1, progress_reporter=reporter)""" EXPECTED_END_TO_END_START = """Number of trials: 1/30 (1 RUNNING) +---------------+----------+-------+-----+ @@ -160,48 +160,6 @@ EXPECTED_BEST_1 = "Current best trial: 00001 with metric_1=0.5 and " \ EXPECTED_BEST_2 = "Current best trial: 00004 with metric_1=2.0 and " \ "parameters={'a': 4}" -VERBOSE_EXP_OUT_1 = "Number of trials: 1/3 (1 RUNNING)" -VERBOSE_EXP_OUT_2 = "Number of trials: 3/3 (3 TERMINATED)" - -VERBOSE_TRIAL_NORM = "Trial train_xxxxx_00000 reported acc=5 with " + \ - """parameters={'do': 'complete'}. This trial completed. -Trial train_xxxxx_00001 reported _metric=6 with parameters={'do': 'once'}. -Trial train_xxxxx_00001 completed. Last result: _metric=6 -Trial train_xxxxx_00002 reported acc=7 with parameters={'do': 'twice'}. -Trial train_xxxxx_00002 reported acc=8 with parameters={'do': 'twice'}. """ + \ - "This trial completed." - -VERBOSE_TRIAL_DETAIL = """+-------------------+----------+-------+----------+ -| Trial name | status | loc | do | -|-------------------+----------+-------+----------| -| train_xxxxx_00000 | RUNNING | | complete | -+-------------------+----------+-------+----------+""" - -VERBOSE_CMD = """from ray import tune -import random -import numpy as np - - -def train(config): - if config["do"] == "complete": - tune.report(acc=5, done=True) - elif config["do"] == "once": - tune.report(6) - else: - tune.report(acc=7) - tune.report(acc=8) - -random.seed(1234) -np.random.seed(1234) - -tune.run( - train, - config={ - "do": tune.grid_search(["complete", "once", "twice"]) - },""" - -# Add "verbose=3)" etc - class ProgressReporterTest(unittest.TestCase): def mock_trial(self, status, i): @@ -405,64 +363,6 @@ class ProgressReporterTest(unittest.TestCase): finally: del os.environ["_TEST_TUNE_TRIAL_UUID"] - def testVerboseReporting(self): - try: - os.environ["_TEST_TUNE_TRIAL_UUID"] = "xxxxx" - - verbose_0_cmd = VERBOSE_CMD + "verbose=0)" - output = run_string_as_driver(verbose_0_cmd) - try: - assert VERBOSE_EXP_OUT_1 not in output - assert VERBOSE_EXP_OUT_2 not in output - assert VERBOSE_TRIAL_NORM not in output - assert VERBOSE_TRIAL_DETAIL not in output - except Exception: - print("*** BEGIN OUTPUT ***") - print(output) - print("*** END OUTPUT ***") - raise - - verbose_1_cmd = VERBOSE_CMD + "verbose=1)" - output = run_string_as_driver(verbose_1_cmd) - try: - assert VERBOSE_EXP_OUT_1 in output - assert VERBOSE_EXP_OUT_2 in output - assert VERBOSE_TRIAL_NORM not in output - assert VERBOSE_TRIAL_DETAIL not in output - except Exception: - print("*** BEGIN OUTPUT ***") - print(output) - print("*** END OUTPUT ***") - raise - - verbose_2_cmd = VERBOSE_CMD + "verbose=2)" - output = run_string_as_driver(verbose_2_cmd) - try: - assert VERBOSE_EXP_OUT_1 in output - assert VERBOSE_EXP_OUT_2 in output - assert VERBOSE_TRIAL_NORM in output - assert VERBOSE_TRIAL_DETAIL not in output - except Exception: - print("*** BEGIN OUTPUT ***") - print(output) - print("*** END OUTPUT ***") - raise - - verbose_3_cmd = VERBOSE_CMD + "verbose=3)" - output = run_string_as_driver(verbose_3_cmd) - try: - assert VERBOSE_EXP_OUT_1 in output - assert VERBOSE_EXP_OUT_2 in output - assert VERBOSE_TRIAL_NORM not in output - assert VERBOSE_TRIAL_DETAIL in output - except Exception: - print("*** BEGIN OUTPUT ***") - print(output) - print("*** END OUTPUT ***") - raise - finally: - del os.environ["_TEST_TUNE_TRIAL_UUID"] - if __name__ == "__main__": import sys diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 2e9465c83..b775639d3 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -16,6 +16,7 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager # NOTE(rkn): We import ray.tune.registry here instead of importing the names we # need because there are cyclic imports that may cause specific names to not # have been defined yet. See https://github.com/ray-project/ray/issues/1716. +from ray.tune.logger import pretty_print from ray.tune.registry import get_trainable_cls, validate_trainable from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION from ray.tune.resources import Resources, json_to_resources, resources_to_json @@ -229,6 +230,7 @@ class Trial: or not len(self.log_to_file) == 2: self.log_to_file = (None, None) + self.verbose = True self.max_failures = max_failures # Local trial state that is updated during the run @@ -478,7 +480,11 @@ class Trial: def update_last_result(self, result, terminate=False): if self.experiment_tag: result.update(experiment_tag=self.experiment_tag) - + if self.verbose and (terminate or time.time() - self.last_debug > + DEBUG_PRINT_INTERVAL): + print("Result for {}:".format(self)) + print(" {}".format(pretty_print(result).replace("\n", "\n "))) + self.last_debug = time.time() self.set_location(Location(result.get("node_ip"), result.get("pid"))) self.last_result = result self.last_update_time = time.time() @@ -521,6 +527,9 @@ class Trial: def get_trainable_cls(self): return get_trainable_cls(self.trainable_name) + def set_verbose(self, verbose): + self.verbose = verbose + def is_finished(self): return self.status in [Trial.ERROR, Trial.TERMINATED] diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 0fee16613..8fab9450c 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -19,7 +19,6 @@ from ray.tune.trial import Checkpoint, Trial from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.suggest import BasicVariantGenerator from ray.tune.utils import warn_if_slow, flatten_dict, env_integer -from ray.tune.utils.log import Verbosity, has_verbosity from ray.tune.utils.serialization import TuneFunctionDecoder, \ TuneFunctionEncoder from ray.tune.web_server import TuneServer @@ -79,6 +78,8 @@ class TrialRunner: If fail_fast='raise' provided, Tune will automatically raise the exception received by the Trainable. fail_fast='raise' can easily leak resources and should be used with caution. + verbose (bool): Flag for verbosity. If False, trial results + will not be output. checkpoint_period (int): Trial runner checkpoint periodicity in seconds. Defaults to 10. trial_executor (TrialExecutor): Defaults to RayTrialExecutor. @@ -101,6 +102,7 @@ class TrialRunner: resume=False, server_port=None, fail_fast=False, + verbose=True, checkpoint_period=None, trial_executor=None, callbacks=None, @@ -133,6 +135,7 @@ class TrialRunner: else: raise ValueError("fail_fast must be one of {bool, RAISE}. " f"Got {self._fail_fast}.") + self._verbose = verbose self._server = None self._server_port = server_port @@ -162,7 +165,7 @@ class TrialRunner: self.resume(run_errored_only=errored_only) self._resumed = True except Exception as e: - if has_verbosity(Verbosity.V3_TRIAL_DETAILS): + if self._verbose: logger.error(str(e)) logger.exception("Runner restore failed.") if self._fail_fast: @@ -401,6 +404,7 @@ class TrialRunner: Args: trial (Trial): Trial to queue. """ + trial.set_verbose(self._verbose) self._trials.append(trial) with warn_if_slow("scheduler.on_trial_add"): self._scheduler_alg.on_trial_add(self, trial) @@ -558,8 +562,6 @@ class TrialRunner: with warn_if_slow("scheduler.on_trial_result"): decision = self._scheduler_alg.on_trial_result( self, trial, flat_result) - if decision == TrialScheduler.STOP: - result.update(done=True) with warn_if_slow("search_alg.on_trial_result"): self._search_alg.on_trial_result(trial.trial_id, flat_result) @@ -578,6 +580,7 @@ class TrialRunner: iteration=self._iteration, trials=self._trials, trial=trial) + result.update(done=True) if not is_duplicate: trial.update_last_result( diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index b7ce7eb79..3566b95ff 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -18,7 +18,6 @@ from ray.tune.syncer import wait_for_sync, set_sync_periods, \ from ray.tune.trial_runner import TrialRunner from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter from ray.tune.schedulers import FIFOScheduler -from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity logger = logging.getLogger(__name__) @@ -71,7 +70,7 @@ def run( checkpoint_score_attr=None, checkpoint_freq=0, checkpoint_at_end=False, - verbose=Verbosity.V3_TRIAL_DETAILS, + verbose=2, progress_reporter=None, log_to_file=False, trial_name_creator=None, @@ -189,9 +188,8 @@ def run( checkpoint_at_end (bool): Whether to checkpoint at the end of the experiment regardless of the checkpoint_freq. Default is False. This has no effect when using the Functional Training API. - verbose (int): 0, 1, 2, or 3. Verbosity mode. 0 = silent, - 1 = only status updates, 2 = status and brief trial results, - 3 = status and detailed trial results. Defaults to 3. + verbose (int): 0, 1, or 2. Verbosity mode. 0 = silent, + 1 = only status updates, 2 = status and trial results. progress_reporter (ProgressReporter): Progress reporter for reporting intermediate experiment progress. Defaults to CLIReporter if running in command-line, or JupyterNotebookReporter if running in @@ -283,8 +281,6 @@ def run( "The `mode` parameter passed to `tune.run()` has to be one of " "['min', 'max']") - set_verbosity(verbose) - config = config or {} sync_config = sync_config or SyncConfig() set_sync_periods(sync_config) @@ -357,9 +353,9 @@ def run( "own `metric` and `mode` parameters. Either remove the arguments " "from your scheduler or from your call to `tune.run()`") - # Create syncer callbacks + # Create logger and syncer callbacks callbacks = create_default_callbacks( - callbacks, sync_config, metric=metric, loggers=loggers) + callbacks, sync_config, loggers=loggers) runner = TrialRunner( search_alg=search_alg, @@ -370,6 +366,7 @@ def run( stopper=experiments[0].stopper, resume=resume, server_port=server_port, + verbose=bool(verbose > 1), fail_fast=fail_fast, trial_executor=trial_executor, callbacks=callbacks, @@ -383,8 +380,7 @@ def run( if progress_reporter is None: if IS_NOTEBOOK: - progress_reporter = JupyterNotebookReporter( - overwrite=not has_verbosity(Verbosity.V2_TRIAL_NORM)) + progress_reporter = JupyterNotebookReporter(overwrite=verbose < 2) else: progress_reporter = CLIReporter() @@ -417,7 +413,7 @@ def run( tune_start = time.time() while not runner.is_finished(): runner.step() - if has_verbosity(Verbosity.V1_EXPERIMENT): + if verbose: _report_progress(runner, progress_reporter) tune_taken = time.time() - tune_start @@ -426,7 +422,7 @@ def run( except Exception as e: logger.warning(f"Trial Runner checkpointing failed: {str(e)}") - if has_verbosity(Verbosity.V1_EXPERIMENT): + if verbose: _report_progress(runner, progress_reporter, done=True) wait_for_sync() @@ -444,9 +440,8 @@ def run( logger.error("Trials did not complete: %s", incomplete_trials) all_taken = time.time() - all_start - if has_verbosity(Verbosity.V1_EXPERIMENT): - logger.info(f"Total run time: {all_taken:.2f} seconds " - f"({tune_taken:.2f} seconds for the tuning loop).") + logger.info(f"Total run time: {all_taken:.2f} seconds " + f"({tune_taken:.2f} seconds for the tuning loop).") trials = runner.get_trials() return ExperimentAnalysis( @@ -459,7 +454,7 @@ def run( def run_experiments(experiments, scheduler=None, server_port=None, - verbose=Verbosity.V3_TRIAL_DETAILS, + verbose=2, progress_reporter=None, resume=False, queue_trials=False, diff --git a/python/ray/tune/utils/callback.py b/python/ray/tune/utils/callback.py index 88e3e4cbe..d74b316a2 100644 --- a/python/ray/tune/utils/callback.py +++ b/python/ray/tune/utils/callback.py @@ -3,7 +3,6 @@ import os from typing import List, Optional from ray.tune.callback import Callback -from ray.tune.progress_reporter import TrialProgressCallback from ray.tune.syncer import SyncConfig from ray.tune.logger import CSVLoggerCallback, CSVLogger, \ LoggerCallback, \ @@ -16,44 +15,14 @@ logger = logging.getLogger(__name__) def create_default_callbacks(callbacks: Optional[List[Callback]], sync_config: SyncConfig, - loggers: Optional[List[Logger]], - metric: Optional[str] = None): - """Create default callbacks for `tune.run()`. + loggers: Optional[List[Logger]]): - This function takes a list of existing callbacks and adds default - callbacks to it. - - Specifically, three kinds of callbacks will be added: - - 1. Loggers. Ray Tune's experiment analysis relies on CSV and JSON logging. - 2. Syncer. Ray Tune synchronizes logs and checkpoint between workers and - the head node. - 2. Trial progress reporter. For reporting intermediate progress, like trial - results, Ray Tune uses a callback. - - These callbacks will only be added if they don't already exist, i.e. if - they haven't been passed (and configured) by the user. A notable case - is when a Logger is passed, which is not a CSV or JSON logger - then - a CSV and JSON logger will still be created. - - Lastly, this function will ensure that the Syncer callback comes after all - Logger callbacks, to ensure that the most up-to-date logs and checkpoints - are synced across nodes. - - """ callbacks = callbacks or [] has_syncer_callback = False has_csv_logger = False has_json_logger = False has_tbx_logger = False - has_trial_progress_callback = any( - isinstance(c, TrialProgressCallback) for c in callbacks) - - if not has_trial_progress_callback: - trial_progress_callback = TrialProgressCallback(metric=metric) - callbacks.append(trial_progress_callback) - # Track syncer obj/index to move callback after loggers last_logger_index = None syncer_index = None diff --git a/python/ray/tune/utils/log.py b/python/ray/tune/utils/log.py deleted file mode 100644 index a62eb2b66..000000000 --- a/python/ray/tune/utils/log.py +++ /dev/null @@ -1,34 +0,0 @@ -from enum import Enum -from typing import Union - - -class Verbosity(Enum): - V0_MINIMAL = 0 - V1_EXPERIMENT = 1 - V2_TRIAL_NORM = 2 - V3_TRIAL_DETAILS = 3 - - def __int__(self): - return self.value - - -verbosity: Union[int, Verbosity] = Verbosity.V3_TRIAL_DETAILS - - -def set_verbosity(level: Union[int, Verbosity]): - global verbosity - - if isinstance(level, int): - verbosity = Verbosity(level) - else: - verbosity = verbosity - - -def has_verbosity(level: Union[int, Verbosity]) -> bool: - """Return True if passed level exceeds global verbosity level.""" - global verbosity - - log_level = int(level) - verbosity_level = int(verbosity) - - return verbosity_level >= log_level