[tune] Add warnings if tune event loop gets clogged (#4353)

* add guards

* comemnts
This commit is contained in:
Eric Liang 2019-03-14 19:44:01 -07:00 committed by GitHub
parent 1a1027b3ab
commit 2c1131e8b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 8 deletions

View file

@ -18,6 +18,9 @@ from ray.tune.util import warn_if_slow
logger = logging.getLogger(__name__)
BOTTLENECK_WARN_PERIOD_S = 60
NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3
class _LocalWrapper(object):
def __init__(self, result):
@ -43,6 +46,7 @@ class RayTrialExecutor(TrialExecutor):
self._resources_initialized = False
self._reuse_actors = reuse_actors
self._cached_actor = None
self._last_nontrivial_wait = time.time()
if ray.is_initialized():
self._update_avail_resources()
@ -275,7 +279,19 @@ class RayTrialExecutor(TrialExecutor):
# the first available result, and we want to guarantee that slower
# trials (i.e. trials that run remotely) also get fairly reported.
# See https://github.com/ray-project/ray/issues/4211 for details.
start = time.time()
[result_id], _ = ray.wait(shuffled_results)
wait_time = time.time() - start
if wait_time > NONTRIVIAL_WAIT_TIME_THRESHOLD_S:
self._last_nontrivial_wait = time.time()
if time.time() - self._last_nontrivial_wait > BOTTLENECK_WARN_PERIOD_S:
logger.warn(
"Over the last {} seconds, the Tune event loop has been "
"backlogged processing new results. Consider increasing your "
"period of result reporting to improve performance.".format(
BOTTLENECK_WARN_PERIOD_S))
self._last_nontrivial_wait = time.time()
return self._running[result_id]
def fetch_result(self, trial):

View file

@ -229,13 +229,14 @@ class TrialRunner(object):
"""
if self.is_finished():
raise TuneError("Called step when all trials finished?")
self.trial_executor.on_step_begin()
next_trial = self._get_next_trial()
with warn_if_slow("on_step_begin"):
self.trial_executor.on_step_begin()
next_trial = self._get_next_trial() # blocking
if next_trial is not None:
with warn_if_slow("start_trial"):
self.trial_executor.start_trial(next_trial)
elif self.trial_executor.get_running_trials():
self._process_events()
self._process_events() # blocking
else:
for trial in self._trials:
if trial.status == Trial.PENDING:
@ -257,17 +258,20 @@ class TrialRunner(object):
"trials with sufficient resources.")
try:
self.checkpoint()
with warn_if_slow("experiment_checkpoint"):
self.checkpoint()
except Exception:
logger.exception("Trial Runner checkpointing failed.")
self._iteration += 1
if self._server:
self._process_requests()
with warn_if_slow("server"):
self._process_requests()
if self.is_finished():
self._server.shutdown()
self.trial_executor.on_step_end()
with warn_if_slow("on_step_end"):
self.trial_executor.on_step_end()
def get_trial(self, tid):
trial = [t for t in self._trials if t.trial_id == tid]
@ -391,11 +395,12 @@ class TrialRunner(object):
trials_done = all(trial.is_finished() for trial in self._trials)
wait_for_trial = trials_done and not self._search_alg.is_finished()
self._update_trial_queue(blocking=wait_for_trial)
trial = self._scheduler_alg.choose_trial_to_run(self)
with warn_if_slow("choose_trial_to_run"):
trial = self._scheduler_alg.choose_trial_to_run(self)
return trial
def _process_events(self):
trial = self.trial_executor.get_next_available_trial()
trial = self.trial_executor.get_next_available_trial() # blocking
with warn_if_slow("process_trial"):
self._process_trial(trial)