[tune] TrialExecutor should not take in Runner interface. (#20655)

This commit is contained in:
xwjiang2010 2021-11-23 08:54:47 -08:00 committed by GitHub
parent e24c1dccb0
commit 2b5ad4a3c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -423,43 +423,6 @@ class TrialRunner:
def scheduler_alg(self):
return self._scheduler_alg
def _run_and_catch(self, func):
"""Run the corresponding `func`.
First try to run the function with trials as argument. If the function
is expecting TrialRunner instead, catch that exception and run with
TrialRunner as argument.
Note, this is as best as we can do to urge people to migrate while
"try" not to break the API. However, since none of TrialExecutor's
method guarantees transactional semantics or idempotency, Executor
may behave strange in the following scenario:
def func(trial_runner):
non_idempotent_blob
f(trial_runner) # throws AttributeError
With _run_and_catch, non_idempotent_blob is executed twice, which may
lead to weird behaviors.
"""
try:
func(self.get_trials())
except AttributeError as e:
if str(e) != "'list' object has no attribute 'get_trials'":
raise
else:
logger.warning(
"TrialExecutor is migrating off of TrialRunner "
"interface. Expect List[Trial] to be passed in directly. "
"See details at "
"https://github.com/ray-project/ray/issues/17665. "
"Please finish the migration ASAP. "
"Although we try to catch exception and recover from "
"caller side, but as there is no atomicity nor "
"idempotency guaranteed by TrialExecutor API contract, "
"weird behaviors may happen.")
func(self)
def _validate_resume(self, resume_type,
driver_sync_trial_checkpoints=True):
"""Checks whether to resume experiment.
@ -687,7 +650,7 @@ class TrialRunner:
if self.is_finished():
raise TuneError("Called step when all trials finished?")
with warn_if_slow("on_step_begin"):
self._run_and_catch(self.trial_executor.on_step_begin)
self.trial_executor.on_step_begin(self.get_trials())
with warn_if_slow("callbacks.on_step_begin"):
self._callbacks.on_step_begin(
iteration=self._iteration, trials=self._trials)
@ -738,7 +701,7 @@ class TrialRunner:
timeout = 0.1
self._process_events(timeout=timeout)
else:
self._run_and_catch(self.trial_executor.on_no_available_trials)
self.trial_executor.on_no_available_trials(self.get_trials())
self._stop_experiment_if_needed()
@ -755,7 +718,7 @@ class TrialRunner:
if self.is_finished():
self._server.shutdown()
with warn_if_slow("on_step_end"):
self._run_and_catch(self.trial_executor.on_step_end)
self.trial_executor.on_step_end(self.get_trials())
with warn_if_slow("callbacks.on_step_end"):
self._callbacks.on_step_end(
iteration=self._iteration, trials=self._trials)
@ -1378,7 +1341,7 @@ class TrialRunner:
self._live_trials.discard(trial)
def cleanup_trials(self):
self._run_and_catch(self.trial_executor.cleanup)
self.trial_executor.cleanup(self.get_trials())
def cleanup(self):
"""Cleanup trials and callbacks."""