mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[tune] temporary revert of verbosity changes (#12132)
This commit is contained in:
parent
4717fcd9c0
commit
2bb6db5e64
7 changed files with 53 additions and 348 deletions
|
@ -1,21 +1,16 @@
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
from typing import Dict, List, Optional
|
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from ray.tune.callback import Callback
|
|
||||||
from ray.tune.logger import pretty_print
|
|
||||||
from ray.tune.result import (EPISODE_REWARD_MEAN, MEAN_ACCURACY, MEAN_LOSS,
|
from ray.tune.result import (EPISODE_REWARD_MEAN, MEAN_ACCURACY, MEAN_LOSS,
|
||||||
TRAINING_ITERATION, TIME_TOTAL_S, TIMESTEPS_TOTAL,
|
TRAINING_ITERATION, TIME_TOTAL_S, TIMESTEPS_TOTAL,
|
||||||
AUTO_RESULT_KEYS)
|
AUTO_RESULT_KEYS)
|
||||||
from ray.tune.trial import DEBUG_PRINT_INTERVAL, Trial
|
from ray.tune.trial import Trial
|
||||||
from ray.tune.utils import unflattened_lookup
|
from ray.tune.utils import unflattened_lookup
|
||||||
from ray.tune.utils.log import Verbosity, has_verbosity
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
|
@ -229,19 +224,15 @@ class TuneReporterBase(ProgressReporter):
|
||||||
best_trial_str(current_best_trial, metric,
|
best_trial_str(current_best_trial, metric,
|
||||||
self._parameter_columns))
|
self._parameter_columns))
|
||||||
|
|
||||||
if has_verbosity(Verbosity.V1_EXPERIMENT):
|
messages.append(
|
||||||
# Will filter the table in `trial_progress_str`
|
trial_progress_str(
|
||||||
messages.append(
|
trials,
|
||||||
trial_progress_str(
|
metric_columns=self._metric_columns,
|
||||||
trials,
|
parameter_columns=self._parameter_columns,
|
||||||
metric_columns=self._metric_columns,
|
total_samples=self._total_samples,
|
||||||
parameter_columns=self._parameter_columns,
|
fmt=fmt,
|
||||||
total_samples=self._total_samples,
|
max_rows=max_progress))
|
||||||
fmt=fmt,
|
messages.append(trial_errors_str(trials, fmt=fmt, max_rows=max_error))
|
||||||
max_rows=max_progress,
|
|
||||||
done=done))
|
|
||||||
messages.append(
|
|
||||||
trial_errors_str(trials, fmt=fmt, max_rows=max_error))
|
|
||||||
|
|
||||||
return delim.join(messages) + delim
|
return delim.join(messages) + delim
|
||||||
|
|
||||||
|
@ -408,20 +399,12 @@ def memory_debug_str():
|
||||||
"(or ray[debug]) to resolve)")
|
"(or ray[debug]) to resolve)")
|
||||||
|
|
||||||
|
|
||||||
def _get_trials_by_state(trials):
|
|
||||||
trials_by_state = collections.defaultdict(list)
|
|
||||||
for t in trials:
|
|
||||||
trials_by_state[t.status].append(t)
|
|
||||||
return trials_by_state
|
|
||||||
|
|
||||||
|
|
||||||
def trial_progress_str(trials,
|
def trial_progress_str(trials,
|
||||||
metric_columns,
|
metric_columns,
|
||||||
parameter_columns=None,
|
parameter_columns=None,
|
||||||
total_samples=0,
|
total_samples=0,
|
||||||
fmt="psql",
|
fmt="psql",
|
||||||
max_rows=None,
|
max_rows=None):
|
||||||
done=False):
|
|
||||||
"""Returns a human readable message for printing to the console.
|
"""Returns a human readable message for printing to the console.
|
||||||
|
|
||||||
This contains a table where each row represents a trial, its parameters
|
This contains a table where each row represents a trial, its parameters
|
||||||
|
@ -449,7 +432,9 @@ def trial_progress_str(trials,
|
||||||
return delim.join(messages)
|
return delim.join(messages)
|
||||||
|
|
||||||
num_trials = len(trials)
|
num_trials = len(trials)
|
||||||
trials_by_state = _get_trials_by_state(trials)
|
trials_by_state = collections.defaultdict(list)
|
||||||
|
for t in trials:
|
||||||
|
trials_by_state[t.status].append(t)
|
||||||
|
|
||||||
for local_dir in sorted({t.local_dir for t in trials}):
|
for local_dir in sorted({t.local_dir for t in trials}):
|
||||||
messages.append("Result logdir: {}".format(local_dir))
|
messages.append("Result logdir: {}".format(local_dir))
|
||||||
|
@ -459,30 +444,6 @@ def trial_progress_str(trials,
|
||||||
for state in sorted(trials_by_state)
|
for state in sorted(trials_by_state)
|
||||||
]
|
]
|
||||||
|
|
||||||
if total_samples and total_samples >= sys.maxsize:
|
|
||||||
total_samples = "infinite"
|
|
||||||
|
|
||||||
messages.append("Number of trials: {}{} ({})".format(
|
|
||||||
num_trials, f"/{total_samples}"
|
|
||||||
if total_samples else "", ", ".join(num_trials_strs)))
|
|
||||||
|
|
||||||
if has_verbosity(Verbosity.V3_TRIAL_DETAILS) or (has_verbosity(
|
|
||||||
Verbosity.V2_TRIAL_NORM) and done):
|
|
||||||
messages += trial_progress_table(trials, metric_columns,
|
|
||||||
parameter_columns, fmt, max_rows)
|
|
||||||
|
|
||||||
return delim.join(messages)
|
|
||||||
|
|
||||||
|
|
||||||
def trial_progress_table(trials,
|
|
||||||
metric_columns,
|
|
||||||
parameter_columns=None,
|
|
||||||
fmt="psql",
|
|
||||||
max_rows=None):
|
|
||||||
messages = []
|
|
||||||
num_trials = len(trials)
|
|
||||||
trials_by_state = _get_trials_by_state(trials)
|
|
||||||
|
|
||||||
state_tbl_order = [
|
state_tbl_order = [
|
||||||
Trial.RUNNING, Trial.PAUSED, Trial.PENDING, Trial.TERMINATED,
|
Trial.RUNNING, Trial.PAUSED, Trial.PENDING, Trial.TERMINATED,
|
||||||
Trial.ERROR
|
Trial.ERROR
|
||||||
|
@ -513,6 +474,13 @@ def trial_progress_table(trials,
|
||||||
continue
|
continue
|
||||||
trials += trials_by_state[state]
|
trials += trials_by_state[state]
|
||||||
|
|
||||||
|
if total_samples and total_samples >= sys.maxsize:
|
||||||
|
total_samples = "infinite"
|
||||||
|
|
||||||
|
messages.append("Number of trials: {}{} ({})".format(
|
||||||
|
num_trials, f"/{total_samples}"
|
||||||
|
if total_samples else "", ", ".join(num_trials_strs)))
|
||||||
|
|
||||||
# Pre-process trials to figure out what columns to show.
|
# Pre-process trials to figure out what columns to show.
|
||||||
if isinstance(metric_columns, Mapping):
|
if isinstance(metric_columns, Mapping):
|
||||||
metric_keys = list(metric_columns.keys())
|
metric_keys = list(metric_columns.keys())
|
||||||
|
@ -554,7 +522,7 @@ def trial_progress_table(trials,
|
||||||
if overflow:
|
if overflow:
|
||||||
messages.append("... {} more trials not shown ({})".format(
|
messages.append("... {} more trials not shown ({})".format(
|
||||||
overflow, overflow_str))
|
overflow, overflow_str))
|
||||||
return messages
|
return delim.join(messages)
|
||||||
|
|
||||||
|
|
||||||
def trial_errors_str(trials, fmt="psql", max_rows=None):
|
def trial_errors_str(trials, fmt="psql", max_rows=None):
|
||||||
|
@ -653,108 +621,3 @@ def _get_trial_info(trial, parameters, metrics):
|
||||||
unflattened_lookup(metric, result, default=None) for metric in metrics
|
unflattened_lookup(metric, result, default=None) for metric in metrics
|
||||||
]
|
]
|
||||||
return trial_info
|
return trial_info
|
||||||
|
|
||||||
|
|
||||||
class TrialProgressCallback(Callback):
|
|
||||||
"""Reports (prints) intermediate trial progress.
|
|
||||||
|
|
||||||
This callback is automatically added to the callback stack. When a
|
|
||||||
result is obtained, this callback will print the results according to
|
|
||||||
the specified verbosity level.
|
|
||||||
|
|
||||||
For ``Verbosity.V3_TRIAL_DETAILS``, a full result list is printed.
|
|
||||||
|
|
||||||
For ``Verbosity.V2_TRIAL_NORM``, only one line is printed per received
|
|
||||||
result.
|
|
||||||
|
|
||||||
All other verbosity levels do not print intermediate trial progress.
|
|
||||||
|
|
||||||
Result printing is throttled on a per-trial basis. Per default, results are
|
|
||||||
printed only once every 30 seconds. Results are always printed when a trial
|
|
||||||
finished or errored.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, metric: Optional[str] = None):
|
|
||||||
self._last_print = collections.defaultdict(float)
|
|
||||||
self._completed_trials = set()
|
|
||||||
self._last_result_str = {}
|
|
||||||
self._metric = metric
|
|
||||||
|
|
||||||
def on_trial_result(self, iteration: int, trials: List["Trial"],
|
|
||||||
trial: "Trial", result: Dict, **info):
|
|
||||||
self.log_result(trial, result, error=False)
|
|
||||||
|
|
||||||
def on_trial_error(self, iteration: int, trials: List["Trial"],
|
|
||||||
trial: "Trial", **info):
|
|
||||||
self.log_result(trial, trial.last_result, error=True)
|
|
||||||
|
|
||||||
def on_trial_complete(self, iteration: int, trials: List["Trial"],
|
|
||||||
trial: "Trial", **info):
|
|
||||||
# Only log when we never logged that a trial was completed
|
|
||||||
if trial not in self._completed_trials:
|
|
||||||
self._completed_trials.add(trial)
|
|
||||||
|
|
||||||
print_result_str = self._print_result(trial.last_result)
|
|
||||||
last_result_str = self._last_result_str.get(trial, "")
|
|
||||||
# If this is a new result, print full result string
|
|
||||||
if print_result_str != last_result_str:
|
|
||||||
self.log_result(trial, trial.last_result, error=False)
|
|
||||||
else:
|
|
||||||
print(f"Trial {trial} completed. "
|
|
||||||
f"Last result: {print_result_str}")
|
|
||||||
|
|
||||||
def log_result(self, trial: "Trial", result: Dict, error: bool = False):
|
|
||||||
done = result.get("done", False) is True
|
|
||||||
last_print = self._last_print[trial]
|
|
||||||
if done and trial not in self._completed_trials:
|
|
||||||
self._completed_trials.add(trial)
|
|
||||||
if has_verbosity(Verbosity.V3_TRIAL_DETAILS) and \
|
|
||||||
(done or error or time.time() - last_print > DEBUG_PRINT_INTERVAL):
|
|
||||||
print("Result for {}:".format(trial))
|
|
||||||
print(" {}".format(pretty_print(result).replace("\n", "\n ")))
|
|
||||||
self._last_print[trial] = time.time()
|
|
||||||
elif has_verbosity(Verbosity.V2_TRIAL_NORM) and (
|
|
||||||
done or error
|
|
||||||
or time.time() - last_print > DEBUG_PRINT_INTERVAL):
|
|
||||||
info = ""
|
|
||||||
if done:
|
|
||||||
info = " This trial completed."
|
|
||||||
|
|
||||||
metric_name = self._metric or "_metric"
|
|
||||||
metric_value = result.get(metric_name, -99.)
|
|
||||||
|
|
||||||
print_result_str = self._print_result(result)
|
|
||||||
|
|
||||||
self._last_result_str[trial] = print_result_str
|
|
||||||
|
|
||||||
error_file = os.path.join(trial.logdir, "error.txt")
|
|
||||||
|
|
||||||
if error:
|
|
||||||
message = f"The trial {trial} errored with " \
|
|
||||||
f"parameters={trial.config}. " \
|
|
||||||
f"Error file: {error_file}"
|
|
||||||
elif self._metric:
|
|
||||||
message = f"Trial {trial} reported " \
|
|
||||||
f"{metric_name}={metric_value:.2f} " \
|
|
||||||
f"with parameters={trial.config}.{info}"
|
|
||||||
else:
|
|
||||||
message = f"Trial {trial} reported " \
|
|
||||||
f"{print_result_str} " \
|
|
||||||
f"with parameters={trial.config}.{info}"
|
|
||||||
|
|
||||||
print(message)
|
|
||||||
self._last_print[trial] = time.time()
|
|
||||||
|
|
||||||
def _print_result(self, result: Dict):
|
|
||||||
print_result = result.copy()
|
|
||||||
print_result.pop("config", None)
|
|
||||||
print_result.pop("trial_id", None)
|
|
||||||
print_result.pop("experiment_tag", None)
|
|
||||||
print_result.pop("done", None)
|
|
||||||
for auto_result in AUTO_RESULT_KEYS:
|
|
||||||
print_result.pop(auto_result, None)
|
|
||||||
|
|
||||||
print_result_str = ",".join(
|
|
||||||
[f"{k}={v}" for k, v in print_result.items()])
|
|
||||||
return print_result_str
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ tune.run_experiments({
|
||||||
"c": tune.grid_search(list(range(10))),
|
"c": tune.grid_search(list(range(10))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, verbose=3, progress_reporter=reporter)"""
|
}, verbose=1, progress_reporter=reporter)"""
|
||||||
|
|
||||||
EXPECTED_END_TO_END_START = """Number of trials: 1/30 (1 RUNNING)
|
EXPECTED_END_TO_END_START = """Number of trials: 1/30 (1 RUNNING)
|
||||||
+---------------+----------+-------+-----+
|
+---------------+----------+-------+-----+
|
||||||
|
@ -160,48 +160,6 @@ EXPECTED_BEST_1 = "Current best trial: 00001 with metric_1=0.5 and " \
|
||||||
EXPECTED_BEST_2 = "Current best trial: 00004 with metric_1=2.0 and " \
|
EXPECTED_BEST_2 = "Current best trial: 00004 with metric_1=2.0 and " \
|
||||||
"parameters={'a': 4}"
|
"parameters={'a': 4}"
|
||||||
|
|
||||||
VERBOSE_EXP_OUT_1 = "Number of trials: 1/3 (1 RUNNING)"
|
|
||||||
VERBOSE_EXP_OUT_2 = "Number of trials: 3/3 (3 TERMINATED)"
|
|
||||||
|
|
||||||
VERBOSE_TRIAL_NORM = "Trial train_xxxxx_00000 reported acc=5 with " + \
|
|
||||||
"""parameters={'do': 'complete'}. This trial completed.
|
|
||||||
Trial train_xxxxx_00001 reported _metric=6 with parameters={'do': 'once'}.
|
|
||||||
Trial train_xxxxx_00001 completed. Last result: _metric=6
|
|
||||||
Trial train_xxxxx_00002 reported acc=7 with parameters={'do': 'twice'}.
|
|
||||||
Trial train_xxxxx_00002 reported acc=8 with parameters={'do': 'twice'}. """ + \
|
|
||||||
"This trial completed."
|
|
||||||
|
|
||||||
VERBOSE_TRIAL_DETAIL = """+-------------------+----------+-------+----------+
|
|
||||||
| Trial name | status | loc | do |
|
|
||||||
|-------------------+----------+-------+----------|
|
|
||||||
| train_xxxxx_00000 | RUNNING | | complete |
|
|
||||||
+-------------------+----------+-------+----------+"""
|
|
||||||
|
|
||||||
VERBOSE_CMD = """from ray import tune
|
|
||||||
import random
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
def train(config):
|
|
||||||
if config["do"] == "complete":
|
|
||||||
tune.report(acc=5, done=True)
|
|
||||||
elif config["do"] == "once":
|
|
||||||
tune.report(6)
|
|
||||||
else:
|
|
||||||
tune.report(acc=7)
|
|
||||||
tune.report(acc=8)
|
|
||||||
|
|
||||||
random.seed(1234)
|
|
||||||
np.random.seed(1234)
|
|
||||||
|
|
||||||
tune.run(
|
|
||||||
train,
|
|
||||||
config={
|
|
||||||
"do": tune.grid_search(["complete", "once", "twice"])
|
|
||||||
},"""
|
|
||||||
|
|
||||||
# Add "verbose=3)" etc
|
|
||||||
|
|
||||||
|
|
||||||
class ProgressReporterTest(unittest.TestCase):
|
class ProgressReporterTest(unittest.TestCase):
|
||||||
def mock_trial(self, status, i):
|
def mock_trial(self, status, i):
|
||||||
|
@ -405,64 +363,6 @@ class ProgressReporterTest(unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
del os.environ["_TEST_TUNE_TRIAL_UUID"]
|
del os.environ["_TEST_TUNE_TRIAL_UUID"]
|
||||||
|
|
||||||
def testVerboseReporting(self):
|
|
||||||
try:
|
|
||||||
os.environ["_TEST_TUNE_TRIAL_UUID"] = "xxxxx"
|
|
||||||
|
|
||||||
verbose_0_cmd = VERBOSE_CMD + "verbose=0)"
|
|
||||||
output = run_string_as_driver(verbose_0_cmd)
|
|
||||||
try:
|
|
||||||
assert VERBOSE_EXP_OUT_1 not in output
|
|
||||||
assert VERBOSE_EXP_OUT_2 not in output
|
|
||||||
assert VERBOSE_TRIAL_NORM not in output
|
|
||||||
assert VERBOSE_TRIAL_DETAIL not in output
|
|
||||||
except Exception:
|
|
||||||
print("*** BEGIN OUTPUT ***")
|
|
||||||
print(output)
|
|
||||||
print("*** END OUTPUT ***")
|
|
||||||
raise
|
|
||||||
|
|
||||||
verbose_1_cmd = VERBOSE_CMD + "verbose=1)"
|
|
||||||
output = run_string_as_driver(verbose_1_cmd)
|
|
||||||
try:
|
|
||||||
assert VERBOSE_EXP_OUT_1 in output
|
|
||||||
assert VERBOSE_EXP_OUT_2 in output
|
|
||||||
assert VERBOSE_TRIAL_NORM not in output
|
|
||||||
assert VERBOSE_TRIAL_DETAIL not in output
|
|
||||||
except Exception:
|
|
||||||
print("*** BEGIN OUTPUT ***")
|
|
||||||
print(output)
|
|
||||||
print("*** END OUTPUT ***")
|
|
||||||
raise
|
|
||||||
|
|
||||||
verbose_2_cmd = VERBOSE_CMD + "verbose=2)"
|
|
||||||
output = run_string_as_driver(verbose_2_cmd)
|
|
||||||
try:
|
|
||||||
assert VERBOSE_EXP_OUT_1 in output
|
|
||||||
assert VERBOSE_EXP_OUT_2 in output
|
|
||||||
assert VERBOSE_TRIAL_NORM in output
|
|
||||||
assert VERBOSE_TRIAL_DETAIL not in output
|
|
||||||
except Exception:
|
|
||||||
print("*** BEGIN OUTPUT ***")
|
|
||||||
print(output)
|
|
||||||
print("*** END OUTPUT ***")
|
|
||||||
raise
|
|
||||||
|
|
||||||
verbose_3_cmd = VERBOSE_CMD + "verbose=3)"
|
|
||||||
output = run_string_as_driver(verbose_3_cmd)
|
|
||||||
try:
|
|
||||||
assert VERBOSE_EXP_OUT_1 in output
|
|
||||||
assert VERBOSE_EXP_OUT_2 in output
|
|
||||||
assert VERBOSE_TRIAL_NORM not in output
|
|
||||||
assert VERBOSE_TRIAL_DETAIL in output
|
|
||||||
except Exception:
|
|
||||||
print("*** BEGIN OUTPUT ***")
|
|
||||||
print(output)
|
|
||||||
print("*** END OUTPUT ***")
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
del os.environ["_TEST_TUNE_TRIAL_UUID"]
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import sys
|
import sys
|
||||||
|
|
|
@ -16,6 +16,7 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager
|
||||||
# NOTE(rkn): We import ray.tune.registry here instead of importing the names we
|
# NOTE(rkn): We import ray.tune.registry here instead of importing the names we
|
||||||
# need because there are cyclic imports that may cause specific names to not
|
# need because there are cyclic imports that may cause specific names to not
|
||||||
# have been defined yet. See https://github.com/ray-project/ray/issues/1716.
|
# have been defined yet. See https://github.com/ray-project/ray/issues/1716.
|
||||||
|
from ray.tune.logger import pretty_print
|
||||||
from ray.tune.registry import get_trainable_cls, validate_trainable
|
from ray.tune.registry import get_trainable_cls, validate_trainable
|
||||||
from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION
|
from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION
|
||||||
from ray.tune.resources import Resources, json_to_resources, resources_to_json
|
from ray.tune.resources import Resources, json_to_resources, resources_to_json
|
||||||
|
@ -229,6 +230,7 @@ class Trial:
|
||||||
or not len(self.log_to_file) == 2:
|
or not len(self.log_to_file) == 2:
|
||||||
self.log_to_file = (None, None)
|
self.log_to_file = (None, None)
|
||||||
|
|
||||||
|
self.verbose = True
|
||||||
self.max_failures = max_failures
|
self.max_failures = max_failures
|
||||||
|
|
||||||
# Local trial state that is updated during the run
|
# Local trial state that is updated during the run
|
||||||
|
@ -478,7 +480,11 @@ class Trial:
|
||||||
def update_last_result(self, result, terminate=False):
|
def update_last_result(self, result, terminate=False):
|
||||||
if self.experiment_tag:
|
if self.experiment_tag:
|
||||||
result.update(experiment_tag=self.experiment_tag)
|
result.update(experiment_tag=self.experiment_tag)
|
||||||
|
if self.verbose and (terminate or time.time() - self.last_debug >
|
||||||
|
DEBUG_PRINT_INTERVAL):
|
||||||
|
print("Result for {}:".format(self))
|
||||||
|
print(" {}".format(pretty_print(result).replace("\n", "\n ")))
|
||||||
|
self.last_debug = time.time()
|
||||||
self.set_location(Location(result.get("node_ip"), result.get("pid")))
|
self.set_location(Location(result.get("node_ip"), result.get("pid")))
|
||||||
self.last_result = result
|
self.last_result = result
|
||||||
self.last_update_time = time.time()
|
self.last_update_time = time.time()
|
||||||
|
@ -521,6 +527,9 @@ class Trial:
|
||||||
def get_trainable_cls(self):
|
def get_trainable_cls(self):
|
||||||
return get_trainable_cls(self.trainable_name)
|
return get_trainable_cls(self.trainable_name)
|
||||||
|
|
||||||
|
def set_verbose(self, verbose):
|
||||||
|
self.verbose = verbose
|
||||||
|
|
||||||
def is_finished(self):
|
def is_finished(self):
|
||||||
return self.status in [Trial.ERROR, Trial.TERMINATED]
|
return self.status in [Trial.ERROR, Trial.TERMINATED]
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ from ray.tune.trial import Checkpoint, Trial
|
||||||
from ray.tune.schedulers import FIFOScheduler, TrialScheduler
|
from ray.tune.schedulers import FIFOScheduler, TrialScheduler
|
||||||
from ray.tune.suggest import BasicVariantGenerator
|
from ray.tune.suggest import BasicVariantGenerator
|
||||||
from ray.tune.utils import warn_if_slow, flatten_dict, env_integer
|
from ray.tune.utils import warn_if_slow, flatten_dict, env_integer
|
||||||
from ray.tune.utils.log import Verbosity, has_verbosity
|
|
||||||
from ray.tune.utils.serialization import TuneFunctionDecoder, \
|
from ray.tune.utils.serialization import TuneFunctionDecoder, \
|
||||||
TuneFunctionEncoder
|
TuneFunctionEncoder
|
||||||
from ray.tune.web_server import TuneServer
|
from ray.tune.web_server import TuneServer
|
||||||
|
@ -79,6 +78,8 @@ class TrialRunner:
|
||||||
If fail_fast='raise' provided, Tune will automatically
|
If fail_fast='raise' provided, Tune will automatically
|
||||||
raise the exception received by the Trainable. fail_fast='raise'
|
raise the exception received by the Trainable. fail_fast='raise'
|
||||||
can easily leak resources and should be used with caution.
|
can easily leak resources and should be used with caution.
|
||||||
|
verbose (bool): Flag for verbosity. If False, trial results
|
||||||
|
will not be output.
|
||||||
checkpoint_period (int): Trial runner checkpoint periodicity in
|
checkpoint_period (int): Trial runner checkpoint periodicity in
|
||||||
seconds. Defaults to 10.
|
seconds. Defaults to 10.
|
||||||
trial_executor (TrialExecutor): Defaults to RayTrialExecutor.
|
trial_executor (TrialExecutor): Defaults to RayTrialExecutor.
|
||||||
|
@ -101,6 +102,7 @@ class TrialRunner:
|
||||||
resume=False,
|
resume=False,
|
||||||
server_port=None,
|
server_port=None,
|
||||||
fail_fast=False,
|
fail_fast=False,
|
||||||
|
verbose=True,
|
||||||
checkpoint_period=None,
|
checkpoint_period=None,
|
||||||
trial_executor=None,
|
trial_executor=None,
|
||||||
callbacks=None,
|
callbacks=None,
|
||||||
|
@ -133,6 +135,7 @@ class TrialRunner:
|
||||||
else:
|
else:
|
||||||
raise ValueError("fail_fast must be one of {bool, RAISE}. "
|
raise ValueError("fail_fast must be one of {bool, RAISE}. "
|
||||||
f"Got {self._fail_fast}.")
|
f"Got {self._fail_fast}.")
|
||||||
|
self._verbose = verbose
|
||||||
|
|
||||||
self._server = None
|
self._server = None
|
||||||
self._server_port = server_port
|
self._server_port = server_port
|
||||||
|
@ -162,7 +165,7 @@ class TrialRunner:
|
||||||
self.resume(run_errored_only=errored_only)
|
self.resume(run_errored_only=errored_only)
|
||||||
self._resumed = True
|
self._resumed = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if has_verbosity(Verbosity.V3_TRIAL_DETAILS):
|
if self._verbose:
|
||||||
logger.error(str(e))
|
logger.error(str(e))
|
||||||
logger.exception("Runner restore failed.")
|
logger.exception("Runner restore failed.")
|
||||||
if self._fail_fast:
|
if self._fail_fast:
|
||||||
|
@ -401,6 +404,7 @@ class TrialRunner:
|
||||||
Args:
|
Args:
|
||||||
trial (Trial): Trial to queue.
|
trial (Trial): Trial to queue.
|
||||||
"""
|
"""
|
||||||
|
trial.set_verbose(self._verbose)
|
||||||
self._trials.append(trial)
|
self._trials.append(trial)
|
||||||
with warn_if_slow("scheduler.on_trial_add"):
|
with warn_if_slow("scheduler.on_trial_add"):
|
||||||
self._scheduler_alg.on_trial_add(self, trial)
|
self._scheduler_alg.on_trial_add(self, trial)
|
||||||
|
@ -558,8 +562,6 @@ class TrialRunner:
|
||||||
with warn_if_slow("scheduler.on_trial_result"):
|
with warn_if_slow("scheduler.on_trial_result"):
|
||||||
decision = self._scheduler_alg.on_trial_result(
|
decision = self._scheduler_alg.on_trial_result(
|
||||||
self, trial, flat_result)
|
self, trial, flat_result)
|
||||||
if decision == TrialScheduler.STOP:
|
|
||||||
result.update(done=True)
|
|
||||||
with warn_if_slow("search_alg.on_trial_result"):
|
with warn_if_slow("search_alg.on_trial_result"):
|
||||||
self._search_alg.on_trial_result(trial.trial_id,
|
self._search_alg.on_trial_result(trial.trial_id,
|
||||||
flat_result)
|
flat_result)
|
||||||
|
@ -578,6 +580,7 @@ class TrialRunner:
|
||||||
iteration=self._iteration,
|
iteration=self._iteration,
|
||||||
trials=self._trials,
|
trials=self._trials,
|
||||||
trial=trial)
|
trial=trial)
|
||||||
|
result.update(done=True)
|
||||||
|
|
||||||
if not is_duplicate:
|
if not is_duplicate:
|
||||||
trial.update_last_result(
|
trial.update_last_result(
|
||||||
|
|
|
@ -18,7 +18,6 @@ from ray.tune.syncer import wait_for_sync, set_sync_periods, \
|
||||||
from ray.tune.trial_runner import TrialRunner
|
from ray.tune.trial_runner import TrialRunner
|
||||||
from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter
|
from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter
|
||||||
from ray.tune.schedulers import FIFOScheduler
|
from ray.tune.schedulers import FIFOScheduler
|
||||||
from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -71,7 +70,7 @@ def run(
|
||||||
checkpoint_score_attr=None,
|
checkpoint_score_attr=None,
|
||||||
checkpoint_freq=0,
|
checkpoint_freq=0,
|
||||||
checkpoint_at_end=False,
|
checkpoint_at_end=False,
|
||||||
verbose=Verbosity.V3_TRIAL_DETAILS,
|
verbose=2,
|
||||||
progress_reporter=None,
|
progress_reporter=None,
|
||||||
log_to_file=False,
|
log_to_file=False,
|
||||||
trial_name_creator=None,
|
trial_name_creator=None,
|
||||||
|
@ -189,9 +188,8 @@ def run(
|
||||||
checkpoint_at_end (bool): Whether to checkpoint at the end of the
|
checkpoint_at_end (bool): Whether to checkpoint at the end of the
|
||||||
experiment regardless of the checkpoint_freq. Default is False.
|
experiment regardless of the checkpoint_freq. Default is False.
|
||||||
This has no effect when using the Functional Training API.
|
This has no effect when using the Functional Training API.
|
||||||
verbose (int): 0, 1, 2, or 3. Verbosity mode. 0 = silent,
|
verbose (int): 0, 1, or 2. Verbosity mode. 0 = silent,
|
||||||
1 = only status updates, 2 = status and brief trial results,
|
1 = only status updates, 2 = status and trial results.
|
||||||
3 = status and detailed trial results. Defaults to 3.
|
|
||||||
progress_reporter (ProgressReporter): Progress reporter for reporting
|
progress_reporter (ProgressReporter): Progress reporter for reporting
|
||||||
intermediate experiment progress. Defaults to CLIReporter if
|
intermediate experiment progress. Defaults to CLIReporter if
|
||||||
running in command-line, or JupyterNotebookReporter if running in
|
running in command-line, or JupyterNotebookReporter if running in
|
||||||
|
@ -283,8 +281,6 @@ def run(
|
||||||
"The `mode` parameter passed to `tune.run()` has to be one of "
|
"The `mode` parameter passed to `tune.run()` has to be one of "
|
||||||
"['min', 'max']")
|
"['min', 'max']")
|
||||||
|
|
||||||
set_verbosity(verbose)
|
|
||||||
|
|
||||||
config = config or {}
|
config = config or {}
|
||||||
sync_config = sync_config or SyncConfig()
|
sync_config = sync_config or SyncConfig()
|
||||||
set_sync_periods(sync_config)
|
set_sync_periods(sync_config)
|
||||||
|
@ -357,9 +353,9 @@ def run(
|
||||||
"own `metric` and `mode` parameters. Either remove the arguments "
|
"own `metric` and `mode` parameters. Either remove the arguments "
|
||||||
"from your scheduler or from your call to `tune.run()`")
|
"from your scheduler or from your call to `tune.run()`")
|
||||||
|
|
||||||
# Create syncer callbacks
|
# Create logger and syncer callbacks
|
||||||
callbacks = create_default_callbacks(
|
callbacks = create_default_callbacks(
|
||||||
callbacks, sync_config, metric=metric, loggers=loggers)
|
callbacks, sync_config, loggers=loggers)
|
||||||
|
|
||||||
runner = TrialRunner(
|
runner = TrialRunner(
|
||||||
search_alg=search_alg,
|
search_alg=search_alg,
|
||||||
|
@ -370,6 +366,7 @@ def run(
|
||||||
stopper=experiments[0].stopper,
|
stopper=experiments[0].stopper,
|
||||||
resume=resume,
|
resume=resume,
|
||||||
server_port=server_port,
|
server_port=server_port,
|
||||||
|
verbose=bool(verbose > 1),
|
||||||
fail_fast=fail_fast,
|
fail_fast=fail_fast,
|
||||||
trial_executor=trial_executor,
|
trial_executor=trial_executor,
|
||||||
callbacks=callbacks,
|
callbacks=callbacks,
|
||||||
|
@ -383,8 +380,7 @@ def run(
|
||||||
|
|
||||||
if progress_reporter is None:
|
if progress_reporter is None:
|
||||||
if IS_NOTEBOOK:
|
if IS_NOTEBOOK:
|
||||||
progress_reporter = JupyterNotebookReporter(
|
progress_reporter = JupyterNotebookReporter(overwrite=verbose < 2)
|
||||||
overwrite=not has_verbosity(Verbosity.V2_TRIAL_NORM))
|
|
||||||
else:
|
else:
|
||||||
progress_reporter = CLIReporter()
|
progress_reporter = CLIReporter()
|
||||||
|
|
||||||
|
@ -417,7 +413,7 @@ def run(
|
||||||
tune_start = time.time()
|
tune_start = time.time()
|
||||||
while not runner.is_finished():
|
while not runner.is_finished():
|
||||||
runner.step()
|
runner.step()
|
||||||
if has_verbosity(Verbosity.V1_EXPERIMENT):
|
if verbose:
|
||||||
_report_progress(runner, progress_reporter)
|
_report_progress(runner, progress_reporter)
|
||||||
tune_taken = time.time() - tune_start
|
tune_taken = time.time() - tune_start
|
||||||
|
|
||||||
|
@ -426,7 +422,7 @@ def run(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Trial Runner checkpointing failed: {str(e)}")
|
logger.warning(f"Trial Runner checkpointing failed: {str(e)}")
|
||||||
|
|
||||||
if has_verbosity(Verbosity.V1_EXPERIMENT):
|
if verbose:
|
||||||
_report_progress(runner, progress_reporter, done=True)
|
_report_progress(runner, progress_reporter, done=True)
|
||||||
|
|
||||||
wait_for_sync()
|
wait_for_sync()
|
||||||
|
@ -444,9 +440,8 @@ def run(
|
||||||
logger.error("Trials did not complete: %s", incomplete_trials)
|
logger.error("Trials did not complete: %s", incomplete_trials)
|
||||||
|
|
||||||
all_taken = time.time() - all_start
|
all_taken = time.time() - all_start
|
||||||
if has_verbosity(Verbosity.V1_EXPERIMENT):
|
logger.info(f"Total run time: {all_taken:.2f} seconds "
|
||||||
logger.info(f"Total run time: {all_taken:.2f} seconds "
|
f"({tune_taken:.2f} seconds for the tuning loop).")
|
||||||
f"({tune_taken:.2f} seconds for the tuning loop).")
|
|
||||||
|
|
||||||
trials = runner.get_trials()
|
trials = runner.get_trials()
|
||||||
return ExperimentAnalysis(
|
return ExperimentAnalysis(
|
||||||
|
@ -459,7 +454,7 @@ def run(
|
||||||
def run_experiments(experiments,
|
def run_experiments(experiments,
|
||||||
scheduler=None,
|
scheduler=None,
|
||||||
server_port=None,
|
server_port=None,
|
||||||
verbose=Verbosity.V3_TRIAL_DETAILS,
|
verbose=2,
|
||||||
progress_reporter=None,
|
progress_reporter=None,
|
||||||
resume=False,
|
resume=False,
|
||||||
queue_trials=False,
|
queue_trials=False,
|
||||||
|
|
|
@ -3,7 +3,6 @@ import os
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from ray.tune.callback import Callback
|
from ray.tune.callback import Callback
|
||||||
from ray.tune.progress_reporter import TrialProgressCallback
|
|
||||||
from ray.tune.syncer import SyncConfig
|
from ray.tune.syncer import SyncConfig
|
||||||
from ray.tune.logger import CSVLoggerCallback, CSVLogger, \
|
from ray.tune.logger import CSVLoggerCallback, CSVLogger, \
|
||||||
LoggerCallback, \
|
LoggerCallback, \
|
||||||
|
@ -16,44 +15,14 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def create_default_callbacks(callbacks: Optional[List[Callback]],
|
def create_default_callbacks(callbacks: Optional[List[Callback]],
|
||||||
sync_config: SyncConfig,
|
sync_config: SyncConfig,
|
||||||
loggers: Optional[List[Logger]],
|
loggers: Optional[List[Logger]]):
|
||||||
metric: Optional[str] = None):
|
|
||||||
"""Create default callbacks for `tune.run()`.
|
|
||||||
|
|
||||||
This function takes a list of existing callbacks and adds default
|
|
||||||
callbacks to it.
|
|
||||||
|
|
||||||
Specifically, three kinds of callbacks will be added:
|
|
||||||
|
|
||||||
1. Loggers. Ray Tune's experiment analysis relies on CSV and JSON logging.
|
|
||||||
2. Syncer. Ray Tune synchronizes logs and checkpoint between workers and
|
|
||||||
the head node.
|
|
||||||
2. Trial progress reporter. For reporting intermediate progress, like trial
|
|
||||||
results, Ray Tune uses a callback.
|
|
||||||
|
|
||||||
These callbacks will only be added if they don't already exist, i.e. if
|
|
||||||
they haven't been passed (and configured) by the user. A notable case
|
|
||||||
is when a Logger is passed, which is not a CSV or JSON logger - then
|
|
||||||
a CSV and JSON logger will still be created.
|
|
||||||
|
|
||||||
Lastly, this function will ensure that the Syncer callback comes after all
|
|
||||||
Logger callbacks, to ensure that the most up-to-date logs and checkpoints
|
|
||||||
are synced across nodes.
|
|
||||||
|
|
||||||
"""
|
|
||||||
callbacks = callbacks or []
|
callbacks = callbacks or []
|
||||||
has_syncer_callback = False
|
has_syncer_callback = False
|
||||||
has_csv_logger = False
|
has_csv_logger = False
|
||||||
has_json_logger = False
|
has_json_logger = False
|
||||||
has_tbx_logger = False
|
has_tbx_logger = False
|
||||||
|
|
||||||
has_trial_progress_callback = any(
|
|
||||||
isinstance(c, TrialProgressCallback) for c in callbacks)
|
|
||||||
|
|
||||||
if not has_trial_progress_callback:
|
|
||||||
trial_progress_callback = TrialProgressCallback(metric=metric)
|
|
||||||
callbacks.append(trial_progress_callback)
|
|
||||||
|
|
||||||
# Track syncer obj/index to move callback after loggers
|
# Track syncer obj/index to move callback after loggers
|
||||||
last_logger_index = None
|
last_logger_index = None
|
||||||
syncer_index = None
|
syncer_index = None
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
from enum import Enum
|
|
||||||
from typing import Union
|
|
||||||
|
|
||||||
|
|
||||||
class Verbosity(Enum):
|
|
||||||
V0_MINIMAL = 0
|
|
||||||
V1_EXPERIMENT = 1
|
|
||||||
V2_TRIAL_NORM = 2
|
|
||||||
V3_TRIAL_DETAILS = 3
|
|
||||||
|
|
||||||
def __int__(self):
|
|
||||||
return self.value
|
|
||||||
|
|
||||||
|
|
||||||
verbosity: Union[int, Verbosity] = Verbosity.V3_TRIAL_DETAILS
|
|
||||||
|
|
||||||
|
|
||||||
def set_verbosity(level: Union[int, Verbosity]):
|
|
||||||
global verbosity
|
|
||||||
|
|
||||||
if isinstance(level, int):
|
|
||||||
verbosity = Verbosity(level)
|
|
||||||
else:
|
|
||||||
verbosity = verbosity
|
|
||||||
|
|
||||||
|
|
||||||
def has_verbosity(level: Union[int, Verbosity]) -> bool:
|
|
||||||
"""Return True if passed level exceeds global verbosity level."""
|
|
||||||
global verbosity
|
|
||||||
|
|
||||||
log_level = int(level)
|
|
||||||
verbosity_level = int(verbosity)
|
|
||||||
|
|
||||||
return verbosity_level >= log_level
|
|
Loading…
Add table
Reference in a new issue