[tune] Fix performance issue and fix reuse tests (#4379)

* fix tests

* better name

* reduce warnings

* better resource tracking

* oops

* revertmessage

* fix_executor
This commit is contained in:
Richard Liaw 2019-03-16 13:52:02 -07:00 committed by GitHub
parent a45019d98c
commit 5e95abe63e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 26 deletions

View file

@ -18,6 +18,7 @@ from ray.tune.util import warn_if_slow
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
RESOURCE_REFRESH_PERIOD = 0.5 # Refresh resources every 500 ms
BOTTLENECK_WARN_PERIOD_S = 60 BOTTLENECK_WARN_PERIOD_S = 60
NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3 NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3
@ -34,18 +35,24 @@ class _LocalWrapper(object):
class RayTrialExecutor(TrialExecutor): class RayTrialExecutor(TrialExecutor):
"""An implemention of TrialExecutor based on Ray.""" """An implemention of TrialExecutor based on Ray."""
def __init__(self, queue_trials=False, reuse_actors=False): def __init__(self,
queue_trials=False,
reuse_actors=False,
refresh_period=RESOURCE_REFRESH_PERIOD):
super(RayTrialExecutor, self).__init__(queue_trials) super(RayTrialExecutor, self).__init__(queue_trials)
self._running = {} self._running = {}
# Since trial resume after paused should not run # Since trial resume after paused should not run
# trial.train.remote(), thus no more new remote object id generated. # trial.train.remote(), thus no more new remote object id generated.
# We use self._paused to store paused trials here. # We use self._paused to store paused trials here.
self._paused = {} self._paused = {}
self._reuse_actors = reuse_actors
self._cached_actor = None
self._avail_resources = Resources(cpu=0, gpu=0) self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0) self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False self._resources_initialized = False
self._reuse_actors = reuse_actors self._refresh_period = refresh_period
self._cached_actor = None self._last_resource_refresh = float("-inf")
self._last_nontrivial_wait = time.time() self._last_nontrivial_wait = time.time()
if ray.is_initialized(): if ray.is_initialized():
self._update_avail_resources() self._update_avail_resources()
@ -370,11 +377,19 @@ class RayTrialExecutor(TrialExecutor):
self._avail_resources = Resources( self._avail_resources = Resources(
int(num_cpus), int(num_gpus), custom_resources=custom_resources) int(num_cpus), int(num_gpus), custom_resources=custom_resources)
self._last_resource_refresh = time.time()
self._resources_initialized = True self._resources_initialized = True
def has_resources(self, resources): def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources.""" """Returns whether this runner has at least the specified resources.
This refreshes the Ray cluster resources if the time since last update
has exceeded self._refresh_period. This also assumes that the
cluster is not resizing very frequently.
"""
if time.time() - self._last_resource_refresh > self._refresh_period:
self._update_avail_resources() self._update_avail_resources()
currently_available = Resources.subtract(self._avail_resources, currently_available = Resources.subtract(self._avail_resources,
self._committed_resources) self._committed_resources)
@ -445,7 +460,6 @@ class RayTrialExecutor(TrialExecutor):
def on_step_begin(self): def on_step_begin(self):
"""Before step() called, update the available resources.""" """Before step() called, update the available resources."""
self._update_avail_resources() self._update_avail_resources()
def save(self, trial, storage=Checkpoint.DISK): def save(self, trial, storage=Checkpoint.DISK):

View file

@ -15,7 +15,8 @@ class FrequentPausesScheduler(FIFOScheduler):
return TrialScheduler.PAUSE return TrialScheduler.PAUSE
class MyResettableClass(Trainable): def create_resettable_class():
class MyResettableClass(Trainable):
def _setup(self, config): def _setup(self, config):
self.config = config self.config = config
self.num_resets = 0 self.num_resets = 0
@ -37,6 +38,8 @@ class MyResettableClass(Trainable):
self.num_resets += 1 self.num_resets += 1
return True return True
return MyResettableClass
class ActorReuseTest(unittest.TestCase): class ActorReuseTest(unittest.TestCase):
def setUp(self): def setUp(self):
@ -49,7 +52,7 @@ class ActorReuseTest(unittest.TestCase):
trials = run_experiments( trials = run_experiments(
{ {
"foo": { "foo": {
"run": MyResettableClass, "run": create_resettable_class(),
"num_samples": 4, "num_samples": 4,
"config": {}, "config": {},
} }
@ -63,7 +66,7 @@ class ActorReuseTest(unittest.TestCase):
trials = run_experiments( trials = run_experiments(
{ {
"foo": { "foo": {
"run": MyResettableClass, "run": create_resettable_class(),
"num_samples": 4, "num_samples": 4,
"config": {}, "config": {},
} }
@ -78,7 +81,7 @@ class ActorReuseTest(unittest.TestCase):
run_experiments( run_experiments(
{ {
"foo": { "foo": {
"run": MyResettableClass, "run": create_resettable_class(),
"max_failures": 1, "max_failures": 1,
"num_samples": 4, "num_samples": 4,
"config": { "config": {