[tune/structure] Introduce execution package (#26015)

Execution-specific packages are moved to tune.execution.

Co-authored-by: Xiaowei Jiang <xwjiang2010@gmail.com>
This commit is contained in:
Kai Fricke 2022-06-23 11:13:19 +01:00 committed by GitHub
parent caa3868570
commit 0959f44b6f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
59 changed files with 3777 additions and 3668 deletions

View file

@ -37,7 +37,7 @@ Callbacks
PlacementGroupFactory
---------------------
.. autoclass:: ray.tune.utils.placement_groups.PlacementGroupFactory
.. autoclass:: ray.tune.execution.placement_groups.PlacementGroupFactory

View file

@ -234,7 +234,7 @@ How do I set resources?
~~~~~~~~~~~~~~~~~~~~~~~
If you want to allocate specific resources to a trial, you can use the
``resources_per_trial`` parameter of ``tune.run()``, to which you can pass
a dict or a :class:`PlacementGroupFactory <ray.tune.utils.placement_groups.PlacementGroupFactory>` object:
a dict or a :class:`PlacementGroupFactory <ray.tune.execution.placement_groups.PlacementGroupFactory>` object:
.. literalinclude:: doc_code/faq.py
:dedent:

View file

@ -15,7 +15,7 @@ By default, Tune automatically runs N concurrent trials, where N is the number o
You can override this parallelism with ``resources_per_trial``. Here you can
specify your resource requests using either a dictionary or a
:class:`PlacementGroupFactory <ray.tune.utils.placement_groups.PlacementGroupFactory>`
:class:`PlacementGroupFactory <ray.tune.execution.placement_groups.PlacementGroupFactory>`
object. In any case, Ray Tune will try to start a placement group for each trial.
.. code-block:: python
@ -39,7 +39,7 @@ It is also possible to specify memory (``"memory"``, in bytes) and custom resour
If your trainable function starts more remote workers, you will need to pass so-called placement group
factory objects to request these resources.
See the :class:`PlacementGroupFactory documentation <ray.tune.utils.placement_groups.PlacementGroupFactory>`
See the :class:`PlacementGroupFactory documentation <ray.tune.execution.placement_groups.PlacementGroupFactory>`
for further information.
This also applies if you are using other libraries making use of Ray, such as Modin.
Failure to set resources correctly may result in a deadlock, "hanging" the cluster.

View file

@ -128,6 +128,15 @@ py_test(
tags = ["team:ml", "exclusive", "tests_dir_F"],
)
py_test(
name = "test_legacy_import",
size = "small",
srcs = ["tests/test_legacy_import.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "tests_dir_L"],
)
py_test(
name = "test_integration_comet",
size = "small",

View file

@ -38,7 +38,7 @@ from ray.tune.sample import (
)
from ray.tune.suggest import create_searcher
from ray.tune.schedulers import create_scheduler
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import with_parameters
from ray._private.usage import usage_lib

View file

@ -0,0 +1,27 @@
import warnings
from ray.util import log_once
def warn_structure_refactor(old_module: str, new_module: str, direct: bool = True):
old_module = old_module.replace(".py", "")
if log_once(f"tune:structure:refactor:{old_module}"):
warning = (
f"The module `{old_module}` has been moved to `{new_module}` and the old "
f"location will be deprecated soon. Please adjust your imports to point "
f"to the new location."
)
if direct:
warning += (
f"Example: Do a global search and "
f"replace `{old_module}` with `{new_module}`."
)
else:
warning += (
f"ATTENTION: This module may have been split or refactored. Please "
f"check the contents of `{new_module}` before making changes."
)
warnings.warn(warning, DeprecationWarning)

View file

@ -30,7 +30,7 @@ from ray.tune.result import (
TRAINING_ITERATION,
)
from ray.tune.trial import Trial
from ray.tune.trial_runner import (
from ray.tune.execution.trial_runner import (
find_newest_experiment_checkpoint,
load_trial_from_checkpoint,
)

View file

@ -1,136 +1,4 @@
# coding: utf-8
import logging
from typing import Callable, Optional
from ray.tune._structure_refactor import warn_structure_refactor
from ray.tune.execution.checkpoint_manager import * # noqa: F401, F403
from ray.tune.result import TRAINING_ITERATION
from ray.util.ml_utils.checkpoint_manager import (
CheckpointStrategy,
MIN,
MAX,
_CheckpointManager as CommonCheckpointManager,
_TrackedCheckpoint,
CheckpointStorage,
)
logger = logging.getLogger(__name__)
class _CheckpointManager(CommonCheckpointManager):
"""Initializes a new CheckpointManager.
`newest_persistent_checkpoint` and `newest_memory_checkpoint` are
initialized to Checkpoint objects with values of None.
Args:
keep_checkpoints_num: Keep at least this many checkpoints.
checkpoint_score_attr: Attribute to use to determine which
checkpoints to keep.
delete_fn: Function that deletes checkpoints. Must be
idempotent.
"""
_persist_memory_checkpoints = False
def __init__(
self,
keep_checkpoints_num: int,
checkpoint_score_attr: Optional[str],
delete_fn: Optional[Callable[["_TrackedCheckpoint"], None]] = None,
):
if keep_checkpoints_num == 0:
raise RuntimeError(
"If checkpointing is enabled, Ray Tune requires `keep_checkpoints_num` "
"to be None or a number greater than 0"
)
checkpoint_score_attr = checkpoint_score_attr or TRAINING_ITERATION
checkpoint_score_desc = checkpoint_score_attr.startswith("min-")
if checkpoint_score_desc:
checkpoint_score_attr = checkpoint_score_attr[4:]
else:
checkpoint_score_attr = checkpoint_score_attr
checkpoint_strategy = CheckpointStrategy(
num_to_keep=keep_checkpoints_num,
checkpoint_score_attribute=checkpoint_score_attr,
checkpoint_score_order=MIN if checkpoint_score_desc else MAX,
)
super().__init__(checkpoint_strategy=checkpoint_strategy, delete_fn=delete_fn)
def handle_checkpoint(self, checkpoint: _TrackedCheckpoint):
# Set checkpoint ID
checkpoint.id = checkpoint.id or self._latest_checkpoint_id
self._latest_checkpoint_id += 1
if checkpoint.storage_mode == CheckpointStorage.MEMORY:
self._replace_latest_memory_checkpoint(checkpoint)
else:
assert checkpoint.storage_mode == CheckpointStorage.PERSISTENT
assert (
self._checkpoint_strategy.num_to_keep is None
or self._checkpoint_strategy.num_to_keep > 0
)
self._process_persistent_checkpoint(checkpoint)
def on_checkpoint(self, checkpoint: _TrackedCheckpoint):
"""Ray Tune's entrypoint"""
# Todo (krfricke): Replace with handle_checkpoint.
self.handle_checkpoint(checkpoint)
def _skip_persisted_checkpoint(self, persisted_checkpoint: _TrackedCheckpoint):
assert persisted_checkpoint.storage_mode == CheckpointStorage.PERSISTENT
super()._skip_persisted_checkpoint(persisted_checkpoint=persisted_checkpoint)
# Ray Tune always keeps track of the latest persisted checkpoint.
# Note that this checkpoint will be deleted once it is not the
# latest checkpoint anymore
self._replace_latest_persisted_checkpoint(
persisted_checkpoint=persisted_checkpoint
)
# Tune-specific properties
@property
def newest_persistent_checkpoint(self):
return self._latest_persisted_checkpoint or _TrackedCheckpoint(
dir_or_data=None,
checkpoint_id=-1,
storage_mode=CheckpointStorage.PERSISTENT,
)
@property
def newest_checkpoint(self):
"""Returns the newest checkpoint (based on training iteration)."""
newest_checkpoint = max(
[self.newest_persistent_checkpoint, self.newest_memory_checkpoint],
key=lambda c: c.id,
)
return newest_checkpoint
@property
def newest_memory_checkpoint(self):
return self._latest_memory_checkpoint or _TrackedCheckpoint(
dir_or_data=None,
checkpoint_id=-1,
storage_mode=CheckpointStorage.MEMORY,
)
def best_checkpoints(self):
"""Returns best PERSISTENT checkpoints, sorted by score."""
checkpoints = sorted(self._top_persisted_checkpoints, key=lambda c: c.priority)
return [wrapped.tracked_checkpoint for wrapped in checkpoints]
def __getstate__(self):
state = self.__dict__.copy()
# Avoid serializing the memory checkpoint.
state["_newest_memory_checkpoint"] = _TrackedCheckpoint(
CheckpointStorage.MEMORY, None
)
# Avoid serializing lambda since it may capture cyclical dependencies.
state.pop("_delete_fn")
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._delete_fn = None
warn_structure_refactor(__name__, "ray.tune.execution.checkpoint_manager")

View file

@ -1,33 +1,4 @@
from functools import lru_cache
import getpass
import os
from ray.tune._structure_refactor import warn_structure_refactor
from ray.tune.execution.cluster_info import * # noqa: F401, F403
@lru_cache()
def is_ray_cluster():
"""Checks if the bootstrap config file exists.
This will always exist if using an autoscaling cluster/started
with the ray cluster launcher.
"""
return os.path.exists(os.path.expanduser("~/ray_bootstrap_config.yaml"))
def get_ssh_user():
"""Returns ssh username for connecting to cluster workers."""
return getpass.getuser()
def get_ssh_key():
"""Returns ssh key to connecting to cluster workers.
If the env var TUNE_CLUSTER_SSH_KEY is provided, then this key
will be used for syncing across different nodes.
"""
path = os.environ.get(
"TUNE_CLUSTER_SSH_KEY", os.path.expanduser("~/ray_bootstrap_key.pem")
)
if os.path.exists(path):
return path
return None
warn_structure_refactor(__name__, "ray.tune.execution.cluster_info")

View file

@ -10,7 +10,7 @@ from ray.tune import TuneError
from ray.tune.trial import Trial
from ray.tune.resources import json_to_resources
from ray.tune.syncer import SyncConfig, Syncer
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.utils.util import SafeFallbackEncoder

View file

@ -12,9 +12,9 @@ from ray import tune
from ray.tune.schedulers import ResourceChangingScheduler, ASHAScheduler
from ray.tune import Trainable
from ray.tune.resources import Resources
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.trial import Trial
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.integration.xgboost import TuneReportCheckpointCallback
CHECKPOINT_FILENAME = "model.xgb"

View file

@ -0,0 +1,136 @@
# coding: utf-8
import logging
from typing import Callable, Optional
from ray.tune.result import TRAINING_ITERATION
from ray.util.ml_utils.checkpoint_manager import (
CheckpointStrategy,
MIN,
MAX,
_CheckpointManager as CommonCheckpointManager,
_TrackedCheckpoint,
CheckpointStorage,
)
logger = logging.getLogger(__name__)
class _CheckpointManager(CommonCheckpointManager):
"""Initializes a new CheckpointManager.
`newest_persistent_checkpoint` and `newest_memory_checkpoint` are
initialized to Checkpoint objects with values of None.
Args:
keep_checkpoints_num: Keep at least this many checkpoints.
checkpoint_score_attr: Attribute to use to determine which
checkpoints to keep.
delete_fn: Function that deletes checkpoints. Must be
idempotent.
"""
_persist_memory_checkpoints = False
def __init__(
self,
keep_checkpoints_num: int,
checkpoint_score_attr: Optional[str],
delete_fn: Optional[Callable[["_TrackedCheckpoint"], None]] = None,
):
if keep_checkpoints_num == 0:
raise RuntimeError(
"If checkpointing is enabled, Ray Tune requires `keep_checkpoints_num` "
"to be None or a number greater than 0"
)
checkpoint_score_attr = checkpoint_score_attr or TRAINING_ITERATION
checkpoint_score_desc = checkpoint_score_attr.startswith("min-")
if checkpoint_score_desc:
checkpoint_score_attr = checkpoint_score_attr[4:]
else:
checkpoint_score_attr = checkpoint_score_attr
checkpoint_strategy = CheckpointStrategy(
num_to_keep=keep_checkpoints_num,
checkpoint_score_attribute=checkpoint_score_attr,
checkpoint_score_order=MIN if checkpoint_score_desc else MAX,
)
super().__init__(checkpoint_strategy=checkpoint_strategy, delete_fn=delete_fn)
def handle_checkpoint(self, checkpoint: _TrackedCheckpoint):
# Set checkpoint ID
checkpoint.id = checkpoint.id or self._latest_checkpoint_id
self._latest_checkpoint_id += 1
if checkpoint.storage_mode == CheckpointStorage.MEMORY:
self._replace_latest_memory_checkpoint(checkpoint)
else:
assert checkpoint.storage_mode == CheckpointStorage.PERSISTENT
assert (
self._checkpoint_strategy.num_to_keep is None
or self._checkpoint_strategy.num_to_keep > 0
)
self._process_persistent_checkpoint(checkpoint)
def on_checkpoint(self, checkpoint: _TrackedCheckpoint):
"""Ray Tune's entrypoint"""
# Todo (krfricke): Replace with handle_checkpoint.
self.handle_checkpoint(checkpoint)
def _skip_persisted_checkpoint(self, persisted_checkpoint: _TrackedCheckpoint):
assert persisted_checkpoint.storage_mode == CheckpointStorage.PERSISTENT
super()._skip_persisted_checkpoint(persisted_checkpoint=persisted_checkpoint)
# Ray Tune always keeps track of the latest persisted checkpoint.
# Note that this checkpoint will be deleted once it is not the
# latest checkpoint anymore
self._replace_latest_persisted_checkpoint(
persisted_checkpoint=persisted_checkpoint
)
# Tune-specific properties
@property
def newest_persistent_checkpoint(self):
return self._latest_persisted_checkpoint or _TrackedCheckpoint(
dir_or_data=None,
checkpoint_id=-1,
storage_mode=CheckpointStorage.PERSISTENT,
)
@property
def newest_checkpoint(self):
"""Returns the newest checkpoint (based on training iteration)."""
newest_checkpoint = max(
[self.newest_persistent_checkpoint, self.newest_memory_checkpoint],
key=lambda c: c.id,
)
return newest_checkpoint
@property
def newest_memory_checkpoint(self):
return self._latest_memory_checkpoint or _TrackedCheckpoint(
dir_or_data=None,
checkpoint_id=-1,
storage_mode=CheckpointStorage.MEMORY,
)
def best_checkpoints(self):
"""Returns best PERSISTENT checkpoints, sorted by score."""
checkpoints = sorted(self._top_persisted_checkpoints, key=lambda c: c.priority)
return [wrapped.tracked_checkpoint for wrapped in checkpoints]
def __getstate__(self):
state = self.__dict__.copy()
# Avoid serializing the memory checkpoint.
state["_newest_memory_checkpoint"] = _TrackedCheckpoint(
CheckpointStorage.MEMORY, None
)
# Avoid serializing lambda since it may capture cyclical dependencies.
state.pop("_delete_fn")
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._delete_fn = None

View file

@ -0,0 +1,33 @@
from functools import lru_cache
import getpass
import os
@lru_cache()
def is_ray_cluster():
"""Checks if the bootstrap config file exists.
This will always exist if using an autoscaling cluster/started
with the ray cluster launcher.
"""
return os.path.exists(os.path.expanduser("~/ray_bootstrap_config.yaml"))
def get_ssh_user():
"""Returns ssh username for connecting to cluster workers."""
return getpass.getuser()
def get_ssh_key():
"""Returns ssh key to connecting to cluster workers.
If the env var TUNE_CLUSTER_SSH_KEY is provided, then this key
will be used for syncing across different nodes.
"""
path = os.environ.get(
"TUNE_CLUSTER_SSH_KEY", os.path.expanduser("~/ray_bootstrap_key.pem")
)
if os.path.exists(path):
return path
return None

View file

@ -0,0 +1,137 @@
import logging
from functools import lru_cache
import os
import ray
import time
from typing import Dict
from ray.tune.execution.cluster_info import is_ray_cluster
from ray.tune.trial import Trial
logger = logging.getLogger(__name__)
# Ideally we want to use @cache; but it's only available for python 3.9.
# Caching is only helpful/correct for no autoscaler case.
@lru_cache()
def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()
def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
"""Calculates if there is enough resources for a PENDING trial.
For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
)
)
else:
# Set the default to 10s so that we don't prematurely determine that
# a cluster cannot fulfill the resources requirements.
# TODO(xwjiang): Change it back once #18608 is resolved.
return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))
# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
else:
return msg
# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"Ignore this message if the cluster is autoscaling. "
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
class _InsufficientResourcesManager:
"""Insufficient resources manager.
Makes best effort, conservative guesses about if Tune loop is stuck due to
infeasible resources. If so, outputs usability messages for users to
act upon.
"""
def __init__(self):
# The information tracked across the life time of Tune loop.
self._no_running_trials_since = -1
self._last_trial_num = -1
def on_no_available_trials(self, all_trials):
"""Tracks information across the life of Tune loop and makes guesses
about if Tune loop is stuck due to infeasible resources.
If so, outputs certain warning messages.
The logic should be conservative, non-intrusive and informative.
For example, rate limiting is applied so that the message is not
spammy.
"""
# This is approximately saying we are not making progress.
if len(all_trials) == self._last_trial_num:
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif (
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (
trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)
):
# TODO(xwjiang):
# Raise an Error once #18608 is resolved.
logger.warning(_get_insufficient_resources_error_msg(trial))
break
else:
# TODO(xwjiang): #17799.
# Output a more helpful msg for autoscaler.
logger.warning(_get_insufficient_resources_warning_msg())
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
self._last_trial_num = len(all_trials)

View file

@ -0,0 +1,766 @@
from typing import Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Union
from collections import defaultdict
from inspect import signature
from copy import deepcopy
import json
import os
import time
import uuid
import ray
from ray import ObjectRef, logger
from ray.actor import ActorClass
from ray.tune.resources import Resources
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.util.placement_group import (
PlacementGroup,
get_placement_group,
placement_group,
placement_group_table,
remove_placement_group,
)
if TYPE_CHECKING:
from ray.tune.trial import Trial
TUNE_PLACEMENT_GROUP_REMOVAL_DELAY = 2.0
_tune_pg_prefix = None
def get_tune_pg_prefix():
"""Get the tune placement group name prefix.
This will store the prefix in a global variable so that subsequent runs
can use this identifier to clean up placement groups before starting their
run.
Can be overwritten with the ``TUNE_PLACEMENT_GROUP_PREFIX`` env variable.
"""
global _tune_pg_prefix
if _tune_pg_prefix:
return _tune_pg_prefix
# Else: check env variable
env_prefix = os.getenv("TUNE_PLACEMENT_GROUP_PREFIX", "")
if env_prefix:
_tune_pg_prefix = env_prefix
return _tune_pg_prefix
# Else: create and store unique prefix
_tune_pg_prefix = f"__tune_{uuid.uuid4().hex[:8]}__"
return _tune_pg_prefix
@PublicAPI(stability="beta")
class PlacementGroupFactory:
"""Wrapper class that creates placement groups for trials.
This function should be used to define resource requests for Ray Tune
trials. It holds the parameters to create placement groups.
At a minimum, this will hold at least one bundle specifying the
resource requirements for each trial:
.. code-block:: python
from ray import tune
tune.run(
train,
tune.PlacementGroupFactory([
{"CPU": 1, "GPU": 0.5, "custom_resource": 2}
]))
If the trial itself schedules further remote workers, the resource
requirements should be specified in additional bundles. You can also
pass the placement strategy for these bundles, e.g. to enforce
co-located placement:
.. code-block:: python
from ray import tune
tune.run(
train,
resources_per_trial=tune.PlacementGroupFactory([
{"CPU": 1, "GPU": 0.5, "custom_resource": 2},
{"CPU": 2},
{"CPU": 2},
], strategy="PACK"))
The example above will reserve 1 CPU, 0.5 GPUs and 2 custom_resources
for the trainable itself, and reserve another 2 bundles of 2 CPUs each.
The trial will only start when all these resources are available. This
could be used e.g. if you had one learner running in the main trainable
that schedules two remote workers that need access to 2 CPUs each.
If the trainable itself doesn't require resources.
You can specify it as:
.. code-block:: python
from ray import tune
tune.run(
train,
resources_per_trial=tune.PlacementGroupFactory([
{},
{"CPU": 2},
{"CPU": 2},
], strategy="PACK"))
Args:
bundles(List[Dict]): A list of bundles which
represent the resources requirements.
strategy(str): The strategy to create the placement group.
- "PACK": Packs Bundles into as few nodes as possible.
- "SPREAD": Places Bundles across distinct nodes as even as possible.
- "STRICT_PACK": Packs Bundles into one node. The group is
not allowed to span multiple nodes.
- "STRICT_SPREAD": Packs Bundles across distinct nodes.
*args: Passed to the call of ``placement_group()``
**kwargs: Passed to the call of ``placement_group()``
"""
def __init__(
self,
bundles: List[Dict[str, Union[int, float]]],
strategy: str = "PACK",
*args,
**kwargs,
):
assert (
len(bundles) > 0
), "Cannot initialize a PlacementGroupFactory with zero bundles."
self._bundles = [
{k: float(v) for k, v in bundle.items() if v != 0} for bundle in bundles
]
if not self._bundles[0]:
# This is when trainable itself doesn't need resources.
self._head_bundle_is_empty = True
self._bundles.pop(0)
else:
self._head_bundle_is_empty = False
self._strategy = strategy
self._args = args
self._kwargs = kwargs
self._hash = None
self._bound = None
self._bind()
@property
def head_bundle_is_empty(self):
"""Returns True if head bundle is empty while child bundles
need resources.
This is considered an internal API within Tune.
"""
return self._head_bundle_is_empty
@property
@DeveloperAPI
def head_cpus(self) -> float:
return 0.0 if self._head_bundle_is_empty else self._bundles[0].get("CPU", 0.0)
@property
@DeveloperAPI
def bundles(self) -> List[Dict[str, float]]:
"""Returns a deep copy of resource bundles"""
return deepcopy(self._bundles)
@property
def required_resources(self) -> Dict[str, float]:
"""Returns a dict containing the sums of all resources"""
resources = {}
for bundle in self._bundles:
for k, v in bundle.items():
resources[k] = resources.get(k, 0) + v
return resources
def _bind(self):
sig = signature(placement_group)
try:
self._bound = sig.bind(
self._bundles, self._strategy, *self._args, **self._kwargs
)
except Exception as exc:
raise RuntimeError(
"Invalid definition for placement group factory. Please check "
"that you passed valid arguments to the PlacementGroupFactory "
"object."
) from exc
def __call__(self, *args, **kwargs):
kwargs.update(self._bound.kwargs)
# Call with bounded *args and **kwargs
return placement_group(*self._bound.args, **kwargs)
def __eq__(self, other: "PlacementGroupFactory"):
return (
self._bound == other._bound
and self.head_bundle_is_empty == other.head_bundle_is_empty
)
def __hash__(self):
if not self._hash:
# Cache hash
self._hash = hash(
json.dumps(
{"args": self._bound.args, "kwargs": self._bound.kwargs},
sort_keys=True,
indent=0,
ensure_ascii=True,
)
)
return self._hash
def __getstate__(self):
state = self.__dict__.copy()
state.pop("_hash", None)
state.pop("_bound", None)
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._hash = None
self._bound = None
self._bind()
def __repr__(self) -> str:
return (
f"<PlacementGroupFactory (_bound={self._bound}, "
f"head_bundle_is_empty={self.head_bundle_is_empty})>"
)
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]):
spec = spec or {"cpu": 1}
if isinstance(spec, Resources):
spec = spec._asdict()
spec = spec.copy()
cpus = spec.pop("cpu", 0.0)
gpus = spec.pop("gpu", 0.0)
memory = spec.pop("memory", 0.0)
object_store_memory = spec.pop("object_store_memory", 0.0)
bundle = {k: v for k, v in spec.pop("custom_resources", {}).items()}
bundle.update(
{
"CPU": cpus,
"GPU": gpus,
"memory": memory,
"object_store_memory": object_store_memory,
}
)
return PlacementGroupFactory([bundle])
class _PlacementGroupManager:
"""PlacementGroupManager to stage and manage placement groups.
.. versionadded:: 1.3.0
This class schedules placement groups for trials, keeps track of
their state, and can return a fully configured actor class using
this placement group.
If two trials share the same placement group factory, both could use
resulting placement groups from it. Thus this manager associates
placement groups with their factory methods.
Args:
prefix: Prefix for the placement group names that are created.
"""
def __init__(self, prefix: str = "__tune__", max_staging: int = 1000):
self._prefix = prefix
# Sets of staged placement groups by factory
self._staging: Dict[PlacementGroupFactory, Set[PlacementGroup]] = defaultdict(
set
)
# Sets of ready and unused placement groups by factory
self._ready: Dict[PlacementGroupFactory, Set[PlacementGroup]] = defaultdict(set)
# Ray futures to check if a placement group is ready
self._staging_futures: Dict[
ObjectRef, Tuple[PlacementGroupFactory, PlacementGroup]
] = {}
# Cache of unstaged PGs (cleaned after full PG removal)
self._unstaged_pg_pgf: Dict[PlacementGroup, PlacementGroupFactory] = {}
self._unstaged_pgf_pg: Dict[
PlacementGroupFactory, Set[PlacementGroup]
] = defaultdict(set)
# Placement groups used by trials
self._in_use_pgs: Dict[PlacementGroup, "Trial"] = {}
self._in_use_trials: Dict["Trial", PlacementGroup] = {}
# Placement groups used by remote actors but not trials
# (e.g. for reuse_actors=True)
self._cached_pgs: Dict[PlacementGroup, PlacementGroupFactory] = {}
# Placement groups scheduled for delayed removal.
# This is used as a damper to filter out some high frequency change
# in resources request.
# Only PGs that have never been used go here.
# TODO(xwjiang): `self._pgs_for_removal` and `self._unstaged_xxx`
# are really the same now. We should consolidate to using one.
# Also `remove_placement_group` method should just be combined with
# `unstage_unused_xxx`.
self._pgs_for_removal: Dict[PlacementGroup, float] = {}
self._removal_delay = TUNE_PLACEMENT_GROUP_REMOVAL_DELAY
self._max_staging = max_staging
def set_max_staging(self, max_staging: int):
self._max_staging = max_staging
def remove_pg(self, pg: PlacementGroup):
"""Schedule placement group for (delayed) removal.
Args:
pg: Placement group object.
"""
self._pgs_for_removal[pg] = time.time()
def cleanup(self, force: bool = False):
"""Remove placement groups that are scheduled for removal.
Currently, this will remove placement groups after they've been
marked for removal for ``self._removal_delay`` seconds.
If ``force=True``, this condition is disregarded and all placement
groups are removed instead.
Args:
force: If True, all placement groups scheduled for removal
will be removed, disregarding any removal conditions.
"""
# Wrap in list so we can modify the dict
for pg in list(self._pgs_for_removal):
if (
force
or (time.time() - self._removal_delay) >= self._pgs_for_removal[pg]
):
self._pgs_for_removal.pop(pg)
remove_placement_group(pg)
# Remove from unstaged cache
if pg in self._unstaged_pg_pgf:
pgf = self._unstaged_pg_pgf.pop(pg)
self._unstaged_pgf_pg[pgf].discard(pg)
def cleanup_existing_pg(self, block: bool = False):
"""Clean up (remove) all existing placement groups.
This scans through the placement_group_table to discover existing
placement groups and calls remove_placement_group on all that
match the ``_tune__`` prefix. This method is called at the beginning
of the tuning run to clean up existing placement groups should the
experiment be interrupted by a driver failure and resumed in the
same driver script.
Args:
block: If True, will wait until all placement groups are
shut down.
"""
should_cleanup = not int(
os.getenv("TUNE_PLACEMENT_GROUP_CLEANUP_DISABLED", "0")
)
if should_cleanup:
has_non_removed_pg_left = True
while has_non_removed_pg_left:
has_non_removed_pg_left = False
for pid, info in placement_group_table().items():
if not info["name"].startswith(self._prefix):
continue
if info["state"] == "REMOVED":
continue
# If block=False, only run once
has_non_removed_pg_left = block
pg = get_placement_group(info["name"])
remove_placement_group(pg)
# Remove from unstaged cache
if pg in self._unstaged_pg_pgf:
pgf = self._unstaged_pg_pgf.pop(pg)
self._unstaged_pgf_pg[pgf].discard(pg)
time.sleep(0.1)
def stage_trial_pg(self, trial: "Trial"):
"""Stage a trial placement group.
Create the trial placement group if maximum number of pending
placement groups is not exhausted.
Args:
trial: Trial whose placement group to stage.
Returns:
False if placement group has not been staged, True otherwise.
Creates placement group and moves it to `self._staging`.
"""
if not self.can_stage():
return False
pgf = trial.placement_group_factory
return self._stage_pgf_pg(pgf)
def _stage_pgf_pg(self, pgf: PlacementGroupFactory):
"""Create placement group for factory"""
if len(self._unstaged_pgf_pg[pgf]) > 0:
# This re-uses a previously unstaged placement group
pg = self._unstaged_pgf_pg[pgf].pop()
del self._unstaged_pg_pgf[pg]
self._pgs_for_removal.pop(pg, None)
else:
# This creates the placement group
pg = pgf(name=f"{self._prefix}{uuid.uuid4().hex[:8]}")
self._staging[pgf].add(pg)
self._staging_futures[pg.ready()] = (pgf, pg)
return True
def can_stage(self):
"""Return True if we can stage another placement group."""
return len(self._staging_futures) < self._max_staging
def update_status(self):
"""Update placement group status.
Moves ready placement groups from `self._staging` to
`self._ready`.
"""
self.cleanup()
ready = True
while ready:
# Use a loop as `ready` might return futures one by one
ready, _ = ray.wait(list(self._staging_futures.keys()), timeout=0)
for ready_fut in ready:
self.handle_ready_future(ready_fut)
def handle_ready_future(self, ready_fut):
ready_pgf, ready_pg = self._staging_futures.pop(ready_fut)
self._staging[ready_pgf].remove(ready_pg)
self._ready[ready_pgf].add(ready_pg)
def get_staging_future_list(self):
return list(self._staging_futures.keys())
def get_full_actor_cls(
self, trial: "Trial", actor_cls: ActorClass
) -> Optional[ActorClass]:
"""Get a fully configured actor class.
Returns the actor handle if the placement group is ready. In this case,
the placement group is moved to `self._in_use_pgs` and removed from
`self._ready`.
Args:
trial: "Trial" object to start
actor_cls: Ray actor class.
Returns:
Configured ActorClass or None
"""
pgf = trial.placement_group_factory
if not self._ready[pgf]:
return None
pg = self._ready[pgf].pop()
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
logger.debug(f"For trial {trial} use pg {pg.id}")
# We still have to pass resource specs
if not pgf.head_bundle_is_empty:
# Pass the full resource specs of the first bundle per default
head_bundle = pg.bundle_specs[0].copy()
num_cpus = head_bundle.pop("CPU", 0)
num_gpus = head_bundle.pop("GPU", 0)
memory = head_bundle.pop("memory", None)
object_store_memory = head_bundle.pop("object_store_memory", None)
# Only custom resources remain in `head_bundle`
resources = head_bundle
return actor_cls.options(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
object_store_memory=object_store_memory,
resources=resources,
)
else:
return actor_cls.options(
placement_group=pg,
placement_group_capture_child_tasks=True,
num_cpus=0,
num_gpus=0,
resources={},
)
def has_ready(self, trial: "Trial", update: bool = False) -> bool:
"""Return True if placement group for trial is ready.
Args:
trial: :obj:`Trial` object.
update: Update status first.
Returns:
Boolean.
"""
if update:
self.update_status()
return bool(self._ready[trial.placement_group_factory])
def has_staging(self, trial: "Trial", update: bool = False) -> bool:
"""Return True if placement group for trial is staging.
Args:
trial: :obj:`Trial` object.
update: Update status first.
Returns:
Boolean.
"""
if update:
self.update_status()
return bool(self._staging[trial.placement_group_factory])
def trial_in_use(self, trial: "Trial"):
return trial in self._in_use_trials
def cache_trial_pg(self, trial: "Trial") -> Optional[PlacementGroup]:
"""Disassociated placement group from trial object.
This can be used to move placement groups into a cache so that
they can be reused by other trials. The difference to just making
them broadly available again is that they have to be specifically
re-assigned to a trial via :meth:`assign_cached_pg`. The reason
for this is that remote actors might already be scheduled on this
placement group, so it should only be associated to the trial that
actually re-uses the remote actor (e.g. when using ``reuse_trials``).
This will replace (unstage) an existing placement group with the same
factory object. If this is unsuccessful (e.g. because no such
pending placement group exists), the placement group will *not* be
cached and None will be returned.
Args:
trial: Trial object with the (currently in use) placement
group that should be cached.
Returns:
PlacementGroup object that was cached or None if
no placement group was replaced.
"""
pgf = trial.placement_group_factory
staged_pg = self._unstage_unused_pg(pgf)
if not staged_pg and not self._unstaged_pgf_pg[pgf]:
# If we have an unstaged placement group for this factory,
# this might be the same one we unstaged previously. If so,
# we should continue with the caching. If not, this will be
# reconciled later.
return None
if staged_pg:
self.remove_pg(staged_pg)
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
self._cached_pgs[pg] = trial.placement_group_factory
return pg
def assign_cached_pg(self, pg: PlacementGroup, trial: "Trial") -> bool:
"""Assign a cached pg to a trial."""
pgf = self._cached_pgs.pop(pg)
trial_pgf = trial.placement_group_factory
assert pgf == trial_pgf, (
f"Cannot assign placement group with a "
f"non-matching factory to trial {trial}"
)
logger.debug(f"For trial {trial} RE-use pg {pg.id}")
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
return True
def clean_cached_pg(self, pg: PlacementGroup):
self._cached_pgs.pop(pg)
def has_cached_pg(self, pgf: PlacementGroupFactory):
"""Check if a placement group for given factory has been cached"""
return any(cached_pgf == pgf for cached_pgf in self._cached_pgs.values())
def remove_from_in_use(self, trial: "Trial") -> PlacementGroup:
"""Return pg back to Core scheduling.
Args:
trial: Return placement group of this trial.
"""
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
return pg
def _unstage_unused_pg(
self, pgf: PlacementGroupFactory
) -> Optional[PlacementGroup]:
"""Unstage an unsued (i.e. staging or ready) placement group.
This method will find an unused placement group and remove it from
the tracked pool of placement groups (including e.g. the
staging futures). It will *not* call ``remove_placement_group()``
on the placement group - that is up to the calling method to do.
(The reason for this is that sometimes we would remove the placement
group directly, but sometimes we would like to enqueue removal.)
Args:
pgf: Placement group factory object.
This method will try to remove a staged PG of this factory
first, then settle for a ready but unused. If none exist,
no placement group will be removed and None will be returned.
Returns:
Removed placement group object or None.
"""
trial_pg = None
# If there are pending placement groups
# in staging, pop a random one.
if self._staging[pgf]:
trial_pg = self._staging[pgf].pop()
# For staging placement groups, we will also need to
# remove the future.
trial_future = None
for future, (pgf, pg) in self._staging_futures.items():
if pg == trial_pg:
trial_future = future
break
# Track unstaged placement groups for potential reuse
self._unstaged_pg_pgf[trial_pg] = pgf
self._unstaged_pgf_pg[pgf].add(trial_pg)
del self._staging_futures[trial_future]
elif self._ready[pgf]:
# Otherwise, return an unused ready placement group.
trial_pg = self._ready[pgf].pop()
return trial_pg
def reconcile_placement_groups(self, trials: List["Trial"]):
"""Reconcile placement groups to match requirements.
This will loop through all trials and count their statuses by
placement group factory. This will make sure that only as many
placement groups are needed as there are trials left to run.
E.g. if PGF_A has 2 terminated, 1 errored, 2 paused, 1 running,
and 3 pending trials, a total of 6 placement groups
(paused+running+pending) should be in staging, use, or the cache.
Args:
trials: List of trials.
"""
# Keep track of the currently tracked placement groups
current_counts: Dict[PlacementGroupFactory, int] = defaultdict(int)
# Count number of expected placement groups
pgf_expected: Dict[PlacementGroupFactory, int] = defaultdict(int)
for trial in trials:
# Count in-use placement groups
if trial in self._in_use_trials:
current_counts[trial.placement_group_factory] += 1
pgf_expected[trial.placement_group_factory] += (
1 if trial.status in ["PAUSED", "PENDING", "RUNNING"] else 0
)
# Ensure that unexpected placement groups are accounted for
for pgf in self._staging:
if pgf not in pgf_expected:
pgf_expected[pgf] = 0
for pgf in self._ready:
if pgf not in pgf_expected:
pgf_expected[pgf] = 0
# Count cached placement groups
for pg, pgf in self._cached_pgs.items():
current_counts[pgf] += 1
# Compare current with expected
for pgf, expected in pgf_expected.items():
# Add staging and ready pgs
current_counts[pgf] += len(self._staging[pgf])
current_counts[pgf] += len(self._ready[pgf])
while current_counts[pgf] > expected:
pg = self._unstage_unused_pg(pgf)
if not pg:
break
logger.debug(f"Removing unneeded placement group {pg.id}")
self.remove_pg(pg)
current_counts[pgf] -= 1
while expected > current_counts[pgf]:
self._stage_pgf_pg(pgf)
current_counts[pgf] += 1
logger.debug(
f"Adding an expected but previously unstaged "
f"placement group for factory {pgf}"
)
def occupied_resources(self):
"""Return a dictionary of currently in-use resources."""
resources = {"CPU": 0, "GPU": 0}
for pg in self._in_use_pgs:
for bundle_resources in pg.bundle_specs:
for key, val in bundle_resources.items():
resources[key] = resources.get(key, 0) + val
return resources

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,137 +1,4 @@
import logging
from functools import lru_cache
import os
import ray
import time
from typing import Dict
from ray.tune._structure_refactor import warn_structure_refactor
from ray.tune.execution.insufficient_resources_manager import * # noqa: F401, F403
from ray.tune.cluster_info import is_ray_cluster
from ray.tune.trial import Trial
logger = logging.getLogger(__name__)
# Ideally we want to use @cache; but it's only available for python 3.9.
# Caching is only helpful/correct for no autoscaler case.
@lru_cache()
def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()
def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
"""Calculates if there is enough resources for a PENDING trial.
For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
)
)
else:
# Set the default to 10s so that we don't prematurely determine that
# a cluster cannot fulfill the resources requirements.
# TODO(xwjiang): Change it back once #18608 is resolved.
return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))
# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
if is_ray_cluster():
return "Ignore this message if the cluster is autoscaling. " + msg
else:
return msg
# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"Ignore this message if the cluster is autoscaling. "
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime."
)
class _InsufficientResourcesManager:
"""Insufficient resources manager.
Makes best effort, conservative guesses about if Tune loop is stuck due to
infeasible resources. If so, outputs usability messages for users to
act upon.
"""
def __init__(self):
# The information tracked across the life time of Tune loop.
self._no_running_trials_since = -1
self._last_trial_num = -1
def on_no_available_trials(self, all_trials):
"""Tracks information across the life of Tune loop and makes guesses
about if Tune loop is stuck due to infeasible resources.
If so, outputs certain warning messages.
The logic should be conservative, non-intrusive and informative.
For example, rate limiting is applied so that the message is not
spammy.
"""
# This is approximately saying we are not making progress.
if len(all_trials) == self._last_trial_num:
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif (
time.monotonic() - self._no_running_trials_since
> _get_insufficient_resources_warning_threshold()
):
if not is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (
trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)
):
# TODO(xwjiang):
# Raise an Error once #18608 is resolved.
logger.warning(_get_insufficient_resources_error_msg(trial))
break
else:
# TODO(xwjiang): #17799.
# Output a more helpful msg for autoscaler.
logger.warning(_get_insufficient_resources_warning_msg())
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
self._last_trial_num = len(all_trials)
warn_structure_refactor(__name__, "ray.tune.execution.insufficient_resources_manager")

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ from typing import Dict, Optional, Union
import numpy as np
import pickle
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.result import DEFAULT_METRIC
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
from ray.tune.trial import Trial

View file

@ -1,7 +1,7 @@
import logging
from typing import Dict, Optional
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.schedulers.trial_scheduler import TrialScheduler
from ray.tune.schedulers.hyperband import HyperBandScheduler
from ray.tune.trial import Trial

View file

@ -4,7 +4,7 @@ from typing import Dict, List, Optional, Tuple
import numpy as np
import logging
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.result import DEFAULT_METRIC
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
from ray.tune.trial import Trial

View file

@ -4,7 +4,7 @@ from typing import Dict, List, Optional
import numpy as np
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.result import DEFAULT_METRIC
from ray.tune.trial import Trial
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler

View file

@ -7,7 +7,7 @@ import random
import shutil
from typing import Callable, Dict, List, Optional, Tuple, Union
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.error import TuneError
from ray.tune.result import DEFAULT_METRIC, TRAINING_ITERATION
from ray.tune.suggest import SearchGenerator

View file

@ -7,11 +7,11 @@ import pickle
import warnings
from ray.util.annotations import PublicAPI
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.resources import Resources
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
from ray.tune.trial import Trial
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
logger = logging.getLogger(__name__)

View file

@ -1,6 +1,6 @@
from typing import Dict, Optional
from ray.tune import trial_runner
from ray.tune.execution import trial_runner
from ray.tune.result import DEFAULT_METRIC
from ray.tune.trial import Trial
from ray.util.annotations import DeveloperAPI, PublicAPI

View file

@ -29,7 +29,7 @@ from ray.tune.callback import Callback
from ray.tune.experiment import Experiment
from ray.tune.function_runner import wrap_function
from ray.tune.logger import Logger, LegacyLoggerCallback
from ray.tune.ray_trial_executor import noop_logger_creator
from ray.tune.execution.ray_trial_executor import noop_logger_creator
from ray.tune.resources import Resources
from ray.tune.result import (
TIMESTEPS_TOTAL,
@ -63,9 +63,9 @@ from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.suggest.suggestion import ConcurrencyLimiter
from ray.tune.syncer import Syncer
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.utils import flatten_dict
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
class TrainableFunctionApiTest(unittest.TestCase):
@ -315,19 +315,19 @@ class TrainableFunctionApiTest(unittest.TestCase):
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "0"
with self.assertRaises(RuntimeError), patch.object(
ray.tune.ray_trial_executor.logger, "warning"
ray.tune.execution.ray_trial_executor.logger, "warning"
) as warn_mock:
self.assertRaises(TuneError, lambda: g(100, 100))
assert warn_mock.assert_called_once()
with self.assertRaises(RuntimeError), patch.object(
ray.tune.ray_trial_executor.logger, "warning"
ray.tune.execution.ray_trial_executor.logger, "warning"
) as warn_mock:
self.assertRaises(TuneError, lambda: g(0, 100))
assert warn_mock.assert_called_once()
with self.assertRaises(RuntimeError), patch.object(
ray.tune.ray_trial_executor.logger, "warning"
ray.tune.execution.ray_trial_executor.logger, "warning"
) as warn_mock:
self.assertRaises(TuneError, lambda: g(100, 0))
assert warn_mock.assert_called_once()

View file

@ -7,7 +7,7 @@ import unittest
from unittest.mock import patch
from ray.tune.result import TRAINING_ITERATION
from ray.tune.checkpoint_manager import _CheckpointManager
from ray.tune.execution.checkpoint_manager import _CheckpointManager
from ray.util.ml_utils.checkpoint_manager import (
_TrackedCheckpoint,
logger,

View file

@ -16,7 +16,7 @@ from ray.tune.error import TuneError
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.syncer import SyncerCallback
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
def _check_trial_running(trial):

View file

@ -8,7 +8,7 @@ import sys
import ray
from ray.cluster_utils import Cluster
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.utils.mock_trainable import MyTrainableClass

View file

@ -10,7 +10,7 @@ from ray.rllib import _register_all
from ray import tune
from ray.tune.logger import NoopLogger
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.function_runner import with_parameters, wrap_function, FuncCheckpointUtil
from ray.tune.result import DEFAULT_METRIC, TRAINING_ITERATION

View file

@ -18,7 +18,7 @@ from ray.tune.integration.wandb import (
)
from ray.tune.result import TRIAL_INFO
from ray.tune.trial import _TrialInfo
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
class Trial(

View file

@ -0,0 +1,44 @@
import warnings
import pytest
@pytest.fixture
def logging_setup():
warnings.filterwarnings("always")
def test_import_execution_checkpoint_manager(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.checkpoint_manager # noqa: F401
def test_import_execution_cluster_info(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.cluster_info # noqa: F401
def test_import_execution_insufficient_resources_manager(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.insufficient_resources_manager # noqa: F401
def test_import_execution_placement_groups(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.utils.placement_groups # noqa: F401
def test_import_execution_ray_trial_executor(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.ray_trial_executor # noqa: F401
def test_import_execution_trial_runner(logging_setup):
with pytest.warns(DeprecationWarning):
import ray.tune.trial_runner # noqa: F401
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -10,7 +10,7 @@ from ray import tune
from ray.rllib import _register_all
from ray.tune import Trainable
from ray.tune.callback import Callback
from ray.tune.ray_trial_executor import (
from ray.tune.execution.ray_trial_executor import (
_ExecutorEvent,
_ExecutorEventType,
RayTrialExecutor,
@ -21,7 +21,7 @@ from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial
from ray.tune.resources import Resources
from ray.cluster_utils import Cluster
from ray.tune.utils.placement_groups import (
from ray.tune.execution.placement_groups import (
PlacementGroupFactory,
_PlacementGroupManager,
)
@ -47,7 +47,7 @@ class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
self.cluster.shutdown()
# no autoscaler case, resource is not sufficient. Log warning for now.
@patch.object(ray.tune.insufficient_resources_manager.logger, "warning")
@patch.object(ray.tune.execution.insufficient_resources_manager.logger, "warning")
def testRaiseErrorNoAutoscaler(self, mocked_warn):
class FailureInjectorCallback(Callback):
"""Adds random failure injection to the TrialExecutor."""

View file

@ -7,14 +7,14 @@ from ray.rllib import _register_all
from ray import tune
from ray.tune import TuneError, register_trainable
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.resources import Resources
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.utils.mock import TrialStatusSnapshotTaker, TrialStatusSnapshot
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
class TrialRunnerTest(unittest.TestCase):

View file

@ -13,7 +13,7 @@ from ray.tune.schedulers import FIFOScheduler
from ray.tune.result import DONE
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.resources import Resources
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.tests.utils_for_test_trial_runner import TrialResultObserver

View file

@ -11,13 +11,13 @@ import ray
from ray.rllib import _register_all
from ray.tune import TuneError
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.result import TRAINING_ITERATION
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.experiment import Experiment
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.resources import Resources, json_to_resources, resources_to_json
from ray.tune.suggest.repeater import Repeater
from ray.tune.suggest._mock import _MockSuggestionAlgorithm

View file

@ -11,7 +11,7 @@ import ray
from ray import tune
from ray.rllib import _register_all
from ray.tune.logger import DEFAULT_LOGGERS, LoggerCallback, LegacyLoggerCallback
from ray.tune.ray_trial_executor import (
from ray.tune.execution.ray_trial_executor import (
_ExecutorEvent,
_ExecutorEventType,
RayTrialExecutor,
@ -21,7 +21,7 @@ from ray.tune.syncer import SyncConfig, SyncerCallback
from ray.tune.callback import warnings
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune import Callback
from ray.tune.utils.callback import create_default_callbacks
from ray.tune.experiment import Experiment

View file

@ -6,11 +6,11 @@ import unittest
import ray
from ray import tune
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.trial import Trial
from ray.tune import Callback
from ray.tune.trial_runner import TrialRunner
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util import placement_group_table
from ray.cluster_utils import Cluster
from ray.rllib import _register_all

View file

@ -14,7 +14,7 @@ from unittest.mock import MagicMock
import ray
from ray import tune
from ray.tune import Trainable
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.result import TRAINING_ITERATION
from ray.tune.schedulers import (
FIFOScheduler,

View file

@ -12,8 +12,8 @@ import ray
from ray import tune
from ray.tune import Trainable
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.schedulers import PopulationBasedTraining
from ray._private.test_utils import object_memory_usage

View file

@ -20,7 +20,7 @@ from ray.tune.callback import Callback
from ray.tune.suggest.basic_variant import BasicVariantGenerator
from ray.tune.suggest import Searcher
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.utils import validate_save_restore
from ray.tune.utils.mock_trainable import MyTrainableClass

View file

@ -8,7 +8,7 @@ import ray
from ray.rllib import _register_all
from ray.tune.trial import Trial, Resources
from ray.tune.web_server import TuneClient
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
def get_valid_port():

View file

@ -42,7 +42,7 @@ from ray.tune.result import (
from ray.tune.syncer import Syncer
from ray.tune.utils import UtilMonitor
from ray.tune.utils.log import disable_ipython
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils.util import (
Tee,

View file

@ -16,7 +16,7 @@ import ray
import ray.cloudpickle as cloudpickle
from ray.exceptions import RayActorError, RayTaskError
from ray.tune import TuneError
from ray.tune.checkpoint_manager import _CheckpointManager
from ray.tune.execution.checkpoint_manager import _CheckpointManager
# NOTE(rkn): We import ray.tune.registry here instead of importing the names we
# need because there are cyclic imports that may cause specific names to not
@ -33,7 +33,7 @@ from ray.tune.result import (
)
from ray.tune.resources import Resources
from ray.tune.syncer import Syncer
from ray.tune.utils.placement_groups import (
from ray.tune.execution.placement_groups import (
PlacementGroupFactory,
resource_dict_to_pg_factory,
)

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,7 @@ from ray.tune.progress_reporter import (
RemoteReporterMixin,
detect_reporter,
)
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.registry import get_trainable_cls, is_function_trainable
# Must come last to avoid circular imports
@ -42,10 +42,10 @@ from ray.tune.suggest.variant_generator import has_unresolved_values
from ray.tune.syncer import SyncConfig, SyncerCallback, _validate_upload_dir
from ray.tune.trainable import Trainable
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
from ray.tune.utils.callback import create_default_callbacks
from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util.annotations import PublicAPI
from ray.util.ml_utils.node import force_on_current_node
from ray.util.queue import Empty, Queue

View file

@ -1,766 +1,4 @@
from typing import Dict, List, Optional, Set, TYPE_CHECKING, Tuple, Union
from collections import defaultdict
from inspect import signature
from copy import deepcopy
import json
import os
import time
import uuid
from ray.tune._structure_refactor import warn_structure_refactor
from ray.tune.execution.placement_groups import * # noqa: F401, F403
import ray
from ray import ObjectRef, logger
from ray.actor import ActorClass
from ray.tune.resources import Resources
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.util.placement_group import (
PlacementGroup,
get_placement_group,
placement_group,
placement_group_table,
remove_placement_group,
)
if TYPE_CHECKING:
from ray.tune.trial import Trial
TUNE_PLACEMENT_GROUP_REMOVAL_DELAY = 2.0
_tune_pg_prefix = None
def get_tune_pg_prefix():
"""Get the tune placement group name prefix.
This will store the prefix in a global variable so that subsequent runs
can use this identifier to clean up placement groups before starting their
run.
Can be overwritten with the ``TUNE_PLACEMENT_GROUP_PREFIX`` env variable.
"""
global _tune_pg_prefix
if _tune_pg_prefix:
return _tune_pg_prefix
# Else: check env variable
env_prefix = os.getenv("TUNE_PLACEMENT_GROUP_PREFIX", "")
if env_prefix:
_tune_pg_prefix = env_prefix
return _tune_pg_prefix
# Else: create and store unique prefix
_tune_pg_prefix = f"__tune_{uuid.uuid4().hex[:8]}__"
return _tune_pg_prefix
@PublicAPI(stability="beta")
class PlacementGroupFactory:
"""Wrapper class that creates placement groups for trials.
This function should be used to define resource requests for Ray Tune
trials. It holds the parameters to create placement groups.
At a minimum, this will hold at least one bundle specifying the
resource requirements for each trial:
.. code-block:: python
from ray import tune
tune.run(
train,
tune.PlacementGroupFactory([
{"CPU": 1, "GPU": 0.5, "custom_resource": 2}
]))
If the trial itself schedules further remote workers, the resource
requirements should be specified in additional bundles. You can also
pass the placement strategy for these bundles, e.g. to enforce
co-located placement:
.. code-block:: python
from ray import tune
tune.run(
train,
resources_per_trial=tune.PlacementGroupFactory([
{"CPU": 1, "GPU": 0.5, "custom_resource": 2},
{"CPU": 2},
{"CPU": 2},
], strategy="PACK"))
The example above will reserve 1 CPU, 0.5 GPUs and 2 custom_resources
for the trainable itself, and reserve another 2 bundles of 2 CPUs each.
The trial will only start when all these resources are available. This
could be used e.g. if you had one learner running in the main trainable
that schedules two remote workers that need access to 2 CPUs each.
If the trainable itself doesn't require resources.
You can specify it as:
.. code-block:: python
from ray import tune
tune.run(
train,
resources_per_trial=tune.PlacementGroupFactory([
{},
{"CPU": 2},
{"CPU": 2},
], strategy="PACK"))
Args:
bundles(List[Dict]): A list of bundles which
represent the resources requirements.
strategy(str): The strategy to create the placement group.
- "PACK": Packs Bundles into as few nodes as possible.
- "SPREAD": Places Bundles across distinct nodes as even as possible.
- "STRICT_PACK": Packs Bundles into one node. The group is
not allowed to span multiple nodes.
- "STRICT_SPREAD": Packs Bundles across distinct nodes.
*args: Passed to the call of ``placement_group()``
**kwargs: Passed to the call of ``placement_group()``
"""
def __init__(
self,
bundles: List[Dict[str, Union[int, float]]],
strategy: str = "PACK",
*args,
**kwargs,
):
assert (
len(bundles) > 0
), "Cannot initialize a PlacementGroupFactory with zero bundles."
self._bundles = [
{k: float(v) for k, v in bundle.items() if v != 0} for bundle in bundles
]
if not self._bundles[0]:
# This is when trainable itself doesn't need resources.
self._head_bundle_is_empty = True
self._bundles.pop(0)
else:
self._head_bundle_is_empty = False
self._strategy = strategy
self._args = args
self._kwargs = kwargs
self._hash = None
self._bound = None
self._bind()
@property
def head_bundle_is_empty(self):
"""Returns True if head bundle is empty while child bundles
need resources.
This is considered an internal API within Tune.
"""
return self._head_bundle_is_empty
@property
@DeveloperAPI
def head_cpus(self) -> float:
return 0.0 if self._head_bundle_is_empty else self._bundles[0].get("CPU", 0.0)
@property
@DeveloperAPI
def bundles(self) -> List[Dict[str, float]]:
"""Returns a deep copy of resource bundles"""
return deepcopy(self._bundles)
@property
def required_resources(self) -> Dict[str, float]:
"""Returns a dict containing the sums of all resources"""
resources = {}
for bundle in self._bundles:
for k, v in bundle.items():
resources[k] = resources.get(k, 0) + v
return resources
def _bind(self):
sig = signature(placement_group)
try:
self._bound = sig.bind(
self._bundles, self._strategy, *self._args, **self._kwargs
)
except Exception as exc:
raise RuntimeError(
"Invalid definition for placement group factory. Please check "
"that you passed valid arguments to the PlacementGroupFactory "
"object."
) from exc
def __call__(self, *args, **kwargs):
kwargs.update(self._bound.kwargs)
# Call with bounded *args and **kwargs
return placement_group(*self._bound.args, **kwargs)
def __eq__(self, other: "PlacementGroupFactory"):
return (
self._bound == other._bound
and self.head_bundle_is_empty == other.head_bundle_is_empty
)
def __hash__(self):
if not self._hash:
# Cache hash
self._hash = hash(
json.dumps(
{"args": self._bound.args, "kwargs": self._bound.kwargs},
sort_keys=True,
indent=0,
ensure_ascii=True,
)
)
return self._hash
def __getstate__(self):
state = self.__dict__.copy()
state.pop("_hash", None)
state.pop("_bound", None)
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._hash = None
self._bound = None
self._bind()
def __repr__(self) -> str:
return (
f"<PlacementGroupFactory (_bound={self._bound}, "
f"head_bundle_is_empty={self.head_bundle_is_empty})>"
)
def resource_dict_to_pg_factory(spec: Optional[Dict[str, float]]):
spec = spec or {"cpu": 1}
if isinstance(spec, Resources):
spec = spec._asdict()
spec = spec.copy()
cpus = spec.pop("cpu", 0.0)
gpus = spec.pop("gpu", 0.0)
memory = spec.pop("memory", 0.0)
object_store_memory = spec.pop("object_store_memory", 0.0)
bundle = {k: v for k, v in spec.pop("custom_resources", {}).items()}
bundle.update(
{
"CPU": cpus,
"GPU": gpus,
"memory": memory,
"object_store_memory": object_store_memory,
}
)
return PlacementGroupFactory([bundle])
class _PlacementGroupManager:
"""PlacementGroupManager to stage and manage placement groups.
.. versionadded:: 1.3.0
This class schedules placement groups for trials, keeps track of
their state, and can return a fully configured actor class using
this placement group.
If two trials share the same placement group factory, both could use
resulting placement groups from it. Thus this manager associates
placement groups with their factory methods.
Args:
prefix: Prefix for the placement group names that are created.
"""
def __init__(self, prefix: str = "__tune__", max_staging: int = 1000):
self._prefix = prefix
# Sets of staged placement groups by factory
self._staging: Dict[PlacementGroupFactory, Set[PlacementGroup]] = defaultdict(
set
)
# Sets of ready and unused placement groups by factory
self._ready: Dict[PlacementGroupFactory, Set[PlacementGroup]] = defaultdict(set)
# Ray futures to check if a placement group is ready
self._staging_futures: Dict[
ObjectRef, Tuple[PlacementGroupFactory, PlacementGroup]
] = {}
# Cache of unstaged PGs (cleaned after full PG removal)
self._unstaged_pg_pgf: Dict[PlacementGroup, PlacementGroupFactory] = {}
self._unstaged_pgf_pg: Dict[
PlacementGroupFactory, Set[PlacementGroup]
] = defaultdict(set)
# Placement groups used by trials
self._in_use_pgs: Dict[PlacementGroup, "Trial"] = {}
self._in_use_trials: Dict["Trial", PlacementGroup] = {}
# Placement groups used by remote actors but not trials
# (e.g. for reuse_actors=True)
self._cached_pgs: Dict[PlacementGroup, PlacementGroupFactory] = {}
# Placement groups scheduled for delayed removal.
# This is used as a damper to filter out some high frequency change
# in resources request.
# Only PGs that have never been used go here.
# TODO(xwjiang): `self._pgs_for_removal` and `self._unstaged_xxx`
# are really the same now. We should consolidate to using one.
# Also `remove_placement_group` method should just be combined with
# `unstage_unused_xxx`.
self._pgs_for_removal: Dict[PlacementGroup, float] = {}
self._removal_delay = TUNE_PLACEMENT_GROUP_REMOVAL_DELAY
self._max_staging = max_staging
def set_max_staging(self, max_staging: int):
self._max_staging = max_staging
def remove_pg(self, pg: PlacementGroup):
"""Schedule placement group for (delayed) removal.
Args:
pg: Placement group object.
"""
self._pgs_for_removal[pg] = time.time()
def cleanup(self, force: bool = False):
"""Remove placement groups that are scheduled for removal.
Currently, this will remove placement groups after they've been
marked for removal for ``self._removal_delay`` seconds.
If ``force=True``, this condition is disregarded and all placement
groups are removed instead.
Args:
force: If True, all placement groups scheduled for removal
will be removed, disregarding any removal conditions.
"""
# Wrap in list so we can modify the dict
for pg in list(self._pgs_for_removal):
if (
force
or (time.time() - self._removal_delay) >= self._pgs_for_removal[pg]
):
self._pgs_for_removal.pop(pg)
remove_placement_group(pg)
# Remove from unstaged cache
if pg in self._unstaged_pg_pgf:
pgf = self._unstaged_pg_pgf.pop(pg)
self._unstaged_pgf_pg[pgf].discard(pg)
def cleanup_existing_pg(self, block: bool = False):
"""Clean up (remove) all existing placement groups.
This scans through the placement_group_table to discover existing
placement groups and calls remove_placement_group on all that
match the ``_tune__`` prefix. This method is called at the beginning
of the tuning run to clean up existing placement groups should the
experiment be interrupted by a driver failure and resumed in the
same driver script.
Args:
block: If True, will wait until all placement groups are
shut down.
"""
should_cleanup = not int(
os.getenv("TUNE_PLACEMENT_GROUP_CLEANUP_DISABLED", "0")
)
if should_cleanup:
has_non_removed_pg_left = True
while has_non_removed_pg_left:
has_non_removed_pg_left = False
for pid, info in placement_group_table().items():
if not info["name"].startswith(self._prefix):
continue
if info["state"] == "REMOVED":
continue
# If block=False, only run once
has_non_removed_pg_left = block
pg = get_placement_group(info["name"])
remove_placement_group(pg)
# Remove from unstaged cache
if pg in self._unstaged_pg_pgf:
pgf = self._unstaged_pg_pgf.pop(pg)
self._unstaged_pgf_pg[pgf].discard(pg)
time.sleep(0.1)
def stage_trial_pg(self, trial: "Trial"):
"""Stage a trial placement group.
Create the trial placement group if maximum number of pending
placement groups is not exhausted.
Args:
trial: Trial whose placement group to stage.
Returns:
False if placement group has not been staged, True otherwise.
Creates placement group and moves it to `self._staging`.
"""
if not self.can_stage():
return False
pgf = trial.placement_group_factory
return self._stage_pgf_pg(pgf)
def _stage_pgf_pg(self, pgf: PlacementGroupFactory):
"""Create placement group for factory"""
if len(self._unstaged_pgf_pg[pgf]) > 0:
# This re-uses a previously unstaged placement group
pg = self._unstaged_pgf_pg[pgf].pop()
del self._unstaged_pg_pgf[pg]
self._pgs_for_removal.pop(pg, None)
else:
# This creates the placement group
pg = pgf(name=f"{self._prefix}{uuid.uuid4().hex[:8]}")
self._staging[pgf].add(pg)
self._staging_futures[pg.ready()] = (pgf, pg)
return True
def can_stage(self):
"""Return True if we can stage another placement group."""
return len(self._staging_futures) < self._max_staging
def update_status(self):
"""Update placement group status.
Moves ready placement groups from `self._staging` to
`self._ready`.
"""
self.cleanup()
ready = True
while ready:
# Use a loop as `ready` might return futures one by one
ready, _ = ray.wait(list(self._staging_futures.keys()), timeout=0)
for ready_fut in ready:
self.handle_ready_future(ready_fut)
def handle_ready_future(self, ready_fut):
ready_pgf, ready_pg = self._staging_futures.pop(ready_fut)
self._staging[ready_pgf].remove(ready_pg)
self._ready[ready_pgf].add(ready_pg)
def get_staging_future_list(self):
return list(self._staging_futures.keys())
def get_full_actor_cls(
self, trial: "Trial", actor_cls: ActorClass
) -> Optional[ActorClass]:
"""Get a fully configured actor class.
Returns the actor handle if the placement group is ready. In this case,
the placement group is moved to `self._in_use_pgs` and removed from
`self._ready`.
Args:
trial: "Trial" object to start
actor_cls: Ray actor class.
Returns:
Configured ActorClass or None
"""
pgf = trial.placement_group_factory
if not self._ready[pgf]:
return None
pg = self._ready[pgf].pop()
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
logger.debug(f"For trial {trial} use pg {pg.id}")
# We still have to pass resource specs
if not pgf.head_bundle_is_empty:
# Pass the full resource specs of the first bundle per default
head_bundle = pg.bundle_specs[0].copy()
num_cpus = head_bundle.pop("CPU", 0)
num_gpus = head_bundle.pop("GPU", 0)
memory = head_bundle.pop("memory", None)
object_store_memory = head_bundle.pop("object_store_memory", None)
# Only custom resources remain in `head_bundle`
resources = head_bundle
return actor_cls.options(
placement_group=pg,
placement_group_bundle_index=0,
placement_group_capture_child_tasks=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
object_store_memory=object_store_memory,
resources=resources,
)
else:
return actor_cls.options(
placement_group=pg,
placement_group_capture_child_tasks=True,
num_cpus=0,
num_gpus=0,
resources={},
)
def has_ready(self, trial: "Trial", update: bool = False) -> bool:
"""Return True if placement group for trial is ready.
Args:
trial: :obj:`Trial` object.
update: Update status first.
Returns:
Boolean.
"""
if update:
self.update_status()
return bool(self._ready[trial.placement_group_factory])
def has_staging(self, trial: "Trial", update: bool = False) -> bool:
"""Return True if placement group for trial is staging.
Args:
trial: :obj:`Trial` object.
update: Update status first.
Returns:
Boolean.
"""
if update:
self.update_status()
return bool(self._staging[trial.placement_group_factory])
def trial_in_use(self, trial: "Trial"):
return trial in self._in_use_trials
def cache_trial_pg(self, trial: "Trial") -> Optional[PlacementGroup]:
"""Disassociated placement group from trial object.
This can be used to move placement groups into a cache so that
they can be reused by other trials. The difference to just making
them broadly available again is that they have to be specifically
re-assigned to a trial via :meth:`assign_cached_pg`. The reason
for this is that remote actors might already be scheduled on this
placement group, so it should only be associated to the trial that
actually re-uses the remote actor (e.g. when using ``reuse_trials``).
This will replace (unstage) an existing placement group with the same
factory object. If this is unsuccessful (e.g. because no such
pending placement group exists), the placement group will *not* be
cached and None will be returned.
Args:
trial: Trial object with the (currently in use) placement
group that should be cached.
Returns:
PlacementGroup object that was cached or None if
no placement group was replaced.
"""
pgf = trial.placement_group_factory
staged_pg = self._unstage_unused_pg(pgf)
if not staged_pg and not self._unstaged_pgf_pg[pgf]:
# If we have an unstaged placement group for this factory,
# this might be the same one we unstaged previously. If so,
# we should continue with the caching. If not, this will be
# reconciled later.
return None
if staged_pg:
self.remove_pg(staged_pg)
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
self._cached_pgs[pg] = trial.placement_group_factory
return pg
def assign_cached_pg(self, pg: PlacementGroup, trial: "Trial") -> bool:
"""Assign a cached pg to a trial."""
pgf = self._cached_pgs.pop(pg)
trial_pgf = trial.placement_group_factory
assert pgf == trial_pgf, (
f"Cannot assign placement group with a "
f"non-matching factory to trial {trial}"
)
logger.debug(f"For trial {trial} RE-use pg {pg.id}")
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
return True
def clean_cached_pg(self, pg: PlacementGroup):
self._cached_pgs.pop(pg)
def has_cached_pg(self, pgf: PlacementGroupFactory):
"""Check if a placement group for given factory has been cached"""
return any(cached_pgf == pgf for cached_pgf in self._cached_pgs.values())
def remove_from_in_use(self, trial: "Trial") -> PlacementGroup:
"""Return pg back to Core scheduling.
Args:
trial: Return placement group of this trial.
"""
pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(pg)
return pg
def _unstage_unused_pg(
self, pgf: PlacementGroupFactory
) -> Optional[PlacementGroup]:
"""Unstage an unsued (i.e. staging or ready) placement group.
This method will find an unused placement group and remove it from
the tracked pool of placement groups (including e.g. the
staging futures). It will *not* call ``remove_placement_group()``
on the placement group - that is up to the calling method to do.
(The reason for this is that sometimes we would remove the placement
group directly, but sometimes we would like to enqueue removal.)
Args:
pgf: Placement group factory object.
This method will try to remove a staged PG of this factory
first, then settle for a ready but unused. If none exist,
no placement group will be removed and None will be returned.
Returns:
Removed placement group object or None.
"""
trial_pg = None
# If there are pending placement groups
# in staging, pop a random one.
if self._staging[pgf]:
trial_pg = self._staging[pgf].pop()
# For staging placement groups, we will also need to
# remove the future.
trial_future = None
for future, (pgf, pg) in self._staging_futures.items():
if pg == trial_pg:
trial_future = future
break
# Track unstaged placement groups for potential reuse
self._unstaged_pg_pgf[trial_pg] = pgf
self._unstaged_pgf_pg[pgf].add(trial_pg)
del self._staging_futures[trial_future]
elif self._ready[pgf]:
# Otherwise, return an unused ready placement group.
trial_pg = self._ready[pgf].pop()
return trial_pg
def reconcile_placement_groups(self, trials: List["Trial"]):
"""Reconcile placement groups to match requirements.
This will loop through all trials and count their statuses by
placement group factory. This will make sure that only as many
placement groups are needed as there are trials left to run.
E.g. if PGF_A has 2 terminated, 1 errored, 2 paused, 1 running,
and 3 pending trials, a total of 6 placement groups
(paused+running+pending) should be in staging, use, or the cache.
Args:
trials: List of trials.
"""
# Keep track of the currently tracked placement groups
current_counts: Dict[PlacementGroupFactory, int] = defaultdict(int)
# Count number of expected placement groups
pgf_expected: Dict[PlacementGroupFactory, int] = defaultdict(int)
for trial in trials:
# Count in-use placement groups
if trial in self._in_use_trials:
current_counts[trial.placement_group_factory] += 1
pgf_expected[trial.placement_group_factory] += (
1 if trial.status in ["PAUSED", "PENDING", "RUNNING"] else 0
)
# Ensure that unexpected placement groups are accounted for
for pgf in self._staging:
if pgf not in pgf_expected:
pgf_expected[pgf] = 0
for pgf in self._ready:
if pgf not in pgf_expected:
pgf_expected[pgf] = 0
# Count cached placement groups
for pg, pgf in self._cached_pgs.items():
current_counts[pgf] += 1
# Compare current with expected
for pgf, expected in pgf_expected.items():
# Add staging and ready pgs
current_counts[pgf] += len(self._staging[pgf])
current_counts[pgf] += len(self._ready[pgf])
while current_counts[pgf] > expected:
pg = self._unstage_unused_pg(pgf)
if not pg:
break
logger.debug(f"Removing unneeded placement group {pg.id}")
self.remove_pg(pg)
current_counts[pgf] -= 1
while expected > current_counts[pgf]:
self._stage_pgf_pg(pgf)
current_counts[pgf] += 1
logger.debug(
f"Adding an expected but previously unstaged "
f"placement group for factory {pgf}"
)
def occupied_resources(self):
"""Return a dictionary of currently in-use resources."""
resources = {"CPU": 0, "GPU": 0}
for pg in self._in_use_pgs:
for bundle_resources in pg.bundle_specs:
for key, val in bundle_resources.items():
resources[key] = resources.get(key, 0) + val
return resources
warn_structure_refactor(__name__, "ray.tune.execution.placement_groups")

View file

@ -13,7 +13,7 @@ from ray._private.utils import binary_to_hex, hex_to_binary
from ray.util.annotations import DeveloperAPI
if TYPE_CHECKING:
from ray.tune.trial_runner import TrialRunner
from ray.tune.execution.trial_runner import TrialRunner
logger = logging.getLogger(__name__)

View file

@ -1,7 +1,7 @@
from sklearn import datasets
from sklearn.model_selection import train_test_split
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util.xgboost import RayDMatrix, RayParams, train
# __train_begin__

View file

@ -45,7 +45,7 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
import ray
import ray.cloudpickle as pickle
from ray.tune.trial_runner import find_newest_experiment_checkpoint
from ray.tune.execution.trial_runner import find_newest_experiment_checkpoint
from ray.tune.utils.serialization import TuneFunctionDecoder
TUNE_SCRIPT = os.path.join(os.path.dirname(__file__), "_tune_script.py")

View file

@ -16,7 +16,7 @@ import os
import ray
from ray import tune
from ray.tune.cluster_info import is_ray_cluster
from ray.tune.execution.cluster_info import is_ray_cluster
from ray.tune.utils.release_test_util import timed_tune_run

View file

@ -100,7 +100,7 @@ from ray.tune.resources import Resources
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.trainable import Trainable
from ray.tune.trial import ExportFormat
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util import log_once
from ray.util.timer import _Timer

View file

@ -42,7 +42,7 @@ from ray.rllib.utils.typing import (
AlgorithmConfigDict,
ResultDict,
)
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util.timer import _Timer

View file

@ -51,7 +51,7 @@ from ray.rllib.utils.typing import (
ResultDict,
)
from ray.tune.trainable import Trainable
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.util.ml_utils.dict import merge_dicts

View file

@ -49,7 +49,7 @@ from ray.rllib.utils.typing import (
SampleBatchType,
T,
)
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.types import ObjectRef
logger = logging.getLogger(__name__)

View file

@ -15,7 +15,7 @@ from ray.rllib.evaluation import RolloutWorker
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
parser = argparse.ArgumentParser()
parser.add_argument("--gpu", action="store_true")

View file

@ -5,9 +5,9 @@ import ray
from ray import tune
from ray.tune import Callback
from ray.rllib.algorithms.pg import PG, DEFAULT_CONFIG
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.trial import Trial
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.execution.placement_groups import PlacementGroupFactory
trial_executor = None