From 2c1131e8b26b7d4e545adf32b935e9ff83badd2b Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 14 Mar 2019 19:44:01 -0700 Subject: [PATCH] [tune] Add warnings if tune event loop gets clogged (#4353) * add guards * comemnts --- python/ray/tune/ray_trial_executor.py | 16 ++++++++++++++++ python/ray/tune/trial_runner.py | 21 +++++++++++++-------- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 9af5e48d0..90fd9e0e7 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -18,6 +18,9 @@ from ray.tune.util import warn_if_slow logger = logging.getLogger(__name__) +BOTTLENECK_WARN_PERIOD_S = 60 +NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3 + class _LocalWrapper(object): def __init__(self, result): @@ -43,6 +46,7 @@ class RayTrialExecutor(TrialExecutor): self._resources_initialized = False self._reuse_actors = reuse_actors self._cached_actor = None + self._last_nontrivial_wait = time.time() if ray.is_initialized(): self._update_avail_resources() @@ -275,7 +279,19 @@ class RayTrialExecutor(TrialExecutor): # the first available result, and we want to guarantee that slower # trials (i.e. trials that run remotely) also get fairly reported. # See https://github.com/ray-project/ray/issues/4211 for details. + start = time.time() [result_id], _ = ray.wait(shuffled_results) + wait_time = time.time() - start + if wait_time > NONTRIVIAL_WAIT_TIME_THRESHOLD_S: + self._last_nontrivial_wait = time.time() + if time.time() - self._last_nontrivial_wait > BOTTLENECK_WARN_PERIOD_S: + logger.warn( + "Over the last {} seconds, the Tune event loop has been " + "backlogged processing new results. Consider increasing your " + "period of result reporting to improve performance.".format( + BOTTLENECK_WARN_PERIOD_S)) + + self._last_nontrivial_wait = time.time() return self._running[result_id] def fetch_result(self, trial): diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index c89150541..c13acfc00 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -229,13 +229,14 @@ class TrialRunner(object): """ if self.is_finished(): raise TuneError("Called step when all trials finished?") - self.trial_executor.on_step_begin() - next_trial = self._get_next_trial() + with warn_if_slow("on_step_begin"): + self.trial_executor.on_step_begin() + next_trial = self._get_next_trial() # blocking if next_trial is not None: with warn_if_slow("start_trial"): self.trial_executor.start_trial(next_trial) elif self.trial_executor.get_running_trials(): - self._process_events() + self._process_events() # blocking else: for trial in self._trials: if trial.status == Trial.PENDING: @@ -257,17 +258,20 @@ class TrialRunner(object): "trials with sufficient resources.") try: - self.checkpoint() + with warn_if_slow("experiment_checkpoint"): + self.checkpoint() except Exception: logger.exception("Trial Runner checkpointing failed.") self._iteration += 1 if self._server: - self._process_requests() + with warn_if_slow("server"): + self._process_requests() if self.is_finished(): self._server.shutdown() - self.trial_executor.on_step_end() + with warn_if_slow("on_step_end"): + self.trial_executor.on_step_end() def get_trial(self, tid): trial = [t for t in self._trials if t.trial_id == tid] @@ -391,11 +395,12 @@ class TrialRunner(object): trials_done = all(trial.is_finished() for trial in self._trials) wait_for_trial = trials_done and not self._search_alg.is_finished() self._update_trial_queue(blocking=wait_for_trial) - trial = self._scheduler_alg.choose_trial_to_run(self) + with warn_if_slow("choose_trial_to_run"): + trial = self._scheduler_alg.choose_trial_to_run(self) return trial def _process_events(self): - trial = self.trial_executor.get_next_available_trial() + trial = self.trial_executor.get_next_available_trial() # blocking with warn_if_slow("process_trial"): self._process_trial(trial)