From 5f14eb3ee4a7b0620ac0a728006ae4b680b3fa21 Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Fri, 12 Nov 2021 06:36:04 -0800 Subject: [PATCH] [Tune] Remove PG caching. (#19515) Co-authored-by: Antoni Baum --- python/ray/tune/BUILD | 4 +-- .../xgboost_dynamic_resources_example.py | 2 +- python/ray/tune/ray_trial_executor.py | 26 +++------------- python/ray/tune/schedulers/pbt.py | 6 ++-- python/ray/tune/tests/test_trial_scheduler.py | 12 ++++---- .../tune/tests/test_trial_scheduler_pbt.py | 21 ++++++++++--- python/ray/tune/trial_executor.py | 5 +--- python/ray/tune/utils/placement_groups.py | 30 +++---------------- 8 files changed, 38 insertions(+), 68 deletions(-) diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index ee54f86f1..726d6297d 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -304,7 +304,7 @@ py_test( py_test( name = "test_trial_scheduler_pbt", - size = "medium", + size = "large", srcs = ["tests/test_trial_scheduler_pbt.py"], deps = [":tune_lib"], tags = ["team:ml", "exclusive", "flaky", "tests_dir_T"], @@ -397,7 +397,7 @@ py_test( py_test( name = "blendsearch_example", - size = "small", + size = "medium", srcs = ["examples/blendsearch_example.py"], deps = [":tune_lib"], tags = ["team:ml", "exclusive", "example"], diff --git a/python/ray/tune/examples/xgboost_dynamic_resources_example.py b/python/ray/tune/examples/xgboost_dynamic_resources_example.py index 8fd7f671f..5992a6f2f 100644 --- a/python/ray/tune/examples/xgboost_dynamic_resources_example.py +++ b/python/ray/tune/examples/xgboost_dynamic_resources_example.py @@ -220,7 +220,7 @@ def tune_xgboost(use_class_trainable=True): total_available_cpus // len(trial_runner.get_live_trials())) # Assign new CPUs to the trial in a PlacementGroupFactory - return PlacementGroupFactory([{"CPU": cpu_to_use}]) + return PlacementGroupFactory([{"CPU": cpu_to_use, "GPU": 0}]) # You can either define your own resources_allocation_function, or # use the default one - evenly_distribute_cpus_gpus diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index de604e494..93f1f6df9 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -516,21 +516,13 @@ class RayTrialExecutor(TrialExecutor): logger.exception( "Trial %s: updating resources timed out.", trial) - def _stop_trial(self, - trial: Trial, - error=False, - error_msg=None, - destroy_pg_if_cannot_replace=True): + def _stop_trial(self, trial: Trial, error=False, error_msg=None): """Stops this trial. Stops this trial, releasing all allocating resources. If stopping the trial fails, the run will be marked as terminated in error, but no exception will be thrown. - If the placement group will be used right away - (destroy_pg_if_cannot_replace=False), we do not remove its placement - group (or a surrogate placement group). - Args: error (bool): Whether to mark this trial as terminated in error. error_msg (str): Optional error message. @@ -569,8 +561,7 @@ class RayTrialExecutor(TrialExecutor): logger.debug("Trial %s: Destroying actor.", trial) # Try to return the placement group for other trials to use - self._pg_manager.return_pg(trial, - destroy_pg_if_cannot_replace) + self._pg_manager.return_pg(trial) with self._change_working_directory(trial): self._trial_cleanup.add(trial, actor=trial.runner) @@ -630,18 +621,9 @@ class RayTrialExecutor(TrialExecutor): def stop_trial(self, trial: Trial, error: bool = False, - error_msg: Optional[str] = None, - destroy_pg_if_cannot_replace: bool = True) -> None: - """Only returns resources if resources allocated. - - If destroy_pg_if_cannot_replace is False, the Trial placement group - will not be removed if it can't replace any staging ones.""" + error_msg: Optional[str] = None) -> None: prior_status = trial.status - self._stop_trial( - trial, - error=error, - error_msg=error_msg, - destroy_pg_if_cannot_replace=destroy_pg_if_cannot_replace) + self._stop_trial(trial, error=error, error_msg=error_msg) if prior_status == Trial.RUNNING: logger.debug("Trial %s: Returning resources.", trial) if not trial.uses_placement_groups: diff --git a/python/ray/tune/schedulers/pbt.py b/python/ray/tune/schedulers/pbt.py index 31632e1bb..75b7ba5cc 100644 --- a/python/ray/tune/schedulers/pbt.py +++ b/python/ray/tune/schedulers/pbt.py @@ -590,8 +590,7 @@ class PopulationBasedTraining(FIFOScheduler): else: # Stop trial, but do not free resources (so we can use them # again right away) - trial_executor.stop_trial( - trial, destroy_pg_if_cannot_replace=False) + trial_executor.stop_trial(trial) trial.set_experiment_tag(new_tag) trial.set_config(new_config) @@ -821,8 +820,7 @@ class PopulationBasedTrainingReplay(FIFOScheduler): if reset_successful: trial_executor.restore(trial, checkpoint, block=True) else: - trial_executor.stop_trial( - trial, destroy_pg_if_cannot_replace=False) + trial_executor.stop_trial(trial) trial.set_experiment_tag(new_tag) trial.set_config(new_config) trial_executor.start_trial(trial, checkpoint, train=False) diff --git a/python/ray/tune/tests/test_trial_scheduler.py b/python/ray/tune/tests/test_trial_scheduler.py index 798c08192..6954c5b1c 100644 --- a/python/ray/tune/tests/test_trial_scheduler.py +++ b/python/ray/tune/tests/test_trial_scheduler.py @@ -225,11 +225,7 @@ class _MockTrialExecutor(TrialExecutor): trial.status = Trial.RUNNING return True - def stop_trial(self, - trial, - error=False, - error_msg=None, - destroy_pg_if_cannot_replace=True): + def stop_trial(self, trial, error=False, error_msg=None): trial.status = Trial.ERROR if error else Trial.TERMINATED def restore(self, trial, checkpoint=None, block=False): @@ -1888,6 +1884,9 @@ class E2EPopulationBasedTestingSuite(unittest.TestCase): f.write("OK") return checkpoint + def reset_config(self, config): + return True + trial_hyperparams = { "float_factor": 2.0, "const_factor": 3, @@ -1923,6 +1922,9 @@ class E2EPopulationBasedTestingSuite(unittest.TestCase): def load_checkpoint(self, state): self.state = state + def reset_config(self, config): + return True + trial_hyperparams = { "float_factor": 2.0, "const_factor": 3, diff --git a/python/ray/tune/tests/test_trial_scheduler_pbt.py b/python/ray/tune/tests/test_trial_scheduler_pbt.py index 731f2805a..d18fbbe5b 100644 --- a/python/ray/tune/tests/test_trial_scheduler_pbt.py +++ b/python/ray/tune/tests/test_trial_scheduler_pbt.py @@ -64,6 +64,10 @@ class PopulationBasedTrainingMemoryTest(unittest.TestCase): with open(path, "rb") as fp: self.large_object, self.iter, self.a = pickle.load(fp) + def reset_config(self, new_config): + self.a = new_config["a"] + return True + class CustomExecutor(RayTrialExecutor): def save(self, *args, **kwargs): checkpoint = super(CustomExecutor, self).save(*args, **kwargs) @@ -122,6 +126,10 @@ class PopulationBasedTrainingFileDescriptorTest(unittest.TestCase): with open(path, "rb") as fp: self.iter, self.a = pickle.load(fp) + def reset_config(self, new_config): + self.a = new_config["a"] + return True + from ray.tune.callback import Callback class FileCheck(Callback): @@ -174,7 +182,6 @@ class PopulationBasedTrainingFileDescriptorTest(unittest.TestCase): class PopulationBasedTrainingSynchTest(unittest.TestCase): def setUp(self): os.environ["TUNE_TRIAL_STARTUP_GRACE_PERIOD"] = "0" - os.environ["TUNE_TRIAL_RESULT_WAIT_TIME_S"] = "99999" ray.init(num_cpus=2) def MockTrainingFuncSync(config, checkpoint_dir=None): @@ -210,7 +217,7 @@ class PopulationBasedTrainingSynchTest(unittest.TestCase): def synchSetup(self, synch, param=None): if param is None: - param = [10, 20, 30] + param = [10, 20, 40] scheduler = PopulationBasedTraining( time_attr="training_iteration", @@ -244,14 +251,14 @@ class PopulationBasedTrainingSynchTest(unittest.TestCase): self.assertTrue( any( analysis.dataframe(metric="mean_accuracy", mode="max") - ["mean_accuracy"] != 33)) + ["mean_accuracy"] != 43)) def testSynchPass(self): analysis = self.synchSetup(True) self.assertTrue( all( analysis.dataframe(metric="mean_accuracy", mode="max")[ - "mean_accuracy"] == 33)) + "mean_accuracy"] == 43)) def testSynchPassLast(self): analysis = self.synchSetup(True, param=[30, 20, 10]) @@ -342,6 +349,12 @@ class PopulationBasedTrainingResumeTest(unittest.TestCase): with open(checkpoint_path, "rb") as fp: self.a, self.b, self.iter = pickle.load(fp) + def reset_config(self, new_config): + self.a = new_config["a"] + self.b = new_config["b"] + self.c = new_config["c"] + return True + scheduler = PopulationBasedTraining( time_attr="training_iteration", metric="mean_accuracy", diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index 26411a136..c159a07b9 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -209,8 +209,7 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta): def stop_trial(self, trial: Trial, error: bool = False, - error_msg: Optional[str] = None, - destroy_pg_if_cannot_replace: bool = True) -> None: + error_msg: Optional[str] = None) -> None: """Stops the trial. Stops this trial, releasing all allocating resources. @@ -220,8 +219,6 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta): Args: error (bool): Whether to mark this trial as terminated in error. error_msg (str): Optional error message. - destroy_pg_if_cannot_replace (bool): Whether the trial's placement - group should be destroyed if it cannot replace any staged ones. """ pass diff --git a/python/ray/tune/utils/placement_groups.py b/python/ray/tune/utils/placement_groups.py index b2caca7d0..5b0ff0e90 100644 --- a/python/ray/tune/utils/placement_groups.py +++ b/python/ray/tune/utils/placement_groups.py @@ -563,41 +563,19 @@ class PlacementGroupManager: self._ready[pgf].add(pg) return True - def return_pg(self, - trial: "Trial", - destroy_pg_if_cannot_replace: bool = True): - """Return pg, making it available for other trials to use. - - If destroy_pg_if_cannot_replace is True, this will only return - a placement group if a staged placement group can be replaced - by it. If not, it will destroy the placement group. + def return_pg(self, trial: "Trial"): + """Return pg back to Core scheduling. Args: trial (Trial): Return placement group of this trial. - - Returns: - Boolean indicating if the placement group was returned. """ if not trial.uses_placement_groups: - return True - - pgf = trial.placement_group_factory + return pg = self._in_use_trials.pop(trial) self._in_use_pgs.pop(pg) - if destroy_pg_if_cannot_replace: - staged_pg = self._unstage_unused_pg(pgf) - - # Could not replace - if not staged_pg: - self.remove_pg(pg) - return False - - self.remove_pg(staged_pg) - self._ready[pgf].add(pg) - - return True + self.remove_pg(pg) def _unstage_unused_pg( self, pgf: PlacementGroupFactory) -> Optional[PlacementGroup]: