diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index ea7217b40..ca3aba368 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -5,7 +5,7 @@ - DATA_PROCESSING_TESTING=1 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-gpu python/ray/ml/... -- label: ":brain: RLlib: Learning discr. actions TF2-static-graph (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning discr. actions TF2-static-graph" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -15,7 +15,7 @@ --test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph --test_arg=--framework=tf rllib/... -- label: ":brain: RLlib: Learning cont. actions TF2-static-graph (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning cont. actions TF2-static-graph" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -25,7 +25,7 @@ --test_tag_filters=learning_tests_continuous,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph --test_arg=--framework=tf rllib/... -- label: ":brain: RLlib: Learning discr. actions TF2-eager-tracing (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning discr. actions TF2-eager-tracing" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -35,7 +35,7 @@ --test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-multi_gpu,-no_tf_eager_tracing --test_arg=--framework=tf2 rllib/... -- label: ":brain: RLlib: Learning cont. actions TF2-eager-tracing (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning cont. actions TF2-eager-tracing" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -46,27 +46,7 @@ --test_arg=--framework=tf2 rllib/... -- label: ":brain: RLlib: Learning discr. actions TF1-static-graph (from rllib/tuned_examples/*.yaml)" - conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"] - commands: - - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT - - RLLIB_TESTING=1 PYTHON=3.7 TF_VERSION=1.14.0 TFP_VERSION=0.7 ./ci/travis/install-dependencies.sh - - bazel test --config=ci $(./scripts/bazel_export_options) - --build_tests_only - --test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph,-multi_gpu - --test_arg=--framework=tf - rllib/... -- label: ":brain: RLlib: Learning cont. actions TF1-static-graph (from rllib/tuned_examples/*.yaml)" - conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"] - commands: - - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT - - RLLIB_TESTING=1 PYTHON=3.7 TF_VERSION=1.14.0 TFP_VERSION=0.7 ./ci/travis/install-dependencies.sh - - bazel test --config=ci $(./scripts/bazel_export_options) - --build_tests_only - --test_tag_filters=learning_tests_continuous,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph,-multi_gpu - --test_arg=--framework=tf - rllib/... -- label: ":brain: RLlib: Learning discr. actions PyTorch (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning discr. actions PyTorch" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -76,7 +56,7 @@ --test_tag_filters=learning_tests_discrete,-fake_gpus,-tf_only,-tf2_only,-multi_gpu --test_arg=--framework=torch rllib/... -- label: ":brain: RLlib: Learning cont. actions PyTorch (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning cont. actions PyTorch" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -86,7 +66,7 @@ --test_tag_filters=learning_tests_continuous,-fake_gpus,-tf_only,-tf2_only,-multi_gpu --test_arg=--framework=torch rllib/... -- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs TF2-static-graph (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs TF2-static-graph" conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -97,7 +77,7 @@ --test_arg=--framework=tf rllib/... # TODO: (sven) tf2 (eager) multi-GPU -- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs PyTorch (from rllib/tuned_examples/*.yaml)" +- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs PyTorch" conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT @@ -108,6 +88,28 @@ --test_arg=--framework=torch rllib/... +- label: ":brain: RLlib: Memory leak tests TF2-eager-tracing" + conditions: ["RAY_CI_RLLIB_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh + - bazel test --config=ci $(./scripts/bazel_export_options) + --build_tests_only + --test_tag_filters=memory_leak_tests,-flaky + --test_arg=--framework=tf2 + rllib/... + +- label: ":brain: RLlib: Memory leak tests PyTorch" + conditions: ["RAY_CI_RLLIB_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh + - bazel test --config=ci $(./scripts/bazel_export_options) + --build_tests_only + --test_tag_filters=memory_leak_tests,-flaky + --test_arg=--framework=torch + rllib/... + - label: ":brain: RLlib: Quick Agent train.py runs (TODO: obsolete)" conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"] commands: @@ -152,7 +154,7 @@ # "learning_tests|quick_train|examples|tests_dir". - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only - --test_tag_filters=-learning_tests,-quick_train,-examples,-tests_dir,-trainers_dir,-documentation,-multi_gpu + --test_tag_filters=-learning_tests,-quick_train,-memory_leak_tests,-examples,-tests_dir,-trainers_dir,-documentation,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1 rllib/... @@ -164,14 +166,14 @@ - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=examples_A,examples_B,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1 rllib/... -- label: ":brain: RLlib: Examples {Ca..t}" +- label: ":brain: RLlib: Examples {Ca..Ct}" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT - RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=examples_C_AtoT,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1 rllib/... -- label: ":brain: RLlib: Examples {Cu..z}" +- label: ":brain: RLlib: Examples {Cu..Cz}" conditions: ["RAY_CI_RLLIB_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT diff --git a/rllib/BUILD b/rllib/BUILD index 101c2596c..8f0d996cf 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -35,6 +35,8 @@ # - Examples directory (everything in rllib/examples/...), tagged: "examples" and # "examples_[A-Z]" +# - Memory leak tests tagged "memory_leak_tests". + # Note: The "examples" and "tests_dir" tags have further sub-tags going by the # starting letter of the test name (e.g. "examples_A", or "tests_dir_F") for # split-up purposes in buildkite. @@ -582,6 +584,14 @@ py_test( srcs = ["agents/tests/test_callbacks.py"] ) +py_test( + name = "test_memory_leaks_generic", + main = "agents/tests/test_memory_leaks.py", + tags = ["team:ml", "trainers_dir"], + size = "large", + srcs = ["agents/tests/test_memory_leaks.py"] +) + py_test( name = "test_trainer", tags = ["team:ml", "trainers_dir", "trainers_dir_generic"], @@ -852,6 +862,82 @@ py_test( ) +# -------------------------------------------------------------------- +# Memory leak tests +# +# Tag: memory_leak_tests +# -------------------------------------------------------------------- + +py_test( + name = "test_memory_leak_a3c", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/a3c/memory-leak-test-a3c.yaml"], + args = ["--yaml-dir=tuned_examples/a3c"] +) + +py_test( + name = "test_memory_leak_appo", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/ppo/memory-leak-test-appo.yaml"], + args = ["--yaml-dir=tuned_examples/ppo"] +) + +py_test( + name = "test_memory_leak_ddpg", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/ddpg/memory-leak-test-ddpg.yaml"], + args = ["--yaml-dir=tuned_examples/ddpg"] +) + +py_test( + name = "test_memory_leak_dqn", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/dqn/memory-leak-test-dqn.yaml"], + args = ["--yaml-dir=tuned_examples/dqn"] +) + +py_test( + name = "test_memory_leak_impala", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/impala/memory-leak-test-impala.yaml"], + args = ["--yaml-dir=tuned_examples/impala"] +) + +py_test( + name = "test_memory_leak_ppo", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/ppo/memory-leak-test-ppo.yaml"], + args = ["--yaml-dir=tuned_examples/ppo"] +) + +py_test( + name = "test_memory_leak_sac", + tags = ["team:ml", "memory_leak_tests"], + main = "utils/tests/run_memory_leak_tests.py", + size = "large", + srcs = ["utils/tests/run_memory_leak_tests.py"], + data = ["tuned_examples/sac/memory-leak-test-sac.yaml"], + args = ["--yaml-dir=tuned_examples/sac"] +) + # -------------------------------------------------------------------- # Agents (quick training test iterations via `rllib train`) # diff --git a/rllib/agents/ddpg/tests/test_ddpg.py b/rllib/agents/ddpg/tests/test_ddpg.py index 7268487d7..8fe793ea7 100644 --- a/rllib/agents/ddpg/tests/test_ddpg.py +++ b/rllib/agents/ddpg/tests/test_ddpg.py @@ -92,16 +92,16 @@ class TestDDPG(unittest.TestCase): trainer = ddpg.DDPGTrainer(config=config, env="Pendulum-v1") # Setting explore=False should always return the same action. a_ = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, 1) + check(trainer.get_policy().global_timestep, 1) for i in range(50): a = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, i + 2) + check(trainer.get_policy().global_timestep, i + 2) check(a, a_) # explore=None (default: explore) should return different actions. actions = [] for i in range(50): actions.append(trainer.compute_single_action(obs)) - self.assertEqual(trainer.get_policy().global_timestep, i + 52) + check(trainer.get_policy().global_timestep, i + 52) check(np.std(actions), 0.0, false=True) trainer.stop() @@ -117,25 +117,25 @@ class TestDDPG(unittest.TestCase): trainer = ddpg.DDPGTrainer(config=config, env="Pendulum-v1") # ts=0 (get a deterministic action as per explore=False). deterministic_action = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, 1) + check(trainer.get_policy().global_timestep, 1) # ts=1-49 (in random window). random_a = [] for i in range(1, 50): random_a.append(trainer.compute_single_action(obs, explore=True)) - self.assertEqual(trainer.get_policy().global_timestep, i + 1) + check(trainer.get_policy().global_timestep, i + 1) check(random_a[-1], deterministic_action, false=True) self.assertTrue(np.std(random_a) > 0.5) # ts > 50 (a=deterministic_action + scale * N[0,1]) for i in range(50): a = trainer.compute_single_action(obs, explore=True) - self.assertEqual(trainer.get_policy().global_timestep, i + 51) + check(trainer.get_policy().global_timestep, i + 51) check(a, deterministic_action, rtol=0.1) # ts >> 50 (BUT: explore=False -> expect deterministic action). for i in range(50): a = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, i + 101) + check(trainer.get_policy().global_timestep, i + 101) check(a, deterministic_action) trainer.stop() diff --git a/rllib/agents/ddpg/tests/test_td3.py b/rllib/agents/ddpg/tests/test_td3.py index 7b01f7aa2..c63ecdbc5 100644 --- a/rllib/agents/ddpg/tests/test_td3.py +++ b/rllib/agents/ddpg/tests/test_td3.py @@ -52,16 +52,16 @@ class TestTD3(unittest.TestCase): trainer = td3.TD3Trainer(config=lcl_config, env="Pendulum-v1") # Setting explore=False should always return the same action. a_ = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, 1) + check(trainer.get_policy().global_timestep, 1) for i in range(50): a = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, i + 2) + check(trainer.get_policy().global_timestep, i + 2) check(a, a_) # explore=None (default: explore) should return different actions. actions = [] for i in range(50): actions.append(trainer.compute_single_action(obs)) - self.assertEqual(trainer.get_policy().global_timestep, i + 52) + check(trainer.get_policy().global_timestep, i + 52) check(np.std(actions), 0.0, false=True) trainer.stop() @@ -77,25 +77,25 @@ class TestTD3(unittest.TestCase): trainer = td3.TD3Trainer(config=lcl_config, env="Pendulum-v1") # ts=0 (get a deterministic action as per explore=False). deterministic_action = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, 1) + check(trainer.get_policy().global_timestep, 1) # ts=1-29 (in random window). random_a = [] for i in range(1, 30): random_a.append(trainer.compute_single_action(obs, explore=True)) - self.assertEqual(trainer.get_policy().global_timestep, i + 1) + check(trainer.get_policy().global_timestep, i + 1) check(random_a[-1], deterministic_action, false=True) self.assertTrue(np.std(random_a) > 0.3) # ts > 30 (a=deterministic_action + scale * N[0,1]) for i in range(50): a = trainer.compute_single_action(obs, explore=True) - self.assertEqual(trainer.get_policy().global_timestep, i + 31) + check(trainer.get_policy().global_timestep, i + 31) check(a, deterministic_action, rtol=0.1) # ts >> 30 (BUT: explore=False -> expect deterministic action). for i in range(50): a = trainer.compute_single_action(obs, explore=False) - self.assertEqual(trainer.get_policy().global_timestep, i + 81) + check(trainer.get_policy().global_timestep, i + 81) check(a, deterministic_action) trainer.stop() diff --git a/rllib/agents/impala/vtrace_tf.py b/rllib/agents/impala/vtrace_tf.py index 068abeaf9..61c6a7e36 100644 --- a/rllib/agents/impala/vtrace_tf.py +++ b/rllib/agents/impala/vtrace_tf.py @@ -360,15 +360,6 @@ def from_importance_weights( rhos = tf.math.exp(log_rhos) if clip_rho_threshold is not None: clipped_rhos = tf.minimum(clip_rho_threshold, rhos, name="clipped_rhos") - - tf1.summary.histogram("clipped_rhos_1000", tf.minimum(1000.0, rhos)) - tf1.summary.scalar( - "num_of_clipped_rhos", - tf.reduce_sum( - tf.cast(tf.equal(clipped_rhos, clip_rho_threshold), tf.int32) - ), - ) - tf1.summary.scalar("size_of_clipped_rhos", tf.size(clipped_rhos)) else: clipped_rhos = rhos diff --git a/rllib/agents/tests/test_memory_leaks.py b/rllib/agents/tests/test_memory_leaks.py new file mode 100644 index 000000000..2fac96dea --- /dev/null +++ b/rllib/agents/tests/test_memory_leaks.py @@ -0,0 +1,58 @@ +import unittest + +import ray +import ray.rllib.agents.dqn as dqn +import ray.rllib.agents.ppo as ppo +from ray.rllib.examples.env.memory_leaking_env import MemoryLeakingEnv +from ray.rllib.examples.policy.memory_leaking_policy import MemoryLeakingPolicy +from ray.rllib.policy.policy import PolicySpec +from ray.rllib.utils.debug.memory import check_memory_leaks + + +class TestMemoryLeaks(unittest.TestCase): + """Generically tests our memory leak diagnostics tools.""" + + @classmethod + def setUpClass(cls): + ray.init() + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + def test_leaky_env(self): + """Tests, whether our diagnostics tools can detect leaks in an env.""" + config = ppo.DEFAULT_CONFIG.copy() + # Make sure we have an env to test on the local worker. + # Otherwise, `check_memory_leaks` will complain. + config["create_env_on_driver"] = True + config["env"] = MemoryLeakingEnv + config["env_config"] = { + "static_samples": True, + } + trainer = ppo.PPOTrainer(config=config) + results = check_memory_leaks(trainer, to_check={"env"}, repeats=150) + assert results["env"] + trainer.stop() + + def test_leaky_policy(self): + """Tests, whether our diagnostics tools can detect leaks in a policy.""" + config = dqn.DEFAULT_CONFIG.copy() + # Make sure we have an env to test on the local worker. + # Otherwise, `check_memory_leaks` will complain. + config["create_env_on_driver"] = True + config["env"] = "CartPole-v0" + config["multiagent"]["policies"] = { + "default_policy": PolicySpec(policy_class=MemoryLeakingPolicy), + } + trainer = dqn.DQNTrainer(config=config) + results = check_memory_leaks(trainer, to_check={"policy"}, repeats=300) + assert results["policy"] + trainer.stop() + + +if __name__ == "__main__": + import pytest + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/env/multi_agent_env.py b/rllib/env/multi_agent_env.py index e50029df0..5de90741b 100644 --- a/rllib/env/multi_agent_env.py +++ b/rllib/env/multi_agent_env.py @@ -41,7 +41,7 @@ class MultiAgentEnv(gym.Env): if not hasattr(self, "_agent_ids"): self._agent_ids = set() - # do the action and observation spaces map from agent ids to spaces + # Do the action and observation spaces map from agent ids to spaces # for the individual agents? if not hasattr(self, "_spaces_in_preferred_format"): self._spaces_in_preferred_format = None @@ -189,7 +189,6 @@ class MultiAgentEnv(gym.Env): if agent_id != "__all__" } logger.warning("action_space_sample() has not been implemented") - del agent_ids return {} @ExperimentalAPI @@ -221,7 +220,6 @@ class MultiAgentEnv(gym.Env): samples = {agent_id: samples[agent_id] for agent_id in agent_ids} return samples logger.warning("observation_space_sample() has not been implemented") - del agent_ids return {} @PublicAPI diff --git a/rllib/env/vector_env.py b/rllib/env/vector_env.py index d039102b8..68b9fd44a 100644 --- a/rllib/env/vector_env.py +++ b/rllib/env/vector_env.py @@ -237,11 +237,6 @@ class _VectorizedGymEnv(VectorEnv): obs_batch, rew_batch, done_batch, info_batch = [], [], [], [] for i in range(self.num_envs): obs, r, done, info = self.envs[i].step(actions[i]) - if not np.isscalar(r) or not np.isreal(r) or not np.isfinite(r): - raise ValueError( - "Reward should be finite scalar, got {} ({}). " - "Actions={}.".format(r, type(r), actions[i]) - ) if not isinstance(info, dict): raise ValueError( "Info should be a dict, got {} ({})".format(info, type(info)) diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 4ed07926a..03b5bed5a 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -42,9 +42,9 @@ from ray.rllib.policy.policy import Policy, PolicySpec from ray.rllib.policy.policy_map import PolicyMap from ray.rllib.policy.torch_policy import TorchPolicy from ray.rllib.utils import force_list, merge_dicts, check_env -from ray.rllib.utils.annotations import Deprecated, DeveloperAPI, ExperimentalAPI +from ray.rllib.utils.annotations import DeveloperAPI, ExperimentalAPI from ray.rllib.utils.debug import summarize, update_global_seed_if_necessary -from ray.rllib.utils.deprecation import deprecation_warning +from ray.rllib.utils.deprecation import Deprecated, deprecation_warning from ray.rllib.utils.error import ERR_MSG_NO_GPUS, HOWTO_CHANGE_CONFIG from ray.rllib.utils.filter import get_filter, Filter from ray.rllib.utils.framework import try_import_tf, try_import_torch @@ -508,15 +508,6 @@ class RolloutWorker(ParallelIteratorWorker): if self.env is not None: # Validate environment (general validation function). if not self._disable_env_checking: - logger.warning( - "We've added a module for checking environments that " - "are used in experiments. It will cause your " - "environment to fail if your environment is not set up" - "correctly. You can disable check env by setting " - "`disable_env_checking` to True in your experiment config " - "dictionary. You can run the environment checking module " - "standalone by calling ray.rllib.utils.check_env(env)." - ) check_env(self.env) # Custom validation function given, typically a function attribute of the # algorithm trainer. diff --git a/rllib/evaluation/tests/test_rollout_worker.py b/rllib/evaluation/tests/test_rollout_worker.py index a3798d9c4..d63b0857f 100644 --- a/rllib/evaluation/tests/test_rollout_worker.py +++ b/rllib/evaluation/tests/test_rollout_worker.py @@ -167,7 +167,11 @@ class TestRolloutWorker(unittest.TestCase): STEPS_SAMPLED_COUNTER, result["info"][STEPS_SAMPLED_COUNTER] ) ) - global_timesteps = policy.global_timestep + global_timesteps = ( + policy.global_timestep + if fw == "tf" + else policy.global_timestep.numpy() + ) print("global_timesteps={}".format(global_timesteps)) expected_lr = 0.1 - ((0.1 - 0.000001) / 100000) * global_timesteps lr = policy.cur_lr diff --git a/rllib/examples/eager_execution.py b/rllib/examples/eager_execution.py index cf01a0428..3f7324459 100644 --- a/rllib/examples/eager_execution.py +++ b/rllib/examples/eager_execution.py @@ -109,8 +109,7 @@ if __name__ == "__main__": "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")), "num_workers": 0, "model": {"custom_model": "eager_model"}, - # Alternatively, use "tf2" here for enforcing TF version 2.x. - "framework": "tfe", + "framework": "tf2", } stop = { "timesteps_total": args.stop_timesteps, diff --git a/rllib/examples/env/memory_leaking_env.py b/rllib/examples/env/memory_leaking_env.py new file mode 100644 index 000000000..db22ab1e1 --- /dev/null +++ b/rllib/examples/env/memory_leaking_env.py @@ -0,0 +1,35 @@ +import logging +import uuid + +from ray.rllib.examples.env.random_env import RandomEnv +from ray.rllib.utils.annotations import override + +logger = logging.getLogger(__name__) + + +class MemoryLeakingEnv(RandomEnv): + """An env that leaks very little memory. + + Useful for proving that our memory-leak tests can catch the + slightest leaks. + """ + + def __init__(self, config=None): + super().__init__(config) + self._leak = {} + self._steps_after_reset = 0 + + @override(RandomEnv) + def reset(self): + self._steps_after_reset = 0 + return super().reset() + + @override(RandomEnv) + def step(self, action): + self._steps_after_reset += 1 + + # Only leak once an episode. + if self._steps_after_reset == 2: + self._leak[uuid.uuid4().hex.upper()] = 1 + + return super().step(action) diff --git a/rllib/examples/env/multi_agent.py b/rllib/examples/env/multi_agent.py index 644e85e3a..0ba5ebb98 100644 --- a/rllib/examples/env/multi_agent.py +++ b/rllib/examples/env/multi_agent.py @@ -149,7 +149,7 @@ class FlexAgentsMultiAgent(MultiAgentEnv): self.dones.add(i) # Sometimes, add a new agent to the episode. - if random.random() > 0.75: + if random.random() > 0.75 and len(action_dict) > 0: i = self.spawn() obs[i], rew[i], done[i], info[i] = self.agents[i].step(action) if done[i]: diff --git a/rllib/examples/env/random_env.py b/rllib/examples/env/random_env.py index b941a748e..69dc2b7fd 100644 --- a/rllib/examples/env/random_env.py +++ b/rllib/examples/env/random_env.py @@ -1,3 +1,4 @@ +import copy import gym from gym.spaces import Discrete, Tuple import numpy as np @@ -26,6 +27,11 @@ class RandomEnv(gym.Env): "reward_space", gym.spaces.Box(low=-1.0, high=1.0, shape=(), dtype=np.float32), ) + self.static_samples = config.get("static_samples", False) + if self.static_samples: + self.observation_sample = self.observation_space.sample() + self.reward_sample = self.reward_space.sample() + # Chance that an episode ends at any step. # Note that a max episode length can be specified via # `max_episode_len`. @@ -42,7 +48,10 @@ class RandomEnv(gym.Env): def reset(self): self.steps = 0 - return self.observation_space.sample() + if not self.static_samples: + return self.observation_space.sample() + else: + return copy.deepcopy(self.observation_sample) def step(self, action): if self.check_action_bounds and not self.action_space.contains(action): @@ -67,12 +76,20 @@ class RandomEnv(gym.Env): np.random.choice([True, False], p=[self.p_done, 1.0 - self.p_done]) ) - return ( - self.observation_space.sample(), - float(self.reward_space.sample()), - done, - {}, - ) + if not self.static_samples: + return ( + self.observation_space.sample(), + self.reward_space.sample(), + done, + {}, + ) + else: + return ( + copy.deepcopy(self.observation_sample), + copy.deepcopy(self.reward_sample), + done, + {}, + ) # Multi-agent version of the RandomEnv. diff --git a/rllib/examples/policy/memory_leaking_policy.py b/rllib/examples/policy/memory_leaking_policy.py new file mode 100644 index 000000000..736f26fba --- /dev/null +++ b/rllib/examples/policy/memory_leaking_policy.py @@ -0,0 +1,38 @@ +from ray.rllib.examples.policy.random_policy import RandomPolicy +from ray.rllib.utils.annotations import override + + +class MemoryLeakingPolicy(RandomPolicy): + """A Policy that leaks very little memory. + + Useful for proving that our memory-leak tests can catch the + slightest leaks. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._leak = [] + + @override(RandomPolicy) + def compute_actions(self, *args, **kwargs): + # Leak. + self._leak.append(1.5) + return super().compute_actions(*args, **kwargs) + + @override(RandomPolicy) + def compute_actions_from_input_dict(self, *args, **kwargs): + # Leak. + self._leak.append(1) + return super().compute_actions_from_input_dict(*args, **kwargs) + + @override(RandomPolicy) + def learn_on_batch(self, samples): + # Leak. + self._leak.append(False) + return super().learn_on_batch(samples) + + @override(RandomPolicy) + def compute_log_likelihoods(self, *args, **kwargs): + # Leak. + self._leak.append("test") + return super().compute_log_likelihoods(*args, **kwargs) diff --git a/rllib/examples/policy/random_policy.py b/rllib/examples/policy/random_policy.py index 5df2deb45..c121533ac 100644 --- a/rllib/examples/policy/random_policy.py +++ b/rllib/examples/policy/random_policy.py @@ -1,9 +1,10 @@ -import random - -import numpy as np from gym.spaces import Box +import numpy as np +import random +import tree # pip install dm_tree from ray.rllib.policy.policy import Policy +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.annotations import override from ray.rllib.utils.typing import ModelWeights @@ -67,3 +68,13 @@ class RandomPolicy(Policy): def set_weights(self, weights: ModelWeights) -> None: """No weights to set.""" pass + + @override(Policy) + def _get_dummy_batch_from_view_requirements(self, batch_size: int = 1): + return SampleBatch( + { + SampleBatch.OBS: tree.map_structure( + lambda s: s[None], self.observation_space.sample() + ), + } + ) diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 7e31c73ba..41c4696e7 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -34,18 +34,18 @@ logger = logging.getLogger(__name__) def _convert_to_tf(x, dtype=None): if isinstance(x, SampleBatch): dict_ = {k: v for k, v in x.items() if k != SampleBatch.INFOS} - return tf.nest.map_structure(_convert_to_tf, dict_) + return tree.map_structure(_convert_to_tf, dict_) elif isinstance(x, Policy): return x # Special handling of "Repeated" values. elif isinstance(x, RepeatedValues): return RepeatedValues( - tf.nest.map_structure(_convert_to_tf, x.values), x.lengths, x.max_len + tree.map_structure(_convert_to_tf, x.values), x.lengths, x.max_len ) if x is not None: d = dtype - return tf.nest.map_structure( + return tree.map_structure( lambda f: _convert_to_tf(f, d) if isinstance(f, RepeatedValues) else tf.convert_to_tensor(f, d) @@ -311,6 +311,12 @@ def build_eager_tf_policy( self.framework = config.get("framework", "tfe") Policy.__init__(self, observation_space, action_space, config) + # Global timestep should be a tensor. + self.global_timestep = tf.Variable(0, trainable=False, dtype=tf.int64) + self.explore = tf.Variable( + self.config["explore"], trainable=False, dtype=tf.bool + ) + # Log device and worker index. from ray.rllib.evaluation.rollout_worker import get_global_worker @@ -438,7 +444,7 @@ def build_eager_tf_policy( after_init(self, observation_space, action_space, config) # Got to reset global_timestep again after fake run-throughs. - self.global_timestep = 0 + self.global_timestep.assign(0) @override(Policy) def compute_actions_from_input_dict( @@ -455,7 +461,7 @@ def build_eager_tf_policy( self._is_training = False - explore = explore if explore is not None else self.config["explore"] + explore = explore if explore is not None else self.explore timestep = timestep if timestep is not None else self.global_timestep if isinstance(timestep, tf.Tensor): timestep = int(timestep.numpy()) @@ -485,7 +491,7 @@ def build_eager_tf_policy( timestep, ) # Update our global timestep by the batch size. - self.global_timestep += int(tree.flatten(ret[0])[0].shape[0]) + self.global_timestep.assign_add(tree.flatten(ret[0])[0].shape.as_list()[0]) return convert_to_numpy(ret) @override(Policy) @@ -501,7 +507,6 @@ def build_eager_tf_policy( timestep=None, **kwargs, ): - # Create input dict to simply pass the entire call to # self.compute_actions_from_input_dict(). input_dict = SampleBatch( @@ -693,6 +698,7 @@ def build_eager_tf_policy( @override(Policy) def get_state(self): state = super().get_state() + state["global_timestep"] = state["global_timestep"].numpy() if self._optimizer and len(self._optimizer.variables()) > 0: state["_optimizer_variables"] = self._optimizer.variables() # Add exploration state. @@ -701,7 +707,6 @@ def build_eager_tf_policy( @override(Policy) def set_state(self, state): - state = state.copy() # shallow copy # Set optimizer vars first. optimizer_vars = state.get("_optimizer_variables", None) if optimizer_vars and self._optimizer.variables(): @@ -716,8 +721,9 @@ def build_eager_tf_policy( # Set exploration's state. if hasattr(self, "exploration") and "_exploration_state" in state: self.exploration.set_state(state=state["_exploration_state"]) - # Then the Policy's (NN) weights. - super().set_state(state) + # Weights and global_timestep (tf vars). + self.set_weights(state["weights"]) + self.global_timestep.assign(state["global_timestep"]) @override(Policy) def export_checkpoint(self, export_dir): diff --git a/rllib/policy/policy.py b/rllib/policy/policy.py index 774ab0fc4..b0c6385a9 100644 --- a/rllib/policy/policy.py +++ b/rllib/policy/policy.py @@ -737,7 +737,11 @@ class Policy(metaclass=ABCMeta): """ # Store the current global time step (sum over all policies' sample # steps). - self.global_timestep = global_vars["timestep"] + # Make sure, we keep global_timestep as a Tensor. + if self.framework in ["tf2", "tfe"]: + self.global_timestep.assign(global_vars["timestep"]) + else: + self.global_timestep = global_vars["timestep"] @DeveloperAPI def export_checkpoint(self, export_dir: str) -> None: diff --git a/rllib/policy/sample_batch.py b/rllib/policy/sample_batch.py index 098ff4606..add274ac6 100644 --- a/rllib/policy/sample_batch.py +++ b/rllib/policy/sample_batch.py @@ -138,13 +138,12 @@ class SampleBatch(dict): # value that is actually a ndarray/tensor. This would fail if # all values are nested dicts/tuples of more complex underlying # structures. - len_ = ( - len(v) - if isinstance(v, (list, np.ndarray)) or (torch and torch.is_tensor(v)) - else None - ) - if len_: - lengths.append(len_) + try: + len_ = len(v) if not isinstance(v, (dict, tuple)) else None + if len_: + lengths.append(len_) + except Exception: + pass if ( self.get(SampleBatch.SEQ_LENS) is not None diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 34f44a0f8..8f40e7036 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -1187,11 +1187,12 @@ class TFPolicy(Policy): # Build the feed dict from the batch. feed_dict = {} for key, placeholders in self._loss_input_dict.items(): - tree.map_structure( + a = tree.map_structure( lambda ph, v: feed_dict.__setitem__(ph, v), placeholders, train_batch[key], ) + del a state_keys = ["state_in_{}".format(i) for i in range(len(self._state_inputs))] for key in state_keys: diff --git a/rllib/tests/test_timesteps.py b/rllib/tests/test_timesteps.py index 3fd304a20..a8ad61953 100644 --- a/rllib/tests/test_timesteps.py +++ b/rllib/tests/test_timesteps.py @@ -4,7 +4,7 @@ import unittest import ray import ray.rllib.agents.pg as pg from ray.rllib.examples.env.random_env import RandomEnv -from ray.rllib.utils.test_utils import framework_iterator +from ray.rllib.utils.test_utils import check, framework_iterator class TestTimeSteps(unittest.TestCase): @@ -32,19 +32,19 @@ class TestTimeSteps(unittest.TestCase): for i in range(1, 21): trainer.compute_single_action(obs) - self.assertEqual(policy.global_timestep, i) + check(policy.global_timestep, i) for i in range(1, 21): policy.compute_actions(obs_batch) - self.assertEqual(policy.global_timestep, i + 20) + check(policy.global_timestep, i + 20) # Artificially set ts to 100Bio, then keep computing actions and # train. crazy_timesteps = int(1e11) - policy.global_timestep = crazy_timesteps + policy.on_global_var_update({"timestep": crazy_timesteps}) # Run for 10 more ts. for i in range(1, 11): policy.compute_actions(obs_batch) - self.assertEqual(policy.global_timestep, i + crazy_timesteps) + check(policy.global_timestep, i + crazy_timesteps) trainer.train() diff --git a/rllib/tuned_examples/a3c/memory-leak-test-a3c.yaml b/rllib/tuned_examples/a3c/memory-leak-test-a3c.yaml new file mode 100644 index 000000000..7b9c0b79d --- /dev/null +++ b/rllib/tuned_examples/a3c/memory-leak-test-a3c.yaml @@ -0,0 +1,15 @@ +memory-leak-test-a3c: + stop: + timesteps_total: 150000 + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv + run: A3C + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + num_workers: 4 + num_envs_per_worker: 5 diff --git a/rllib/tuned_examples/ddpg/memory-leak-test-ddpg.yaml b/rllib/tuned_examples/ddpg/memory-leak-test-ddpg.yaml new file mode 100644 index 000000000..1eae80e07 --- /dev/null +++ b/rllib/tuned_examples/ddpg/memory-leak-test-ddpg.yaml @@ -0,0 +1,12 @@ +memory-leak-test-ddpg: + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnvContActions + run: DDPG + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + buffer_size: 500 # use small buffer to catch memory leaks diff --git a/rllib/tuned_examples/dqn/memory-leak-test-dqn.yaml b/rllib/tuned_examples/dqn/memory-leak-test-dqn.yaml new file mode 100644 index 000000000..62fea118d --- /dev/null +++ b/rllib/tuned_examples/dqn/memory-leak-test-dqn.yaml @@ -0,0 +1,12 @@ +memory-leak-test-dqn: + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv + run: DQN + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + buffer_size: 500 # use small buffer to catch memory leaks diff --git a/rllib/tuned_examples/impala/memory-leak-test-impala.yaml b/rllib/tuned_examples/impala/memory-leak-test-impala.yaml new file mode 100644 index 000000000..b61c2c186 --- /dev/null +++ b/rllib/tuned_examples/impala/memory-leak-test-impala.yaml @@ -0,0 +1,17 @@ +memory-leak-test-impala: + stop: + timesteps_total: 150000 + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv + run: IMPALA + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + num_gpus: 0 + num_workers: 3 + num_aggregation_workers: 1 + num_envs_per_worker: 5 diff --git a/rllib/tuned_examples/ppo/memory-leak-test-appo.yaml b/rllib/tuned_examples/ppo/memory-leak-test-appo.yaml new file mode 100644 index 000000000..8673aead6 --- /dev/null +++ b/rllib/tuned_examples/ppo/memory-leak-test-appo.yaml @@ -0,0 +1,14 @@ +memory-leak-test-appo: + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv + run: APPO + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + num_workers: 4 + num_envs_per_worker: 5 + rollout_fragment_length: 20 diff --git a/rllib/tuned_examples/ppo/memory-leak-test-ppo.yaml b/rllib/tuned_examples/ppo/memory-leak-test-ppo.yaml new file mode 100644 index 000000000..e6fd5ba4f --- /dev/null +++ b/rllib/tuned_examples/ppo/memory-leak-test-ppo.yaml @@ -0,0 +1,17 @@ +memory-leak-test-ppo: + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv + run: PPO + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + rollout_fragment_length: 20 + num_workers: 4 + num_envs_per_worker: 5 + train_batch_size: 1000 + sgd_minibatch_size: 256 + num_sgd_iter: 5 diff --git a/rllib/tuned_examples/sac/memory-leak-test-sac.yaml b/rllib/tuned_examples/sac/memory-leak-test-sac.yaml new file mode 100644 index 000000000..39a9097ce --- /dev/null +++ b/rllib/tuned_examples/sac/memory-leak-test-sac.yaml @@ -0,0 +1,12 @@ +memory-leak-test-sac: + env: + ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnvContActions + run: SAC + config: + # Works for both torch and tf. + framework: tf + # Switch off np.random, which is known to have memory leaks. + env_config: + config: + static_samples: true + buffer_size: 500 # use small buffer to catch memory leaks diff --git a/rllib/utils/debug/__init__.py b/rllib/utils/debug/__init__.py new file mode 100644 index 000000000..140323eef --- /dev/null +++ b/rllib/utils/debug/__init__.py @@ -0,0 +1,10 @@ +from ray.rllib.utils.debug.deterministic import update_global_seed_if_necessary +from ray.rllib.utils.debug.memory import check_memory_leaks +from ray.rllib.utils.debug.summary import summarize + + +__all__ = [ + "check_memory_leaks", + "summarize", + "update_global_seed_if_necessary", +] diff --git a/rllib/utils/debug/deterministic.py b/rllib/utils/debug/deterministic.py new file mode 100644 index 000000000..15cefc645 --- /dev/null +++ b/rllib/utils/debug/deterministic.py @@ -0,0 +1,54 @@ +import numpy as np +import os +import random +from typing import Optional + +from ray.rllib.utils.framework import try_import_tf, try_import_torch + + +def update_global_seed_if_necessary( + framework: Optional[str] = None, seed: Optional[int] = None +) -> None: + """Seed global modules such as random, numpy, torch, or tf. + + This is useful for debugging and testing. + + Args: + framework: The framework specifier (may be None). + seed: An optional int seed. If None, will not do + anything. + """ + if seed is None: + return + + # Python random module. + random.seed(seed) + # Numpy. + np.random.seed(seed) + + # Torch. + if framework == "torch": + torch, _ = try_import_torch() + torch.manual_seed(seed) + # See https://github.com/pytorch/pytorch/issues/47672. + cuda_version = torch.version.cuda + if cuda_version is not None and float(torch.version.cuda) >= 10.2: + os.environ["CUBLAS_WORKSPACE_CONFIG"] = "4096:8" + else: + from distutils.version import LooseVersion + + if LooseVersion(torch.__version__) >= LooseVersion("1.8.0"): + # Not all Operations support this. + torch.use_deterministic_algorithms(True) + else: + torch.set_deterministic(True) + # This is only for Convolution no problem. + torch.backends.cudnn.deterministic = True + elif framework == "tf2" or framework == "tfe": + tf1, tf, _ = try_import_tf() + # Tf2.x. + if framework == "tf2": + tf.random.set_seed(seed) + # Tf-eager. + elif framework == "tfe": + tf1.set_random_seed(seed) diff --git a/rllib/utils/debug/memory.py b/rllib/utils/debug/memory.py new file mode 100644 index 000000000..a33eb17f0 --- /dev/null +++ b/rllib/utils/debug/memory.py @@ -0,0 +1,362 @@ +from collections import defaultdict, namedtuple +import numpy as np +import os +import re +import scipy +import tracemalloc +import tree # pip install dm_tree +from typing import Callable, DefaultDict, List, Optional, Set + +from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch + + +# A suspicious memory-allocating stack-trace that we should re-test +# to make sure it's not a false positive. +Suspect = namedtuple( + "Suspect", + [ + # The stack trace of the allocation, going back n frames, depending + # on the tracemalloc.start(n) call. + "traceback", + # The amount of memory taken by this particular stack trace + # over the course of the experiment. + "memory_increase", + # The slope of the scipy linear regression (x=iteration; y=memory size). + "slope", + # The rvalue of the scipy linear regression. + "rvalue", + # The memory size history (list of all memory sizes over all iterations). + "hist", + ], +) + + +def check_memory_leaks( + trainer, + to_check: Optional[Set[str]] = None, + repeats: Optional[int] = None, + max_num_trials: int = 3, +) -> DefaultDict[str, List[Suspect]]: + """Diagnoses the given trainer for possible memory leaks. + + Isolates single components inside the trainer's local worker, e.g. the env, + policy, etc.. and calls some of their methods repeatedly, while checking + the memory footprints and keeping track of which lines in the code add + un-GC'd items to memory. + + Args: + trainer: The Trainer instance to test. + to_check: Set of strings to indentify components to test. Allowed strings + are: "env", "policy", "model", "rollout_worker". By default, check all + of these. + repeats: Number of times the test code block should get executed (per trial). + If a trial fails, a new trial may get started with a larger number of + repeats: actual_repeats = `repeats` * (trial + 1) (1st trial == 0). + max_num_trials: The maximum number of trials to run each check for. + + Raises: + A defaultdict(list) with keys being the `to_check` strings and values being + lists of Suspect instances that were found. + """ + local_worker = trainer.workers.local_worker() + + # Which components should we test? + to_check = to_check or {"env", "model", "policy", "rollout_worker"} + + results_per_category = defaultdict(list) + + # Test a single sub-env (first in the VectorEnv)? + if "env" in to_check: + assert local_worker.async_env is not None, ( + "ERROR: Cannot test 'env' since given trainer does not have one " + "in its local worker. Try setting `create_env_on_driver=True`." + ) + + # Isolate the first sub-env in the vectorized setup and test it. + env = local_worker.async_env.get_sub_environments()[0] + action_space = env.action_space + # Always use same action to avoid numpy random caused memory leaks. + action_sample = action_space.sample() + + def code(): + env.reset() + while True: + # If masking is used, try something like this: + # np.random.choice( + # action_space.n, p=(obs["action_mask"] / sum(obs["action_mask"]))) + _, _, done, _ = env.step(action_sample) + if done: + break + + test = _test_some_code_for_memory_leaks( + desc="Looking for leaks in env, running through episodes.", + init=None, + code=code, + # How many times to repeat the function call? + repeats=repeats or 200, + max_num_trials=max_num_trials, + ) + if test: + results_per_category["env"].extend(test) + + # Test the policy (single-agent case only so far). + if "policy" in to_check: + policy = local_worker.policy_map[DEFAULT_POLICY_ID] + + # Get a fixed obs (B=10). + obs = tree.map_structure( + lambda s: np.stack([s] * 10, axis=0), policy.observation_space.sample() + ) + + print("Looking for leaks in Policy") + + def code(): + policy.compute_actions_from_input_dict( + { + "obs": obs, + } + ) + + # Call `compute_actions_from_input_dict()` n times. + test = _test_some_code_for_memory_leaks( + desc="Calling `compute_actions_from_input_dict()`.", + init=None, + code=code, + # How many times to repeat the function call? + repeats=repeats or 400, + # How many times to re-try if we find a suspicious memory + # allocation? + max_num_trials=max_num_trials, + ) + if test: + results_per_category["policy"].extend(test) + + # Call `learn_on_batch()` n times. + dummy_batch = policy._get_dummy_batch_from_view_requirements(batch_size=16) + + test = _test_some_code_for_memory_leaks( + desc="Calling `learn_on_batch()`.", + init=None, + code=lambda: policy.learn_on_batch(dummy_batch), + # How many times to repeat the function call? + repeats=repeats or 100, + max_num_trials=max_num_trials, + ) + if test: + results_per_category["policy"].extend(test) + + # Test only the model. + if "model" in to_check: + policy = local_worker.policy_map[DEFAULT_POLICY_ID] + + # Get a fixed obs. + obs = tree.map_structure(lambda s: s[None], policy.observation_space.sample()) + + print("Looking for leaks in Model") + + # Call `compute_actions_from_input_dict()` n times. + test = _test_some_code_for_memory_leaks( + desc="Calling `[model]()`.", + init=None, + code=lambda: policy.model({SampleBatch.OBS: obs}), + # How many times to repeat the function call? + repeats=repeats or 400, + # How many times to re-try if we find a suspicious memory + # allocation? + max_num_trials=max_num_trials, + ) + if test: + results_per_category["model"].extend(test) + + # Test the RolloutWorker. + if "rollout_worker" in to_check: + print("Looking for leaks in local RolloutWorker") + + def code(): + local_worker.sample() + local_worker.get_metrics() + + # Call `compute_actions_from_input_dict()` n times. + test = _test_some_code_for_memory_leaks( + desc="Calling `sample()` and `get_metrics()`.", + init=None, + code=code, + # How many times to repeat the function call? + repeats=repeats or 200, + # How many times to re-try if we find a suspicious memory + # allocation? + max_num_trials=max_num_trials, + ) + if test: + results_per_category["rollout_worker"].extend(test) + + return results_per_category + + +def _test_some_code_for_memory_leaks( + desc: str, + init: Optional[Callable[[], None]], + code: Callable[[], None], + repeats: int, + max_num_trials: int = 1, +) -> List[Suspect]: + """Runs given code (and init code) n times and checks for memory leaks. + + Args: + desc: A descriptor of the test. + init: Optional code to be executed initially. + code: The actual code to be checked for producing memory leaks. + repeats: How many times to repeatedly execute `code`. + max_num_trials: The maximum number of trials to run. A new trial is only + run, if the previous one produced a memory leak. For all non-1st trials, + `repeats` calculates as: actual_repeats = `repeats` * (trial + 1), where + the first trial is 0. + + Returns: + A list of Suspect objects, describing possible memory leaks. If list + is empty, no leaks have been found. + """ + + def _i_print(i): + if (i + 1) % 10 == 0: + print(".", end="" if (i + 1) % 100 else f" {i + 1}\n", flush=True) + + # Do n trials to make sure a found leak is really one. + suspicious = set() + suspicious_stats = [] + for trial in range(max_num_trials): + # Store up to n frames of each call stack. + tracemalloc.start(20) + + table = defaultdict(list) + + # Repeat running code for n times. + # Increase repeat value with each trial to make sure stats are more + # solid each time (avoiding false positives). + actual_repeats = repeats * (trial + 1) + + print(f"{desc} {actual_repeats} times.") + + # Initialize if necessary. + if init is not None: + init() + # Run `code` n times, each time taking a memory snapshot. + for i in range(actual_repeats): + _i_print(i) + code() + _take_snapshot(table, suspicious) + print("\n") + + # Check, which traces have moved up in their memory consumption + # constantly over time. + suspicious.clear() + suspicious_stats.clear() + # Suspicious memory allocation found? + suspects = _find_memory_leaks_in_table(table) + for suspect in sorted(suspects, key=lambda s: s.memory_increase, reverse=True): + # Only print out the biggest offender: + if len(suspicious) == 0: + _pprint_suspect(suspect) + print("-> added to retry list") + suspicious.add(suspect.traceback) + suspicious_stats.append(suspect) + + tracemalloc.stop() + + # Some suspicious memory allocations found. + if len(suspicious) > 0: + print(f"{len(suspicious)} suspects found. Top-ten:") + for i, s in enumerate(suspicious_stats): + if i > 10: + break + print( + f"{i}) line={s.traceback[-1]} mem-increase={s.memory_increase}B " + f"slope={s.slope}B/detection rval={s.rvalue}" + ) + # Nothing suspicious found -> Exit trial loop and return. + else: + print("No remaining suspects found -> returning") + break + + # Print out final top offender. + if len(suspicious_stats) > 0: + _pprint_suspect(suspicious_stats[0]) + + return suspicious_stats + + +def _take_snapshot(table, suspicious=None): + # Take a memory snapshot. + snapshot = tracemalloc.take_snapshot() + # Group all memory allocations by their stacktrace (going n frames + # deep as defined above in tracemalloc.start(n)). + # Then sort groups by size, then count, then trace. + top_stats = snapshot.statistics("traceback") + + # For the first m largest increases, keep only, if a) first trial or b) those + # that are already in the `suspicious` set. + for stat in top_stats[:100]: + if not suspicious or stat.traceback in suspicious: + table[stat.traceback].append(stat.size) + + +def _find_memory_leaks_in_table(table): + suspects = [] + + for traceback, hist in table.items(): + # Do a quick mem increase check. + memory_increase = hist[-1] - hist[0] + + # Only if memory increased, do we check further. + if memory_increase <= 0.0: + continue + + # Ignore this very module here (we are collecting lots of data + # so an increase is expected). + top_stack = str(traceback[-1]) + drive_separator = "\\\\" if os.name == "nt" else "/" + if any( + s in top_stack + for s in [ + "tracemalloc", + "pycharm", + "thirdparty_files/psutil", + re.sub("\\.", drive_separator, __name__) + ".py", + ] + ): + continue + + # Do a linear regression to get the slope and R-value. + line = scipy.stats.linregress(x=np.arange(len(hist)), y=np.array(hist)) + + # - If weak positive slope and some confidence and + # increase > n bytes -> error. + # - If stronger positive slope -> error. + if memory_increase > 1000 and ( + (line.slope > 40.0 and line.rvalue > 0.85) + or (line.slope > 20.0 and line.rvalue > 0.9) + or (line.slope > 10.0 and line.rvalue > 0.95) + ): + suspects.append( + Suspect( + traceback=traceback, + memory_increase=memory_increase, + slope=line.slope, + rvalue=line.rvalue, + hist=hist, + ) + ) + + return suspects + + +def _pprint_suspect(suspect): + print( + "Most suspicious memory allocation in traceback " + "(only printing out this one, but all (less suspicious)" + " suspects will be investigated as well):" + ) + print("\n".join(suspect.traceback.format())) + print(f"Increase total={suspect.memory_increase}B") + print(f"Slope={suspect.slope} B/detection") + print(f"Rval={suspect.rvalue}") diff --git a/rllib/utils/debug.py b/rllib/utils/debug/summary.py similarity index 57% rename from rllib/utils/debug.py rename to rllib/utils/debug/summary.py index 00e3e3dd6..58370f2df 100644 --- a/rllib/utils/debug.py +++ b/rllib/utils/debug/summary.py @@ -1,11 +1,8 @@ import numpy as np -import os import pprint -import random -from typing import Any, Mapping, Optional +from typing import Any, Mapping from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch -from ray.rllib.utils.framework import try_import_tf, try_import_torch _printer = pprint.PrettyPrinter(indent=2, width=60) @@ -78,51 +75,3 @@ class _StringValue: def __repr__(self): return self.value - - -def update_global_seed_if_necessary( - framework: Optional[str] = None, seed: Optional[int] = None -) -> None: - """Seed global modules such as random, numpy, torch, or tf. - - This is useful for debugging and testing. - - Args: - framework: The framework specifier (may be None). - seed: An optional int seed. If None, will not do - anything. - """ - if seed is None: - return - - # Python random module. - random.seed(seed) - # Numpy. - np.random.seed(seed) - - # Torch. - if framework == "torch": - torch, _ = try_import_torch() - torch.manual_seed(seed) - # See https://github.com/pytorch/pytorch/issues/47672. - cuda_version = torch.version.cuda - if cuda_version is not None and float(torch.version.cuda) >= 10.2: - os.environ["CUBLAS_WORKSPACE_CONFIG"] = "4096:8" - else: - from distutils.version import LooseVersion - - if LooseVersion(torch.__version__) >= LooseVersion("1.8.0"): - # Not all Operations support this. - torch.use_deterministic_algorithms(True) - else: - torch.set_deterministic(True) - # This is only for Convolution no problem. - torch.backends.cudnn.deterministic = True - elif framework == "tf2" or framework == "tfe": - tf1, tf, _ = try_import_tf() - # Tf2.x. - if framework == "tf2": - tf.random.set_seed(seed) - # Tf-eager. - elif framework == "tfe": - tf1.set_random_seed(seed) diff --git a/rllib/utils/pre_checks/env.py b/rllib/utils/pre_checks/env.py index 93f07e9d1..18dad2882 100644 --- a/rllib/utils/pre_checks/env.py +++ b/rllib/utils/pre_checks/env.py @@ -1,13 +1,14 @@ """Common pre-checks for all RLlib experiments.""" import logging -import numpy as np -from typing import TYPE_CHECKING, Set - import gym +import numpy as np +import traceback +from typing import TYPE_CHECKING, Set from ray.actor import ActorHandle from ray.rllib.utils.spaces.space_utils import convert_element_to_space_type from ray.rllib.utils.typing import EnvType +from ray.util import log_once if TYPE_CHECKING: from ray.rllib.env import BaseEnv, MultiAgentEnv @@ -40,35 +41,48 @@ def check_env(env: EnvType) -> None: logger.warning("Skipping env checking for this experiment") return - if not isinstance( - env, - ( - BaseEnv, - gym.Env, - MultiAgentEnv, - RemoteBaseEnv, - VectorEnv, - ExternalMultiAgentEnv, - ExternalEnv, - ActorHandle, - ), - ): + try: + if not isinstance( + env, + ( + BaseEnv, + gym.Env, + MultiAgentEnv, + RemoteBaseEnv, + VectorEnv, + ExternalMultiAgentEnv, + ExternalEnv, + ActorHandle, + ), + ): + raise ValueError( + "Env must be one of the supported types: BaseEnv, gym.Env, " + "MultiAgentEnv, VectorEnv, RemoteBaseEnv, ExternalMultiAgentEnv, " + f"ExternalEnv, but instead was a {type(env)}" + ) + if isinstance(env, MultiAgentEnv): + check_multiagent_environments(env) + elif isinstance(env, gym.Env): + check_gym_environments(env) + elif isinstance(env, BaseEnv): + check_base_env(env) + else: + logger.warning( + "Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, " + "ExternalMultiAgentEnv,or ExternalEnvs or Environments that are " + "Ray actors" + ) + except Exception: + actual_error = traceback.format_exc() raise ValueError( - "Env must be one of the supported types: BaseEnv, gym.Env, " - "MultiAgentEnv, VectorEnv, RemoteBaseEnv, ExternalMultiAgentEnv, " - f"ExternalEnv, but instead was a {type(env)}" - ) - - if isinstance(env, MultiAgentEnv): - check_multiagent_environments(env) - elif isinstance(env, gym.Env): - check_gym_environments(env) - elif isinstance(env, BaseEnv): - check_base_env(env) - else: - logger.warning( - "Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, " - "ExternalMultiAgentEnv,or ExternalEnvs or Environments that are Ray actors" + f"{actual_error}\n" + "The above error has been found in your environment! " + "We've added a module for checking your custom environments. It " + "may cause your experiment to fail if your environment is not set up" + "correctly. You can disable this behavior by setting " + "`disable_env_checking=True` in your config " + "dictionary. You can run the environment checking module " + "standalone by calling ray.rllib.utils.check_env([env])." ) @@ -191,6 +205,23 @@ def check_multiagent_environments(env: "MultiAgentEnv") -> None: if not isinstance(env, MultiAgentEnv): raise ValueError("The passed env is not a MultiAgentEnv.") + elif not ( + hasattr(env, "observation_space") + and hasattr(env, "action_space") + and hasattr(env, "_agent_ids") + and hasattr(env, "_spaces_in_preferred_format") + ): + if log_once("ma_env_super_ctor_called"): + logger.warning( + f"Your MultiAgentEnv {env} does not have some or all of the needed " + "base-class attributes! Make sure you call `super().__init__` from " + "within your MutiAgentEnv's constructor. " + "This will raise an error in the future." + ) + env.observation_space = ( + env.action_space + ) = env._spaces_in_preferred_format = None + env._agent_ids = set() reset_obs = env.reset() sampled_obs = env.observation_space_sample() @@ -331,12 +362,17 @@ def _check_reward(reward, base_env=False, agent_ids=None): for _, multi_agent_dict in reward.items(): for agent_id, rew in multi_agent_dict.items(): if not ( - np.isreal(rew) and not isinstance(rew, bool) and np.isscalar(rew) + np.isreal(rew) + and not isinstance(rew, bool) + and ( + np.isscalar(rew) + or (isinstance(rew, np.ndarray) and rew.shape == ()) + ) ): error = ( "Your step function must return rewards that are" f" integer or float. reward: {rew}. Instead it was a " - f"{type(reward)}" + f"{type(rew)}" ) raise ValueError(error) if not (agent_id in agent_ids or agent_id == "__all__"): @@ -347,7 +383,12 @@ def _check_reward(reward, base_env=False, agent_ids=None): ) raise ValueError(error) elif not ( - np.isreal(reward) and not isinstance(reward, bool) and np.isscalar(reward) + np.isreal(reward) + and not isinstance(reward, bool) + and ( + np.isscalar(reward) + or (isinstance(reward, np.ndarray) and reward.shape == ()) + ) ): error = ( "Your step function must return a reward that is integer or float. " diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index 6cea5aebd..00da8fea1 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -177,7 +177,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): assert isinstance(y, dict), "ERROR: If x is dict, y needs to be a dict as well!" y_keys = set(x.keys()) for key, value in x.items(): - assert key in y, "ERROR: y does not have x's key='{}'! y={}".format(key, y) + assert key in y, f"ERROR: y does not have x's key='{key}'! y={y}" check(value, y[key], decimals=decimals, atol=atol, rtol=rtol, false=false) y_keys.remove(key) assert not y_keys, "ERROR: y contains keys ({}) that are not in x! y={}".format( @@ -198,21 +198,21 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): # Boolean comparison. elif isinstance(x, (np.bool_, bool)): if false is True: - assert bool(x) is not bool(y), "ERROR: x ({}) is y ({})!".format(x, y) + assert bool(x) is not bool(y), f"ERROR: x ({x}) is y ({y})!" else: - assert bool(x) is bool(y), "ERROR: x ({}) is not y ({})!".format(x, y) + assert bool(x) is bool(y), f"ERROR: x ({x}) is not y ({y})!" # Nones or primitives. elif x is None or y is None or isinstance(x, (str, int)): if false is True: - assert x != y, "ERROR: x ({}) is the same as y ({})!".format(x, y) + assert x != y, f"ERROR: x ({x}) is the same as y ({y})!" else: - assert x == y, "ERROR: x ({}) is not the same as y ({})!".format(x, y) + assert x == y, f"ERROR: x ({x}) is not the same as y ({y})!" # String/byte comparisons. elif hasattr(x, "dtype") and (x.dtype == object or str(x.dtype).startswith(" raise error (not expected to be equal). if false is True: - assert False, "ERROR: x ({}) is the same as y ({})!".format(x, y) + assert False, f"ERROR: x ({x}) is the same as y ({y})!" # Using atol/rtol. else: @@ -276,7 +276,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): raise e else: if false is True: - assert False, "ERROR: x ({}) is the same as y ({})!".format(x, y) + assert False, f"ERROR: x ({x}) is the same as y ({y})!" def check_compute_single_action( diff --git a/rllib/utils/tests/run_memory_leak_tests.py b/rllib/utils/tests/run_memory_leak_tests.py new file mode 100644 index 000000000..4800463dc --- /dev/null +++ b/rllib/utils/tests/run_memory_leak_tests.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# Runs one or more memory leak tests. +# +# Example usage: +# $ python run_memory_leak_tests.py memory-leak-test-ppo.yaml +# +# When using in BAZEL (with py_test), e.g. see in ray/rllib/BUILD: +# py_test( +# name = "memory_leak_ppo", +# main = "tests/test_memory_leak.py", +# tags = ["memory_leak_tests"], +# size = "medium", # 5min timeout +# srcs = ["tests/test_memory_leak.py"], +# data = glob(["tuned_examples/ppo/*.yaml"]), +# # Pass `BAZEL` option and the path to look for yaml files. +# args = ["BAZEL", "tuned_examples/ppo/memory-leak-test-ppo.yaml"] +# ) + +import argparse +import os +from pathlib import Path +import sys +import yaml + +import ray +from ray.rllib.agents.registry import get_trainer_class +from ray.rllib.utils.debug.memory import check_memory_leaks +from ray.rllib import _register_all + +parser = argparse.ArgumentParser() +parser.add_argument( + "--framework", + required=False, + choices=["jax", "tf2", "tf", "tfe", "torch", None], + default=None, + help="The deep learning framework to use.", +) +parser.add_argument( + "--yaml-dir", + required=True, + type=str, + help="The directory in which to find all yamls to test.", +) +parser.add_argument( + "--local-mode", + action="store_true", + help="Run ray in local mode for easier debugging.", +) +parser.add_argument( + "--to-check", + nargs="+", + default=["env", "policy", "rollout_worker"], + help="List of 'env', 'policy', 'rollout_worker', 'model'.", +) + + +if __name__ == "__main__": + args = parser.parse_args() + + # Bazel regression test mode: Get path to look for yaml files. + # Get the path or single file to use. + rllib_dir = Path(__file__).parent.parent.parent + print("rllib dir={}".format(rllib_dir)) + + abs_yaml_path = os.path.join(rllib_dir, args.yaml_dir) + # Single file given. + if os.path.isfile(abs_yaml_path): + yaml_files = [abs_yaml_path] + # Given path/file does not exist. + elif not os.path.isdir(abs_yaml_path): + raise ValueError("yaml-dir ({}) not found!".format(args.yaml_dir)) + # Path given -> Get all yaml files in there via rglob. + else: + yaml_files = rllib_dir.rglob(args.yaml_dir + "/*.yaml") + yaml_files = sorted( + map(lambda path: str(path.absolute()), yaml_files), reverse=True + ) + + print("Will run the following memory-leak tests:") + for yaml_file in yaml_files: + print("->", yaml_file) + + # Loop through all collected files. + for yaml_file in yaml_files: + experiments = yaml.safe_load(open(yaml_file).read()) + assert ( + len(experiments) == 1 + ), "Error, can only run a single experiment per yaml file!" + + experiment = list(experiments.values())[0] + + # Add framework option to exp configs. + if args.framework: + experiment["config"]["framework"] = args.framework + # Create env on local_worker for memory leak testing just the env. + experiment["config"]["create_env_on_driver"] = True + # Always run with eager-tracing when framework=tf2 if not in local-mode. + if args.framework in ["tf2", "tfe"] and not args.local_mode: + experiment["config"]["eager_tracing"] = True + # experiment["config"]["callbacks"] = MemoryTrackingCallbacks + + # Move "env" specifier into config. + experiment["config"]["env"] = experiment["env"] + experiment.pop("env", None) + + # Print out the actual config. + print("== Test config ==") + print(yaml.dump(experiment)) + + # Construct the trainer instance based on the given config. + leaking = True + try: + ray.init(num_cpus=5, local_mode=args.local_mode) + trainer = get_trainer_class(experiment["run"])(experiment["config"]) + results = check_memory_leaks( + trainer, + to_check=set(args.to_check), + ) + if not results: + leaking = False + finally: + ray.shutdown() + _register_all() + + if not leaking: + print("Memory leak test PASSED") + else: + print("Memory leak test FAILED. Exiting with Error.") + sys.exit(1)