diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index 72a19f44d..078a0f85e 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -850,7 +850,7 @@ These are the environment variables Ray Tune currently considers: * **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds). * **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S**: Threshold for throwing a warning if no active trials are in ``RUNNING`` state for this amount of seconds. If the Ray Tune job is stuck in this state (most likely due to insufficient resources), - the warning message is printed repeatedly every this amount of seconds. Defaults to 1 (seconds). + the warning message is printed repeatedly every this amount of seconds. Defaults to 10 (seconds). * **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER**: Threshold for throwing a warning, when the autoscaler is enabled, if no active trials are in ``RUNNING`` state for this amount of seconds. If the Ray Tune job is stuck in this state (most likely due to insufficient resources), the warning message is printed diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 56b5befd3..87071a9e0 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -1,13 +1,12 @@ # coding: utf-8 -from freezegun import freeze_time -from mock import patch import os +import pytest import unittest import ray from ray import tune from ray.rllib import _register_all -from ray.tune import Trainable +from ray.tune import Trainable, TuneError from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.result import TRAINING_ITERATION @@ -20,54 +19,39 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory class TrialExecutorInsufficientResourcesTest(unittest.TestCase): def setUp(self): - os.environ["TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S"] = "1" + os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "1" self.cluster = Cluster( initialize_head=True, connect=True, head_node_args={ "num_cpus": 4, "num_gpus": 2, - "_system_config": { - "num_heartbeats_timeout": 10 - } }) def tearDown(self): ray.shutdown() self.cluster.shutdown() - @freeze_time("2021-08-03", auto_tick_seconds=15) - @patch.object(ray.tune.trial_executor.logger, "warning") - def testOutputWarningMessage(self, mocked_warn): + # no autoscaler case, resource is not sufficient. Raise error. + def testRaiseErrorNoAutoscaler(self): def train(config): pass - tune.run( - train, resources_per_trial={ - "cpu": 1, - "gpu": 1, - }) - msg = ( - "No trial is running and no new trial has been started within at" - " least the last 1.0 seconds. This could be due to the cluster " - "not having enough resources available to start the next trial. " - "Please stop the tuning job and readjust resources_per_trial " - "argument passed into tune.run() and/or start a cluster with more " - "resources.") - mocked_warn.assert_called_with(msg) - - @freeze_time("2021-08-03") - @patch.object(ray.tune.trial_executor.logger, "warning") - def testNotOutputWarningMessage(self, mocked_warn): - def train(config): - pass - - tune.run( - train, resources_per_trial={ - "cpu": 1, - "gpu": 1, - }) - mocked_warn.assert_not_called() + with pytest.raises(TuneError) as cm: + tune.run( + train, + resources_per_trial={ + "cpu": 5, # more than what the cluster can offer. + "gpu": 3, + }) + msg = ("You asked for 5.0 cpu and 3.0 gpu per trial, " + "but the cluster only has 4.0 cpu and 2.0 gpu. " + "Stop the tuning job and " + "adjust the resources requested per trial " + "(possibly via `resources_per_trial` " + "or via `num_workers` for rllib) " + "and/or add more resources to your Ray runtime.") + assert str(cm._excinfo[1]) == msg class RayTrialExecutorTest(unittest.TestCase): @@ -475,6 +459,5 @@ class LocalModeExecutorTest(RayTrialExecutorTest): if __name__ == "__main__": - import pytest import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index 5af548823..cbe3187df 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -6,6 +6,7 @@ import os import time from typing import Dict, List, Optional +import ray from ray.tune.resources import Resources from ray.util.annotations import DeveloperAPI from ray.tune.trial import Trial, Checkpoint @@ -15,37 +16,78 @@ from ray.tune.cluster_info import is_ray_cluster logger = logging.getLogger(__name__) +# Ideally we want to use @cache; but it's only available for python 3.9. +# Caching is only helpful/correct for no autoscaler case. @lru_cache() -def _get_warning_threshold() -> float: +def _get_cluster_resources_no_autoscaler() -> Dict: + return ray.cluster_resources() + + +def _get_trial_cpu_and_gpu(trial: Trial) -> Dict: + cpu = trial.resources.cpu + trial.resources.extra_cpu + gpu = trial.resources.gpu + trial.resources.extra_gpu + if trial.placement_group_factory is not None: + cpu = trial.placement_group_factory.required_resources.get("CPU", 0) + gpu = trial.placement_group_factory.required_resources.get("GPU", 0) + return {"CPU": cpu, "GPU": gpu} + + +def _can_fulfill_no_autoscaler(trial: Trial) -> bool: + """Calculates if there is enough resources for a PENDING trial. + + For no autoscaler case. + """ + assert trial.status == Trial.PENDING + trial_cpu_gpu = _get_trial_cpu_and_gpu(trial) + + return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get( + "CPU", 0 + ) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get( + "GPU", 0) + + +@lru_cache() +def _get_insufficient_resources_warning_threshold() -> float: if is_ray_cluster(): return float( os.environ.get( "TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60")) else: + # Set the default to 10s so that we don't prematurely determine that + # a cluster cannot fulfill the resources requirements. return float( - os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "1")) + os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "10")) +# TODO(xwjiang): Consider having a help page with more detailed instructions. @lru_cache() -def _get_warning_msg() -> str: +def _get_insufficient_resources_warning_msg() -> str: + msg = ( + f"No trial is running and no new trial has been started within" + f" at least the last " + f"{_get_insufficient_resources_warning_threshold()} seconds. " + f"This could be due to the cluster not having enough " + f"resources available to start the next trial. " + f"Stop the tuning job and adjust the resources requested per trial " + f"(possibly via `resources_per_trial` or via `num_workers` for rllib) " + f"and/or add more resources to your Ray runtime.") if is_ray_cluster(): - return ( - f"If autoscaler is still scaling up, ignore this message. No " - f"trial is running and no new trial has been started within at " - f"least the last {_get_warning_threshold()} seconds. " - f"This could be due to the cluster not having enough " - f"resources available to start the next trial. Please stop the " - f"tuning job and readjust resources_per_trial argument passed " - f"into tune.run() as well as max_workers and worker_nodes " - f"InstanceType specified in cluster.yaml.") + return "If autoscaler is still scaling up, ignore this message. " + msg else: - return (f"No trial is running and no new trial has been started within" - f" at least the last {_get_warning_threshold()} seconds. " - f"This could be due to the cluster not having enough " - f"resources available to start the next trial. Please stop " - f"the tuning job and readjust resources_per_trial argument " - f"passed into tune.run() and/or start a cluster with more " - f"resources.") + return msg + + +# A beefed up version when Tune Error is raised. +def _get_insufficient_resources_error_msg(trial: Trial) -> str: + trial_cpu_gpu = _get_trial_cpu_and_gpu(trial) + return ( + f"You asked for {trial_cpu_gpu['CPU']} cpu and " + f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has " + f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and " + f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. " + f"Stop the tuning job and adjust the resources requested per trial " + f"(possibly via `resources_per_trial` or via `num_workers` for rllib) " + f"and/or add more resources to your Ray runtime.") @DeveloperAPI @@ -68,12 +110,19 @@ class TrialExecutor(metaclass=ABCMeta): self._queue_trials = queue_trials self._cached_trial_state = {} self._trials_to_cache = set() + # The next two variables are used to keep track of if there is any + # "progress" made between subsequent calls to `on_no_available_trials`. + # TODO(xwjiang): Clean this up once figuring out who should have a + # holistic view of trials - runner or executor. + # Also iterating over list of trials every time is very inefficient. + # Need better visibility APIs into trials. # The start time since when all active trials have been in PENDING # state, or since last time we output a resource insufficent # warning message, whichever comes later. # -1 means either the TrialExecutor is just initialized without any # trials yet, or there are some trials in RUNNING state. self._no_running_trials_since = -1 + self._all_trials_size = -1 def set_status(self, trial: Trial, status: str) -> None: """Sets status and checkpoints metadata if needed. @@ -237,17 +286,28 @@ class TrialExecutor(metaclass=ABCMeta): pass def _may_warn_insufficient_resources(self, all_trials): - if not any(trial.status == Trial.RUNNING for trial in all_trials): + # This is approximately saying we are not making progress. + if len(all_trials) == self._all_trials_size: if self._no_running_trials_since == -1: self._no_running_trials_since = time.monotonic() - elif time.monotonic( - ) - self._no_running_trials_since > _get_warning_threshold(): - # TODO(xwjiang): We should ideally output a more helpful msg. - # https://github.com/ray-project/ray/issues/17799 - logger.warning(_get_warning_msg()) + elif (time.monotonic() - self._no_running_trials_since > + _get_insufficient_resources_warning_threshold()): + if not is_ray_cluster(): # autoscaler not enabled + # If any of the pending trial cannot be fulfilled, + # that's a good enough hint of trial resources not enough. + for trial in all_trials: + if (trial.status is Trial.PENDING + and not _can_fulfill_no_autoscaler(trial)): + raise TuneError( + _get_insufficient_resources_error_msg(trial)) + else: + # TODO(xwjiang): Output a more helpful msg for autoscaler. + # https://github.com/ray-project/ray/issues/17799 + logger.warning(_get_insufficient_resources_warning_msg()) self._no_running_trials_since = time.monotonic() else: self._no_running_trials_since = -1 + self._all_trials_size = len(all_trials) def on_no_available_trials(self, trials: List[Trial]) -> None: """