[Tune] Remove legacy resources implementations in Runner and Executor. (#19773)

This commit is contained in:
xwjiang2010 2021-11-12 12:33:39 -08:00 committed by GitHub
parent 73e570c426
commit cdf70c2900
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 310 additions and 548 deletions

View file

@ -78,7 +78,7 @@ Use :ref:`tune.run <tune-run-ref>` to execute hyperparameter tuning. This functi
Each trial has
- a hyperparameter configuration (``trial.config``), id (``trial.trial_id``)
- a resource specification (``resources_per_trial`` or ``trial.resources``)
- a resource specification (``resources_per_trial`` or ``trial.placement_group_factory``)
- And other configuration values.
Each trial is also associated with one instance of a :ref:`Trainable <trainable-docs>`. You can access trial objects through the :ref:`Analysis object <tune-concepts-analysis>` provided after ``tune.run`` finishes.

View file

@ -884,8 +884,6 @@ These are the environment variables Ray Tune currently considers:
with the parameter values in them)
* **TUNE_MAX_PENDING_TRIALS_PG**: Maximum number of pending trials when placement groups are used. Defaults
to ``auto``, which will be updated to ``max(16, cluster_cpus * 1.1)`` for random/grid search and ``1`` for any other search algorithms.
* **TUNE_PLACEMENT_GROUP_AUTO_DISABLED**: Ray Tune automatically uses placement groups
instead of the legacy resource requests. Setting this to 1 enables legacy placement.
* **TUNE_PLACEMENT_GROUP_CLEANUP_DISABLED**: Ray Tune cleans up existing placement groups
with the ``_tune__`` prefix in their name before starting a run. This is used to make sure
that scheduled placement groups are removed when multiple calls to ``tune.run()`` are

View file

@ -10,13 +10,13 @@ from filelock import FileLock
import ray
from ray import tune
from ray.tune.resources import Resources
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.logger import NoopLogger
from ray.tune.trainable import DistributedTrainable
from ray.tune.function_runner import wrap_function
from ray.tune.logger import NoopLogger
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.trainable import DistributedTrainable
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import TrainableUtil
from horovod.ray import RayExecutor
logger = logging.getLogger(__name__)
@ -250,15 +250,10 @@ def DistributedTrainableCreator(
@classmethod
def default_resource_request(cls, config: Dict):
extra_gpu = int(num_hosts * num_slots) * int(use_gpu)
extra_cpu = int(num_hosts * num_slots * num_cpus_per_slot)
return Resources(
cpu=0,
gpu=0,
extra_cpu=extra_cpu,
extra_gpu=extra_gpu,
)
return PlacementGroupFactory([{}] + [{
"CPU": cls._num_cpus_per_slot,
"GPU": int(use_gpu)
}] * (num_hosts * num_slots))
return WrappedHorovodTrainable

View file

@ -3,13 +3,14 @@ import logging
import ray
import os
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.function_runner import wrap_function
from ray.tune.resources import Resources
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.trainable import DistributedTrainable
from ray.util.placement_group import remove_placement_group
from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil
from ray.tune.utils import detect_checkpoint_function, find_free_port
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil
from ray.util.placement_group import remove_placement_group
from typing import Callable, Dict, Type, Optional
logger = logging.getLogger(__name__)
@ -182,11 +183,10 @@ def DistributedTrainableCreator(
@classmethod
def default_resource_request(cls, config: Dict) -> Resources:
return Resources(
cpu=0,
gpu=0,
extra_cpu=num_workers * num_cpus_per_worker,
extra_gpu=num_workers * num_gpus_per_worker)
return PlacementGroupFactory([{}] + [{
"CPU": cls._num_cpus_per_worker,
"GPU": cls._num_gpus_per_worker
}] * num_workers)
return WrappedDistributedTensorFlowTrainable

View file

@ -15,8 +15,8 @@ from ray import tune
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.logger import NoopLogger
from ray.tune.function_runner import wrap_function
from ray.tune.resources import Resources
from ray.tune.trainable import DistributedTrainable
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil
from ray.tune.utils import detect_checkpoint_function
from ray.util.sgd.torch.utils import setup_process_group, setup_address
@ -210,13 +210,12 @@ def DistributedTrainableCreator(func: Callable,
return dict(timeout=timedelta(seconds=timeout_s), backend=backend)
@classmethod
def default_resource_request(cls, config: Dict) -> Resources:
return Resources(
cpu=0,
gpu=0,
extra_cpu=num_cpus_per_worker * num_workers,
extra_gpu=num_gpus_per_worker * num_workers)
def default_resource_request(cls,
config: Dict) -> PlacementGroupFactory:
return PlacementGroupFactory([{}] + [{
"CPU": cls._num_cpus_per_worker,
"GPU": cls._num_gpus_per_worker
}] * num_workers)
return WrappedDistributedTorchTrainable

View file

@ -188,7 +188,6 @@ class RayTrialExecutor(TrialExecutor):
self._cached_actor_pg = deque(maxlen=1)
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._pg_manager = PlacementGroupManager(prefix=get_tune_pg_prefix())
self._staged_trials = set()
self._just_staged_trials = set()
@ -256,8 +255,6 @@ class RayTrialExecutor(TrialExecutor):
for trial in trials:
if trial.status != Trial.PENDING:
continue
if not trial.uses_placement_groups:
continue
if trial in self._staged_trials:
continue
if self._pg_manager.trial_in_use(trial):
@ -298,7 +295,7 @@ class RayTrialExecutor(TrialExecutor):
f"{existing_runner}")
trial.set_runner(existing_runner)
if pg and trial.uses_placement_groups:
if pg:
self._pg_manager.assign_cached_pg(pg, trial)
if not self.reset_trial(trial, trial.config, trial.experiment_tag,
@ -327,53 +324,44 @@ class RayTrialExecutor(TrialExecutor):
f"a string, make sure the trainable was registered before.")
_actor_cls = _class_cache.get(trainable_cls)
if trial.uses_placement_groups:
if not self._pg_manager.has_ready(trial, update=True):
if trial not in self._staged_trials:
if self._pg_manager.stage_trial_pg(trial):
self._staged_trials.add(trial)
self._just_staged_trials.add(trial)
if not self._pg_manager.has_ready(trial, update=True):
if trial not in self._staged_trials:
if self._pg_manager.stage_trial_pg(trial):
self._staged_trials.add(trial)
self._just_staged_trials.add(trial)
just_staged = trial in self._just_staged_trials
just_staged = trial in self._just_staged_trials
# This part of the code is mostly here for testing
# purposes. If self._wait_for_pg is set, we will wait here
# for that many seconds until the placement group is ready.
# This ensures that the trial can be started right away and
# not just in the next step() of the trial runner.
# We only do this if we have reason to believe that resources
# will be ready, soon, i.e. when a) we just staged the PG,
# b) another trial just exited, freeing resources, or c)
# when there are no currently running trials.
if self._wait_for_pg is not None and (
just_staged or self._trial_just_finished_before
or not self.get_running_trials()):
logger.debug(
f"Waiting up to {self._wait_for_pg} seconds for "
f"placement group of trial {trial} to become ready.")
wait_end = time.monotonic() + self._wait_for_pg
while time.monotonic() < wait_end:
self._pg_manager.update_status()
if self._pg_manager.has_ready(trial):
break
time.sleep(0.1)
else:
return None
if not self._pg_manager.has_ready(trial):
# PG may have become ready during waiting period
# This part of the code is mostly here for testing
# purposes. If self._wait_for_pg is set, we will wait here
# for that many seconds until the placement group is ready.
# This ensures that the trial can be started right away and
# not just in the next step() of the trial runner.
# We only do this if we have reason to believe that resources
# will be ready, soon, i.e. when a) we just staged the PG,
# b) another trial just exited, freeing resources, or c)
# when there are no currently running trials.
if self._wait_for_pg is not None and (
just_staged or self._trial_just_finished_before
or not self.get_running_trials()):
logger.debug(
f"Waiting up to {self._wait_for_pg} seconds for "
f"placement group of trial {trial} to become ready.")
wait_end = time.monotonic() + self._wait_for_pg
while time.monotonic() < wait_end:
self._pg_manager.update_status()
if self._pg_manager.has_ready(trial):
break
time.sleep(0.1)
else:
return None
full_actor_class = self._pg_manager.get_full_actor_cls(
trial, _actor_cls)
else:
full_actor_class = _actor_cls.options(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
memory=trial.resources.memory or None,
object_store_memory=trial.resources.object_store_memory
or None,
resources=trial.resources.custom_resources)
if not self._pg_manager.has_ready(trial):
# PG may have become ready during waiting period
return None
full_actor_class = self._pg_manager.get_full_actor_cls(
trial, _actor_cls)
# Clear the Trial's location (to be updated later on result)
# since we don't know where the remote runner is placed.
trial.set_location(Location())
@ -509,8 +497,7 @@ class RayTrialExecutor(TrialExecutor):
try:
ray.get(
trainable._update_resources.remote(
trial.placement_group_factory if trial.
uses_placement_groups else trial.resources),
trial.placement_group_factory),
timeout=DEFAULT_GET_TIMEOUT)
except GetTimeoutError:
logger.exception(
@ -541,7 +528,7 @@ class RayTrialExecutor(TrialExecutor):
logger.debug("Reusing actor for %s", trial.runner)
# Move PG into cache (disassociate from trial)
pg = self._pg_manager.cache_trial_pg(trial)
if pg or not trial.uses_placement_groups:
if pg:
# True if a placement group was replaced
self._cached_actor_pg.append((trial.runner, pg))
should_destroy_actor = False
@ -593,8 +580,6 @@ class RayTrialExecutor(TrialExecutor):
True if the remote runner has been started. False if trial was
not started (e.g. because of lacking resources/pending PG).
"""
if not trial.uses_placement_groups:
self._commit_resources(trial.resources)
try:
return self._start_trial(trial, checkpoint, train=train)
except AbortTrialExecution:
@ -626,8 +611,6 @@ class RayTrialExecutor(TrialExecutor):
self._stop_trial(trial, error=error, error_msg=error_msg)
if prior_status == Trial.RUNNING:
logger.debug("Trial %s: Returning resources.", trial)
if not trial.uses_placement_groups:
self._return_resources(trial.resources)
out = self._find_item(self._running, trial)
for result_id in out:
self._running.pop(result_id)
@ -774,42 +757,6 @@ class RayTrialExecutor(TrialExecutor):
return [result]
return result
def _commit_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))
custom_resources = {
k: committed.get(k) + resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
committed.cpu + resources.cpu_total(),
committed.gpu + resources.gpu_total(),
committed.memory + resources.memory_total(),
committed.object_store_memory +
resources.object_store_memory_total(),
custom_resources=custom_resources)
def _return_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))
custom_resources = {
k: committed.get(k) - resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
committed.cpu - resources.cpu_total(),
committed.gpu - resources.gpu_total(),
custom_resources=custom_resources)
assert self._committed_resources.is_nonnegative(), (
"Resource invalid: {}".format(resources))
def _update_avail_resources(self, num_retries=5):
if time.time() - self._last_resource_refresh < self._refresh_period:
return
@ -852,7 +799,7 @@ class RayTrialExecutor(TrialExecutor):
self._resources_initialized = True
def has_resources_for_trial(self, trial: Trial) -> bool:
"""Returns whether this runner has resources available for this trial.
"""Returns whether there are resources available for this trial.
This will return True as long as we didn't reach the maximum number
of pending trials. It will also return True if the trial placement
@ -865,46 +812,13 @@ class RayTrialExecutor(TrialExecutor):
boolean
"""
if trial.uses_placement_groups:
return trial in self._staged_trials or self._pg_manager.can_stage(
) or self._pg_manager.has_ready(
trial, update=True)
return self.has_resources(trial.resources)
def has_resources(self, resources: Resources) -> bool:
"""Returns whether this runner has at least the specified resources.
This refreshes the Ray cluster resources if the time since last update
has exceeded self._refresh_period. This also assumes that the
cluster is not resizing very frequently.
"""
if resources.has_placement_group:
return self._pg_manager.can_stage()
self._update_avail_resources()
currently_available = Resources.subtract(self._avail_resources,
self._committed_resources)
have_space = (
resources.cpu_total() <= currently_available.cpu
and resources.gpu_total() <= currently_available.gpu
and resources.memory_total() <= currently_available.memory
and resources.object_store_memory_total() <=
currently_available.object_store_memory and all(
resources.get_res_total(res) <= currently_available.get(res)
for res in resources.custom_resources))
if have_space:
# The assumption right now is that we block all trials if one
# trial is queued.
return True
return False
return trial in self._staged_trials or self._pg_manager.can_stage(
) or self._pg_manager.has_ready(
trial, update=True)
def debug_string(self) -> str:
"""Returns a human readable message for printing to the console."""
total_resources = self._pg_manager.total_used_resources(
self._committed_resources)
total_resources = self._pg_manager.occupied_resources()
if self._resources_initialized:
status = ("Resources requested: {}/{} CPUs, {}/{} GPUs, "
@ -932,25 +846,6 @@ class RayTrialExecutor(TrialExecutor):
else:
return "Resources requested: ?"
def resource_string(self) -> str:
"""Returns a string describing the total resources available."""
if self._resources_initialized:
res_str = ("{} CPUs, {} GPUs, "
"{} GiB heap, {} GiB objects".format(
self._avail_resources.cpu,
self._avail_resources.gpu,
_to_gb(self._avail_resources.memory),
_to_gb(self._avail_resources.object_store_memory)))
if self._avail_resources.custom_resources:
custom = ", ".join(
"{} {}".format(
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources)
res_str += " ({})".format(custom)
return res_str
else:
return "? CPUs, ? GPUs"
def on_step_begin(self, trials: List[Trial]) -> None:
"""Before step() is called, update the available resources."""
self._update_avail_resources()

View file

@ -119,8 +119,8 @@ class HyperBandForBOHB(HyperBandScheduler):
scrubbed = [b for b in hyperband if b is not None]
for bracket in scrubbed:
for trial in bracket.current_trials():
if (trial.status == Trial.PENDING
and trial_runner.has_resources_for_trial(trial)):
if (trial.status == Trial.PENDING and trial_runner.
trial_executor.has_resources_for_trial(trial)):
return trial
# MAIN CHANGE HERE!
if not any(t.status == Trial.RUNNING

View file

@ -304,8 +304,8 @@ class HyperBandScheduler(FIFOScheduler):
for bracket in sorted(
scrubbed, key=lambda b: b.completion_percentage()):
for trial in bracket.current_trials():
if (trial.status == Trial.PENDING
and trial_runner.has_resources_for_trial(trial)):
if (trial.status == Trial.PENDING and trial_runner.
trial_executor.has_resources_for_trial(trial)):
return trial
return None

View file

@ -640,7 +640,7 @@ class PopulationBasedTraining(FIFOScheduler):
candidates = []
for trial in trial_runner.get_trials():
if trial.status in [Trial.PENDING, Trial.PAUSED] and \
trial_runner.has_resources_for_trial(trial):
trial_runner.trial_executor.has_resources_for_trial(trial):
if not self._synch:
candidates.append(trial)
elif self._trial_state[trial].last_train_time < \

View file

@ -319,19 +319,13 @@ class ResourceChangingScheduler(TrialScheduler):
trial: Trial, **kwargs):
# use the first trial resources as the base
if self._base_trial_resources is None:
if trial.uses_placement_groups:
self._base_trial_resources = trial.placement_group_factory
else:
self._base_trial_resources = trial.resources
self._base_trial_resources = trial.placement_group_factory
# Raise error if the resources of a newly added trial don't match
# base resources, but allow trials that have already had their
# resources changed by ResourceChangingScheduler
# (those can be added again during loading from a checkpoint)
elif trial.trial_id not in self._reallocated_trial_ids:
if trial.uses_placement_groups:
trial_resources = trial.placement_group_factory
else:
trial_resources = trial.resources
trial_resources = trial.placement_group_factory
if trial_resources != self._base_trial_resources:
raise RuntimeError(
"ResourceChangingScheduler doesn't support trials with "
@ -413,8 +407,7 @@ class ResourceChangingScheduler(TrialScheduler):
def set_trial_resources(
self, trial: Trial,
new_resources: Union[Dict, Callable, PlacementGroupFactory]
) -> bool:
new_resources: Union[Dict, PlacementGroupFactory]) -> bool:
"""Returns True if new_resources were set."""
if new_resources:
logger.info(f"Setting trial {trial} resource to {new_resources}")
@ -434,16 +427,15 @@ class ResourceChangingScheduler(TrialScheduler):
Only checks for PlacementGroupFactories at this moment.
"""
if trial.uses_placement_groups:
if (isinstance(new_resources, PlacementGroupFactory)
and trial.placement_group_factory == new_resources):
logger.debug(
f"{trial} PGF "
f"{trial.placement_group_factory.required_resources}"
f" and {new_resources.required_resources}"
f" are the same, skipping")
return True
return False
if (isinstance(new_resources, PlacementGroupFactory)
and trial.placement_group_factory == new_resources):
logger.debug(f"{trial} PGF "
f"{trial.placement_group_factory.required_resources}"
f" and {new_resources.required_resources}"
f" are the same, skipping")
return True
else:
return False
def reallocate_trial_resources_if_needed(
self, trial_runner: "trial_runner.TrialRunner", trial: Trial,

View file

@ -140,11 +140,13 @@ class FIFOScheduler(TrialScheduler):
self, trial_runner: "trial_runner.TrialRunner") -> Optional[Trial]:
for trial in trial_runner.get_trials():
if (trial.status == Trial.PENDING
and trial_runner.has_resources_for_trial(trial)):
and trial_runner.trial_executor.has_resources_for_trial(
trial)):
return trial
for trial in trial_runner.get_trials():
if (trial.status == Trial.PAUSED
and trial_runner.has_resources_for_trial(trial)):
and trial_runner.trial_executor.has_resources_for_trial(
trial)):
return trial
return None

View file

@ -253,8 +253,6 @@ class TrainableFunctionApiTest(unittest.TestCase):
self.assertEqual(trial.last_result[TIMESTEPS_TOTAL], steps)
def testBuiltInTrainableResources(self):
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"
class B(Trainable):
@classmethod
def default_resource_request(cls, config):
@ -274,15 +272,56 @@ class TrainableFunctionApiTest(unittest.TestCase):
"gpu": gpus,
},
}
})[0]
}, )[0]
# Should all succeed
self.assertEqual(f(0, 0).status, Trial.TERMINATED)
# TODO(xwjiang): https://github.com/ray-project/ray/issues/19959
# self.assertEqual(f(0, 0).status, Trial.TERMINATED)
# Too large resource request
self.assertRaises(TuneError, lambda: f(100, 100))
self.assertRaises(TuneError, lambda: f(0, 100))
self.assertRaises(TuneError, lambda: f(100, 0))
# TODO(xwjiang): Make FailureInjectorCallback a test util.
class FailureInjectorCallback(Callback):
"""Adds random failure injection to the TrialExecutor."""
def __init__(self, steps=4):
self._step = 0
self.steps = steps
def on_step_begin(self, iteration, trials, **info):
self._step += 1
if self._step >= self.steps:
raise RuntimeError
def g(cpus, gpus):
return run_experiments(
{
"foo": {
"run": "B",
"config": {
"cpu": cpus,
"gpu": gpus,
},
}
},
callbacks=[FailureInjectorCallback()],
)[0]
# Too large resource requests are infeasible
# TODO(xwjiang): Throw TuneError after https://github.com/ray-project/ray/issues/19985. # noqa
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "0"
with self.assertRaises(RuntimeError), patch.object(
ray.tune.trial_executor.logger, "warning") as warn_mock:
self.assertRaises(TuneError, lambda: g(100, 100))
assert warn_mock.assert_called_once()
with self.assertRaises(RuntimeError), patch.object(
ray.tune.trial_executor.logger, "warning") as warn_mock:
self.assertRaises(TuneError, lambda: g(0, 100))
assert warn_mock.assert_called_once()
with self.assertRaises(RuntimeError), patch.object(
ray.tune.trial_executor.logger, "warning") as warn_mock:
self.assertRaises(TuneError, lambda: g(100, 0))
assert warn_mock.assert_called_once()
def testRewriteEnv(self):
def train(config, reporter):

View file

@ -110,7 +110,6 @@ def start_connected_emptyhead_cluster():
def test_counting_resources(start_connected_cluster):
"""Tests that Tune accounting is consistent with actual cluster."""
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"
cluster = start_connected_cluster
nodes = []
@ -122,7 +121,7 @@ def test_counting_resources(start_connected_cluster):
for t in trials:
runner.add_trial(t)
runner.step() # run 1
runner.step()
running_trials = _get_running_trials(runner)
assert len(running_trials) == 1
assert _check_trial_running(running_trials[0])
@ -133,7 +132,8 @@ def test_counting_resources(start_connected_cluster):
cluster.remove_node(nodes.pop())
cluster.wait_for_nodes()
assert ray.cluster_resources()["CPU"] == 1
runner.step() # run 2
runner.step()
# Only 1 trial can be running due to resource limitation.
assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 1
for i in range(5):
@ -141,7 +141,11 @@ def test_counting_resources(start_connected_cluster):
cluster.wait_for_nodes()
assert ray.cluster_resources()["CPU"] == 6
runner.step() # 1 result
# This is to make sure that pg is ready for the previous pending trial,
# so that when runner.step() is called next, the trial can be started in
# the same event loop.
time.sleep(5)
runner.step()
assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 2
@ -311,22 +315,18 @@ def test_trial_migration(start_connected_emptyhead_cluster, trainable_id):
@pytest.mark.parametrize("trainable_id", ["__fake", "__fake_durable"])
@pytest.mark.parametrize("with_pg", [True, False])
def test_trial_requeue(start_connected_emptyhead_cluster, trainable_id,
with_pg):
def test_trial_requeue(start_connected_emptyhead_cluster, trainable_id):
"""Removing a node in full cluster causes Trial to be requeued."""
os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1"
if not with_pg:
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"
cluster = start_connected_emptyhead_cluster
node = cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()
syncer_callback = _PerTrialSyncerCallback(
lambda trial: trial.trainable_name == "__fake")
runner = TrialRunner(BasicVariantGenerator(), callbacks=[syncer_callback])
runner = TrialRunner(
BasicVariantGenerator(), callbacks=[syncer_callback]) # noqa
kwargs = {
"stopping_criterion": {
"training_iteration": 5
@ -357,11 +357,6 @@ def test_trial_requeue(start_connected_emptyhead_cluster, trainable_id,
assert all(
t.status == Trial.PENDING for t in trials), runner.debug_string()
if not with_pg:
# Only raises if placement groups are not used
with pytest.raises(TuneError):
runner.step()
@pytest.mark.parametrize("trainable_id", ["__fake_remote", "__fake_durable"])
def test_migration_checkpoint_removal(start_connected_emptyhead_cluster,

View file

@ -20,7 +20,7 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory
class Trial(
namedtuple("MockTrial", [
"config", "trial_id", "trial_name", "trainable_name",
"uses_placement_groups", "placement_group_factory"
"placement_group_factory"
])):
def __hash__(self):
return hash(self.trial_id)
@ -89,7 +89,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbLegacyLoggerConfig(self):
trial_config = {"par1": 4, "par2": 9.12345678}
trial = Trial(trial_config, 0, "trial_0", "trainable", True,
trial = Trial(trial_config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))
@ -175,7 +175,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbLegacyLoggerReporting(self):
trial_config = {"par1": 4, "par2": 9.12345678}
trial = Trial(trial_config, 0, "trial_0", "trainable", True,
trial = Trial(trial_config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))
@ -210,7 +210,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbLoggerConfig(self):
trial_config = {"par1": 4, "par2": 9.12345678}
trial = Trial(trial_config, 0, "trial_0", "trainable", True,
trial = Trial(trial_config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))
@ -288,7 +288,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbLoggerReporting(self):
trial_config = {"par1": 4, "par2": 9.12345678}
trial = Trial(trial_config, 0, "trial_0", "trainable", True,
trial = Trial(trial_config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))
@ -320,7 +320,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbMixinConfig(self):
config = {"par1": 4, "par2": 9.12345678}
trial = Trial(config, 0, "trial_0", "trainable", True,
trial = Trial(config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))
@ -381,7 +381,7 @@ class WandbIntegrationTest(unittest.TestCase):
def testWandbDecoratorConfig(self):
config = {"par1": 4, "par2": 9.12345678}
trial = Trial(config, 0, "trial_0", "trainable", True,
trial = Trial(config, 0, "trial_0", "trainable",
PlacementGroupFactory([{
"CPU": 1
}]))

View file

@ -363,37 +363,6 @@ class RayExecutorPlacementGroupTest(unittest.TestCase):
self.cluster.shutdown()
_register_all() # re-register the evicted objects
def testResourcesAvailableNoPlacementGroup(self):
def train(config):
tune.report(metric=0, resources=ray.available_resources())
out = tune.run(
train,
resources_per_trial={
"cpu": 1,
"gpu": 1,
"custom_resources": {
"custom": 3
},
"extra_cpu": 3,
"extra_gpu": 1,
"extra_custom_resources": {
"custom": 4
},
})
# Only `cpu`, `gpu`, and `custom_resources` will be "really" reserved,
# the extra_* will just be internally reserved by Tune.
self.assertDictEqual({
key: val
for key, val in out.trials[0].last_result["resources"].items()
if key in ["CPU", "GPU", "custom"]
}, {
"CPU": self.head_cpus - 1.0,
"GPU": self.head_gpus - 1.0,
"custom": self.head_custom - 3.0
})
def testResourcesAvailableWithPlacementGroup(self):
def train(config):
tune.report(metric=0, resources=ray.available_resources())

View file

@ -47,12 +47,6 @@ class TestTrialExecutorInheritance(unittest.TestCase):
def get_running_trials(self):
return []
def has_resources(self):
return False
def resource_string(self):
return "This is a resource string."
msg = ("_MyTrialExecutor inherits from TrialExecutor, which is being "
"deprecated. "
"RFC: https://github.com/ray-project/ray/issues/17593. "

View file

@ -8,11 +8,12 @@ from ray.rllib import _register_all
from ray import tune
from ray.tune import TuneError, register_trainable
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.resources import Resources
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.resources import Resources
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.utils.placement_groups import PlacementGroupFactory
class TrialRunnerTest(unittest.TestCase):
@ -77,7 +78,12 @@ class TrialRunnerTest(unittest.TestCase):
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=0, extra_cpu=3, extra_gpu=1),
"placement_group_factory": PlacementGroupFactory([{
"CPU": 1
}, {
"CPU": 3,
"GPU": 1
}]),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
@ -98,7 +104,10 @@ class TrialRunnerTest(unittest.TestCase):
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}),
"placement_group_factory": PlacementGroupFactory([{
"CPU": 1,
"a": 2
}]),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
@ -118,8 +127,11 @@ class TrialRunnerTest(unittest.TestCase):
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(
cpu=1, gpu=0, extra_custom_resources={"a": 2}),
"placement_group_factory": PlacementGroupFactory([{
"CPU": 1
}, {
"a": 2
}]),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
@ -134,18 +146,6 @@ class TrialRunnerTest(unittest.TestCase):
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
def testCustomResources2(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner()
resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource1))
resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource2))
resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource3))
resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource4))
def testFractionalGpus(self):
ray.init(num_cpus=4, num_gpus=1)
runner = TrialRunner()
@ -261,7 +261,6 @@ class TrialRunnerTest(unittest.TestCase):
def testChangeResources(self):
"""Checks that resource requirements can be changed on fly."""
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"
ray.init(num_cpus=2)
class ChangingScheduler(FIFOScheduler):
@ -286,13 +285,17 @@ class TrialRunnerTest(unittest.TestCase):
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._committed_resources.cpu, 1)
self.assertEqual(
runner.trial_executor._pg_manager.occupied_resources().get("CPU"),
1)
self.assertRaises(
ValueError, lambda: trials[0].update_resources(dict(cpu=2, gpu=0)))
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._committed_resources.cpu, 2)
self.assertEqual(
runner.trial_executor._pg_manager.occupied_resources().get("CPU"),
2)
def testQueueFilling(self):
os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1"

View file

@ -1,7 +1,6 @@
import os
import sys
import unittest
from unittest.mock import patch
import ray
from ray.rllib import _register_all
@ -133,43 +132,6 @@ class TrialRunnerTest2(unittest.TestCase):
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 0)
def testFailureRecoveryNodeRemoval(self):
# Node removal simulation only works with resource requests
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 1,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
with patch("ray.cluster_resources") as resource_mock:
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step() # Start trial
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step() # Process result, dispatch save
runner.step() # Process save
self.assertEqual(trials[0].status, Trial.RUNNING)
# Mimic a node failure
resource_mock.return_value = {"CPU": 0, "GPU": 0}
runner.step() # Detect node failure
self.assertEqual(trials[0].status, Trial.PENDING)
self.assertEqual(trials[0].num_failures, 1)
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 1)
def testFailureRecoveryMaxFailures(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()

View file

@ -244,18 +244,16 @@ class TrialRunnerPlacementGroupTest(unittest.TestCase):
self.testPlacementGroupDistributedTraining(reuse_actors=True)
class PlacementGroupNoAutoSetupTest(unittest.TestCase):
def testPlacementGroupNoCPUDriver(self):
"""Bundles with only GPU:1 but no CPU should work"""
ray.init(num_gpus=1, num_cpus=1)
def test_placement_group_no_cpu_trainer():
"""Bundles with only GPU:1 but no CPU should work"""
ray.init(num_gpus=1, num_cpus=1)
pgf = PlacementGroupFactory([{"GPU": 1, "CPU": 0}, {"CPU": 1}])
pgf = PlacementGroupFactory([{"GPU": 1, "CPU": 0}, {"CPU": 1}])
def train(config):
time.sleep(1)
return 5
def train(config):
time.sleep(1)
return 5
tune.run(train, resources_per_trial=pgf)
tune.run(train, resources_per_trial=pgf)
if __name__ == "__main__":

View file

@ -255,11 +255,8 @@ class _MockTrialExecutor(TrialExecutor):
def get_running_trials(self):
return []
def has_resources(self):
return False
def resource_string(self):
return "This is a mock resource_string."
def has_resources_for_trial(self, trial: Trial):
return True
class _MockTrialRunner():
@ -292,12 +289,6 @@ class _MockTrialRunner():
def get_trials(self):
return self.trials
def has_resources_for_trial(self, trial):
return True
def has_resources(self, resources):
return True
def _pause_trial(self, trial):
self.trial_executor.save(trial, Checkpoint.MEMORY, None)
trial.status = Trial.PAUSED

View file

@ -212,7 +212,9 @@ class TuneFailResumeGridTest(unittest.TestCase):
if not self._checked and iteration >= self._check_after:
for trial in trials:
if trial.status == Trial.PENDING:
assert trial.resources.cpu == self._expected_cpu
assert (trial.
placement_group_factory.required_resources.get(
"CPU", 0) == self._expected_cpu)
self._checked = True
def setUp(self):

View file

@ -161,11 +161,8 @@ class Trainable:
@classmethod
def default_resource_request(cls, config):
return Resources(
cpu=0,
gpu=0,
extra_cpu=config["workers"],
extra_gpu=int(config["use_gpu"]) * config["workers"])
return PlacementGroupFactory([{"CPU": 1}, {"CPU": 1}]])
Args:
config[Dict[str, Any]]: The Trainable's config dict.

View file

@ -8,7 +8,7 @@ import platform
import re
import shutil
import time
from typing import Callable, Dict, Optional, Sequence, Union
from typing import Dict, Optional, Sequence, Union
import uuid
import ray
@ -22,14 +22,12 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager
from ray.tune.registry import get_trainable_cls, validate_trainable
from ray.tune.result import (DEFAULT_RESULTS_DIR, DONE, NODE_IP, PID,
TRAINING_ITERATION, TRIAL_ID, DEBUG_METRICS)
from ray.tune.resources import Resources, \
json_to_resources, resources_to_json
from ray.tune.resources import Resources
from ray.tune.utils.placement_groups import PlacementGroupFactory, \
resource_dict_to_pg_factory
from ray.tune.utils.serialization import TuneFunctionEncoder
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils import date_str, flatten_dict
from ray.util import log_once
from ray.util.annotations import DeveloperAPI
from ray._private.utils import binary_to_hex, hex_to_binary
@ -133,10 +131,7 @@ class TrialInfo:
def __init__(self, trial: "Trial"):
self._trial_name = str(trial)
self._trial_id = trial.trial_id
if trial.uses_placement_groups:
self._trial_resources = trial.placement_group_factory
else:
self._trial_resources = trial.resources
self._trial_resources = trial.placement_group_factory
@property
def trial_name(self):
@ -169,6 +164,22 @@ def create_logdir(dirname, local_dir):
return logdir
def _to_pg_factory(resources: Optional[Resources],
placement_group_factory: Optional[PlacementGroupFactory]
) -> PlacementGroupFactory:
"""Outputs resources requirement in the form of PGF.
In case that `placement_group_factory` is None, `resources` will be
converted to PGF. If this is unsuccessful, an error will be raised.
"""
if not placement_group_factory:
if not resources:
resources = Resources(cpu=1, gpu=0)
placement_group_factory = resource_dict_to_pg_factory(resources)
return placement_group_factory
@DeveloperAPI
class Trial:
"""A trial object holds the state for one model training run.
@ -179,6 +190,10 @@ class Trial:
Trials start in the PENDING state, and transition to RUNNING once started.
On error it transitions to ERROR, otherwise TERMINATED on success.
There are resources allocated to each trial. It's preferred that resources
are specified using PlacementGroupFactory, rather than through Resources,
which is being deprecated.
Attributes:
trainable_name (str): Name of the trainable object to be executed.
config (dict): Provided configuration dictionary with evaluated params.
@ -186,8 +201,7 @@ class Trial:
local_dir (str): Local_dir as passed to tune.run.
logdir (str): Directory where the trial logs are saved.
evaluated_params (dict): Evaluated parameters by search algorithm,
experiment_tag (str): Identifying trial name to show in the console.
resources (Resources): Amount of resources that this trial will use.
experiment_tag (str): Identifying trial name to show in the console
status (str): One of PENDING, RUNNING, PAUSED, TERMINATED, ERROR/
error_file (str): Path to the errors that this trial has raised.
@ -198,6 +212,7 @@ class Trial:
"best_result",
"param_config",
"extra_arg",
"placement_group_factory",
]
PENDING = "PENDING"
@ -248,7 +263,7 @@ class Trial:
self.config = config or {}
self.local_dir = local_dir # This remains unexpanded for syncing.
#: Parameters that Tune varies across searches.
# Parameters that Tune varies across searches.
self.evaluated_params = evaluated_params or {}
self.experiment_tag = experiment_tag
trainable_cls = self.get_trainable_cls()
@ -266,20 +281,16 @@ class Trial:
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources))
# New way: Trainable returns a PlacementGroupFactory object.
if isinstance(default_resources, PlacementGroupFactory):
placement_group_factory = default_resources
resources = None
# Set placement group factory to None for backwards
# compatibility.
else:
placement_group_factory = None
resources = default_resources
self.location = Location()
self.resources = resources or Resources(cpu=1, gpu=0)
self.placement_group_factory = placement_group_factory
self._setup_resources()
self.placement_group_factory = _to_pg_factory(resources,
placement_group_factory)
self.stopping_criterion = stopping_criterion or {}
@ -362,37 +373,6 @@ class Trial:
self._state_json = None
self._state_valid = False
def _setup_resources(self, log_always: bool = False):
"""Set up resource and placement group requirements.
This will try to convert the resource request in ``self.resources``
to a placement group factory object. If this is unsuccessful,
placement groups will not be used.
Args:
log_always (bool): If True, this will always log a warning if
conversion from a resource dict to a placement group
definition was unsuccessful (e.g. when passing ``extra_``
requests).
"""
if not self.placement_group_factory and \
not int(os.getenv("TUNE_PLACEMENT_GROUP_AUTO_DISABLED", "0")):
try:
self.placement_group_factory = resource_dict_to_pg_factory(
self.resources)
except ValueError as exc:
if log_always or log_once("tune_pg_extra_resources"):
logger.warning(exc)
self.placement_group_factory = None
# Set placement group factory flag to True in Resources object.
if self.placement_group_factory:
resource_kwargs = self.resources._asdict()
resource_kwargs["has_placement_group"] = True
self.resources = Resources(**resource_kwargs)
def _get_default_result_or_future(self) -> Optional[dict]:
"""Calls ray.get on self._default_result_or_future and assigns back.
@ -470,10 +450,6 @@ class Trial:
logdir_name = os.path.basename(self.logdir)
return os.path.join(self.remote_checkpoint_dir_prefix, logdir_name)
@property
def uses_placement_groups(self):
return bool(self.placement_group_factory)
@property
def uses_cloud_checkpointing(self):
return bool(self.remote_checkpoint_dir)
@ -487,7 +463,6 @@ class Trial:
trainable_cls = self.get_trainable_cls()
clear_resources = (trainable_cls and
trainable_cls.default_resource_request(self.config))
resources = self.resources if not clear_resources else None
placement_group_factory = (self.placement_group_factory
if not clear_resources else None)
@ -498,7 +473,7 @@ class Trial:
local_dir=self.local_dir,
evaluated_params=self.evaluated_params,
experiment_tag=self.experiment_tag,
resources=resources,
resources=None,
placement_group_factory=placement_group_factory,
stopping_criterion=self.stopping_criterion,
remote_checkpoint_dir=self.remote_checkpoint_dir,
@ -523,8 +498,7 @@ class Trial:
os.makedirs(self.logdir, exist_ok=True)
self.invalidate_json_state()
def update_resources(
self, resources: Union[Dict, Callable, PlacementGroupFactory]):
def update_resources(self, resources: Union[Dict, PlacementGroupFactory]):
"""EXPERIMENTAL: Updates the resource requirements.
Should only be called when the trial is not running.
@ -535,12 +509,14 @@ class Trial:
if self.status is Trial.RUNNING:
raise ValueError("Cannot update resources while Trial is running.")
placement_group_factory = None
if isinstance(resources, PlacementGroupFactory):
self.placement_group_factory = resources
placement_group_factory = resources
else:
self.resources = Resources(**resources)
resources = Resources(**resources)
self._setup_resources()
self.placement_group_factory = _to_pg_factory(resources,
placement_group_factory)
self.invalidate_json_state()
@ -774,7 +750,6 @@ class Trial:
Note this can only occur if the trial holds a PERSISTENT checkpoint.
"""
state = self.__dict__.copy()
state["resources"] = resources_to_json(self.resources)
for key in self._nonjson_fields:
state[key] = binary_to_hex(cloudpickle.dumps(state.get(key)))
@ -792,7 +767,6 @@ class Trial:
return copy.deepcopy(state)
def __setstate__(self, state):
state["resources"] = json_to_resources(state["resources"])
if state["status"] == Trial.RUNNING:
state["status"] = Trial.PENDING

View file

@ -11,7 +11,6 @@ import ray
from ray.tune.resources import Resources
from ray.util.annotations import DeveloperAPI
from ray.tune.trial import Trial, Checkpoint
from ray.tune.error import TuneError
from ray.tune.cluster_info import is_ray_cluster
logger = logging.getLogger(__name__)
@ -25,11 +24,8 @@ def _get_cluster_resources_no_autoscaler() -> Dict:
def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
cpu = trial.resources.cpu + trial.resources.extra_cpu
gpu = trial.resources.gpu + trial.resources.extra_gpu
if trial.placement_group_factory is not None:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
@ -329,24 +325,6 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta):
providing TrialRunner directly here.
"""
self._may_warn_insufficient_resources(trials)
for trial in trials:
if trial.uses_placement_groups:
return
# TODO(xwjiang): The rest should be gone in a follow up PR
# to remove non-pg case.
if trial.status == Trial.PENDING:
if not self.has_resources_for_trial(trial):
resource_string = trial.resources.summary_string()
trial_resource_help_msg = trial.get_trainable_cls(
).resource_help(trial.config)
raise TuneError(
"Insufficient cluster resources to launch trial: "
f"trial requested {resource_string}, but the cluster "
f"has only {self.resource_string()}. "
f"{trial_resource_help_msg} ")
elif trial.status == Trial.PAUSED:
raise TuneError("There are paused trials, but no more pending "
"trials with sufficient resources.")
@abstractmethod
def get_next_available_trial(self) -> Optional[Trial]:
@ -383,11 +361,6 @@ class TrialExecutor(metaclass=_WarnOnDirectInheritanceMeta):
"""Returns a human readable message for printing to the console."""
pass
@abstractmethod
def resource_string(self) -> str:
"""Returns a string describing the total resources available."""
pass
@abstractmethod
def restore(self,
trial: Trial,

View file

@ -694,7 +694,6 @@ class TrialRunner:
# This will contain the next trial to start
next_trial = self._get_next_trial() # blocking
# Create pending trials. If the queue was updated before, only
# continue updating if this was successful (next_trial is not None)
if not self._updated_queue or (self._updated_queue and next_trial):
@ -807,14 +806,6 @@ class TrialRunner:
]
return delim.join(messages)
def has_resources_for_trial(self, trial: "Trial"):
"""Returns whether this runner has at least the specified resources."""
return self.trial_executor.has_resources_for_trial(trial)
def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
return self.trial_executor.has_resources(resources)
def _stop_experiment_if_needed(self):
"""Stops all trials."""
fail_fast = self._fail_fast and self._has_errored
@ -1118,7 +1109,6 @@ class TrialRunner:
"""
logger.debug("Trial %s: Processing trial save.", trial)
checkpoint_value = None
try:
results = self.trial_executor.fetch_result(trial)
checkpoint_value = results[-1]
@ -1245,7 +1235,6 @@ class TrialRunner:
trial.clear_checkpoint()
self.trial_executor.stop_trial(
trial, error=error_msg is not None, error_msg=error_msg)
if self.trial_executor.has_resources_for_trial(trial):
requeue_trial = False
logger.info(

View file

@ -3,7 +3,7 @@ from inspect import signature
import json
import os
import time
from typing import Dict, List, Optional, Set, TYPE_CHECKING, Tuple
from typing import Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Union
import uuid
import ray
@ -88,6 +88,21 @@ class PlacementGroupFactory:
could be used e.g. if you had one learner running in the main trainable
that schedules two remote workers that need access to 2 CPUs each.
If the trainable itself doesn't require resources.
You can specify it as:
.. code-block:: python
from ray import tune
tune.run(
train,
resources_per_trial=tune.PlacementGroupFactory([
{},
{"CPU": 2},
{"CPU": 2},
], strategy="PACK"))
Args:
bundles(List[Dict]): A list of bundles which
represent the resources requirements.
@ -104,10 +119,19 @@ class PlacementGroupFactory:
"""
def __init__(self,
bundles: List[Dict[str, float]],
bundles: List[Dict[str, Union[int, float]]],
strategy: str = "PACK",
*args,
**kwargs):
assert len(bundles) > 0, (
"Cannot initialize a PlacementGroupFactory with zero bundles.")
if not bundles[0]:
# This is when trainable itself doesn't need resources.
self._head_bundle_is_empty = True
bundles.pop(0)
else:
self._head_bundle_is_empty = False
self._bundles = [{k: float(v)
for k, v in bundle.items()} for bundle in bundles]
self._strategy = strategy
@ -120,8 +144,18 @@ class PlacementGroupFactory:
self._bind()
@property
def head_cpus(self):
return self._bundles[0].get("CPU", None)
def head_bundle_is_empty(self):
"""Returns True if head bundle is empty while child bundles
need resources.
This is considered an internal API within Tune.
"""
return self._head_bundle_is_empty
@property
def head_cpus(self) -> float:
return 0.0 if self._head_bundle_is_empty else self._bundles[0].get(
"CPU", 0.0)
@property
def required_resources(self) -> Dict[str, float]:
@ -428,29 +462,31 @@ class PlacementGroupManager:
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
# We still have to pass resource specs
# Pass the full resource specs of the first bundle per default
first_bundle = pg.bundle_specs[0].copy()
num_cpus = first_bundle.pop("CPU", None)
num_gpus = first_bundle.pop("GPU", None)
# Only custom resources remain in `first_bundle`
resources = first_bundle or None
if num_cpus is None:
# If the placement group specifically set the number
# of CPUs to 0, use this.
num_cpus = pgf.head_cpus
logger.debug(f"For trial {trial} use pg {pg.id}")
return actor_cls.options(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
# We still have to pass resource specs
if not pgf.head_bundle_is_empty:
# Pass the full resource specs of the first bundle per default
head_bundle = pg.bundle_specs[0].copy()
num_cpus = head_bundle.pop("CPU", 0)
num_gpus = head_bundle.pop("GPU", 0)
# Only custom resources remain in `head_bundle`
resources = head_bundle
return actor_cls.options(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
else:
return actor_cls.options(
placement_group=pg,
placement_group_capture_child_tasks=True,
num_cpus=0,
num_gpus=0,
resources={})
def has_ready(self, trial: "Trial", update: bool = False) -> bool:
"""Return True if placement group for trial is ready.
@ -495,9 +531,6 @@ class PlacementGroupManager:
no placement group was replaced.
"""
if not trial.uses_placement_groups:
return None
pgf = trial.placement_group_factory
staged_pg = self._unstage_unused_pg(pgf)
@ -569,8 +602,6 @@ class PlacementGroupManager:
Args:
trial (Trial): Return placement group of this trial.
"""
if not trial.uses_placement_groups:
return
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
@ -651,9 +682,6 @@ class PlacementGroupManager:
# Count number of expected placement groups
pgf_expected: Dict[PlacementGroupFactory, int] = defaultdict(int)
for trial in trials:
if not trial.uses_placement_groups:
continue
# Count in-use placement groups
if trial in self._in_use_trials:
current_counts[trial.placement_group_factory] += 1
@ -703,37 +731,3 @@ class PlacementGroupManager:
resources[key] = resources.get(key, 0) + val
return resources
def total_used_resources(self, committed_resources: Resources) -> dict:
"""Dict of total used resources incl. placement groups
Args:
committed_resources (Resources): Additional commited resources
from (legacy) Ray Tune resource management.
"""
committed = committed_resources._asdict()
# Make dict compatible with pg resource dict
committed.pop("has_placement_group", None)
committed["CPU"] = committed.pop("cpu", 0) + committed.pop(
"extra_cpu", 0)
committed["GPU"] = committed.pop("gpu", 0) + committed.pop(
"extra_gpu", 0)
committed["memory"] += committed.pop("extra_memory", 0.)
committed["object_store_memory"] += committed.pop(
"extra_object_store_memory", 0.)
custom = committed.pop("custom_resources", {})
extra_custom = committed.pop("extra_custom_resources", {})
for k, v in extra_custom.items():
custom[k] = custom.get(k, 0.) + v
committed.update(custom)
pg_resources = self.occupied_resources()
for k, v in committed.items():
pg_resources[k] = pg_resources.get(k, 0.) + v
return pg_resources

View file

@ -5,7 +5,7 @@ import pickle
import ray
from ray.tune import Trainable
from ray.tune.resources import Resources
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.util.annotations import PublicAPI
from ray.util.sgd.tf.tf_runner import TFRunner
@ -164,11 +164,10 @@ class TFTrainer:
class TFTrainable(Trainable):
@classmethod
def default_resource_request(cls, config):
return Resources(
cpu=0,
gpu=0,
extra_cpu=config["num_replicas"],
extra_gpu=int(config["use_gpu"]) * config["num_replicas"])
return PlacementGroupFactory([{}] + [{
"CPU": 1,
"GPU": int(config["use_gpu"])
}] * config["num_replicas"])
def setup(self, config):
self._trainer = TFTrainer(

View file

@ -1,6 +1,7 @@
from sklearn import datasets
from sklearn.model_selection import train_test_split
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.util.xgboost import RayDMatrix, RayParams, train
# __train_begin__
@ -68,10 +69,11 @@ def main():
metric="eval-error",
mode="min",
num_samples=4,
resources_per_trial={
"cpu": 1,
"extra_cpu": num_actors * num_cpus_per_actor
})
resources_per_trial=PlacementGroupFactory([{
"CPU": 1.0
}] + [{
"CPU": float(num_cpus_per_actor)
}] * num_actors))
# Load in the best performing model.
best_bst = load_best_model(analysis.best_logdir)

View file

@ -8,7 +8,6 @@ collection and policy optimization.
import argparse
import gym
import numpy as np
import os
import ray
from ray import tune
@ -16,6 +15,7 @@ from ray.rllib.evaluation import RolloutWorker
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch
from ray.tune.utils.placement_groups import PlacementGroupFactory
parser = argparse.ArgumentParser()
parser.add_argument("--gpu", action="store_true")
@ -108,12 +108,12 @@ if __name__ == "__main__":
tune.run(
training_workflow,
resources_per_trial={
"gpu": 1 if args.gpu
or int(os.environ.get("RLLIB_FORCE_NUM_GPUS", 0)) else 0,
"cpu": 1,
"extra_cpu": args.num_workers,
},
resources_per_trial=PlacementGroupFactory(([{
"CPU": 1,
"GPU": 1 if args.gpu else 0
}] + [{
"CPU": 1
}] * args.num_workers)),
config={
"num_workers": args.num_workers,
"num_iters": args.num_iters,