From ea48b1227f5f021f978cf81e465c89562d46835b Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Wed, 15 Sep 2021 23:00:53 -0700 Subject: [PATCH] [Tune] Do not crash when resources are insufficient. (#18611) --- .../ray/tune/tests/test_ray_trial_executor.py | 35 +++++++++++++++---- python/ray/tune/trial_executor.py | 12 ++++--- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 1a5bd93ae..2f0dd9fc5 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -7,7 +7,8 @@ import unittest import ray from ray import tune from ray.rllib import _register_all -from ray.tune import Trainable, TuneError +from ray.tune import Trainable +from ray.tune.callback import Callback from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.result import TRAINING_ITERATION @@ -16,11 +17,12 @@ from ray.tune.trial import Trial, Checkpoint from ray.tune.resources import Resources from ray.cluster_utils import Cluster from ray.tune.utils.placement_groups import PlacementGroupFactory +from unittest.mock import patch class TrialExecutorInsufficientResourcesTest(unittest.TestCase): def setUp(self): - os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "1" + os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "0" self.cluster = Cluster( initialize_head=True, connect=True, @@ -33,26 +35,45 @@ class TrialExecutorInsufficientResourcesTest(unittest.TestCase): ray.shutdown() self.cluster.shutdown() - # no autoscaler case, resource is not sufficient. Raise error. - def testRaiseErrorNoAutoscaler(self): + # no autoscaler case, resource is not sufficient. Log warning for now. + @patch.object(ray.tune.trial_executor.logger, "warning") + def testRaiseErrorNoAutoscaler(self, mocked_warn): + class FailureInjectorCallback(Callback): + """Adds random failure injection to the TrialExecutor.""" + + def __init__(self, steps=5): + self._step = 0 + self.steps = steps + + def on_step_begin(self, iteration, trials, **info): + self._step += 1 + if self._step >= self.steps: + raise RuntimeError + def train(config): pass - with pytest.raises(TuneError) as cm: + with self.assertRaises(RuntimeError): tune.run( train, + callbacks=[ + # Make sure that the test is not stuck forever, + # as what would happen for the users now, unfortunately. + FailureInjectorCallback(), + ], resources_per_trial={ "cpu": 5, # more than what the cluster can offer. "gpu": 3, }) - msg = ("You asked for 5.0 cpu and 3.0 gpu per trial, " + msg = ("Ignore this message if the cluster is autoscaling. " + "You asked for 5.0 cpu and 3.0 gpu per trial, " "but the cluster only has 4.0 cpu and 2.0 gpu. " "Stop the tuning job and " "adjust the resources requested per trial " "(possibly via `resources_per_trial` " "or via `num_workers` for rllib) " "and/or add more resources to your Ray runtime.") - assert str(cm._excinfo[1]) == msg + mocked_warn.assert_called_with(msg) class RayTrialExecutorTest(unittest.TestCase): diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index cbe3187df..183a50103 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -72,7 +72,7 @@ def _get_insufficient_resources_warning_msg() -> str: f"(possibly via `resources_per_trial` or via `num_workers` for rllib) " f"and/or add more resources to your Ray runtime.") if is_ray_cluster(): - return "If autoscaler is still scaling up, ignore this message. " + msg + return "Ignore this message if the cluster is autoscaling. " + msg else: return msg @@ -81,6 +81,7 @@ def _get_insufficient_resources_warning_msg() -> str: def _get_insufficient_resources_error_msg(trial: Trial) -> str: trial_cpu_gpu = _get_trial_cpu_and_gpu(trial) return ( + f"Ignore this message if the cluster is autoscaling. " f"You asked for {trial_cpu_gpu['CPU']} cpu and " f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has " f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and " @@ -298,11 +299,13 @@ class TrialExecutor(metaclass=ABCMeta): for trial in all_trials: if (trial.status is Trial.PENDING and not _can_fulfill_no_autoscaler(trial)): - raise TuneError( + # TODO(xwjiang): + # Raise an Error once #18608 is resolved. + logger.warning( _get_insufficient_resources_error_msg(trial)) else: - # TODO(xwjiang): Output a more helpful msg for autoscaler. - # https://github.com/ray-project/ray/issues/17799 + # TODO(xwjiang): #17799. + # Output a more helpful msg for autoscaler. logger.warning(_get_insufficient_resources_warning_msg()) self._no_running_trials_since = time.monotonic() else: @@ -315,7 +318,6 @@ class TrialExecutor(metaclass=ABCMeta): trials (List[Trial]): The list of trials. Note, refrain from providing TrialRunner directly here. """ - if self._queue_trials: return self._may_warn_insufficient_resources(trials)