[RLlib] Memory leak finding toolset using tracemalloc + CI memory leak tests. (#15412)

This commit is contained in:
Sven Mika 2022-04-12 07:50:09 +02:00 committed by GitHub
parent d7ef546352
commit a8494742a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1084 additions and 205 deletions

View file

@ -5,7 +5,7 @@
- DATA_PROCESSING_TESTING=1 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-gpu python/ray/ml/...
- label: ":brain: RLlib: Learning discr. actions TF2-static-graph (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning discr. actions TF2-static-graph"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -15,7 +15,7 @@
--test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph
--test_arg=--framework=tf
rllib/...
- label: ":brain: RLlib: Learning cont. actions TF2-static-graph (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning cont. actions TF2-static-graph"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -25,7 +25,7 @@
--test_tag_filters=learning_tests_continuous,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph
--test_arg=--framework=tf
rllib/...
- label: ":brain: RLlib: Learning discr. actions TF2-eager-tracing (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning discr. actions TF2-eager-tracing"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -35,7 +35,7 @@
--test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-multi_gpu,-no_tf_eager_tracing
--test_arg=--framework=tf2
rllib/...
- label: ":brain: RLlib: Learning cont. actions TF2-eager-tracing (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning cont. actions TF2-eager-tracing"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -46,27 +46,7 @@
--test_arg=--framework=tf2
rllib/...
- label: ":brain: RLlib: Learning discr. actions TF1-static-graph (from rllib/tuned_examples/*.yaml)"
conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RLLIB_TESTING=1 PYTHON=3.7 TF_VERSION=1.14.0 TFP_VERSION=0.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options)
--build_tests_only
--test_tag_filters=learning_tests_discrete,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph,-multi_gpu
--test_arg=--framework=tf
rllib/...
- label: ":brain: RLlib: Learning cont. actions TF1-static-graph (from rllib/tuned_examples/*.yaml)"
conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RLLIB_TESTING=1 PYTHON=3.7 TF_VERSION=1.14.0 TFP_VERSION=0.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options)
--build_tests_only
--test_tag_filters=learning_tests_continuous,-fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph,-multi_gpu
--test_arg=--framework=tf
rllib/...
- label: ":brain: RLlib: Learning discr. actions PyTorch (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning discr. actions PyTorch"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -76,7 +56,7 @@
--test_tag_filters=learning_tests_discrete,-fake_gpus,-tf_only,-tf2_only,-multi_gpu
--test_arg=--framework=torch
rllib/...
- label: ":brain: RLlib: Learning cont. actions PyTorch (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning cont. actions PyTorch"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -86,7 +66,7 @@
--test_tag_filters=learning_tests_continuous,-fake_gpus,-tf_only,-tf2_only,-multi_gpu
--test_arg=--framework=torch
rllib/...
- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs TF2-static-graph (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs TF2-static-graph"
conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -97,7 +77,7 @@
--test_arg=--framework=tf
rllib/...
# TODO: (sven) tf2 (eager) multi-GPU
- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs PyTorch (from rllib/tuned_examples/*.yaml)"
- label: ":brain: RLlib: Learning tests w/ 2 fake GPUs PyTorch"
conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
@ -108,6 +88,28 @@
--test_arg=--framework=torch
rllib/...
- label: ":brain: RLlib: Memory leak tests TF2-eager-tracing"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options)
--build_tests_only
--test_tag_filters=memory_leak_tests,-flaky
--test_arg=--framework=tf2
rllib/...
- label: ":brain: RLlib: Memory leak tests PyTorch"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options)
--build_tests_only
--test_tag_filters=memory_leak_tests,-flaky
--test_arg=--framework=torch
rllib/...
- label: ":brain: RLlib: Quick Agent train.py runs (TODO: obsolete)"
conditions: ["RAY_CI_RLLIB_DIRECTLY_AFFECTED"]
commands:
@ -152,7 +154,7 @@
# "learning_tests|quick_train|examples|tests_dir".
- bazel test --config=ci $(./scripts/bazel_export_options)
--build_tests_only
--test_tag_filters=-learning_tests,-quick_train,-examples,-tests_dir,-trainers_dir,-documentation,-multi_gpu
--test_tag_filters=-learning_tests,-quick_train,-memory_leak_tests,-examples,-tests_dir,-trainers_dir,-documentation,-multi_gpu
--test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
rllib/...
@ -164,14 +166,14 @@
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only
--test_tag_filters=examples_A,examples_B,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1 rllib/...
- label: ":brain: RLlib: Examples {Ca..t}"
- label: ":brain: RLlib: Examples {Ca..Ct}"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RLLIB_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only
--test_tag_filters=examples_C_AtoT,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1 rllib/...
- label: ":brain: RLlib: Examples {Cu..z}"
- label: ":brain: RLlib: Examples {Cu..Cz}"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT

View file

@ -35,6 +35,8 @@
# - Examples directory (everything in rllib/examples/...), tagged: "examples" and
# "examples_[A-Z]"
# - Memory leak tests tagged "memory_leak_tests".
# Note: The "examples" and "tests_dir" tags have further sub-tags going by the
# starting letter of the test name (e.g. "examples_A", or "tests_dir_F") for
# split-up purposes in buildkite.
@ -582,6 +584,14 @@ py_test(
srcs = ["agents/tests/test_callbacks.py"]
)
py_test(
name = "test_memory_leaks_generic",
main = "agents/tests/test_memory_leaks.py",
tags = ["team:ml", "trainers_dir"],
size = "large",
srcs = ["agents/tests/test_memory_leaks.py"]
)
py_test(
name = "test_trainer",
tags = ["team:ml", "trainers_dir", "trainers_dir_generic"],
@ -852,6 +862,82 @@ py_test(
)
# --------------------------------------------------------------------
# Memory leak tests
#
# Tag: memory_leak_tests
# --------------------------------------------------------------------
py_test(
name = "test_memory_leak_a3c",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/a3c/memory-leak-test-a3c.yaml"],
args = ["--yaml-dir=tuned_examples/a3c"]
)
py_test(
name = "test_memory_leak_appo",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/ppo/memory-leak-test-appo.yaml"],
args = ["--yaml-dir=tuned_examples/ppo"]
)
py_test(
name = "test_memory_leak_ddpg",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/ddpg/memory-leak-test-ddpg.yaml"],
args = ["--yaml-dir=tuned_examples/ddpg"]
)
py_test(
name = "test_memory_leak_dqn",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/dqn/memory-leak-test-dqn.yaml"],
args = ["--yaml-dir=tuned_examples/dqn"]
)
py_test(
name = "test_memory_leak_impala",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/impala/memory-leak-test-impala.yaml"],
args = ["--yaml-dir=tuned_examples/impala"]
)
py_test(
name = "test_memory_leak_ppo",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/ppo/memory-leak-test-ppo.yaml"],
args = ["--yaml-dir=tuned_examples/ppo"]
)
py_test(
name = "test_memory_leak_sac",
tags = ["team:ml", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/sac/memory-leak-test-sac.yaml"],
args = ["--yaml-dir=tuned_examples/sac"]
)
# --------------------------------------------------------------------
# Agents (quick training test iterations via `rllib train`)
#

View file

@ -92,16 +92,16 @@ class TestDDPG(unittest.TestCase):
trainer = ddpg.DDPGTrainer(config=config, env="Pendulum-v1")
# Setting explore=False should always return the same action.
a_ = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, 1)
check(trainer.get_policy().global_timestep, 1)
for i in range(50):
a = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, i + 2)
check(trainer.get_policy().global_timestep, i + 2)
check(a, a_)
# explore=None (default: explore) should return different actions.
actions = []
for i in range(50):
actions.append(trainer.compute_single_action(obs))
self.assertEqual(trainer.get_policy().global_timestep, i + 52)
check(trainer.get_policy().global_timestep, i + 52)
check(np.std(actions), 0.0, false=True)
trainer.stop()
@ -117,25 +117,25 @@ class TestDDPG(unittest.TestCase):
trainer = ddpg.DDPGTrainer(config=config, env="Pendulum-v1")
# ts=0 (get a deterministic action as per explore=False).
deterministic_action = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, 1)
check(trainer.get_policy().global_timestep, 1)
# ts=1-49 (in random window).
random_a = []
for i in range(1, 50):
random_a.append(trainer.compute_single_action(obs, explore=True))
self.assertEqual(trainer.get_policy().global_timestep, i + 1)
check(trainer.get_policy().global_timestep, i + 1)
check(random_a[-1], deterministic_action, false=True)
self.assertTrue(np.std(random_a) > 0.5)
# ts > 50 (a=deterministic_action + scale * N[0,1])
for i in range(50):
a = trainer.compute_single_action(obs, explore=True)
self.assertEqual(trainer.get_policy().global_timestep, i + 51)
check(trainer.get_policy().global_timestep, i + 51)
check(a, deterministic_action, rtol=0.1)
# ts >> 50 (BUT: explore=False -> expect deterministic action).
for i in range(50):
a = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, i + 101)
check(trainer.get_policy().global_timestep, i + 101)
check(a, deterministic_action)
trainer.stop()

View file

@ -52,16 +52,16 @@ class TestTD3(unittest.TestCase):
trainer = td3.TD3Trainer(config=lcl_config, env="Pendulum-v1")
# Setting explore=False should always return the same action.
a_ = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, 1)
check(trainer.get_policy().global_timestep, 1)
for i in range(50):
a = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, i + 2)
check(trainer.get_policy().global_timestep, i + 2)
check(a, a_)
# explore=None (default: explore) should return different actions.
actions = []
for i in range(50):
actions.append(trainer.compute_single_action(obs))
self.assertEqual(trainer.get_policy().global_timestep, i + 52)
check(trainer.get_policy().global_timestep, i + 52)
check(np.std(actions), 0.0, false=True)
trainer.stop()
@ -77,25 +77,25 @@ class TestTD3(unittest.TestCase):
trainer = td3.TD3Trainer(config=lcl_config, env="Pendulum-v1")
# ts=0 (get a deterministic action as per explore=False).
deterministic_action = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, 1)
check(trainer.get_policy().global_timestep, 1)
# ts=1-29 (in random window).
random_a = []
for i in range(1, 30):
random_a.append(trainer.compute_single_action(obs, explore=True))
self.assertEqual(trainer.get_policy().global_timestep, i + 1)
check(trainer.get_policy().global_timestep, i + 1)
check(random_a[-1], deterministic_action, false=True)
self.assertTrue(np.std(random_a) > 0.3)
# ts > 30 (a=deterministic_action + scale * N[0,1])
for i in range(50):
a = trainer.compute_single_action(obs, explore=True)
self.assertEqual(trainer.get_policy().global_timestep, i + 31)
check(trainer.get_policy().global_timestep, i + 31)
check(a, deterministic_action, rtol=0.1)
# ts >> 30 (BUT: explore=False -> expect deterministic action).
for i in range(50):
a = trainer.compute_single_action(obs, explore=False)
self.assertEqual(trainer.get_policy().global_timestep, i + 81)
check(trainer.get_policy().global_timestep, i + 81)
check(a, deterministic_action)
trainer.stop()

View file

@ -360,15 +360,6 @@ def from_importance_weights(
rhos = tf.math.exp(log_rhos)
if clip_rho_threshold is not None:
clipped_rhos = tf.minimum(clip_rho_threshold, rhos, name="clipped_rhos")
tf1.summary.histogram("clipped_rhos_1000", tf.minimum(1000.0, rhos))
tf1.summary.scalar(
"num_of_clipped_rhos",
tf.reduce_sum(
tf.cast(tf.equal(clipped_rhos, clip_rho_threshold), tf.int32)
),
)
tf1.summary.scalar("size_of_clipped_rhos", tf.size(clipped_rhos))
else:
clipped_rhos = rhos

View file

@ -0,0 +1,58 @@
import unittest
import ray
import ray.rllib.agents.dqn as dqn
import ray.rllib.agents.ppo as ppo
from ray.rllib.examples.env.memory_leaking_env import MemoryLeakingEnv
from ray.rllib.examples.policy.memory_leaking_policy import MemoryLeakingPolicy
from ray.rllib.policy.policy import PolicySpec
from ray.rllib.utils.debug.memory import check_memory_leaks
class TestMemoryLeaks(unittest.TestCase):
"""Generically tests our memory leak diagnostics tools."""
@classmethod
def setUpClass(cls):
ray.init()
@classmethod
def tearDownClass(cls):
ray.shutdown()
def test_leaky_env(self):
"""Tests, whether our diagnostics tools can detect leaks in an env."""
config = ppo.DEFAULT_CONFIG.copy()
# Make sure we have an env to test on the local worker.
# Otherwise, `check_memory_leaks` will complain.
config["create_env_on_driver"] = True
config["env"] = MemoryLeakingEnv
config["env_config"] = {
"static_samples": True,
}
trainer = ppo.PPOTrainer(config=config)
results = check_memory_leaks(trainer, to_check={"env"}, repeats=150)
assert results["env"]
trainer.stop()
def test_leaky_policy(self):
"""Tests, whether our diagnostics tools can detect leaks in a policy."""
config = dqn.DEFAULT_CONFIG.copy()
# Make sure we have an env to test on the local worker.
# Otherwise, `check_memory_leaks` will complain.
config["create_env_on_driver"] = True
config["env"] = "CartPole-v0"
config["multiagent"]["policies"] = {
"default_policy": PolicySpec(policy_class=MemoryLeakingPolicy),
}
trainer = dqn.DQNTrainer(config=config)
results = check_memory_leaks(trainer, to_check={"policy"}, repeats=300)
assert results["policy"]
trainer.stop()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -41,7 +41,7 @@ class MultiAgentEnv(gym.Env):
if not hasattr(self, "_agent_ids"):
self._agent_ids = set()
# do the action and observation spaces map from agent ids to spaces
# Do the action and observation spaces map from agent ids to spaces
# for the individual agents?
if not hasattr(self, "_spaces_in_preferred_format"):
self._spaces_in_preferred_format = None
@ -189,7 +189,6 @@ class MultiAgentEnv(gym.Env):
if agent_id != "__all__"
}
logger.warning("action_space_sample() has not been implemented")
del agent_ids
return {}
@ExperimentalAPI
@ -221,7 +220,6 @@ class MultiAgentEnv(gym.Env):
samples = {agent_id: samples[agent_id] for agent_id in agent_ids}
return samples
logger.warning("observation_space_sample() has not been implemented")
del agent_ids
return {}
@PublicAPI

View file

@ -237,11 +237,6 @@ class _VectorizedGymEnv(VectorEnv):
obs_batch, rew_batch, done_batch, info_batch = [], [], [], []
for i in range(self.num_envs):
obs, r, done, info = self.envs[i].step(actions[i])
if not np.isscalar(r) or not np.isreal(r) or not np.isfinite(r):
raise ValueError(
"Reward should be finite scalar, got {} ({}). "
"Actions={}.".format(r, type(r), actions[i])
)
if not isinstance(info, dict):
raise ValueError(
"Info should be a dict, got {} ({})".format(info, type(info))

View file

@ -42,9 +42,9 @@ from ray.rllib.policy.policy import Policy, PolicySpec
from ray.rllib.policy.policy_map import PolicyMap
from ray.rllib.policy.torch_policy import TorchPolicy
from ray.rllib.utils import force_list, merge_dicts, check_env
from ray.rllib.utils.annotations import Deprecated, DeveloperAPI, ExperimentalAPI
from ray.rllib.utils.annotations import DeveloperAPI, ExperimentalAPI
from ray.rllib.utils.debug import summarize, update_global_seed_if_necessary
from ray.rllib.utils.deprecation import deprecation_warning
from ray.rllib.utils.deprecation import Deprecated, deprecation_warning
from ray.rllib.utils.error import ERR_MSG_NO_GPUS, HOWTO_CHANGE_CONFIG
from ray.rllib.utils.filter import get_filter, Filter
from ray.rllib.utils.framework import try_import_tf, try_import_torch
@ -508,15 +508,6 @@ class RolloutWorker(ParallelIteratorWorker):
if self.env is not None:
# Validate environment (general validation function).
if not self._disable_env_checking:
logger.warning(
"We've added a module for checking environments that "
"are used in experiments. It will cause your "
"environment to fail if your environment is not set up"
"correctly. You can disable check env by setting "
"`disable_env_checking` to True in your experiment config "
"dictionary. You can run the environment checking module "
"standalone by calling ray.rllib.utils.check_env(env)."
)
check_env(self.env)
# Custom validation function given, typically a function attribute of the
# algorithm trainer.

View file

@ -167,7 +167,11 @@ class TestRolloutWorker(unittest.TestCase):
STEPS_SAMPLED_COUNTER, result["info"][STEPS_SAMPLED_COUNTER]
)
)
global_timesteps = policy.global_timestep
global_timesteps = (
policy.global_timestep
if fw == "tf"
else policy.global_timestep.numpy()
)
print("global_timesteps={}".format(global_timesteps))
expected_lr = 0.1 - ((0.1 - 0.000001) / 100000) * global_timesteps
lr = policy.cur_lr

View file

@ -109,8 +109,7 @@ if __name__ == "__main__":
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"num_workers": 0,
"model": {"custom_model": "eager_model"},
# Alternatively, use "tf2" here for enforcing TF version 2.x.
"framework": "tfe",
"framework": "tf2",
}
stop = {
"timesteps_total": args.stop_timesteps,

View file

@ -0,0 +1,35 @@
import logging
import uuid
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.utils.annotations import override
logger = logging.getLogger(__name__)
class MemoryLeakingEnv(RandomEnv):
"""An env that leaks very little memory.
Useful for proving that our memory-leak tests can catch the
slightest leaks.
"""
def __init__(self, config=None):
super().__init__(config)
self._leak = {}
self._steps_after_reset = 0
@override(RandomEnv)
def reset(self):
self._steps_after_reset = 0
return super().reset()
@override(RandomEnv)
def step(self, action):
self._steps_after_reset += 1
# Only leak once an episode.
if self._steps_after_reset == 2:
self._leak[uuid.uuid4().hex.upper()] = 1
return super().step(action)

View file

@ -149,7 +149,7 @@ class FlexAgentsMultiAgent(MultiAgentEnv):
self.dones.add(i)
# Sometimes, add a new agent to the episode.
if random.random() > 0.75:
if random.random() > 0.75 and len(action_dict) > 0:
i = self.spawn()
obs[i], rew[i], done[i], info[i] = self.agents[i].step(action)
if done[i]:

View file

@ -1,3 +1,4 @@
import copy
import gym
from gym.spaces import Discrete, Tuple
import numpy as np
@ -26,6 +27,11 @@ class RandomEnv(gym.Env):
"reward_space",
gym.spaces.Box(low=-1.0, high=1.0, shape=(), dtype=np.float32),
)
self.static_samples = config.get("static_samples", False)
if self.static_samples:
self.observation_sample = self.observation_space.sample()
self.reward_sample = self.reward_space.sample()
# Chance that an episode ends at any step.
# Note that a max episode length can be specified via
# `max_episode_len`.
@ -42,7 +48,10 @@ class RandomEnv(gym.Env):
def reset(self):
self.steps = 0
return self.observation_space.sample()
if not self.static_samples:
return self.observation_space.sample()
else:
return copy.deepcopy(self.observation_sample)
def step(self, action):
if self.check_action_bounds and not self.action_space.contains(action):
@ -67,12 +76,20 @@ class RandomEnv(gym.Env):
np.random.choice([True, False], p=[self.p_done, 1.0 - self.p_done])
)
return (
self.observation_space.sample(),
float(self.reward_space.sample()),
done,
{},
)
if not self.static_samples:
return (
self.observation_space.sample(),
self.reward_space.sample(),
done,
{},
)
else:
return (
copy.deepcopy(self.observation_sample),
copy.deepcopy(self.reward_sample),
done,
{},
)
# Multi-agent version of the RandomEnv.

View file

@ -0,0 +1,38 @@
from ray.rllib.examples.policy.random_policy import RandomPolicy
from ray.rllib.utils.annotations import override
class MemoryLeakingPolicy(RandomPolicy):
"""A Policy that leaks very little memory.
Useful for proving that our memory-leak tests can catch the
slightest leaks.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._leak = []
@override(RandomPolicy)
def compute_actions(self, *args, **kwargs):
# Leak.
self._leak.append(1.5)
return super().compute_actions(*args, **kwargs)
@override(RandomPolicy)
def compute_actions_from_input_dict(self, *args, **kwargs):
# Leak.
self._leak.append(1)
return super().compute_actions_from_input_dict(*args, **kwargs)
@override(RandomPolicy)
def learn_on_batch(self, samples):
# Leak.
self._leak.append(False)
return super().learn_on_batch(samples)
@override(RandomPolicy)
def compute_log_likelihoods(self, *args, **kwargs):
# Leak.
self._leak.append("test")
return super().compute_log_likelihoods(*args, **kwargs)

View file

@ -1,9 +1,10 @@
import random
import numpy as np
from gym.spaces import Box
import numpy as np
import random
import tree # pip install dm_tree
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import ModelWeights
@ -67,3 +68,13 @@ class RandomPolicy(Policy):
def set_weights(self, weights: ModelWeights) -> None:
"""No weights to set."""
pass
@override(Policy)
def _get_dummy_batch_from_view_requirements(self, batch_size: int = 1):
return SampleBatch(
{
SampleBatch.OBS: tree.map_structure(
lambda s: s[None], self.observation_space.sample()
),
}
)

View file

@ -34,18 +34,18 @@ logger = logging.getLogger(__name__)
def _convert_to_tf(x, dtype=None):
if isinstance(x, SampleBatch):
dict_ = {k: v for k, v in x.items() if k != SampleBatch.INFOS}
return tf.nest.map_structure(_convert_to_tf, dict_)
return tree.map_structure(_convert_to_tf, dict_)
elif isinstance(x, Policy):
return x
# Special handling of "Repeated" values.
elif isinstance(x, RepeatedValues):
return RepeatedValues(
tf.nest.map_structure(_convert_to_tf, x.values), x.lengths, x.max_len
tree.map_structure(_convert_to_tf, x.values), x.lengths, x.max_len
)
if x is not None:
d = dtype
return tf.nest.map_structure(
return tree.map_structure(
lambda f: _convert_to_tf(f, d)
if isinstance(f, RepeatedValues)
else tf.convert_to_tensor(f, d)
@ -311,6 +311,12 @@ def build_eager_tf_policy(
self.framework = config.get("framework", "tfe")
Policy.__init__(self, observation_space, action_space, config)
# Global timestep should be a tensor.
self.global_timestep = tf.Variable(0, trainable=False, dtype=tf.int64)
self.explore = tf.Variable(
self.config["explore"], trainable=False, dtype=tf.bool
)
# Log device and worker index.
from ray.rllib.evaluation.rollout_worker import get_global_worker
@ -438,7 +444,7 @@ def build_eager_tf_policy(
after_init(self, observation_space, action_space, config)
# Got to reset global_timestep again after fake run-throughs.
self.global_timestep = 0
self.global_timestep.assign(0)
@override(Policy)
def compute_actions_from_input_dict(
@ -455,7 +461,7 @@ def build_eager_tf_policy(
self._is_training = False
explore = explore if explore is not None else self.config["explore"]
explore = explore if explore is not None else self.explore
timestep = timestep if timestep is not None else self.global_timestep
if isinstance(timestep, tf.Tensor):
timestep = int(timestep.numpy())
@ -485,7 +491,7 @@ def build_eager_tf_policy(
timestep,
)
# Update our global timestep by the batch size.
self.global_timestep += int(tree.flatten(ret[0])[0].shape[0])
self.global_timestep.assign_add(tree.flatten(ret[0])[0].shape.as_list()[0])
return convert_to_numpy(ret)
@override(Policy)
@ -501,7 +507,6 @@ def build_eager_tf_policy(
timestep=None,
**kwargs,
):
# Create input dict to simply pass the entire call to
# self.compute_actions_from_input_dict().
input_dict = SampleBatch(
@ -693,6 +698,7 @@ def build_eager_tf_policy(
@override(Policy)
def get_state(self):
state = super().get_state()
state["global_timestep"] = state["global_timestep"].numpy()
if self._optimizer and len(self._optimizer.variables()) > 0:
state["_optimizer_variables"] = self._optimizer.variables()
# Add exploration state.
@ -701,7 +707,6 @@ def build_eager_tf_policy(
@override(Policy)
def set_state(self, state):
state = state.copy() # shallow copy
# Set optimizer vars first.
optimizer_vars = state.get("_optimizer_variables", None)
if optimizer_vars and self._optimizer.variables():
@ -716,8 +721,9 @@ def build_eager_tf_policy(
# Set exploration's state.
if hasattr(self, "exploration") and "_exploration_state" in state:
self.exploration.set_state(state=state["_exploration_state"])
# Then the Policy's (NN) weights.
super().set_state(state)
# Weights and global_timestep (tf vars).
self.set_weights(state["weights"])
self.global_timestep.assign(state["global_timestep"])
@override(Policy)
def export_checkpoint(self, export_dir):

View file

@ -737,7 +737,11 @@ class Policy(metaclass=ABCMeta):
"""
# Store the current global time step (sum over all policies' sample
# steps).
self.global_timestep = global_vars["timestep"]
# Make sure, we keep global_timestep as a Tensor.
if self.framework in ["tf2", "tfe"]:
self.global_timestep.assign(global_vars["timestep"])
else:
self.global_timestep = global_vars["timestep"]
@DeveloperAPI
def export_checkpoint(self, export_dir: str) -> None:

View file

@ -138,13 +138,12 @@ class SampleBatch(dict):
# value that is actually a ndarray/tensor. This would fail if
# all values are nested dicts/tuples of more complex underlying
# structures.
len_ = (
len(v)
if isinstance(v, (list, np.ndarray)) or (torch and torch.is_tensor(v))
else None
)
if len_:
lengths.append(len_)
try:
len_ = len(v) if not isinstance(v, (dict, tuple)) else None
if len_:
lengths.append(len_)
except Exception:
pass
if (
self.get(SampleBatch.SEQ_LENS) is not None

View file

@ -1187,11 +1187,12 @@ class TFPolicy(Policy):
# Build the feed dict from the batch.
feed_dict = {}
for key, placeholders in self._loss_input_dict.items():
tree.map_structure(
a = tree.map_structure(
lambda ph, v: feed_dict.__setitem__(ph, v),
placeholders,
train_batch[key],
)
del a
state_keys = ["state_in_{}".format(i) for i in range(len(self._state_inputs))]
for key in state_keys:

View file

@ -4,7 +4,7 @@ import unittest
import ray
import ray.rllib.agents.pg as pg
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.utils.test_utils import framework_iterator
from ray.rllib.utils.test_utils import check, framework_iterator
class TestTimeSteps(unittest.TestCase):
@ -32,19 +32,19 @@ class TestTimeSteps(unittest.TestCase):
for i in range(1, 21):
trainer.compute_single_action(obs)
self.assertEqual(policy.global_timestep, i)
check(policy.global_timestep, i)
for i in range(1, 21):
policy.compute_actions(obs_batch)
self.assertEqual(policy.global_timestep, i + 20)
check(policy.global_timestep, i + 20)
# Artificially set ts to 100Bio, then keep computing actions and
# train.
crazy_timesteps = int(1e11)
policy.global_timestep = crazy_timesteps
policy.on_global_var_update({"timestep": crazy_timesteps})
# Run for 10 more ts.
for i in range(1, 11):
policy.compute_actions(obs_batch)
self.assertEqual(policy.global_timestep, i + crazy_timesteps)
check(policy.global_timestep, i + crazy_timesteps)
trainer.train()

View file

@ -0,0 +1,15 @@
memory-leak-test-a3c:
stop:
timesteps_total: 150000
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv
run: A3C
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
num_workers: 4
num_envs_per_worker: 5

View file

@ -0,0 +1,12 @@
memory-leak-test-ddpg:
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnvContActions
run: DDPG
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
buffer_size: 500 # use small buffer to catch memory leaks

View file

@ -0,0 +1,12 @@
memory-leak-test-dqn:
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv
run: DQN
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
buffer_size: 500 # use small buffer to catch memory leaks

View file

@ -0,0 +1,17 @@
memory-leak-test-impala:
stop:
timesteps_total: 150000
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv
run: IMPALA
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
num_gpus: 0
num_workers: 3
num_aggregation_workers: 1
num_envs_per_worker: 5

View file

@ -0,0 +1,14 @@
memory-leak-test-appo:
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv
run: APPO
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
num_workers: 4
num_envs_per_worker: 5
rollout_fragment_length: 20

View file

@ -0,0 +1,17 @@
memory-leak-test-ppo:
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnv
run: PPO
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
rollout_fragment_length: 20
num_workers: 4
num_envs_per_worker: 5
train_batch_size: 1000
sgd_minibatch_size: 256
num_sgd_iter: 5

View file

@ -0,0 +1,12 @@
memory-leak-test-sac:
env:
ray.rllib.examples.env.random_env.RandomLargeObsSpaceEnvContActions
run: SAC
config:
# Works for both torch and tf.
framework: tf
# Switch off np.random, which is known to have memory leaks.
env_config:
config:
static_samples: true
buffer_size: 500 # use small buffer to catch memory leaks

View file

@ -0,0 +1,10 @@
from ray.rllib.utils.debug.deterministic import update_global_seed_if_necessary
from ray.rllib.utils.debug.memory import check_memory_leaks
from ray.rllib.utils.debug.summary import summarize
__all__ = [
"check_memory_leaks",
"summarize",
"update_global_seed_if_necessary",
]

View file

@ -0,0 +1,54 @@
import numpy as np
import os
import random
from typing import Optional
from ray.rllib.utils.framework import try_import_tf, try_import_torch
def update_global_seed_if_necessary(
framework: Optional[str] = None, seed: Optional[int] = None
) -> None:
"""Seed global modules such as random, numpy, torch, or tf.
This is useful for debugging and testing.
Args:
framework: The framework specifier (may be None).
seed: An optional int seed. If None, will not do
anything.
"""
if seed is None:
return
# Python random module.
random.seed(seed)
# Numpy.
np.random.seed(seed)
# Torch.
if framework == "torch":
torch, _ = try_import_torch()
torch.manual_seed(seed)
# See https://github.com/pytorch/pytorch/issues/47672.
cuda_version = torch.version.cuda
if cuda_version is not None and float(torch.version.cuda) >= 10.2:
os.environ["CUBLAS_WORKSPACE_CONFIG"] = "4096:8"
else:
from distutils.version import LooseVersion
if LooseVersion(torch.__version__) >= LooseVersion("1.8.0"):
# Not all Operations support this.
torch.use_deterministic_algorithms(True)
else:
torch.set_deterministic(True)
# This is only for Convolution no problem.
torch.backends.cudnn.deterministic = True
elif framework == "tf2" or framework == "tfe":
tf1, tf, _ = try_import_tf()
# Tf2.x.
if framework == "tf2":
tf.random.set_seed(seed)
# Tf-eager.
elif framework == "tfe":
tf1.set_random_seed(seed)

362
rllib/utils/debug/memory.py Normal file
View file

@ -0,0 +1,362 @@
from collections import defaultdict, namedtuple
import numpy as np
import os
import re
import scipy
import tracemalloc
import tree # pip install dm_tree
from typing import Callable, DefaultDict, List, Optional, Set
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch
# A suspicious memory-allocating stack-trace that we should re-test
# to make sure it's not a false positive.
Suspect = namedtuple(
"Suspect",
[
# The stack trace of the allocation, going back n frames, depending
# on the tracemalloc.start(n) call.
"traceback",
# The amount of memory taken by this particular stack trace
# over the course of the experiment.
"memory_increase",
# The slope of the scipy linear regression (x=iteration; y=memory size).
"slope",
# The rvalue of the scipy linear regression.
"rvalue",
# The memory size history (list of all memory sizes over all iterations).
"hist",
],
)
def check_memory_leaks(
trainer,
to_check: Optional[Set[str]] = None,
repeats: Optional[int] = None,
max_num_trials: int = 3,
) -> DefaultDict[str, List[Suspect]]:
"""Diagnoses the given trainer for possible memory leaks.
Isolates single components inside the trainer's local worker, e.g. the env,
policy, etc.. and calls some of their methods repeatedly, while checking
the memory footprints and keeping track of which lines in the code add
un-GC'd items to memory.
Args:
trainer: The Trainer instance to test.
to_check: Set of strings to indentify components to test. Allowed strings
are: "env", "policy", "model", "rollout_worker". By default, check all
of these.
repeats: Number of times the test code block should get executed (per trial).
If a trial fails, a new trial may get started with a larger number of
repeats: actual_repeats = `repeats` * (trial + 1) (1st trial == 0).
max_num_trials: The maximum number of trials to run each check for.
Raises:
A defaultdict(list) with keys being the `to_check` strings and values being
lists of Suspect instances that were found.
"""
local_worker = trainer.workers.local_worker()
# Which components should we test?
to_check = to_check or {"env", "model", "policy", "rollout_worker"}
results_per_category = defaultdict(list)
# Test a single sub-env (first in the VectorEnv)?
if "env" in to_check:
assert local_worker.async_env is not None, (
"ERROR: Cannot test 'env' since given trainer does not have one "
"in its local worker. Try setting `create_env_on_driver=True`."
)
# Isolate the first sub-env in the vectorized setup and test it.
env = local_worker.async_env.get_sub_environments()[0]
action_space = env.action_space
# Always use same action to avoid numpy random caused memory leaks.
action_sample = action_space.sample()
def code():
env.reset()
while True:
# If masking is used, try something like this:
# np.random.choice(
# action_space.n, p=(obs["action_mask"] / sum(obs["action_mask"])))
_, _, done, _ = env.step(action_sample)
if done:
break
test = _test_some_code_for_memory_leaks(
desc="Looking for leaks in env, running through episodes.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 200,
max_num_trials=max_num_trials,
)
if test:
results_per_category["env"].extend(test)
# Test the policy (single-agent case only so far).
if "policy" in to_check:
policy = local_worker.policy_map[DEFAULT_POLICY_ID]
# Get a fixed obs (B=10).
obs = tree.map_structure(
lambda s: np.stack([s] * 10, axis=0), policy.observation_space.sample()
)
print("Looking for leaks in Policy")
def code():
policy.compute_actions_from_input_dict(
{
"obs": obs,
}
)
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `compute_actions_from_input_dict()`.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 400,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["policy"].extend(test)
# Call `learn_on_batch()` n times.
dummy_batch = policy._get_dummy_batch_from_view_requirements(batch_size=16)
test = _test_some_code_for_memory_leaks(
desc="Calling `learn_on_batch()`.",
init=None,
code=lambda: policy.learn_on_batch(dummy_batch),
# How many times to repeat the function call?
repeats=repeats or 100,
max_num_trials=max_num_trials,
)
if test:
results_per_category["policy"].extend(test)
# Test only the model.
if "model" in to_check:
policy = local_worker.policy_map[DEFAULT_POLICY_ID]
# Get a fixed obs.
obs = tree.map_structure(lambda s: s[None], policy.observation_space.sample())
print("Looking for leaks in Model")
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `[model]()`.",
init=None,
code=lambda: policy.model({SampleBatch.OBS: obs}),
# How many times to repeat the function call?
repeats=repeats or 400,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["model"].extend(test)
# Test the RolloutWorker.
if "rollout_worker" in to_check:
print("Looking for leaks in local RolloutWorker")
def code():
local_worker.sample()
local_worker.get_metrics()
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `sample()` and `get_metrics()`.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 200,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["rollout_worker"].extend(test)
return results_per_category
def _test_some_code_for_memory_leaks(
desc: str,
init: Optional[Callable[[], None]],
code: Callable[[], None],
repeats: int,
max_num_trials: int = 1,
) -> List[Suspect]:
"""Runs given code (and init code) n times and checks for memory leaks.
Args:
desc: A descriptor of the test.
init: Optional code to be executed initially.
code: The actual code to be checked for producing memory leaks.
repeats: How many times to repeatedly execute `code`.
max_num_trials: The maximum number of trials to run. A new trial is only
run, if the previous one produced a memory leak. For all non-1st trials,
`repeats` calculates as: actual_repeats = `repeats` * (trial + 1), where
the first trial is 0.
Returns:
A list of Suspect objects, describing possible memory leaks. If list
is empty, no leaks have been found.
"""
def _i_print(i):
if (i + 1) % 10 == 0:
print(".", end="" if (i + 1) % 100 else f" {i + 1}\n", flush=True)
# Do n trials to make sure a found leak is really one.
suspicious = set()
suspicious_stats = []
for trial in range(max_num_trials):
# Store up to n frames of each call stack.
tracemalloc.start(20)
table = defaultdict(list)
# Repeat running code for n times.
# Increase repeat value with each trial to make sure stats are more
# solid each time (avoiding false positives).
actual_repeats = repeats * (trial + 1)
print(f"{desc} {actual_repeats} times.")
# Initialize if necessary.
if init is not None:
init()
# Run `code` n times, each time taking a memory snapshot.
for i in range(actual_repeats):
_i_print(i)
code()
_take_snapshot(table, suspicious)
print("\n")
# Check, which traces have moved up in their memory consumption
# constantly over time.
suspicious.clear()
suspicious_stats.clear()
# Suspicious memory allocation found?
suspects = _find_memory_leaks_in_table(table)
for suspect in sorted(suspects, key=lambda s: s.memory_increase, reverse=True):
# Only print out the biggest offender:
if len(suspicious) == 0:
_pprint_suspect(suspect)
print("-> added to retry list")
suspicious.add(suspect.traceback)
suspicious_stats.append(suspect)
tracemalloc.stop()
# Some suspicious memory allocations found.
if len(suspicious) > 0:
print(f"{len(suspicious)} suspects found. Top-ten:")
for i, s in enumerate(suspicious_stats):
if i > 10:
break
print(
f"{i}) line={s.traceback[-1]} mem-increase={s.memory_increase}B "
f"slope={s.slope}B/detection rval={s.rvalue}"
)
# Nothing suspicious found -> Exit trial loop and return.
else:
print("No remaining suspects found -> returning")
break
# Print out final top offender.
if len(suspicious_stats) > 0:
_pprint_suspect(suspicious_stats[0])
return suspicious_stats
def _take_snapshot(table, suspicious=None):
# Take a memory snapshot.
snapshot = tracemalloc.take_snapshot()
# Group all memory allocations by their stacktrace (going n frames
# deep as defined above in tracemalloc.start(n)).
# Then sort groups by size, then count, then trace.
top_stats = snapshot.statistics("traceback")
# For the first m largest increases, keep only, if a) first trial or b) those
# that are already in the `suspicious` set.
for stat in top_stats[:100]:
if not suspicious or stat.traceback in suspicious:
table[stat.traceback].append(stat.size)
def _find_memory_leaks_in_table(table):
suspects = []
for traceback, hist in table.items():
# Do a quick mem increase check.
memory_increase = hist[-1] - hist[0]
# Only if memory increased, do we check further.
if memory_increase <= 0.0:
continue
# Ignore this very module here (we are collecting lots of data
# so an increase is expected).
top_stack = str(traceback[-1])
drive_separator = "\\\\" if os.name == "nt" else "/"
if any(
s in top_stack
for s in [
"tracemalloc",
"pycharm",
"thirdparty_files/psutil",
re.sub("\\.", drive_separator, __name__) + ".py",
]
):
continue
# Do a linear regression to get the slope and R-value.
line = scipy.stats.linregress(x=np.arange(len(hist)), y=np.array(hist))
# - If weak positive slope and some confidence and
# increase > n bytes -> error.
# - If stronger positive slope -> error.
if memory_increase > 1000 and (
(line.slope > 40.0 and line.rvalue > 0.85)
or (line.slope > 20.0 and line.rvalue > 0.9)
or (line.slope > 10.0 and line.rvalue > 0.95)
):
suspects.append(
Suspect(
traceback=traceback,
memory_increase=memory_increase,
slope=line.slope,
rvalue=line.rvalue,
hist=hist,
)
)
return suspects
def _pprint_suspect(suspect):
print(
"Most suspicious memory allocation in traceback "
"(only printing out this one, but all (less suspicious)"
" suspects will be investigated as well):"
)
print("\n".join(suspect.traceback.format()))
print(f"Increase total={suspect.memory_increase}B")
print(f"Slope={suspect.slope} B/detection")
print(f"Rval={suspect.rvalue}")

View file

@ -1,11 +1,8 @@
import numpy as np
import os
import pprint
import random
from typing import Any, Mapping, Optional
from typing import Any, Mapping
from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch
from ray.rllib.utils.framework import try_import_tf, try_import_torch
_printer = pprint.PrettyPrinter(indent=2, width=60)
@ -78,51 +75,3 @@ class _StringValue:
def __repr__(self):
return self.value
def update_global_seed_if_necessary(
framework: Optional[str] = None, seed: Optional[int] = None
) -> None:
"""Seed global modules such as random, numpy, torch, or tf.
This is useful for debugging and testing.
Args:
framework: The framework specifier (may be None).
seed: An optional int seed. If None, will not do
anything.
"""
if seed is None:
return
# Python random module.
random.seed(seed)
# Numpy.
np.random.seed(seed)
# Torch.
if framework == "torch":
torch, _ = try_import_torch()
torch.manual_seed(seed)
# See https://github.com/pytorch/pytorch/issues/47672.
cuda_version = torch.version.cuda
if cuda_version is not None and float(torch.version.cuda) >= 10.2:
os.environ["CUBLAS_WORKSPACE_CONFIG"] = "4096:8"
else:
from distutils.version import LooseVersion
if LooseVersion(torch.__version__) >= LooseVersion("1.8.0"):
# Not all Operations support this.
torch.use_deterministic_algorithms(True)
else:
torch.set_deterministic(True)
# This is only for Convolution no problem.
torch.backends.cudnn.deterministic = True
elif framework == "tf2" or framework == "tfe":
tf1, tf, _ = try_import_tf()
# Tf2.x.
if framework == "tf2":
tf.random.set_seed(seed)
# Tf-eager.
elif framework == "tfe":
tf1.set_random_seed(seed)

View file

@ -1,13 +1,14 @@
"""Common pre-checks for all RLlib experiments."""
import logging
import numpy as np
from typing import TYPE_CHECKING, Set
import gym
import numpy as np
import traceback
from typing import TYPE_CHECKING, Set
from ray.actor import ActorHandle
from ray.rllib.utils.spaces.space_utils import convert_element_to_space_type
from ray.rllib.utils.typing import EnvType
from ray.util import log_once
if TYPE_CHECKING:
from ray.rllib.env import BaseEnv, MultiAgentEnv
@ -40,35 +41,48 @@ def check_env(env: EnvType) -> None:
logger.warning("Skipping env checking for this experiment")
return
if not isinstance(
env,
(
BaseEnv,
gym.Env,
MultiAgentEnv,
RemoteBaseEnv,
VectorEnv,
ExternalMultiAgentEnv,
ExternalEnv,
ActorHandle,
),
):
try:
if not isinstance(
env,
(
BaseEnv,
gym.Env,
MultiAgentEnv,
RemoteBaseEnv,
VectorEnv,
ExternalMultiAgentEnv,
ExternalEnv,
ActorHandle,
),
):
raise ValueError(
"Env must be one of the supported types: BaseEnv, gym.Env, "
"MultiAgentEnv, VectorEnv, RemoteBaseEnv, ExternalMultiAgentEnv, "
f"ExternalEnv, but instead was a {type(env)}"
)
if isinstance(env, MultiAgentEnv):
check_multiagent_environments(env)
elif isinstance(env, gym.Env):
check_gym_environments(env)
elif isinstance(env, BaseEnv):
check_base_env(env)
else:
logger.warning(
"Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, "
"ExternalMultiAgentEnv,or ExternalEnvs or Environments that are "
"Ray actors"
)
except Exception:
actual_error = traceback.format_exc()
raise ValueError(
"Env must be one of the supported types: BaseEnv, gym.Env, "
"MultiAgentEnv, VectorEnv, RemoteBaseEnv, ExternalMultiAgentEnv, "
f"ExternalEnv, but instead was a {type(env)}"
)
if isinstance(env, MultiAgentEnv):
check_multiagent_environments(env)
elif isinstance(env, gym.Env):
check_gym_environments(env)
elif isinstance(env, BaseEnv):
check_base_env(env)
else:
logger.warning(
"Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, "
"ExternalMultiAgentEnv,or ExternalEnvs or Environments that are Ray actors"
f"{actual_error}\n"
"The above error has been found in your environment! "
"We've added a module for checking your custom environments. It "
"may cause your experiment to fail if your environment is not set up"
"correctly. You can disable this behavior by setting "
"`disable_env_checking=True` in your config "
"dictionary. You can run the environment checking module "
"standalone by calling ray.rllib.utils.check_env([env])."
)
@ -191,6 +205,23 @@ def check_multiagent_environments(env: "MultiAgentEnv") -> None:
if not isinstance(env, MultiAgentEnv):
raise ValueError("The passed env is not a MultiAgentEnv.")
elif not (
hasattr(env, "observation_space")
and hasattr(env, "action_space")
and hasattr(env, "_agent_ids")
and hasattr(env, "_spaces_in_preferred_format")
):
if log_once("ma_env_super_ctor_called"):
logger.warning(
f"Your MultiAgentEnv {env} does not have some or all of the needed "
"base-class attributes! Make sure you call `super().__init__` from "
"within your MutiAgentEnv's constructor. "
"This will raise an error in the future."
)
env.observation_space = (
env.action_space
) = env._spaces_in_preferred_format = None
env._agent_ids = set()
reset_obs = env.reset()
sampled_obs = env.observation_space_sample()
@ -331,12 +362,17 @@ def _check_reward(reward, base_env=False, agent_ids=None):
for _, multi_agent_dict in reward.items():
for agent_id, rew in multi_agent_dict.items():
if not (
np.isreal(rew) and not isinstance(rew, bool) and np.isscalar(rew)
np.isreal(rew)
and not isinstance(rew, bool)
and (
np.isscalar(rew)
or (isinstance(rew, np.ndarray) and rew.shape == ())
)
):
error = (
"Your step function must return rewards that are"
f" integer or float. reward: {rew}. Instead it was a "
f"{type(reward)}"
f"{type(rew)}"
)
raise ValueError(error)
if not (agent_id in agent_ids or agent_id == "__all__"):
@ -347,7 +383,12 @@ def _check_reward(reward, base_env=False, agent_ids=None):
)
raise ValueError(error)
elif not (
np.isreal(reward) and not isinstance(reward, bool) and np.isscalar(reward)
np.isreal(reward)
and not isinstance(reward, bool)
and (
np.isscalar(reward)
or (isinstance(reward, np.ndarray) and reward.shape == ())
)
):
error = (
"Your step function must return a reward that is integer or float. "

View file

@ -177,7 +177,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False):
assert isinstance(y, dict), "ERROR: If x is dict, y needs to be a dict as well!"
y_keys = set(x.keys())
for key, value in x.items():
assert key in y, "ERROR: y does not have x's key='{}'! y={}".format(key, y)
assert key in y, f"ERROR: y does not have x's key='{key}'! y={y}"
check(value, y[key], decimals=decimals, atol=atol, rtol=rtol, false=false)
y_keys.remove(key)
assert not y_keys, "ERROR: y contains keys ({}) that are not in x! y={}".format(
@ -198,21 +198,21 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False):
# Boolean comparison.
elif isinstance(x, (np.bool_, bool)):
if false is True:
assert bool(x) is not bool(y), "ERROR: x ({}) is y ({})!".format(x, y)
assert bool(x) is not bool(y), f"ERROR: x ({x}) is y ({y})!"
else:
assert bool(x) is bool(y), "ERROR: x ({}) is not y ({})!".format(x, y)
assert bool(x) is bool(y), f"ERROR: x ({x}) is not y ({y})!"
# Nones or primitives.
elif x is None or y is None or isinstance(x, (str, int)):
if false is True:
assert x != y, "ERROR: x ({}) is the same as y ({})!".format(x, y)
assert x != y, f"ERROR: x ({x}) is the same as y ({y})!"
else:
assert x == y, "ERROR: x ({}) is not the same as y ({})!".format(x, y)
assert x == y, f"ERROR: x ({x}) is not the same as y ({y})!"
# String/byte comparisons.
elif hasattr(x, "dtype") and (x.dtype == object or str(x.dtype).startswith("<U")):
try:
np.testing.assert_array_equal(x, y)
if false is True:
assert False, "ERROR: x ({}) is the same as y ({})!".format(x, y)
assert False, f"ERROR: x ({x}) is the same as y ({y})!"
except AssertionError as e:
if false is False:
raise e
@ -260,7 +260,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False):
else:
# If false is set -> raise error (not expected to be equal).
if false is True:
assert False, "ERROR: x ({}) is the same as y ({})!".format(x, y)
assert False, f"ERROR: x ({x}) is the same as y ({y})!"
# Using atol/rtol.
else:
@ -276,7 +276,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False):
raise e
else:
if false is True:
assert False, "ERROR: x ({}) is the same as y ({})!".format(x, y)
assert False, f"ERROR: x ({x}) is the same as y ({y})!"
def check_compute_single_action(

View file

@ -0,0 +1,129 @@
#!/usr/bin/env python
# Runs one or more memory leak tests.
#
# Example usage:
# $ python run_memory_leak_tests.py memory-leak-test-ppo.yaml
#
# When using in BAZEL (with py_test), e.g. see in ray/rllib/BUILD:
# py_test(
# name = "memory_leak_ppo",
# main = "tests/test_memory_leak.py",
# tags = ["memory_leak_tests"],
# size = "medium", # 5min timeout
# srcs = ["tests/test_memory_leak.py"],
# data = glob(["tuned_examples/ppo/*.yaml"]),
# # Pass `BAZEL` option and the path to look for yaml files.
# args = ["BAZEL", "tuned_examples/ppo/memory-leak-test-ppo.yaml"]
# )
import argparse
import os
from pathlib import Path
import sys
import yaml
import ray
from ray.rllib.agents.registry import get_trainer_class
from ray.rllib.utils.debug.memory import check_memory_leaks
from ray.rllib import _register_all
parser = argparse.ArgumentParser()
parser.add_argument(
"--framework",
required=False,
choices=["jax", "tf2", "tf", "tfe", "torch", None],
default=None,
help="The deep learning framework to use.",
)
parser.add_argument(
"--yaml-dir",
required=True,
type=str,
help="The directory in which to find all yamls to test.",
)
parser.add_argument(
"--local-mode",
action="store_true",
help="Run ray in local mode for easier debugging.",
)
parser.add_argument(
"--to-check",
nargs="+",
default=["env", "policy", "rollout_worker"],
help="List of 'env', 'policy', 'rollout_worker', 'model'.",
)
if __name__ == "__main__":
args = parser.parse_args()
# Bazel regression test mode: Get path to look for yaml files.
# Get the path or single file to use.
rllib_dir = Path(__file__).parent.parent.parent
print("rllib dir={}".format(rllib_dir))
abs_yaml_path = os.path.join(rllib_dir, args.yaml_dir)
# Single file given.
if os.path.isfile(abs_yaml_path):
yaml_files = [abs_yaml_path]
# Given path/file does not exist.
elif not os.path.isdir(abs_yaml_path):
raise ValueError("yaml-dir ({}) not found!".format(args.yaml_dir))
# Path given -> Get all yaml files in there via rglob.
else:
yaml_files = rllib_dir.rglob(args.yaml_dir + "/*.yaml")
yaml_files = sorted(
map(lambda path: str(path.absolute()), yaml_files), reverse=True
)
print("Will run the following memory-leak tests:")
for yaml_file in yaml_files:
print("->", yaml_file)
# Loop through all collected files.
for yaml_file in yaml_files:
experiments = yaml.safe_load(open(yaml_file).read())
assert (
len(experiments) == 1
), "Error, can only run a single experiment per yaml file!"
experiment = list(experiments.values())[0]
# Add framework option to exp configs.
if args.framework:
experiment["config"]["framework"] = args.framework
# Create env on local_worker for memory leak testing just the env.
experiment["config"]["create_env_on_driver"] = True
# Always run with eager-tracing when framework=tf2 if not in local-mode.
if args.framework in ["tf2", "tfe"] and not args.local_mode:
experiment["config"]["eager_tracing"] = True
# experiment["config"]["callbacks"] = MemoryTrackingCallbacks
# Move "env" specifier into config.
experiment["config"]["env"] = experiment["env"]
experiment.pop("env", None)
# Print out the actual config.
print("== Test config ==")
print(yaml.dump(experiment))
# Construct the trainer instance based on the given config.
leaking = True
try:
ray.init(num_cpus=5, local_mode=args.local_mode)
trainer = get_trainer_class(experiment["run"])(experiment["config"])
results = check_memory_leaks(
trainer,
to_check=set(args.to_check),
)
if not results:
leaking = False
finally:
ray.shutdown()
_register_all()
if not leaking:
print("Memory leak test PASSED")
else:
print("Memory leak test FAILED. Exiting with Error.")
sys.exit(1)