[tune] ResourceChangingScheduler improvements (#17082)

This commit is contained in:
Antoni Baum 2021-07-15 16:03:27 +02:00 committed by GitHub
parent 649580d735
commit f20311f194
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 197 additions and 83 deletions

View file

@ -246,6 +246,11 @@ evenly_distribute_cpus_gpus
.. autofunction:: ray.tune.schedulers.resource_changing_scheduler.evenly_distribute_cpus_gpus
evenly_distribute_cpus_gpus_distributed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: ray.tune.schedulers.resource_changing_scheduler.evenly_distribute_cpus_gpus_distributed
FIFOScheduler
-------------

View file

@ -24,6 +24,7 @@ class BreastCancerTrainable(Trainable):
def setup(self, config):
self.config = config
self.nthread = config.pop("nthread", 1)
self.new_nthread = None
self.model: xgb.Booster = None
# Load dataset
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
@ -62,6 +63,9 @@ class BreastCancerTrainable(Trainable):
def load_checkpoint(self, checkpoint_path):
with open(checkpoint_path, "rb") as inputFile:
self.config, self.nthread, raw_model = pickle.load(inputFile)
if self.new_nthread:
self.nthread = self.new_nthread
self.new_nthread = None
self.model = Booster()
self.model.load_model(bytearray(raw_model))
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
@ -74,10 +78,11 @@ class BreastCancerTrainable(Trainable):
def update_resources(
self, new_resources: Union[PlacementGroupFactory, Resources]):
# this is called before `load_checkpoint`
if isinstance(new_resources, PlacementGroupFactory):
self.nthread = new_resources.head_cpus
self.new_nthread = new_resources.head_cpus
else:
self.nthread = new_resources.cpu
self.new_nthread = new_resources.cpu
def get_best_model_checkpoint(analysis):
@ -112,8 +117,7 @@ def tune_xgboost():
def example_resources_allocation_function(
trial_runner: "trial_runner.TrialRunner", trial: Trial,
result: Dict[str, Any],
base_trial_resource: Union[PlacementGroupFactory, Resources]
result: Dict[str, Any], scheduler: "ResourceChangingScheduler"
) -> Union[None, PlacementGroupFactory, Resources]:
"""This is a basic example of a resource allocating function.
@ -133,11 +137,14 @@ def tune_xgboost():
Can be used to obtain information about other trials.
trial (Trial): The trial to allocate new resources to.
result (Dict[str, Any]): The latest results of trial.
base_trial_resource (Union[PlacementGroupFactory, Resources]):
Base trial resources as defined in
``tune.run(resources_per_trial)``
scheduler (ResourceChangingScheduler): The scheduler calling
the function.
"""
# Get base trial resources as defined in
# ``tune.run(resources_per_trial)``
base_trial_resource = scheduler._base_trial_resources
# Don't bother if this is just the first iteration
if result["training_iteration"] < 1:
return None

View file

@ -451,8 +451,8 @@ class RayTrialExecutor(TrialExecutor):
if not runner:
return False
trial.set_runner(runner)
self.restore(trial, checkpoint)
self._notify_trainable_of_new_resources_if_needed(trial)
self.restore(trial, checkpoint)
self.set_status(trial, Trial.RUNNING)
if trial in self._staged_trials:

View file

@ -14,20 +14,136 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory
logger = logging.getLogger(__name__)
def evenly_distribute_cpus_gpus(
trial_runner: "trial_runner.TrialRunner", trial: Trial,
result: Dict[str, Any],
base_trial_resource: Union[PlacementGroupFactory, Resources]
) -> Union[None, PlacementGroupFactory, Resources]:
class _DistributeResources:
"""Generic functionality for resource allocation functions"""
def __init__(self, add_bundles: bool = False):
"""If add_bundles is True, create new bundles from free resources.
Otherwise, spread them among base_trial_resource bundles."""
self.add_bundles = add_bundles
def __call__(self, trial_runner: "trial_runner.TrialRunner", trial: Trial,
result: Dict[str, Any], scheduler: "ResourceChangingScheduler"
) -> Union[None, PlacementGroupFactory]:
# Get base trial resources as defined in
# ``tune.run(resources_per_trial)``
base_trial_resource = scheduler.base_trial_resources
if not isinstance(base_trial_resource, PlacementGroupFactory):
raise ValueError("evenly_distribute_cpus_gpus only supports"
" PlacementGroupFactories.")
# Don't bother if this is just the first iteration
if result["training_iteration"] < 1:
return None
# default values if resources_per_trial is unspecified
if base_trial_resource is None:
base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
# Assume that the number of CPUs and GPUs can't go below
# what was specified in tune.run
min_cpu = base_trial_resource.required_resources.get("CPU", 0)
min_gpu = base_trial_resource.required_resources.get("GPU", 0)
min_cpu_bundle = base_trial_resource._bundles[0].get("CPU", 0)
min_gpu_bundle = base_trial_resource._bundles[0].get("GPU", 0)
# Get the number of CPUs and GPUs avaialble in total (not just free)
total_available_cpus = (
trial_runner.trial_executor._avail_resources.cpu)
total_available_gpus = (
trial_runner.trial_executor._avail_resources.gpu)
# Set upper limits for resources based on number of live trials
# to ensure that the trial cannot get more resources that it's
# possible to run
num_running_trials = len(trial_runner.get_live_trials())
if min_cpu == 0:
upper_cpu_limit = 0
else:
upper_cpu_limit = math.ceil(
total_available_cpus / num_running_trials)
# Round to nearest bundle minimum
# eg. 8 CPUs between 3 trials with min 2 CPUs per bundle
# -> 4, 2, 2
if self.add_bundles:
upper_cpu_limit = math.ceil(
upper_cpu_limit / min_cpu_bundle) * min_cpu_bundle
upper_cpu_limit = max(min_cpu, upper_cpu_limit)
if min_gpu == 0:
upper_gpu_limit = 0
else:
upper_gpu_limit = math.ceil(
total_available_gpus / num_running_trials)
# Ensure we don't go below per-bundle minimum
if self.add_bundles:
upper_gpu_limit = math.ceil(
upper_gpu_limit / min_cpu_bundle) * min_gpu_bundle
upper_gpu_limit = max(min_gpu, upper_gpu_limit)
# Function to check how many CPUs and GPUs a trial is using currently
def get_used_cpus_and_gpus(t: Trial):
return (t.placement_group_factory.required_resources.get("CPU", 0),
t.placement_group_factory.required_resources.get("GPU", 0))
# Check how many CPUs and GPUs are currently being used by this trial
trial_used_cpus, trial_used_gpus = get_used_cpus_and_gpus(trial)
# Check how many CPUs and GPUs are currently being used by live trials
used_cpus_and_gpus = [
get_used_cpus_and_gpus(t) for t in trial_runner.get_live_trials()
]
used_cpus, used_gpus = zip(*used_cpus_and_gpus)
used_cpus = sum(used_cpus)
used_gpus = sum(used_gpus)
# Calculate how many free CPUs and GPUs there are
free_cpus = total_available_cpus - used_cpus
free_gpus = total_available_gpus - used_gpus
# Add free CPUs and GPUs enforcing upper and lower limits
new_cpu = min(upper_cpu_limit, max(trial_used_cpus + free_cpus,
min_cpu))
new_gpu = min(upper_gpu_limit, max(trial_used_gpus + free_gpus,
min_gpu))
# Assign new CPUs and GPUs to the trial in a PlacementGroupFactory
# If self.add_bundles, make new bundles out of the resources
if self.add_bundles:
if min_cpu_bundle and min_gpu_bundle:
multiplier = min(new_cpu // min_cpu_bundle,
new_gpu // min_cpu_bundle)
elif min_gpu_bundle:
multiplier = new_gpu // min_cpu_bundle
else:
multiplier = new_cpu // min_cpu_bundle
new_bundles = [{
"CPU": min_cpu_bundle,
"GPU": min_gpu_bundle
}] * int(multiplier)
# Otherwise, just put them all in one bundle
else:
new_bundles = [{"CPU": new_cpu, "GPU": new_gpu}]
return PlacementGroupFactory(new_bundles)
def evenly_distribute_cpus_gpus(trial_runner: "trial_runner.TrialRunner",
trial: Trial, result: Dict[str, Any],
scheduler: "ResourceChangingScheduler"
) -> Union[None, PlacementGroupFactory]:
"""This is a basic resource allocating function.
This function is used by default in ``ResourceChangingScheduler``.
The function naively balances free resources (CPUs and GPUs) between
trials, giving them all equal priority, ensuring that all resources
are always being used. If for some reason a trial ends up with
more resources than there are free ones, it will adjust downwards.
are always being used. All of the resources will be placed in one bundle.
If for some reason a trial ends up with
more resources than there are free ones, it will adjust downwards.
It will also ensure that trial as at least as many resources as
it started with (``base_trial_resource``).
@ -42,73 +158,48 @@ def evenly_distribute_cpus_gpus(
Can be used to obtain information about other trials.
trial (Trial): The trial to allocate new resources to.
result (Dict[str, Any]): The latest results of trial.
base_trial_resource (Union[PlacementGroupFactory, Resources]):
Base trial resources as defined in
``tune.run(resources_per_trial)``
scheduler (ResourceChangingScheduler): The scheduler calling
the function.
"""
if not isinstance(base_trial_resource, PlacementGroupFactory):
raise ValueError("evenly_distribute_cpus_gpus only supports"
" PlacementGroupFactories.")
return _DistributeResources(add_bundles=False)(trial_runner, trial, result,
scheduler)
# Don't bother if this is just the first iteration
if result["training_iteration"] < 1:
return None
# default values if resources_per_trial is unspecified
if base_trial_resource is None:
base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
def evenly_distribute_cpus_gpus_distributed(
trial_runner: "trial_runner.TrialRunner", trial: Trial,
result: Dict[str, Any], scheduler: "ResourceChangingScheduler"
) -> Union[None, PlacementGroupFactory]:
"""This is a basic resource allocating function.
# Assume that the number of CPUs and GPUs can't go below
# what was specified in tune.run
min_cpu = base_trial_resource.required_resources.get("CPU", 0)
min_gpu = base_trial_resource.required_resources.get("GPU", 0)
The function naively balances free resources (CPUs and GPUs) between
trials, giving them all equal priority, ensuring that all resources
are always being used. The free resources will be placed in new bundles.
This function assumes that all bundles are equal (there is no "head"
bundle).
# Get the number of CPUs and GPUs avaialble in total (not just free)
total_available_cpus = (trial_runner.trial_executor._avail_resources.cpu)
total_available_gpus = (trial_runner.trial_executor._avail_resources.gpu)
If for some reason a trial ends up with
more resources than there are free ones, it will adjust downwards.
It will also ensure that trial as at least as many resources as
it started with (``base_trial_resource``).
# Set upper limits for resources based on number of live trials
# to ensure that the trial cannot get more resources that it's
# possible to run
if min_cpu == 0:
upper_cpu_limit = 0
else:
upper_cpu_limit = math.ceil(total_available_cpus / len(
trial_runner.get_live_trials()) / min_cpu)
This function returns a new ``PlacementGroupFactory`` with updated
resource requirements, or None. If the returned
``PlacementGroupFactory`` is equal by value to the one the
trial has currently, the scheduler will skip the update process
internally (same with None).
if min_gpu == 0:
upper_gpu_limit = 0
else:
upper_gpu_limit = math.ceil(total_available_gpus / len(
trial_runner.get_live_trials()) / min_gpu)
Args:
trial_runner (TrialRunner): Trial runner for this Tune run.
Can be used to obtain information about other trials.
trial (Trial): The trial to allocate new resources to.
result (Dict[str, Any]): The latest results of trial.
scheduler (ResourceChangingScheduler): The scheduler calling
the function.
"""
# Function to check how many CPUs and GPUs a trial is using currently
def get_used_cpus_and_gpus(t: Trial):
return (t.placement_group_factory.required_resources.get("CPU", 0),
t.placement_group_factory.required_resources.get("GPU", 0))
# Check how many CPUs and GPUs are currently being used by this trial
trial_used_cpus, trial_used_gpus = get_used_cpus_and_gpus(trial)
# Check how many CPUs and GPUs are currently being used by live trials
used_cpus_and_gpus = [
get_used_cpus_and_gpus(t) for t in trial_runner.get_live_trials()
]
used_cpus, used_gpus = zip(*used_cpus_and_gpus)
used_cpus = sum(used_cpus)
used_gpus = sum(used_gpus)
# Calculate how many free CPUs and GPUs there are
free_cpus = total_available_cpus - used_cpus
free_gpus = total_available_gpus - used_gpus
# Add free CPUs and GPUs enforcing upper and lower limits
new_cpu = min(upper_cpu_limit, max(trial_used_cpus + free_cpus, min_cpu))
new_gpu = min(upper_gpu_limit, max(trial_used_gpus + free_gpus, min_gpu))
# Assign new CPUs and GPUs to the trial in a PlacementGroupFactory
return PlacementGroupFactory([{"CPU": new_cpu, "GPU": new_gpu}])
return _DistributeResources(add_bundles=True)(trial_runner, trial, result,
scheduler)
class ResourceChangingScheduler(TrialScheduler):
@ -142,11 +233,10 @@ class ResourceChangingScheduler(TrialScheduler):
live trial resource requiements during tuning. This function
will be called on each trial as it finishes one step of training.
The function must take four arguments: ``TrialRunner``, current
``Trial``, current result :class:`dict` and the base trial
resource ``PlacementGroupFactory`` or ``Resource`` (depending on
whether placement groups are used). The function must return a
``PlacementGroupFactory``, ``Resources``, :class:`dict` or None
(signifying no need for an update). If
``Trial``, current result :class:`dict` and the
``ResourceChangingScheduler`` calling it. The function must
return a ``PlacementGroupFactory``, ``Resources``, :class:`dict`
or None (signifying no need for an update). If
``resources_allocation_function`` is None, no resource
requirements will be changed at any time.
By default, :func:`evenly_distribute_cpus_gpus` will be used,
@ -168,7 +258,7 @@ class ResourceChangingScheduler(TrialScheduler):
trial_runner: "trial_runner.TrialRunner",
trial: Trial,
result: Dict[str, Any],
base_trial_resource: Union[PlacementGroupFactory, Resources]
scheduler: "ResourceChangingScheduler"
) -> Union[None, PlacementGroupFactory, Resource]:
# logic here
# usage of PlacementGroupFactory is strongly preferred
@ -186,8 +276,8 @@ class ResourceChangingScheduler(TrialScheduler):
self,
base_scheduler: Optional[TrialScheduler] = None,
resources_allocation_function: Optional[Callable[[
"trial_runner.TrialRunner", Trial, Dict[str, Any], Union[
PlacementGroupFactory, Resources]
"trial_runner.TrialRunner", Trial, Dict[str, Any],
"ResourceChangingScheduler"
], Union[None, PlacementGroupFactory,
Resources]]] = evenly_distribute_cpus_gpus,
) -> None:
@ -209,6 +299,11 @@ class ResourceChangingScheduler(TrialScheduler):
def metric(self):
return self._base_scheduler._metric
@property
def base_trial_resources(
self) -> Optional[Union[Resources, PlacementGroupFactory]]:
return self._base_trial_resources
def set_search_properties(self, metric: Optional[str],
mode: Optional[str]) -> bool:
return self._base_scheduler.set_search_properties(metric, mode)
@ -275,11 +370,16 @@ class ResourceChangingScheduler(TrialScheduler):
any_resources_changed = False
new_trials_to_reallocate = {}
for trial, new_resources in self._trials_to_reallocate.items():
if trial.status == Trial.RUNNING:
new_trials_to_reallocate[trial] = new_resources
logger.debug(f"{trial} is still running, skipping for now")
continue
any_resources_changed = (any_resources_changed
or self.set_trial_resources(
trial, new_resources))
self._trials_to_reallocate.clear()
self._trials_to_reallocate = new_trials_to_reallocate
if any_resources_changed:
# force reconcilation to ensure resource changes
@ -348,7 +448,7 @@ class ResourceChangingScheduler(TrialScheduler):
return None
new_resources = self._resources_allocation_function(
trial_runner, trial, result, self._base_trial_resources)
trial_runner, trial, result, self)
# if we can check if the new resources are the same,
# we do that here and skip resource allocation

View file

@ -499,6 +499,8 @@ class Trainable:
self, new_resources: Union[PlacementGroupFactory, Resources]):
"""Fires whenever Trainable resources are changed.
This method will be called before the checkpoint is loaded.
Args:
new_resources (PlacementGroupFactory|Resources):
Updated resources. Will be a PlacementGroupFactory if