[Tune] Remove PG caching. (#19515)

Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
This commit is contained in:
xwjiang2010 2021-11-12 06:36:04 -08:00 committed by GitHub
parent 38c456b6f4
commit 5f14eb3ee4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 38 additions and 68 deletions

View file

@ -304,7 +304,7 @@ py_test(
py_test(
name = "test_trial_scheduler_pbt",
size = "medium",
size = "large",
srcs = ["tests/test_trial_scheduler_pbt.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "flaky", "tests_dir_T"],
@ -397,7 +397,7 @@ py_test(
py_test(
name = "blendsearch_example",
size = "small",
size = "medium",
srcs = ["examples/blendsearch_example.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "example"],

View file

@ -220,7 +220,7 @@ def tune_xgboost(use_class_trainable=True):
total_available_cpus // len(trial_runner.get_live_trials()))
# Assign new CPUs to the trial in a PlacementGroupFactory
return PlacementGroupFactory([{"CPU": cpu_to_use}])
return PlacementGroupFactory([{"CPU": cpu_to_use, "GPU": 0}])
# You can either define your own resources_allocation_function, or
# use the default one - evenly_distribute_cpus_gpus

View file

@ -516,21 +516,13 @@ class RayTrialExecutor(TrialExecutor):
logger.exception(
"Trial %s: updating resources timed out.", trial)
def _stop_trial(self,
trial: Trial,
error=False,
error_msg=None,
destroy_pg_if_cannot_replace=True):
def _stop_trial(self, trial: Trial, error=False, error_msg=None):
"""Stops this trial.
Stops this trial, releasing all allocating resources. If stopping the
trial fails, the run will be marked as terminated in error, but no
exception will be thrown.
If the placement group will be used right away
(destroy_pg_if_cannot_replace=False), we do not remove its placement
group (or a surrogate placement group).
Args:
error (bool): Whether to mark this trial as terminated in error.
error_msg (str): Optional error message.
@ -569,8 +561,7 @@ class RayTrialExecutor(TrialExecutor):
logger.debug("Trial %s: Destroying actor.", trial)
# Try to return the placement group for other trials to use
self._pg_manager.return_pg(trial,
destroy_pg_if_cannot_replace)
self._pg_manager.return_pg(trial)
with self._change_working_directory(trial):
self._trial_cleanup.add(trial, actor=trial.runner)
@ -630,18 +621,9 @@ class RayTrialExecutor(TrialExecutor):
def stop_trial(self,
trial: Trial,
error: bool = False,
error_msg: Optional[str] = None,
destroy_pg_if_cannot_replace: bool = True) -> None:
"""Only returns resources if resources allocated.
If destroy_pg_if_cannot_replace is False, the Trial placement group
will not be removed if it can't replace any staging ones."""
error_msg: Optional[str] = None) -> None:
prior_status = trial.status
self._stop_trial(
trial,
error=error,
error_msg=error_msg,
destroy_pg_if_cannot_replace=destroy_pg_if_cannot_replace)
self._stop_trial(trial, error=error, error_msg=error_msg)
if prior_status == Trial.RUNNING:
logger.debug("Trial %s: Returning resources.", trial)
if not trial.uses_placement_groups:

View file

@ -590,8 +590,7 @@ class PopulationBasedTraining(FIFOScheduler):
else:
# Stop trial, but do not free resources (so we can use them
# again right away)
trial_executor.stop_trial(
trial, destroy_pg_if_cannot_replace=False)
trial_executor.stop_trial(trial)
trial.set_experiment_tag(new_tag)
trial.set_config(new_config)
@ -821,8 +820,7 @@ class PopulationBasedTrainingReplay(FIFOScheduler):
if reset_successful:
trial_executor.restore(trial, checkpoint, block=True)
else:
trial_executor.stop_trial(
trial, destroy_pg_if_cannot_replace=False)
trial_executor.stop_trial(trial)
trial.set_experiment_tag(new_tag)
trial.set_config(new_config)
trial_executor.start_trial(trial, checkpoint, train=False)

View file

@ -225,11 +225,7 @@ class _MockTrialExecutor(TrialExecutor):
trial.status = Trial.RUNNING
return True
def stop_trial(self,
trial,
error=False,
error_msg=None,
destroy_pg_if_cannot_replace=True):
def stop_trial(self, trial, error=False, error_msg=None):
trial.status = Trial.ERROR if error else Trial.TERMINATED
def restore(self, trial, checkpoint=None, block=False):
@ -1888,6 +1884,9 @@ class E2EPopulationBasedTestingSuite(unittest.TestCase):
f.write("OK")
return checkpoint
def reset_config(self, config):
return True
trial_hyperparams = {
"float_factor": 2.0,
"const_factor": 3,
@ -1923,6 +1922,9 @@ class E2EPopulationBasedTestingSuite(unittest.TestCase):
def load_checkpoint(self, state):
self.state = state
def reset_config(self, config):
return True
trial_hyperparams = {
"float_factor": 2.0,
"const_factor": 3,

View file

@ -64,6 +64,10 @@ class PopulationBasedTrainingMemoryTest(unittest.TestCase):
with open(path, "rb") as fp:
self.large_object, self.iter, self.a = pickle.load(fp)
def reset_config(self, new_config):
self.a = new_config["a"]
return True
class CustomExecutor(RayTrialExecutor):
def save(self, *args, **kwargs):
checkpoint = super(CustomExecutor, self).save(*args, **kwargs)
@ -122,6 +126,10 @@ class PopulationBasedTrainingFileDescriptorTest(unittest.TestCase):
with open(path, "rb") as fp:
self.iter, self.a = pickle.load(fp)
def reset_config(self, new_config):
self.a = new_config["a"]
return True
from ray.tune.callback import Callback
class FileCheck(Callback):
@ -174,7 +182,6 @@ class PopulationBasedTrainingFileDescriptorTest(unittest.TestCase):
class PopulationBasedTrainingSynchTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_TRIAL_STARTUP_GRACE_PERIOD"] = "0"
os.environ["TUNE_TRIAL_RESULT_WAIT_TIME_S"] = "99999"
ray.init(num_cpus=2)
def MockTrainingFuncSync(config, checkpoint_dir=None):
@ -210,7 +217,7 @@ class PopulationBasedTrainingSynchTest(unittest.TestCase):
def synchSetup(self, synch, param=None):
if param is None:
param = [10, 20, 30]
param = [10, 20, 40]
scheduler = PopulationBasedTraining(
time_attr="training_iteration",
@ -244,14 +251,14 @@ class PopulationBasedTrainingSynchTest(unittest.TestCase):
self.assertTrue(
any(
analysis.dataframe(metric="mean_accuracy", mode="max")
["mean_accuracy"] != 33))
["mean_accuracy"] != 43))
def testSynchPass(self):
analysis = self.synchSetup(True)
self.assertTrue(
all(
analysis.dataframe(metric="mean_accuracy", mode="max")[
"mean_accuracy"] == 33))
"mean_accuracy"] == 43))
def testSynchPassLast(self):
analysis = self.synchSetup(True, param=[30, 20, 10])
@ -342,6 +349,12 @@ class PopulationBasedTrainingResumeTest(unittest.TestCase):
with open(checkpoint_path, "rb") as fp:
self.a, self.b, self.iter = pickle.load(fp)
def reset_config(self, new_config):
self.a = new_config["a"]
self.b = new_config["b"]
self.c = new_config["c"]
return True
scheduler = PopulationBasedTraining(
time_attr="training_iteration",
metric="mean_accuracy",

View file

@ -209,8 +209,7 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta):
def stop_trial(self,
trial: Trial,
error: bool = False,
error_msg: Optional[str] = None,
destroy_pg_if_cannot_replace: bool = True) -> None:
error_msg: Optional[str] = None) -> None:
"""Stops the trial.
Stops this trial, releasing all allocating resources.
@ -220,8 +219,6 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta):
Args:
error (bool): Whether to mark this trial as terminated in error.
error_msg (str): Optional error message.
destroy_pg_if_cannot_replace (bool): Whether the trial's placement
group should be destroyed if it cannot replace any staged ones.
"""
pass

View file

@ -563,41 +563,19 @@ class PlacementGroupManager:
self._ready[pgf].add(pg)
return True
def return_pg(self,
trial: "Trial",
destroy_pg_if_cannot_replace: bool = True):
"""Return pg, making it available for other trials to use.
If destroy_pg_if_cannot_replace is True, this will only return
a placement group if a staged placement group can be replaced
by it. If not, it will destroy the placement group.
def return_pg(self, trial: "Trial"):
"""Return pg back to Core scheduling.
Args:
trial (Trial): Return placement group of this trial.
Returns:
Boolean indicating if the placement group was returned.
"""
if not trial.uses_placement_groups:
return True
pgf = trial.placement_group_factory
return
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
if destroy_pg_if_cannot_replace:
staged_pg = self._unstage_unused_pg(pgf)
# Could not replace
if not staged_pg:
self.remove_pg(pg)
return False
self.remove_pg(staged_pg)
self._ready[pgf].add(pg)
return True
self.remove_pg(pg)
def _unstage_unused_pg(
self, pgf: PlacementGroupFactory) -> Optional[PlacementGroup]: