[Tune] Do not crash when resources are insufficient. (#18611)

This commit is contained in:
xwjiang2010 2021-09-15 23:00:53 -07:00 committed by GitHub
parent be7cb70c30
commit ea48b1227f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 12 deletions

View file

@ -7,7 +7,8 @@ import unittest
import ray
from ray import tune
from ray.rllib import _register_all
from ray.tune import Trainable, TuneError
from ray.tune import Trainable
from ray.tune.callback import Callback
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.result import TRAINING_ITERATION
@ -16,11 +17,12 @@ from ray.tune.trial import Trial, Checkpoint
from ray.tune.resources import Resources
from ray.cluster_utils import Cluster
from ray.tune.utils.placement_groups import PlacementGroupFactory
from unittest.mock import patch
class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "1"
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "0"
self.cluster = Cluster(
initialize_head=True,
connect=True,
@ -33,26 +35,45 @@ class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
ray.shutdown()
self.cluster.shutdown()
# no autoscaler case, resource is not sufficient. Raise error.
def testRaiseErrorNoAutoscaler(self):
# no autoscaler case, resource is not sufficient. Log warning for now.
@patch.object(ray.tune.trial_executor.logger, "warning")
def testRaiseErrorNoAutoscaler(self, mocked_warn):
class FailureInjectorCallback(Callback):
"""Adds random failure injection to the TrialExecutor."""
def __init__(self, steps=5):
self._step = 0
self.steps = steps
def on_step_begin(self, iteration, trials, **info):
self._step += 1
if self._step >= self.steps:
raise RuntimeError
def train(config):
pass
with pytest.raises(TuneError) as cm:
with self.assertRaises(RuntimeError):
tune.run(
train,
callbacks=[
# Make sure that the test is not stuck forever,
# as what would happen for the users now, unfortunately.
FailureInjectorCallback(),
],
resources_per_trial={
"cpu": 5, # more than what the cluster can offer.
"gpu": 3,
})
msg = ("You asked for 5.0 cpu and 3.0 gpu per trial, "
msg = ("Ignore this message if the cluster is autoscaling. "
"You asked for 5.0 cpu and 3.0 gpu per trial, "
"but the cluster only has 4.0 cpu and 2.0 gpu. "
"Stop the tuning job and "
"adjust the resources requested per trial "
"(possibly via `resources_per_trial` "
"or via `num_workers` for rllib) "
"and/or add more resources to your Ray runtime.")
assert str(cm._excinfo[1]) == msg
mocked_warn.assert_called_with(msg)
class RayTrialExecutorTest(unittest.TestCase):

View file

@ -72,7 +72,7 @@ def _get_insufficient_resources_warning_msg() -> str:
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime.")
if is_ray_cluster():
return "If autoscaler is still scaling up, ignore this message. " + msg
return "Ignore this message if the cluster is autoscaling. " + msg
else:
return msg
@ -81,6 +81,7 @@ def _get_insufficient_resources_warning_msg() -> str:
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"Ignore this message if the cluster is autoscaling. "
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
@ -298,11 +299,13 @@ class TrialExecutor(metaclass=ABCMeta):
for trial in all_trials:
if (trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)):
raise TuneError(
# TODO(xwjiang):
# Raise an Error once #18608 is resolved.
logger.warning(
_get_insufficient_resources_error_msg(trial))
else:
# TODO(xwjiang): Output a more helpful msg for autoscaler.
# https://github.com/ray-project/ray/issues/17799
# TODO(xwjiang): #17799.
# Output a more helpful msg for autoscaler.
logger.warning(_get_insufficient_resources_warning_msg())
self._no_running_trials_since = time.monotonic()
else:
@ -315,7 +318,6 @@ class TrialExecutor(metaclass=ABCMeta):
trials (List[Trial]): The list of trials. Note, refrain from
providing TrialRunner directly here.
"""
if self._queue_trials:
return
self._may_warn_insufficient_resources(trials)