From 3590a86db0369ce8a8f9c3965cddc9e4c817c2b8 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 2 Sep 2022 12:52:26 +0100 Subject: [PATCH 1/7] [tune] Add timeout ro retry_fn to catch hanging syncs (#28155) Syncing sometimes hangs in pyarrow for unknown reasons. We should introduce a timeout for these syncing operations. Signed-off-by: Kai Fricke --- .../ray/tune/execution/ray_trial_executor.py | 15 ++- python/ray/tune/execution/trial_runner.py | 11 +- python/ray/tune/syncer.py | 6 + .../ray/tune/tests/test_ray_trial_executor.py | 2 +- python/ray/tune/tests/test_trainable.py | 40 ++++++- python/ray/tune/tests/test_utils.py | 113 +++++++++++++----- python/ray/tune/trainable/trainable.py | 39 ++++-- python/ray/tune/utils/util.py | 36 +++++- 8 files changed, 208 insertions(+), 54 deletions(-) diff --git a/python/ray/tune/execution/ray_trial_executor.py b/python/ray/tune/execution/ray_trial_executor.py index fe6f886e3..a0eec9aee 100644 --- a/python/ray/tune/execution/ray_trial_executor.py +++ b/python/ray/tune/execution/ray_trial_executor.py @@ -217,7 +217,7 @@ class RayTrialExecutor: self._has_cleaned_up_pgs = False self._reuse_actors = reuse_actors - # The maxlen will be updated when `set_max_pending_trials()` is called + # The maxlen will be updated when `setup(max_pending_trials)` is called self._cached_actor_pg = deque(maxlen=1) self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix()) self._staged_trials = set() @@ -235,16 +235,20 @@ class RayTrialExecutor: self._buffer_max_time_s = float( os.getenv("TUNE_RESULT_BUFFER_MAX_TIME_S", 100.0) ) + self._trainable_kwargs = {} - def set_max_pending_trials(self, max_pending: int) -> None: + def setup( + self, max_pending_trials: int, trainable_kwargs: Optional[Dict] = None + ) -> None: if len(self._cached_actor_pg) > 0: logger.warning( "Cannot update maximum number of queued actors for reuse " "during a run." ) else: - self._cached_actor_pg = deque(maxlen=max_pending) - self._pg_manager.set_max_staging(max_pending) + self._cached_actor_pg = deque(maxlen=max_pending_trials) + self._pg_manager.set_max_staging(max_pending_trials) + self._trainable_kwargs = trainable_kwargs or {} def set_status(self, trial: Trial, status: str) -> None: """Sets status and checkpoints metadata if needed. @@ -377,6 +381,9 @@ class RayTrialExecutor: kwargs["remote_checkpoint_dir"] = trial.remote_checkpoint_dir kwargs["custom_syncer"] = trial.custom_syncer + if self._trainable_kwargs: + kwargs.update(self._trainable_kwargs) + # Throw a meaningful error if trainable does not use the # new API sig = inspect.signature(trial.get_trainable_cls()) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index d810729f7..a4fd37b00 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -198,6 +198,8 @@ class _ExperimentCheckpointManager: exclude = ["*/checkpoint_*"] if self._syncer: + # Todo: Implement sync_timeout for experiment-level syncing + # (it is currently only used for trainable-to-cloud syncing) if force: # Wait until previous sync command finished self._syncer.wait() @@ -341,7 +343,13 @@ class TrialRunner: else: # Manual override self._max_pending_trials = int(max_pending_trials) - self.trial_executor.set_max_pending_trials(self._max_pending_trials) + + sync_config = sync_config or SyncConfig() + + self.trial_executor.setup( + max_pending_trials=self._max_pending_trials, + trainable_kwargs={"sync_timeout": sync_config.sync_timeout}, + ) self._metric = metric @@ -385,7 +393,6 @@ class TrialRunner: if self._local_checkpoint_dir: os.makedirs(self._local_checkpoint_dir, exist_ok=True) - sync_config = sync_config or SyncConfig() self._remote_checkpoint_dir = remote_checkpoint_dir self._syncer = get_node_to_storage_syncer(sync_config) diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index a5ade3938..3a18bdd34 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -40,6 +40,9 @@ logger = logging.getLogger(__name__) # Syncing period for syncing checkpoints between nodes or to cloud. DEFAULT_SYNC_PERIOD = 300 +# Default sync timeout after which syncing processes are aborted +DEFAULT_SYNC_TIMEOUT = 1800 + _EXCLUDE_FROM_SYNC = [ "./checkpoint_-00001", "./checkpoint_tmp*", @@ -85,6 +88,8 @@ class SyncConfig: is asynchronous and best-effort. This does not affect persistent storage syncing. Defaults to True. sync_period: Syncing period for syncing between nodes. + sync_timeout: Timeout after which running sync processes are aborted. + Currently only affects trial-to-cloud syncing. """ @@ -93,6 +98,7 @@ class SyncConfig: sync_on_checkpoint: bool = True sync_period: int = DEFAULT_SYNC_PERIOD + sync_timeout: int = DEFAULT_SYNC_TIMEOUT def _repr_html_(self) -> str: """Generate an HTML representation of the SyncConfig. diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 4615f605b..8e18c5476 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -499,7 +499,7 @@ class RayExecutorPlacementGroupTest(unittest.TestCase): executor = RayTrialExecutor(reuse_actors=True) executor._pg_manager = pgm - executor.set_max_pending_trials(1) + executor.setup(max_pending_trials=1) def train(config): yield 1 diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index 248152285..254edb5c2 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -1,14 +1,16 @@ import json import os import tempfile +import time from typing import Dict, Union +from unittest.mock import patch import pytest import ray from ray import tune from ray.air import session, Checkpoint -from ray.air._internal.remote_storage import download_from_uri +from ray.air._internal.remote_storage import download_from_uri, upload_to_uri from ray.tune.trainable import wrap_function @@ -188,6 +190,42 @@ def test_checkpoint_object_no_sync(tmpdir): trainable.restore_from_object(obj) +@pytest.mark.parametrize("hanging", [True, False]) +def test_sync_timeout(tmpdir, hanging): + orig_upload_fn = upload_to_uri + + def _hanging_upload(*args, **kwargs): + time.sleep(200 if hanging else 0) + orig_upload_fn(*args, **kwargs) + + trainable = SavingTrainable( + "object", + remote_checkpoint_dir=f"memory:///test/location_hanging_{hanging}", + sync_timeout=0.5, + ) + + with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): + trainable.save() + + check_dir = tmpdir / "check_save_obj" + + try: + download_from_uri( + uri=f"memory:///test/location_hanging_{hanging}", local_path=str(check_dir) + ) + except FileNotFoundError: + hung = True + else: + hung = False + + assert hung == hanging + + if hanging: + assert not check_dir.exists() + else: + assert check_dir.listdir() + + if __name__ == "__main__": import sys diff --git a/python/ray/tune/tests/test_utils.py b/python/ray/tune/tests/test_utils.py index bca055f4c..21b99c687 100644 --- a/python/ray/tune/tests/test_utils.py +++ b/python/ray/tune/tests/test_utils.py @@ -1,45 +1,94 @@ -import unittest +import time + +import pytest from ray.tune.search.variant_generator import format_vars +from ray.tune.utils.util import retry_fn -class TuneUtilsTest(unittest.TestCase): - def testFormatVars(self): - # Format brackets correctly - self.assertTrue( - format_vars( - { - ("a", "b", "c"): 8.1234567, - ("a", "b", "d"): [7, 8], - ("a", "b", "e"): [[[3, 4]]], - } - ), - "c=8.12345,d=7_8,e=3_4", +def test_format_vars(): + + # Format brackets correctly + assert ( + format_vars( + { + ("a", "b", "c"): 8.1234567, + ("a", "b", "d"): [7, 8], + ("a", "b", "e"): [[[3, 4]]], + } ) - # Sorted by full keys, but only last key is reported - self.assertTrue( - format_vars( - { - ("a", "c", "x"): [7, 8], - ("a", "b", "x"): 8.1234567, - } - ), - "x=8.12345,x=7_8", + == "c=8.1235,d=7_8,e=3_4" + ) + # Sorted by full keys, but only last key is reported + assert ( + format_vars( + { + ("a", "c", "x"): [7, 8], + ("a", "b", "x"): 8.1234567, + } ) - # Filter out invalid chars. It's ok to have empty keys or values. - self.assertTrue( - format_vars( - { - ("a c?x"): " <;%$ok ", - ("some"): " ", - } - ), - "a_c_x=ok,some=", + == "x=8.1235,x=7_8" + ) + # Filter out invalid chars. It's ok to have empty keys or values. + assert ( + format_vars( + { + ("a c?x",): " <;%$ok ", + ("some",): " ", + } ) + == "a_c_x=ok,some=" + ) + + +def test_retry_fn_repeat(tmpdir): + success = tmpdir / "success" + marker = tmpdir / "marker" + + def _fail_once(): + if marker.exists(): + success.write_text(".", encoding="utf-8") + return + marker.write_text(".", encoding="utf-8") + raise RuntimeError("Failing") + + assert not success.exists() + assert not marker.exists() + + assert retry_fn( + fn=_fail_once, + exception_type=RuntimeError, + sleep_time=0, + ) + + assert success.exists() + assert marker.exists() + + +def test_retry_fn_timeout(tmpdir): + success = tmpdir / "success" + marker = tmpdir / "marker" + + def _fail_once(): + if not marker.exists(): + marker.write_text(".", encoding="utf-8") + raise RuntimeError("Failing") + time.sleep(5) + success.write_text(".", encoding="utf-8") + return + + assert not success.exists() + assert not marker.exists() + + assert not retry_fn( + fn=_fail_once, exception_type=RuntimeError, sleep_time=0, timeout=0.1 + ) + + assert not success.exists() + assert marker.exists() if __name__ == "__main__": - import pytest import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index a63b767c5..aedf56ba4 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -101,8 +101,9 @@ class Trainable: logger_creator: Callable[[Dict[str, Any]], "Logger"] = None, remote_checkpoint_dir: Optional[str] = None, custom_syncer: Optional[Syncer] = None, + sync_timeout: Optional[int] = None, ): - """Initialize an Trainable. + """Initialize a Trainable. Sets up logging and points ``self.logdir`` to a directory in which training outputs should be placed. @@ -120,6 +121,7 @@ class Trainable: which is different from **per checkpoint** directory. custom_syncer: Syncer used for synchronizing data from Ray nodes to external storage. + sync_timeout: Timeout after which sync processes are aborted. """ self._experiment_id = uuid.uuid4().hex @@ -171,6 +173,7 @@ class Trainable: self.remote_checkpoint_dir = remote_checkpoint_dir self.custom_syncer = custom_syncer + self.sync_timeout = sync_timeout @property def uses_cloud_checkpointing(self): @@ -512,12 +515,22 @@ class Trainable: return True checkpoint = Checkpoint.from_directory(checkpoint_dir) - retry_fn( - lambda: checkpoint.to_uri(self._storage_path(checkpoint_dir)), + checkpoint_uri = self._storage_path(checkpoint_dir) + if not retry_fn( + lambda: checkpoint.to_uri(checkpoint_uri), subprocess.CalledProcessError, num_retries=3, sleep_time=1, - ) + timeout=self.sync_timeout, + ): + logger.error( + f"Could not upload checkpoint even after 3 retries." + f"Please check if the credentials expired and that the remote " + f"filesystem is supported.. For large checkpoints, consider " + f"increasing `SyncConfig(sync_timeout)` " + f"(current value: {self.sync_timeout} seconds). Checkpoint URI: " + f"{checkpoint_uri}" + ) return True def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool: @@ -546,12 +559,17 @@ class Trainable: return True checkpoint = Checkpoint.from_uri(external_uri) - retry_fn( + if not retry_fn( lambda: checkpoint.to_directory(local_dir), subprocess.CalledProcessError, num_retries=3, sleep_time=1, - ) + timeout=self.sync_timeout, + ): + logger.error( + f"Could not download checkpoint even after 3 retries: " + f"{external_uri}" + ) return True @@ -719,12 +737,17 @@ class Trainable: self.custom_syncer.wait_or_retry() else: checkpoint_uri = self._storage_path(checkpoint_dir) - retry_fn( + if not retry_fn( lambda: _delete_external_checkpoint(checkpoint_uri), subprocess.CalledProcessError, num_retries=3, sleep_time=1, - ) + timeout=self.sync_timeout, + ): + logger.error( + f"Could not delete checkpoint even after 3 retries: " + f"{checkpoint_uri}" + ) if os.path.exists(checkpoint_dir): shutil.rmtree(checkpoint_dir) diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 6deaa3a57..66b581b33 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -7,6 +7,7 @@ import threading import time from collections import defaultdict from datetime import datetime +from numbers import Number from threading import Thread from typing import Dict, List, Union, Type, Callable, Any, Optional @@ -124,18 +125,41 @@ class UtilMonitor(Thread): @DeveloperAPI def retry_fn( fn: Callable[[], Any], - exception_type: Type[Exception], + exception_type: Type[Exception] = Exception, num_retries: int = 3, sleep_time: int = 1, -): - for i in range(num_retries): + timeout: Optional[Number] = None, +) -> bool: + errored = threading.Event() + + def _try_fn(): try: fn() except exception_type as e: logger.warning(e) - time.sleep(sleep_time) - else: - break + errored.set() + + for i in range(num_retries): + errored.clear() + + proc = threading.Thread(target=_try_fn) + proc.daemon = True + proc.start() + proc.join(timeout=timeout) + + if proc.is_alive(): + logger.debug( + f"Process timed out (try {i+1}/{num_retries}): " + f"{getattr(fn, '__name__', None)}" + ) + elif not errored.is_set(): + return True + + # Timed out, sleep and try again + time.sleep(sleep_time) + + # Timed out, so return False + return False @ray.remote From 5779ee764db18f3bb3fa6d60540b3ad9f8f776bb Mon Sep 17 00:00:00 2001 From: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> Date: Fri, 2 Sep 2022 08:27:05 -0700 Subject: [PATCH 2/7] [RLlib] Fix ope v_gain (#28136) --- doc/source/rllib/rllib-offline.rst | 6 +- rllib/offline/estimators/direct_method.py | 27 ++++--- rllib/offline/estimators/doubly_robust.py | 27 ++++--- .../offline/estimators/importance_sampling.py | 27 ++++--- rllib/offline/estimators/tests/test_ope.py | 78 +++++++++---------- .../weighted_importance_sampling.py | 24 +++--- 6 files changed, 100 insertions(+), 89 deletions(-) diff --git a/doc/source/rllib/rllib-offline.rst b/doc/source/rllib/rllib-offline.rst index 8c2ea44a5..1e4a676c5 100644 --- a/doc/source/rllib/rllib-offline.rst +++ b/doc/source/rllib/rllib-offline.rst @@ -80,8 +80,8 @@ RLlib's OPE estimators output six metrics: - ``v_behavior_std``: The standard deviation corresponding to v_behavior. - ``v_target``: The OPE's estimated discounted return for the target policy, averaged over episodes in the batch. - ``v_target_std``: The standard deviation corresponding to v_target. -- ``v_gain``: ``v_target / max(v_behavior, 1e-8)``, averaged over episodes in the batch. ``v_gain > 1.0`` indicates that the policy is better than the policy that generated the behavior data. -- ``v_gain_std``: The standard deviation corresponding to v_gain. +- ``v_gain``: ``v_target / max(v_behavior, 1e-8)``. ``v_gain > 1.0`` indicates that the policy is better than the policy that generated the behavior data. In case, ``v_behavior <= 0``, ``v_delta`` should be used instead for comparison. +- ``v_delta``: The difference between v_target and v_behavior. As an example, we generate an evaluation dataset for off-policy estimation: @@ -170,7 +170,7 @@ We can now train a DQN algorithm offline and evaluate it using OPE: batch = reader.next() print(estimator.estimate(batch)) # {'v_behavior': ..., 'v_target': ..., 'v_gain': ..., - # 'v_behavior_std': ..., 'v_target_std': ..., 'v_gain_std': ...} + # 'v_behavior_std': ..., 'v_target_std': ..., 'v_delta': ...} Example: Converting external experiences to batch format -------------------------------------------------------- diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index 3329f64ec..7e1b376d2 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -75,12 +75,12 @@ class DirectMethod(OffPolicyEstimator): - v_target: The estimated discounted return for `self.policy`, averaged over episodes in the batch - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes - - v_gain_std: The standard deviation corresponding to v_gain + - v_gain: v_target / max(v_behavior, 1e-8) + - v_delta: The difference between v_target and v_behavior. """ batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) - estimates = {"v_behavior": [], "v_target": [], "v_gain": []} + estimates_per_epsiode = {"v_behavior": [], "v_target": []} # Calculate Direct Method OPE estimates for episode in batch.split_by_episode(): rewards = episode["rewards"] @@ -93,15 +93,18 @@ class DirectMethod(OffPolicyEstimator): v_target = self.model.estimate_v(init_step) v_target = convert_to_numpy(v_target).item() - estimates["v_behavior"].append(v_behavior) - estimates["v_target"].append(v_target) - estimates["v_gain"].append(v_target / max(v_behavior, 1e-8)) - estimates["v_behavior_std"] = np.std(estimates["v_behavior"]) - estimates["v_behavior"] = np.mean(estimates["v_behavior"]) - estimates["v_target_std"] = np.std(estimates["v_target"]) - estimates["v_target"] = np.mean(estimates["v_target"]) - estimates["v_gain_std"] = np.std(estimates["v_gain"]) - estimates["v_gain"] = np.mean(estimates["v_gain"]) + estimates_per_epsiode["v_behavior"].append(v_behavior) + estimates_per_epsiode["v_target"].append(v_target) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + return estimates @override(OffPolicyEstimator) diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index 7ee4a052b..8f1b90c7d 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -90,12 +90,12 @@ class DoublyRobust(OffPolicyEstimator): - v_target: The estimated discounted return for `self.policy`, averaged over episodes in the batch - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes - - v_gain_std: The standard deviation corresponding to v_gain + - v_gain: v_target / max(v_behavior, 1e-8)' + - v_delta: The difference between v_target and v_behavior. """ batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) - estimates = {"v_behavior": [], "v_target": [], "v_gain": []} + estimates_per_epsiode = {"v_behavior": [], "v_target": []} # Calculate doubly robust OPE estimates for episode in batch.split_by_episode(): rewards, old_prob = episode["rewards"], episode["action_prob"] @@ -119,15 +119,18 @@ class DoublyRobust(OffPolicyEstimator): ) v_target = v_target.item() - estimates["v_behavior"].append(v_behavior) - estimates["v_target"].append(v_target) - estimates["v_gain"].append(v_target / max(v_behavior, 1e-8)) - estimates["v_behavior_std"] = np.std(estimates["v_behavior"]) - estimates["v_behavior"] = np.mean(estimates["v_behavior"]) - estimates["v_target_std"] = np.std(estimates["v_target"]) - estimates["v_target"] = np.mean(estimates["v_target"]) - estimates["v_gain_std"] = np.std(estimates["v_gain"]) - estimates["v_gain"] = np.mean(estimates["v_gain"]) + estimates_per_epsiode["v_behavior"].append(v_behavior) + estimates_per_epsiode["v_target"].append(v_target) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + return estimates @override(OffPolicyEstimator) diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index 9a0d40d4a..256ca8102 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -37,12 +37,12 @@ class ImportanceSampling(OffPolicyEstimator): - v_target: The estimated discounted return for `self.policy`, averaged over episodes in the batch - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes - - v_gain_std: The standard deviation corresponding to v_gain + - v_gain: v_target / max(v_behavior, 1e-8) + - v_delta: The difference between v_target and v_behavior. """ batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) - estimates = {"v_behavior": [], "v_target": [], "v_gain": []} + estimates_per_epsiode = {"v_behavior": [], "v_target": []} for episode in batch.split_by_episode(): rewards, old_prob = episode["rewards"], episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict( @@ -66,13 +66,16 @@ class ImportanceSampling(OffPolicyEstimator): v_behavior += rewards[t] * self.gamma ** t v_target += p[t] * rewards[t] * self.gamma ** t - estimates["v_behavior"].append(v_behavior) - estimates["v_target"].append(v_target) - estimates["v_gain"].append(v_target / max(v_behavior, 1e-8)) - estimates["v_behavior_std"] = np.std(estimates["v_behavior"]) - estimates["v_behavior"] = np.mean(estimates["v_behavior"]) - estimates["v_target_std"] = np.std(estimates["v_target"]) - estimates["v_target"] = np.mean(estimates["v_target"]) - estimates["v_gain_std"] = np.std(estimates["v_gain"]) - estimates["v_gain"] = np.mean(estimates["v_gain"]) + estimates_per_epsiode["v_behavior"].append(v_behavior) + estimates_per_epsiode["v_target"].append(v_target) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + return estimates diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 0535df5db..53c43b27e 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -25,6 +25,15 @@ import ray torch, _ = try_import_torch() +ESTIMATOR_OUTPUTS = { + "v_behavior", + "v_behavior_std", + "v_target", + "v_target_std", + "v_gain", + "v_delta", +} + class TestOPE(unittest.TestCase): """Compilation tests for using OPE both standalone and in an RLlib Algorithm""" @@ -75,49 +84,38 @@ class TestOPE(unittest.TestCase): def tearDownClass(cls): ray.shutdown() - def test_ope_standalone(self): - # Test all OPE methods standalone - estimator_outputs = { - "v_behavior", - "v_behavior_std", - "v_target", - "v_target_std", - "v_gain", - "v_gain_std", - } - estimator = ImportanceSampling( - policy=self.algo.get_policy(), - gamma=self.gamma, - ) - estimates = estimator.estimate(self.batch) - self.assertEqual(estimates.keys(), estimator_outputs) + def test_is_and_wis_standalone(self): + ope_classes = [ + ImportanceSampling, + WeightedImportanceSampling, + ] - estimator = WeightedImportanceSampling( - policy=self.algo.get_policy(), - gamma=self.gamma, - ) - estimates = estimator.estimate(self.batch) - self.assertEqual(estimates.keys(), estimator_outputs) + for class_module in ope_classes: + estimator = class_module( + policy=self.algo.get_policy(), + gamma=self.gamma, + ) + estimates = estimator.estimate(self.batch) + self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS) + check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"]) - estimator = DirectMethod( - policy=self.algo.get_policy(), - gamma=self.gamma, - q_model_config=self.q_model_config, - ) - losses = estimator.train(self.batch) - assert losses, "DM estimator did not return mean loss" - estimates = estimator.estimate(self.batch) - self.assertEqual(estimates.keys(), estimator_outputs) + def test_dm_and_dr_standalone(self): + ope_classes = [ + DirectMethod, + DoublyRobust, + ] - estimator = DoublyRobust( - policy=self.algo.get_policy(), - gamma=self.gamma, - q_model_config=self.q_model_config, - ) - losses = estimator.train(self.batch) - assert losses, "DM estimator did not return mean loss" - estimates = estimator.estimate(self.batch) - self.assertEqual(estimates.keys(), estimator_outputs) + for class_module in ope_classes: + estimator = class_module( + policy=self.algo.get_policy(), + gamma=self.gamma, + q_model_config=self.q_model_config, + ) + losses = estimator.train(self.batch) + assert losses, f"{class_module.__name__} estimator did not return mean loss" + estimates = estimator.estimate(self.batch) + self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS) + check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"]) def test_ope_in_algo(self): # Test OPE in DQN, during training as well as by calling evaluate() diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 1354d1301..6b12cc718 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -49,10 +49,11 @@ class WeightedImportanceSampling(OffPolicyEstimator): - v_target_std: The standard deviation corresponding to v_target - v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes - v_gain_std: The standard deviation corresponding to v_gain + - v_delta: The difference between v_target and v_behavior. """ batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) - estimates = {"v_behavior": [], "v_target": [], "v_gain": []} + estimates_per_epsiode = {"v_behavior": [], "v_target": []} for episode in batch.split_by_episode(): rewards, old_prob = episode["rewards"], episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict( @@ -84,13 +85,16 @@ class WeightedImportanceSampling(OffPolicyEstimator): w_t = self.filter_values[t] / self.filter_counts[t] v_target += p[t] / w_t * rewards[t] * self.gamma ** t - estimates["v_behavior"].append(v_behavior) - estimates["v_target"].append(v_target) - estimates["v_gain"].append(v_target / max(v_behavior, 1e-8)) - estimates["v_behavior_std"] = np.std(estimates["v_behavior"]) - estimates["v_behavior"] = np.mean(estimates["v_behavior"]) - estimates["v_target_std"] = np.std(estimates["v_target"]) - estimates["v_target"] = np.mean(estimates["v_target"]) - estimates["v_gain_std"] = np.std(estimates["v_gain"]) - estimates["v_gain"] = np.mean(estimates["v_gain"]) + estimates_per_epsiode["v_behavior"].append(v_behavior) + estimates_per_epsiode["v_target"].append(v_target) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + return estimates From 5d31f2d4bc10b38a1c15a0c9282daace92c3cefd Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 2 Sep 2022 17:10:01 +0100 Subject: [PATCH 3/7] [tune] Run SigOpt tests in CI (#28225) --- .buildkite/pipeline.ml.yml | 1 + ci/env/setup_credentials.py | 22 +++++++++++++++ python/ray/tune/examples/sigopt_example.py | 3 ++ python/ray/tune/search/sigopt/__init__.py | 4 +-- .../ray/tune/search/sigopt/sigopt_search.py | 28 +++++++++++++++++++ .../tests/test_tune_restore_warm_start.py | 6 +++- 6 files changed, 61 insertions(+), 3 deletions(-) diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 7191f6e6c..3ba9477a1 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -282,6 +282,7 @@ - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - TUNE_TESTING=1 PYTHON=3.7 ./ci/env/install-dependencies.sh - ./ci/env/env_info.sh + - python ./ci/env/setup_credentials.py sigopt - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=tests_dir_S,tests_dir_T,tests_dir_U,tests_dir_V,tests_dir_W,tests_dir_X,tests_dir_Y,tests_dir_Z,-example,-py37,-soft_imports,-gpu_only,-rllib python/ray/tune/... diff --git a/ci/env/setup_credentials.py b/ci/env/setup_credentials.py index 6116d9adb..4f62bce8f 100644 --- a/ci/env/setup_credentials.py +++ b/ci/env/setup_credentials.py @@ -3,8 +3,10 @@ This script is used to set up credentials for some services in the CI environment. For instance, it can fetch WandB API tokens and write the WandB configuration file so test scripts can use the service. """ +import json import os import sys +from pathlib import Path import boto3 @@ -14,6 +16,9 @@ AWS_WANDB_SECRET_ARN = ( AWS_COMET_SECRET_ARN = ( "arn:aws:secretsmanager:us-west-2:029272617770:secret:oss-ci/comet-ml-token-vw81C3" ) +AWS_SIGOPT_SECRET_ARN = ( + "arn:aws:secretsmanager:us-west-2:029272617770:secret:oss-ci/sigopt-key-qEqYrk" +) def get_and_write_wandb_api_key(client): @@ -28,9 +33,26 @@ def get_and_write_comet_ml_api_key(client): fp.write(f"[comet]\napi_key={api_key}\n") +def get_and_write_sigopt_api_key(client): + api_key = client.get_secret_value(SecretId=AWS_SIGOPT_SECRET_ARN)["SecretString"] + + sigopt_config_file = Path("~/.sigopt/client/config.json").expanduser() + sigopt_config_file.parent.mkdir(parents=True, exist_ok=True) + with open(sigopt_config_file, "wt") as f: + json.dump( + { + "api_token": api_key, + "code_tracking_enabled": False, + "log_collection_enabled": False, + }, + f, + ) + + SERVICES = { "wandb": get_and_write_wandb_api_key, "comet_ml": get_and_write_comet_ml_api_key, + "sigopt": get_and_write_sigopt_api_key, } diff --git a/python/ray/tune/examples/sigopt_example.py b/python/ray/tune/examples/sigopt_example.py index 0e1d6f3af..23f669fb2 100644 --- a/python/ray/tune/examples/sigopt_example.py +++ b/python/ray/tune/examples/sigopt_example.py @@ -11,6 +11,7 @@ from ray import air, tune from ray.air import session from ray.tune.schedulers import AsyncHyperBandScheduler from ray.tune.search.sigopt import SigOptSearch +from ray.tune.search.sigopt.sigopt_search import load_sigopt_key def evaluate(step, width, height): @@ -39,6 +40,8 @@ if __name__ == "__main__": ) args, _ = parser.parse_known_args() + load_sigopt_key() + if "SIGOPT_KEY" not in os.environ: if args.smoke_test: print("SigOpt API Key not found. Skipping smoke test.") diff --git a/python/ray/tune/search/sigopt/__init__.py b/python/ray/tune/search/sigopt/__init__.py index 9cec92cac..6060d4159 100644 --- a/python/ray/tune/search/sigopt/__init__.py +++ b/python/ray/tune/search/sigopt/__init__.py @@ -1,3 +1,3 @@ -from ray.tune.search.sigopt.sigopt_search import SigOptSearch +from ray.tune.search.sigopt.sigopt_search import SigOptSearch, load_sigopt_key -__all__ = ["SigOptSearch"] +__all__ = ["SigOptSearch", "load_sigopt_key"] diff --git a/python/ray/tune/search/sigopt/sigopt_search.py b/python/ray/tune/search/sigopt/sigopt_search.py index c5e3dc344..d11a90afc 100644 --- a/python/ray/tune/search/sigopt/sigopt_search.py +++ b/python/ray/tune/search/sigopt/sigopt_search.py @@ -1,7 +1,9 @@ import copy +import json import os import logging import pickle +from pathlib import Path from typing import Dict, List, Optional, Union try: @@ -18,6 +20,31 @@ from ray.tune.search import Searcher logger = logging.getLogger(__name__) +def load_sigopt_key(overwrite: bool = False) -> bool: + """Load SigOpt key from config file and save in environment. + + Args: + overwrite: If True, will overwrite an existing SIGOPT_KEY env variable. + + Returns: + True if a key was loaded into the environment. + """ + if "SIGOPT_KEY" in os.environ and not overwrite: + return False + + sigopt_key_file = Path("~/.sigopt/client/config.json").expanduser() + if not sigopt_key_file.exists(): + return False + + try: + with open(sigopt_key_file, "rt") as f: + config = json.load(f) + os.environ["SIGOPT_KEY"] = config["api_token"] + return True + except Exception: + return False + + class SigOptSearch(Searcher): """A wrapper around SigOpt to provide trial suggestions. @@ -184,6 +211,7 @@ class SigOptSearch(Searcher): ), """SigOpt must be installed! You can install SigOpt with the command: `pip install -U sigopt`.""" + load_sigopt_key() assert ( "SIGOPT_KEY" in os.environ ), "SigOpt API key must be stored as environ variable at SIGOPT_KEY" diff --git a/python/ray/tune/tests/test_tune_restore_warm_start.py b/python/ray/tune/tests/test_tune_restore_warm_start.py index 191afb1af..f54a3d2b0 100644 --- a/python/ray/tune/tests/test_tune_restore_warm_start.py +++ b/python/ray/tune/tests/test_tune_restore_warm_start.py @@ -22,7 +22,7 @@ from ray.tune.search.flaml import CFO, BlendSearch from ray.tune.search.skopt import SkOptSearch from ray.tune.search.nevergrad import NevergradSearch from ray.tune.search.optuna import OptunaSearch -from ray.tune.search.sigopt import SigOptSearch +from ray.tune.search.sigopt import SigOptSearch, load_sigopt_key from ray.tune.search.zoopt import ZOOptSearch from ray.tune.search.hebo import HEBOSearch from ray.tune.search.ax import AxSearch @@ -316,6 +316,10 @@ class DragonflyWarmStartTest(AbstractWarmStartTest, unittest.TestCase): class SigOptWarmStartTest(AbstractWarmStartTest, unittest.TestCase): + def setUp(self): + AbstractWarmStartTest.setUp(self) + load_sigopt_key() + def set_basic_conf(self): space = [ { From 57484b28cf2345f6f17d75928bfdbc097a4ce61d Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 2 Sep 2022 17:10:36 +0100 Subject: [PATCH 4/7] [ci/air] Only run examples that need credentials in branch builds (#28260) --- .buildkite/pipeline.ml.yml | 5 ++++- ci/env/cleanup_test_state.py | 5 ++++- ci/env/setup_credentials.py | 5 ++++- doc/source/ray-air/examples/BUILD | 10 ++++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 3ba9477a1..05e491a3a 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -437,6 +437,9 @@ - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - DOC_TESTING=1 PYTHON=3.7 ./ci/env/install-dependencies.sh - ./ci/env/env_info.sh + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,-needs_credentials,-gpu,-py37,-post_wheel_build doc/... + # Only run examples with credentials in non-PRs + - if [ "$BUILDKITE_PULL_REQUEST" != "false" ]; then exit 0; fi - python ./ci/env/setup_credentials.py wandb comet_ml - - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,-gpu,-py37,-post_wheel_build doc/... + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,needs_credentials,-gpu,-py37,-post_wheel_build doc/... - python ./ci/env/cleanup_test_state.py wandb comet_ml diff --git a/ci/env/cleanup_test_state.py b/ci/env/cleanup_test_state.py index 9ae121492..57bab45d8 100644 --- a/ci/env/cleanup_test_state.py +++ b/ci/env/cleanup_test_state.py @@ -48,4 +48,7 @@ if __name__ == "__main__": ) for service in services: - SERVICES[service]() + try: + SERVICES[service]() + except Exception as e: + print(f"Could not cleanup service test state for {service}: {e}") diff --git a/ci/env/setup_credentials.py b/ci/env/setup_credentials.py index 4f62bce8f..de77bfbe2 100644 --- a/ci/env/setup_credentials.py +++ b/ci/env/setup_credentials.py @@ -71,4 +71,7 @@ if __name__ == "__main__": client = boto3.client("secretsmanager", region_name="us-west-2") for service in services: - SERVICES[service](client) + try: + SERVICES[service](client) + except Exception as e: + print(f"Could not setup service credentials for {service}: {e}") diff --git a/doc/source/ray-air/examples/BUILD b/doc/source/ray-air/examples/BUILD index 8eb24b291..067857c1e 100644 --- a/doc/source/ray-air/examples/BUILD +++ b/doc/source/ray-air/examples/BUILD @@ -32,11 +32,21 @@ py_test_run_all_notebooks( exclude = [ "huggingface_text_classification.ipynb", "feast_example.ipynb", # REGRESSION + "upload_to_comet_ml.ipynb", # Needs credentials + "upload_to_wandb.ipynb", # Needs credentials ], data = ["//doc/source/ray-air/examples:air_examples"], tags = ["exclusive", "team:ml", "ray_air"], ) +py_test_run_all_notebooks( + size = "large", + include = ["upload_to_*.ipynb"], + exclude = [], + data = ["//doc/source/ray-air/examples:air_examples"], + tags = ["exclusive", "team:ml", "ray_air", "needs_credentials"], +) + # GPU Tests From 9cf5df2c8102ce1614a8c4339834e1d90c01577c Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 2 Sep 2022 11:44:06 -0700 Subject: [PATCH 5/7] Add ray.widgets to be linked in setup dev script (#27984) --- python/ray/setup-dev.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/setup-dev.py b/python/ray/setup-dev.py index 49f6472c7..2d2fbe88e 100755 --- a/python/ray/setup-dev.py +++ b/python/ray/setup-dev.py @@ -99,6 +99,7 @@ if __name__ == "__main__": do_link("experimental", force=args.yes, skip_list=args.skip) do_link("util", force=args.yes, skip_list=args.skip) do_link("workflow", force=args.yes, skip_list=args.skip) + do_link("widgets", force=args.yes, skip_list=args.skip) do_link("cluster_utils.py", force=args.yes, skip_list=args.skip) do_link("_private", force=args.yes, skip_list=args.skip) # Link package's `dashboard` directly to local (repo's) dashboard. From 59be31d5583a3302e000feeba925e3eed1850eb2 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Fri, 2 Sep 2022 12:18:04 -0700 Subject: [PATCH 6/7] Update links. (#28269) Signed-off-by: Dmitri Gekhtman This PR updates the quickstart configuration in the Ray docs to reflect the fixes from ray-project/kuberay#529 To provide access to the fixed version, we update the link to point to KubeRay master rather than the 0.3.0 branch. After the next KubeRay release (0.4.0), we can update these links to point to a fixed release version again. --- doc/source/cluster/kubernetes/getting-started.ipynb | 4 ++-- .../cluster/kubernetes/user-guides/configuring-autoscaling.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/source/cluster/kubernetes/getting-started.ipynb b/doc/source/cluster/kubernetes/getting-started.ipynb index 135295219..134fca627 100644 --- a/doc/source/cluster/kubernetes/getting-started.ipynb +++ b/doc/source/cluster/kubernetes/getting-started.ipynb @@ -74,7 +74,7 @@ "metadata": {}, "source": [ "To run the example in this guide, make sure your Kubernetes cluster (or local Kind cluster) can accomodate\n", - "additional resource requests of 3 CPU and 2Gi memory. \n", + "additional resource requests of 3 CPU and 3Gi memory. \n", "\n", "(kuberay-operator-deploy)=\n", "## Deploying the KubeRay operator\n", @@ -157,7 +157,7 @@ "outputs": [], "source": [ "# Deploy a sample Ray Cluster CR from the KubeRay repo:\n", - "! kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/release-0.3/ray-operator/config/samples/ray-cluster.autoscaler.yaml\n", + "! kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml\n", "\n", "# This Ray cluster is named `raycluster-autoscaler` because it has optional Ray Autoscaler support enabled." ] diff --git a/doc/source/cluster/kubernetes/user-guides/configuring-autoscaling.md b/doc/source/cluster/kubernetes/user-guides/configuring-autoscaling.md index ea3ab6171..3c04365df 100644 --- a/doc/source/cluster/kubernetes/user-guides/configuring-autoscaling.md +++ b/doc/source/cluster/kubernetes/user-guides/configuring-autoscaling.md @@ -49,7 +49,7 @@ First, follow the [quickstart guide](kuberay-quickstart) to create an autoscalin # Create the KubeRay operator. $ kubectl create -k "github.com/ray-project/kuberay/ray-operator/config/default?ref=v0.3.0&timeout=90s" # Create an autoscaling Ray cluster. -$ kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/release-0.3/ray-operator/config/samples/ray-cluster.autoscaler.yaml +$ kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml ``` Now, we can run a Ray program on the head pod that uses [``request_resources``](ref-autoscaler-sdk) to scale the cluster to a total of 3 CPUs. The head and worker pods in our [example cluster config](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml) each have a capacity of 1 CPU, and we specified a minimum of 1 worker pod. Thus, the request should trigger upscaling of one additional worker pod. From 8c0b0272cee095d3bd8f8c891d71e6a612539856 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Fri, 2 Sep 2022 13:43:49 -0700 Subject: [PATCH 7/7] Make state API release tests stable (#28274) Make state API release tests stable - it has been passing in the last few days. Signed-off-by: rickyyx --- release/release_tests.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 7b1cdbb9f..cd745757f 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3527,7 +3527,6 @@ legacy: test_name: stress_test_state_api_scale test_suite: nightly_tests - stable: false frequency: nightly team: core @@ -3560,7 +3559,6 @@ legacy: test_name: shuffle_20gb_with_state_api test_suite: nightly_tests - stable: false frequency: nightly team: core