diff --git a/doc/source/tune/api_docs/schedulers.rst b/doc/source/tune/api_docs/schedulers.rst index d04a54301..5b5876318 100644 --- a/doc/source/tune/api_docs/schedulers.rst +++ b/doc/source/tune/api_docs/schedulers.rst @@ -246,6 +246,11 @@ evenly_distribute_cpus_gpus .. autofunction:: ray.tune.schedulers.resource_changing_scheduler.evenly_distribute_cpus_gpus +evenly_distribute_cpus_gpus_distributed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autofunction:: ray.tune.schedulers.resource_changing_scheduler.evenly_distribute_cpus_gpus_distributed + FIFOScheduler ------------- diff --git a/python/ray/tune/examples/xgboost_dynamic_resources_example.py b/python/ray/tune/examples/xgboost_dynamic_resources_example.py index b19cb734d..98174102a 100644 --- a/python/ray/tune/examples/xgboost_dynamic_resources_example.py +++ b/python/ray/tune/examples/xgboost_dynamic_resources_example.py @@ -24,6 +24,7 @@ class BreastCancerTrainable(Trainable): def setup(self, config): self.config = config self.nthread = config.pop("nthread", 1) + self.new_nthread = None self.model: xgb.Booster = None # Load dataset data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True) @@ -62,6 +63,9 @@ class BreastCancerTrainable(Trainable): def load_checkpoint(self, checkpoint_path): with open(checkpoint_path, "rb") as inputFile: self.config, self.nthread, raw_model = pickle.load(inputFile) + if self.new_nthread: + self.nthread = self.new_nthread + self.new_nthread = None self.model = Booster() self.model.load_model(bytearray(raw_model)) data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True) @@ -74,10 +78,11 @@ class BreastCancerTrainable(Trainable): def update_resources( self, new_resources: Union[PlacementGroupFactory, Resources]): + # this is called before `load_checkpoint` if isinstance(new_resources, PlacementGroupFactory): - self.nthread = new_resources.head_cpus + self.new_nthread = new_resources.head_cpus else: - self.nthread = new_resources.cpu + self.new_nthread = new_resources.cpu def get_best_model_checkpoint(analysis): @@ -112,8 +117,7 @@ def tune_xgboost(): def example_resources_allocation_function( trial_runner: "trial_runner.TrialRunner", trial: Trial, - result: Dict[str, Any], - base_trial_resource: Union[PlacementGroupFactory, Resources] + result: Dict[str, Any], scheduler: "ResourceChangingScheduler" ) -> Union[None, PlacementGroupFactory, Resources]: """This is a basic example of a resource allocating function. @@ -133,11 +137,14 @@ def tune_xgboost(): Can be used to obtain information about other trials. trial (Trial): The trial to allocate new resources to. result (Dict[str, Any]): The latest results of trial. - base_trial_resource (Union[PlacementGroupFactory, Resources]): - Base trial resources as defined in - ``tune.run(resources_per_trial)`` + scheduler (ResourceChangingScheduler): The scheduler calling + the function. """ + # Get base trial resources as defined in + # ``tune.run(resources_per_trial)`` + base_trial_resource = scheduler._base_trial_resources + # Don't bother if this is just the first iteration if result["training_iteration"] < 1: return None diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index d5f0c44e1..35d669ab5 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -451,8 +451,8 @@ class RayTrialExecutor(TrialExecutor): if not runner: return False trial.set_runner(runner) - self.restore(trial, checkpoint) self._notify_trainable_of_new_resources_if_needed(trial) + self.restore(trial, checkpoint) self.set_status(trial, Trial.RUNNING) if trial in self._staged_trials: diff --git a/python/ray/tune/schedulers/resource_changing_scheduler.py b/python/ray/tune/schedulers/resource_changing_scheduler.py index e81ab085a..1741ec601 100644 --- a/python/ray/tune/schedulers/resource_changing_scheduler.py +++ b/python/ray/tune/schedulers/resource_changing_scheduler.py @@ -14,20 +14,136 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory logger = logging.getLogger(__name__) -def evenly_distribute_cpus_gpus( - trial_runner: "trial_runner.TrialRunner", trial: Trial, - result: Dict[str, Any], - base_trial_resource: Union[PlacementGroupFactory, Resources] -) -> Union[None, PlacementGroupFactory, Resources]: +class _DistributeResources: + """Generic functionality for resource allocation functions""" + + def __init__(self, add_bundles: bool = False): + """If add_bundles is True, create new bundles from free resources. + Otherwise, spread them among base_trial_resource bundles.""" + self.add_bundles = add_bundles + + def __call__(self, trial_runner: "trial_runner.TrialRunner", trial: Trial, + result: Dict[str, Any], scheduler: "ResourceChangingScheduler" + ) -> Union[None, PlacementGroupFactory]: + # Get base trial resources as defined in + # ``tune.run(resources_per_trial)`` + base_trial_resource = scheduler.base_trial_resources + + if not isinstance(base_trial_resource, PlacementGroupFactory): + raise ValueError("evenly_distribute_cpus_gpus only supports" + " PlacementGroupFactories.") + + # Don't bother if this is just the first iteration + if result["training_iteration"] < 1: + return None + + # default values if resources_per_trial is unspecified + if base_trial_resource is None: + base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}]) + + # Assume that the number of CPUs and GPUs can't go below + # what was specified in tune.run + min_cpu = base_trial_resource.required_resources.get("CPU", 0) + min_gpu = base_trial_resource.required_resources.get("GPU", 0) + + min_cpu_bundle = base_trial_resource._bundles[0].get("CPU", 0) + min_gpu_bundle = base_trial_resource._bundles[0].get("GPU", 0) + + # Get the number of CPUs and GPUs avaialble in total (not just free) + total_available_cpus = ( + trial_runner.trial_executor._avail_resources.cpu) + total_available_gpus = ( + trial_runner.trial_executor._avail_resources.gpu) + + # Set upper limits for resources based on number of live trials + # to ensure that the trial cannot get more resources that it's + # possible to run + num_running_trials = len(trial_runner.get_live_trials()) + if min_cpu == 0: + upper_cpu_limit = 0 + else: + upper_cpu_limit = math.ceil( + total_available_cpus / num_running_trials) + # Round to nearest bundle minimum + # eg. 8 CPUs between 3 trials with min 2 CPUs per bundle + # -> 4, 2, 2 + if self.add_bundles: + upper_cpu_limit = math.ceil( + upper_cpu_limit / min_cpu_bundle) * min_cpu_bundle + upper_cpu_limit = max(min_cpu, upper_cpu_limit) + + if min_gpu == 0: + upper_gpu_limit = 0 + else: + upper_gpu_limit = math.ceil( + total_available_gpus / num_running_trials) + # Ensure we don't go below per-bundle minimum + if self.add_bundles: + upper_gpu_limit = math.ceil( + upper_gpu_limit / min_cpu_bundle) * min_gpu_bundle + upper_gpu_limit = max(min_gpu, upper_gpu_limit) + + # Function to check how many CPUs and GPUs a trial is using currently + def get_used_cpus_and_gpus(t: Trial): + return (t.placement_group_factory.required_resources.get("CPU", 0), + t.placement_group_factory.required_resources.get("GPU", 0)) + + # Check how many CPUs and GPUs are currently being used by this trial + trial_used_cpus, trial_used_gpus = get_used_cpus_and_gpus(trial) + + # Check how many CPUs and GPUs are currently being used by live trials + used_cpus_and_gpus = [ + get_used_cpus_and_gpus(t) for t in trial_runner.get_live_trials() + ] + used_cpus, used_gpus = zip(*used_cpus_and_gpus) + used_cpus = sum(used_cpus) + used_gpus = sum(used_gpus) + + # Calculate how many free CPUs and GPUs there are + free_cpus = total_available_cpus - used_cpus + free_gpus = total_available_gpus - used_gpus + + # Add free CPUs and GPUs enforcing upper and lower limits + new_cpu = min(upper_cpu_limit, max(trial_used_cpus + free_cpus, + min_cpu)) + new_gpu = min(upper_gpu_limit, max(trial_used_gpus + free_gpus, + min_gpu)) + + # Assign new CPUs and GPUs to the trial in a PlacementGroupFactory + + # If self.add_bundles, make new bundles out of the resources + if self.add_bundles: + if min_cpu_bundle and min_gpu_bundle: + multiplier = min(new_cpu // min_cpu_bundle, + new_gpu // min_cpu_bundle) + elif min_gpu_bundle: + multiplier = new_gpu // min_cpu_bundle + else: + multiplier = new_cpu // min_cpu_bundle + new_bundles = [{ + "CPU": min_cpu_bundle, + "GPU": min_gpu_bundle + }] * int(multiplier) + # Otherwise, just put them all in one bundle + else: + new_bundles = [{"CPU": new_cpu, "GPU": new_gpu}] + return PlacementGroupFactory(new_bundles) + + +def evenly_distribute_cpus_gpus(trial_runner: "trial_runner.TrialRunner", + trial: Trial, result: Dict[str, Any], + scheduler: "ResourceChangingScheduler" + ) -> Union[None, PlacementGroupFactory]: """This is a basic resource allocating function. This function is used by default in ``ResourceChangingScheduler``. The function naively balances free resources (CPUs and GPUs) between trials, giving them all equal priority, ensuring that all resources - are always being used. If for some reason a trial ends up with - more resources than there are free ones, it will adjust downwards. + are always being used. All of the resources will be placed in one bundle. + If for some reason a trial ends up with + more resources than there are free ones, it will adjust downwards. It will also ensure that trial as at least as many resources as it started with (``base_trial_resource``). @@ -42,73 +158,48 @@ def evenly_distribute_cpus_gpus( Can be used to obtain information about other trials. trial (Trial): The trial to allocate new resources to. result (Dict[str, Any]): The latest results of trial. - base_trial_resource (Union[PlacementGroupFactory, Resources]): - Base trial resources as defined in - ``tune.run(resources_per_trial)`` + scheduler (ResourceChangingScheduler): The scheduler calling + the function. """ - if not isinstance(base_trial_resource, PlacementGroupFactory): - raise ValueError("evenly_distribute_cpus_gpus only supports" - " PlacementGroupFactories.") + return _DistributeResources(add_bundles=False)(trial_runner, trial, result, + scheduler) - # Don't bother if this is just the first iteration - if result["training_iteration"] < 1: - return None - # default values if resources_per_trial is unspecified - if base_trial_resource is None: - base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}]) +def evenly_distribute_cpus_gpus_distributed( + trial_runner: "trial_runner.TrialRunner", trial: Trial, + result: Dict[str, Any], scheduler: "ResourceChangingScheduler" +) -> Union[None, PlacementGroupFactory]: + """This is a basic resource allocating function. - # Assume that the number of CPUs and GPUs can't go below - # what was specified in tune.run - min_cpu = base_trial_resource.required_resources.get("CPU", 0) - min_gpu = base_trial_resource.required_resources.get("GPU", 0) + The function naively balances free resources (CPUs and GPUs) between + trials, giving them all equal priority, ensuring that all resources + are always being used. The free resources will be placed in new bundles. + This function assumes that all bundles are equal (there is no "head" + bundle). - # Get the number of CPUs and GPUs avaialble in total (not just free) - total_available_cpus = (trial_runner.trial_executor._avail_resources.cpu) - total_available_gpus = (trial_runner.trial_executor._avail_resources.gpu) + If for some reason a trial ends up with + more resources than there are free ones, it will adjust downwards. + It will also ensure that trial as at least as many resources as + it started with (``base_trial_resource``). - # Set upper limits for resources based on number of live trials - # to ensure that the trial cannot get more resources that it's - # possible to run - if min_cpu == 0: - upper_cpu_limit = 0 - else: - upper_cpu_limit = math.ceil(total_available_cpus / len( - trial_runner.get_live_trials()) / min_cpu) + This function returns a new ``PlacementGroupFactory`` with updated + resource requirements, or None. If the returned + ``PlacementGroupFactory`` is equal by value to the one the + trial has currently, the scheduler will skip the update process + internally (same with None). - if min_gpu == 0: - upper_gpu_limit = 0 - else: - upper_gpu_limit = math.ceil(total_available_gpus / len( - trial_runner.get_live_trials()) / min_gpu) + Args: + trial_runner (TrialRunner): Trial runner for this Tune run. + Can be used to obtain information about other trials. + trial (Trial): The trial to allocate new resources to. + result (Dict[str, Any]): The latest results of trial. + scheduler (ResourceChangingScheduler): The scheduler calling + the function. + """ - # Function to check how many CPUs and GPUs a trial is using currently - def get_used_cpus_and_gpus(t: Trial): - return (t.placement_group_factory.required_resources.get("CPU", 0), - t.placement_group_factory.required_resources.get("GPU", 0)) - - # Check how many CPUs and GPUs are currently being used by this trial - trial_used_cpus, trial_used_gpus = get_used_cpus_and_gpus(trial) - - # Check how many CPUs and GPUs are currently being used by live trials - used_cpus_and_gpus = [ - get_used_cpus_and_gpus(t) for t in trial_runner.get_live_trials() - ] - used_cpus, used_gpus = zip(*used_cpus_and_gpus) - used_cpus = sum(used_cpus) - used_gpus = sum(used_gpus) - - # Calculate how many free CPUs and GPUs there are - free_cpus = total_available_cpus - used_cpus - free_gpus = total_available_gpus - used_gpus - - # Add free CPUs and GPUs enforcing upper and lower limits - new_cpu = min(upper_cpu_limit, max(trial_used_cpus + free_cpus, min_cpu)) - new_gpu = min(upper_gpu_limit, max(trial_used_gpus + free_gpus, min_gpu)) - - # Assign new CPUs and GPUs to the trial in a PlacementGroupFactory - return PlacementGroupFactory([{"CPU": new_cpu, "GPU": new_gpu}]) + return _DistributeResources(add_bundles=True)(trial_runner, trial, result, + scheduler) class ResourceChangingScheduler(TrialScheduler): @@ -142,11 +233,10 @@ class ResourceChangingScheduler(TrialScheduler): live trial resource requiements during tuning. This function will be called on each trial as it finishes one step of training. The function must take four arguments: ``TrialRunner``, current - ``Trial``, current result :class:`dict` and the base trial - resource ``PlacementGroupFactory`` or ``Resource`` (depending on - whether placement groups are used). The function must return a - ``PlacementGroupFactory``, ``Resources``, :class:`dict` or None - (signifying no need for an update). If + ``Trial``, current result :class:`dict` and the + ``ResourceChangingScheduler`` calling it. The function must + return a ``PlacementGroupFactory``, ``Resources``, :class:`dict` + or None (signifying no need for an update). If ``resources_allocation_function`` is None, no resource requirements will be changed at any time. By default, :func:`evenly_distribute_cpus_gpus` will be used, @@ -168,7 +258,7 @@ class ResourceChangingScheduler(TrialScheduler): trial_runner: "trial_runner.TrialRunner", trial: Trial, result: Dict[str, Any], - base_trial_resource: Union[PlacementGroupFactory, Resources] + scheduler: "ResourceChangingScheduler" ) -> Union[None, PlacementGroupFactory, Resource]: # logic here # usage of PlacementGroupFactory is strongly preferred @@ -186,8 +276,8 @@ class ResourceChangingScheduler(TrialScheduler): self, base_scheduler: Optional[TrialScheduler] = None, resources_allocation_function: Optional[Callable[[ - "trial_runner.TrialRunner", Trial, Dict[str, Any], Union[ - PlacementGroupFactory, Resources] + "trial_runner.TrialRunner", Trial, Dict[str, Any], + "ResourceChangingScheduler" ], Union[None, PlacementGroupFactory, Resources]]] = evenly_distribute_cpus_gpus, ) -> None: @@ -209,6 +299,11 @@ class ResourceChangingScheduler(TrialScheduler): def metric(self): return self._base_scheduler._metric + @property + def base_trial_resources( + self) -> Optional[Union[Resources, PlacementGroupFactory]]: + return self._base_trial_resources + def set_search_properties(self, metric: Optional[str], mode: Optional[str]) -> bool: return self._base_scheduler.set_search_properties(metric, mode) @@ -275,11 +370,16 @@ class ResourceChangingScheduler(TrialScheduler): any_resources_changed = False + new_trials_to_reallocate = {} for trial, new_resources in self._trials_to_reallocate.items(): + if trial.status == Trial.RUNNING: + new_trials_to_reallocate[trial] = new_resources + logger.debug(f"{trial} is still running, skipping for now") + continue any_resources_changed = (any_resources_changed or self.set_trial_resources( trial, new_resources)) - self._trials_to_reallocate.clear() + self._trials_to_reallocate = new_trials_to_reallocate if any_resources_changed: # force reconcilation to ensure resource changes @@ -348,7 +448,7 @@ class ResourceChangingScheduler(TrialScheduler): return None new_resources = self._resources_allocation_function( - trial_runner, trial, result, self._base_trial_resources) + trial_runner, trial, result, self) # if we can check if the new resources are the same, # we do that here and skip resource allocation diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index 13cf9e2dd..70fb8945b 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -499,6 +499,8 @@ class Trainable: self, new_resources: Union[PlacementGroupFactory, Resources]): """Fires whenever Trainable resources are changed. + This method will be called before the checkpoint is loaded. + Args: new_resources (PlacementGroupFactory|Resources): Updated resources. Will be a PlacementGroupFactory if