[Tune] Raise Error when there are insufficient resources. (#17957)

This commit is contained in:
xwjiang2010 2021-09-03 10:49:54 -07:00 committed by GitHub
parent ac5d255c9c
commit 01adf030ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 63 deletions

View file

@ -850,7 +850,7 @@ These are the environment variables Ray Tune currently considers:
* **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S**: Threshold for throwing a warning if no active trials are in ``RUNNING`` state
for this amount of seconds. If the Ray Tune job is stuck in this state (most likely due to insufficient resources),
the warning message is printed repeatedly every this amount of seconds. Defaults to 1 (seconds).
the warning message is printed repeatedly every this amount of seconds. Defaults to 10 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER**: Threshold for throwing a warning, when the autoscaler is enabled,
if no active trials are in ``RUNNING`` state for this amount of seconds.
If the Ray Tune job is stuck in this state (most likely due to insufficient resources), the warning message is printed

View file

@ -1,13 +1,12 @@
# coding: utf-8
from freezegun import freeze_time
from mock import patch
import os
import pytest
import unittest
import ray
from ray import tune
from ray.rllib import _register_all
from ray.tune import Trainable
from ray.tune import Trainable, TuneError
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.result import TRAINING_ITERATION
@ -20,54 +19,39 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory
class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S"] = "1"
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "1"
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 4,
"num_gpus": 2,
"_system_config": {
"num_heartbeats_timeout": 10
}
})
def tearDown(self):
ray.shutdown()
self.cluster.shutdown()
@freeze_time("2021-08-03", auto_tick_seconds=15)
@patch.object(ray.tune.trial_executor.logger, "warning")
def testOutputWarningMessage(self, mocked_warn):
# no autoscaler case, resource is not sufficient. Raise error.
def testRaiseErrorNoAutoscaler(self):
def train(config):
pass
tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
msg = (
"No trial is running and no new trial has been started within at"
" least the last 1.0 seconds. This could be due to the cluster "
"not having enough resources available to start the next trial. "
"Please stop the tuning job and readjust resources_per_trial "
"argument passed into tune.run() and/or start a cluster with more "
"resources.")
mocked_warn.assert_called_with(msg)
@freeze_time("2021-08-03")
@patch.object(ray.tune.trial_executor.logger, "warning")
def testNotOutputWarningMessage(self, mocked_warn):
def train(config):
pass
tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
mocked_warn.assert_not_called()
with pytest.raises(TuneError) as cm:
tune.run(
train,
resources_per_trial={
"cpu": 5, # more than what the cluster can offer.
"gpu": 3,
})
msg = ("You asked for 5.0 cpu and 3.0 gpu per trial, "
"but the cluster only has 4.0 cpu and 2.0 gpu. "
"Stop the tuning job and "
"adjust the resources requested per trial "
"(possibly via `resources_per_trial` "
"or via `num_workers` for rllib) "
"and/or add more resources to your Ray runtime.")
assert str(cm._excinfo[1]) == msg
class RayTrialExecutorTest(unittest.TestCase):
@ -475,6 +459,5 @@ class LocalModeExecutorTest(RayTrialExecutorTest):
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -6,6 +6,7 @@ import os
import time
from typing import Dict, List, Optional
import ray
from ray.tune.resources import Resources
from ray.util.annotations import DeveloperAPI
from ray.tune.trial import Trial, Checkpoint
@ -15,37 +16,78 @@ from ray.tune.cluster_info import is_ray_cluster
logger = logging.getLogger(__name__)
# Ideally we want to use @cache; but it's only available for python 3.9.
# Caching is only helpful/correct for no autoscaler case.
@lru_cache()
def _get_warning_threshold() -> float:
def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()
def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
cpu = trial.resources.cpu + trial.resources.extra_cpu
gpu = trial.resources.gpu + trial.resources.extra_gpu
if trial.placement_group_factory is not None:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}
def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
"""Calculates if there is enough resources for a PENDING trial.
For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get(
"GPU", 0)
@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"))
else:
# Set the default to 10s so that we don't prematurely determine that
# a cluster cannot fulfill the resources requirements.
return float(
os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "1"))
os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "10"))
# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_warning_msg() -> str:
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime.")
if is_ray_cluster():
return (
f"If autoscaler is still scaling up, ignore this message. No "
f"trial is running and no new trial has been started within at "
f"least the last {_get_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. Please stop the "
f"tuning job and readjust resources_per_trial argument passed "
f"into tune.run() as well as max_workers and worker_nodes "
f"InstanceType specified in cluster.yaml.")
return "If autoscaler is still scaling up, ignore this message. " + msg
else:
return (f"No trial is running and no new trial has been started within"
f" at least the last {_get_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. Please stop "
f"the tuning job and readjust resources_per_trial argument "
f"passed into tune.run() and/or start a cluster with more "
f"resources.")
return msg
# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime.")
@DeveloperAPI
@ -68,12 +110,19 @@ class TrialExecutor(metaclass=ABCMeta):
self._queue_trials = queue_trials
self._cached_trial_state = {}
self._trials_to_cache = set()
# The next two variables are used to keep track of if there is any
# "progress" made between subsequent calls to `on_no_available_trials`.
# TODO(xwjiang): Clean this up once figuring out who should have a
# holistic view of trials - runner or executor.
# Also iterating over list of trials every time is very inefficient.
# Need better visibility APIs into trials.
# The start time since when all active trials have been in PENDING
# state, or since last time we output a resource insufficent
# warning message, whichever comes later.
# -1 means either the TrialExecutor is just initialized without any
# trials yet, or there are some trials in RUNNING state.
self._no_running_trials_since = -1
self._all_trials_size = -1
def set_status(self, trial: Trial, status: str) -> None:
"""Sets status and checkpoints metadata if needed.
@ -237,17 +286,28 @@ class TrialExecutor(metaclass=ABCMeta):
pass
def _may_warn_insufficient_resources(self, all_trials):
if not any(trial.status == Trial.RUNNING for trial in all_trials):
# This is approximately saying we are not making progress.
if len(all_trials) == self._all_trials_size:
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif time.monotonic(
) - self._no_running_trials_since > _get_warning_threshold():
# TODO(xwjiang): We should ideally output a more helpful msg.
# https://github.com/ray-project/ray/issues/17799
logger.warning(_get_warning_msg())
elif (time.monotonic() - self._no_running_trials_since >
_get_insufficient_resources_warning_threshold()):
if not is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)):
raise TuneError(
_get_insufficient_resources_error_msg(trial))
else:
# TODO(xwjiang): Output a more helpful msg for autoscaler.
# https://github.com/ray-project/ray/issues/17799
logger.warning(_get_insufficient_resources_warning_msg())
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
self._all_trials_size = len(all_trials)
def on_no_available_trials(self, trials: List[Trial]) -> None:
"""