From 5e95abe63e56587ead9d361c5c64e8ea69482018 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 16 Mar 2019 13:52:02 -0700 Subject: [PATCH] [tune] Fix performance issue and fix reuse tests (#4379) * fix tests * better name * reduce warnings * better resource tracking * oops * revertmessage * fix_executor --- python/ray/tune/ray_trial_executor.py | 26 ++++++++++---- python/ray/tune/tests/test_actor_reuse.py | 43 ++++++++++++----------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 90fd9e0e7..f3af5f38d 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -18,6 +18,7 @@ from ray.tune.util import warn_if_slow logger = logging.getLogger(__name__) +RESOURCE_REFRESH_PERIOD = 0.5 # Refresh resources every 500 ms BOTTLENECK_WARN_PERIOD_S = 60 NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3 @@ -34,18 +35,24 @@ class _LocalWrapper(object): class RayTrialExecutor(TrialExecutor): """An implemention of TrialExecutor based on Ray.""" - def __init__(self, queue_trials=False, reuse_actors=False): + def __init__(self, + queue_trials=False, + reuse_actors=False, + refresh_period=RESOURCE_REFRESH_PERIOD): super(RayTrialExecutor, self).__init__(queue_trials) self._running = {} # Since trial resume after paused should not run # trial.train.remote(), thus no more new remote object id generated. # We use self._paused to store paused trials here. self._paused = {} + self._reuse_actors = reuse_actors + self._cached_actor = None + self._avail_resources = Resources(cpu=0, gpu=0) self._committed_resources = Resources(cpu=0, gpu=0) self._resources_initialized = False - self._reuse_actors = reuse_actors - self._cached_actor = None + self._refresh_period = refresh_period + self._last_resource_refresh = float("-inf") self._last_nontrivial_wait = time.time() if ray.is_initialized(): self._update_avail_resources() @@ -370,11 +377,19 @@ class RayTrialExecutor(TrialExecutor): self._avail_resources = Resources( int(num_cpus), int(num_gpus), custom_resources=custom_resources) + self._last_resource_refresh = time.time() self._resources_initialized = True def has_resources(self, resources): - """Returns whether this runner has at least the specified resources.""" - self._update_avail_resources() + """Returns whether this runner has at least the specified resources. + + This refreshes the Ray cluster resources if the time since last update + has exceeded self._refresh_period. This also assumes that the + cluster is not resizing very frequently. + """ + if time.time() - self._last_resource_refresh > self._refresh_period: + self._update_avail_resources() + currently_available = Resources.subtract(self._avail_resources, self._committed_resources) @@ -445,7 +460,6 @@ class RayTrialExecutor(TrialExecutor): def on_step_begin(self): """Before step() called, update the available resources.""" - self._update_avail_resources() def save(self, trial, storage=Checkpoint.DISK): diff --git a/python/ray/tune/tests/test_actor_reuse.py b/python/ray/tune/tests/test_actor_reuse.py index 9fc8579b7..227f81515 100644 --- a/python/ray/tune/tests/test_actor_reuse.py +++ b/python/ray/tune/tests/test_actor_reuse.py @@ -15,27 +15,30 @@ class FrequentPausesScheduler(FIFOScheduler): return TrialScheduler.PAUSE -class MyResettableClass(Trainable): - def _setup(self, config): - self.config = config - self.num_resets = 0 - self.iter = 0 +def create_resettable_class(): + class MyResettableClass(Trainable): + def _setup(self, config): + self.config = config + self.num_resets = 0 + self.iter = 0 - def _train(self): - self.iter += 1 - return {"num_resets": self.num_resets, "done": self.iter > 1} + def _train(self): + self.iter += 1 + return {"num_resets": self.num_resets, "done": self.iter > 1} - def _save(self, chkpt_dir): - return {"iter": self.iter} + def _save(self, chkpt_dir): + return {"iter": self.iter} - def _restore(self, item): - self.iter = item["iter"] + def _restore(self, item): + self.iter = item["iter"] - def reset_config(self, new_config): - if "fake_reset_not_supported" in self.config: - return False - self.num_resets += 1 - return True + def reset_config(self, new_config): + if "fake_reset_not_supported" in self.config: + return False + self.num_resets += 1 + return True + + return MyResettableClass class ActorReuseTest(unittest.TestCase): @@ -49,7 +52,7 @@ class ActorReuseTest(unittest.TestCase): trials = run_experiments( { "foo": { - "run": MyResettableClass, + "run": create_resettable_class(), "num_samples": 4, "config": {}, } @@ -63,7 +66,7 @@ class ActorReuseTest(unittest.TestCase): trials = run_experiments( { "foo": { - "run": MyResettableClass, + "run": create_resettable_class(), "num_samples": 4, "config": {}, } @@ -78,7 +81,7 @@ class ActorReuseTest(unittest.TestCase): run_experiments( { "foo": { - "run": MyResettableClass, + "run": create_resettable_class(), "max_failures": 1, "num_samples": 4, "config": {