mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[rllib] Switch DQN to using deepmind wrappers (#1655)
* deepmind wrap * use 80x80 * respect custom prep * fix replay size * fix chekc * batch idx * Wed Mar 7 11:00:39 PST 2018 * random starts and reward clipping * Fri Mar 9 17:27:17 PST 2018 * Fri Mar 9 17:36:15 PST 2018 * Sat Mar 10 19:47:10 PST 2018 * Sat Mar 10 19:47:37 PST 2018 * Sat Mar 10 20:05:12 PST 2018 * Sat Mar 10 20:54:21 PST 2018 * Sat Mar 10 21:03:52 PST 2018
This commit is contained in:
parent
6114b6d20e
commit
076936a7f5
9 changed files with 244 additions and 242 deletions
202
python/ray/rllib/dqn/common/atari_wrappers.py
Normal file
202
python/ray/rllib/dqn/common/atari_wrappers.py
Normal file
|
@ -0,0 +1,202 @@
|
||||||
|
import numpy as np
|
||||||
|
from collections import deque
|
||||||
|
import gym
|
||||||
|
from gym import spaces
|
||||||
|
import cv2
|
||||||
|
cv2.ocl.setUseOpenCL(False)
|
||||||
|
|
||||||
|
|
||||||
|
class NoopResetEnv(gym.Wrapper):
|
||||||
|
def __init__(self, env, noop_max=30, random_starts=False):
|
||||||
|
"""Sample initial states by taking random number of no-ops on reset.
|
||||||
|
No-op is assumed to be action 0.
|
||||||
|
"""
|
||||||
|
gym.Wrapper.__init__(self, env)
|
||||||
|
self.noop_max = noop_max
|
||||||
|
self.override_num_noops = None
|
||||||
|
self.noop_action = 0
|
||||||
|
self.random_starts = random_starts
|
||||||
|
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
|
||||||
|
|
||||||
|
def reset(self, **kwargs):
|
||||||
|
""" Do no-op action for a number of steps in [1, noop_max]."""
|
||||||
|
self.env.reset(**kwargs)
|
||||||
|
if self.override_num_noops is not None:
|
||||||
|
noops = self.override_num_noops
|
||||||
|
else:
|
||||||
|
noops = self.unwrapped.np_random.randint(
|
||||||
|
1, self.noop_max + 1)
|
||||||
|
assert noops > 0
|
||||||
|
obs = None
|
||||||
|
for _ in range(noops):
|
||||||
|
if self.random_starts:
|
||||||
|
action = np.random.randint(self.env.action_space.n)
|
||||||
|
else:
|
||||||
|
action = self.noop_action
|
||||||
|
obs, _, done, _ = self.env.step(action)
|
||||||
|
if done:
|
||||||
|
obs = self.env.reset(**kwargs)
|
||||||
|
return obs
|
||||||
|
|
||||||
|
def step(self, ac):
|
||||||
|
return self.env.step(ac)
|
||||||
|
|
||||||
|
|
||||||
|
class ClipRewardEnv(gym.RewardWrapper):
|
||||||
|
def __init__(self, env):
|
||||||
|
gym.RewardWrapper.__init__(self, env)
|
||||||
|
|
||||||
|
def reward(self, reward):
|
||||||
|
"""Bin reward to {+1, 0, -1} by its sign."""
|
||||||
|
return np.sign(reward)
|
||||||
|
|
||||||
|
|
||||||
|
class FireResetEnv(gym.Wrapper):
|
||||||
|
def __init__(self, env):
|
||||||
|
"""Take action on reset.
|
||||||
|
|
||||||
|
For environments that are fixed until firing."""
|
||||||
|
gym.Wrapper.__init__(self, env)
|
||||||
|
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
|
||||||
|
assert len(env.unwrapped.get_action_meanings()) >= 3
|
||||||
|
|
||||||
|
def reset(self, **kwargs):
|
||||||
|
self.env.reset(**kwargs)
|
||||||
|
obs, _, done, _ = self.env.step(1)
|
||||||
|
if done:
|
||||||
|
self.env.reset(**kwargs)
|
||||||
|
obs, _, done, _ = self.env.step(2)
|
||||||
|
if done:
|
||||||
|
self.env.reset(**kwargs)
|
||||||
|
return obs
|
||||||
|
|
||||||
|
def step(self, ac):
|
||||||
|
return self.env.step(ac)
|
||||||
|
|
||||||
|
|
||||||
|
class EpisodicLifeEnv(gym.Wrapper):
|
||||||
|
def __init__(self, env):
|
||||||
|
"""Make end-of-life == end-of-episode, but only reset on true game over.
|
||||||
|
Done by DeepMind for the DQN and co. since it helps value estimation.
|
||||||
|
"""
|
||||||
|
gym.Wrapper.__init__(self, env)
|
||||||
|
self.lives = 0
|
||||||
|
self.was_real_done = True
|
||||||
|
|
||||||
|
def step(self, action):
|
||||||
|
obs, reward, done, info = self.env.step(action)
|
||||||
|
self.was_real_done = done
|
||||||
|
# check current lives, make loss of life terminal,
|
||||||
|
# then update lives to handle bonus lives
|
||||||
|
lives = self.env.unwrapped.ale.lives()
|
||||||
|
if lives < self.lives and lives > 0:
|
||||||
|
# for Qbert sometimes we stay in lives == 0 condtion for a few
|
||||||
|
# frames so its important to keep lives > 0, so that we only reset
|
||||||
|
# once the environment advertises done.
|
||||||
|
done = True
|
||||||
|
self.lives = lives
|
||||||
|
return obs, reward, done, info
|
||||||
|
|
||||||
|
def reset(self, **kwargs):
|
||||||
|
"""Reset only when lives are exhausted.
|
||||||
|
This way all states are still reachable even though lives are episodic,
|
||||||
|
and the learner need not know about any of this behind-the-scenes.
|
||||||
|
"""
|
||||||
|
if self.was_real_done:
|
||||||
|
obs = self.env.reset(**kwargs)
|
||||||
|
else:
|
||||||
|
# no-op step to advance from terminal/lost life state
|
||||||
|
obs, _, _, _ = self.env.step(0)
|
||||||
|
self.lives = self.env.unwrapped.ale.lives()
|
||||||
|
return obs
|
||||||
|
|
||||||
|
|
||||||
|
class MaxAndSkipEnv(gym.Wrapper):
|
||||||
|
def __init__(self, env, skip=4):
|
||||||
|
"""Return only every `skip`-th frame"""
|
||||||
|
gym.Wrapper.__init__(self, env)
|
||||||
|
# most recent raw observations (for max pooling across time steps)
|
||||||
|
self._obs_buffer = np.zeros(
|
||||||
|
(2,)+env.observation_space.shape, dtype=np.uint8)
|
||||||
|
self._skip = skip
|
||||||
|
|
||||||
|
def step(self, action):
|
||||||
|
"""Repeat action, sum reward, and max over last observations."""
|
||||||
|
total_reward = 0.0
|
||||||
|
done = None
|
||||||
|
for i in range(self._skip):
|
||||||
|
obs, reward, done, info = self.env.step(action)
|
||||||
|
if i == self._skip - 2:
|
||||||
|
self._obs_buffer[0] = obs
|
||||||
|
if i == self._skip - 1:
|
||||||
|
self._obs_buffer[1] = obs
|
||||||
|
total_reward += reward
|
||||||
|
if done:
|
||||||
|
break
|
||||||
|
# Note that the observation on the done=True frame
|
||||||
|
# doesn't matter
|
||||||
|
max_frame = self._obs_buffer.max(axis=0)
|
||||||
|
|
||||||
|
return max_frame, total_reward, done, info
|
||||||
|
|
||||||
|
def reset(self, **kwargs):
|
||||||
|
return self.env.reset(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class WarpFrame(gym.ObservationWrapper):
|
||||||
|
def __init__(self, env):
|
||||||
|
"""Warp frames to 84x84 as done in the Nature paper and later work."""
|
||||||
|
gym.ObservationWrapper.__init__(self, env)
|
||||||
|
self.width = 80 # in rllib we use 80
|
||||||
|
self.height = 80
|
||||||
|
self.observation_space = spaces.Box(
|
||||||
|
low=0, high=255, shape=(self.height, self.width, 1))
|
||||||
|
|
||||||
|
def observation(self, frame):
|
||||||
|
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
|
||||||
|
frame = cv2.resize(
|
||||||
|
frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
|
||||||
|
return frame[:, :, None]
|
||||||
|
|
||||||
|
|
||||||
|
class FrameStack(gym.Wrapper):
|
||||||
|
def __init__(self, env, k):
|
||||||
|
"""Stack k last frames."""
|
||||||
|
gym.Wrapper.__init__(self, env)
|
||||||
|
self.k = k
|
||||||
|
self.frames = deque([], maxlen=k)
|
||||||
|
shp = env.observation_space.shape
|
||||||
|
self.observation_space = spaces.Box(
|
||||||
|
low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
ob = self.env.reset()
|
||||||
|
for _ in range(self.k):
|
||||||
|
self.frames.append(ob)
|
||||||
|
return self._get_ob()
|
||||||
|
|
||||||
|
def step(self, action):
|
||||||
|
ob, reward, done, info = self.env.step(action)
|
||||||
|
self.frames.append(ob)
|
||||||
|
return self._get_ob(), reward, done, info
|
||||||
|
|
||||||
|
def _get_ob(self):
|
||||||
|
assert len(self.frames) == self.k
|
||||||
|
return np.concatenate(self.frames, axis=2)
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_deepmind(env, random_starts):
|
||||||
|
"""Configure environment for DeepMind-style Atari.
|
||||||
|
|
||||||
|
Note that we assume reward clipping is done outside the wrapper.
|
||||||
|
"""
|
||||||
|
env = NoopResetEnv(env, noop_max=30, random_starts=random_starts)
|
||||||
|
if 'NoFrameskip' in env.spec.id:
|
||||||
|
env = MaxAndSkipEnv(env, skip=4)
|
||||||
|
env = EpisodicLifeEnv(env)
|
||||||
|
if 'FIRE' in env.unwrapped.get_action_meanings():
|
||||||
|
env = FireResetEnv(env)
|
||||||
|
env = WarpFrame(env)
|
||||||
|
# env = ClipRewardEnv(env) # reward clipping is handled by DQN replay
|
||||||
|
env = FrameStack(env, 4)
|
||||||
|
return env
|
|
@ -2,236 +2,18 @@ from __future__ import absolute_import
|
||||||
from __future__ import division
|
from __future__ import division
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
import cv2
|
|
||||||
import gym
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
from collections import deque
|
|
||||||
from gym import spaces
|
|
||||||
|
|
||||||
from ray.rllib.models import ModelCatalog
|
from ray.rllib.models import ModelCatalog
|
||||||
|
from ray.rllib.dqn.common.atari_wrappers import wrap_deepmind
|
||||||
|
|
||||||
|
|
||||||
class NoopResetEnv(gym.Wrapper):
|
def wrap_dqn(registry, env, options, random_starts):
|
||||||
def __init__(self, env=None, noop_max=30):
|
|
||||||
"""Sample initial states by taking random number of no-ops on reset.
|
|
||||||
No-op is assumed to be action 0.
|
|
||||||
"""
|
|
||||||
super(NoopResetEnv, self).__init__(env)
|
|
||||||
self.noop_max = noop_max
|
|
||||||
self.override_num_noops = None
|
|
||||||
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
""" Do no-op action for a number of steps in [1, noop_max]."""
|
|
||||||
self.env.reset()
|
|
||||||
if self.override_num_noops is not None:
|
|
||||||
noops = self.override_num_noops
|
|
||||||
else:
|
|
||||||
noops = np.random.randint(1, self.noop_max + 1)
|
|
||||||
assert noops > 0
|
|
||||||
obs = None
|
|
||||||
for _ in range(noops):
|
|
||||||
obs, _, done, _ = self.env.step(0)
|
|
||||||
if done:
|
|
||||||
obs = self.env.reset()
|
|
||||||
return obs
|
|
||||||
|
|
||||||
|
|
||||||
class FireResetEnv(gym.Wrapper):
|
|
||||||
def __init__(self, env=None):
|
|
||||||
"""For environments where the user need to press FIRE for the game to
|
|
||||||
start."""
|
|
||||||
super(FireResetEnv, self).__init__(env)
|
|
||||||
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
|
|
||||||
assert len(env.unwrapped.get_action_meanings()) >= 3
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
self.env.reset()
|
|
||||||
obs, _, done, _ = self.env.step(1)
|
|
||||||
if done:
|
|
||||||
self.env.reset()
|
|
||||||
obs, _, done, _ = self.env.step(2)
|
|
||||||
if done:
|
|
||||||
self.env.reset()
|
|
||||||
return obs
|
|
||||||
|
|
||||||
|
|
||||||
class EpisodicLifeEnv(gym.Wrapper):
|
|
||||||
def __init__(self, env=None):
|
|
||||||
"""Make end-of-life == end-of-episode, but only reset on true game
|
|
||||||
over. Done by DeepMind for the DQN and co. since it helps value
|
|
||||||
estimation.
|
|
||||||
"""
|
|
||||||
super(EpisodicLifeEnv, self).__init__(env)
|
|
||||||
self.lives = 0
|
|
||||||
self.was_real_done = True
|
|
||||||
self.was_real_reset = False
|
|
||||||
|
|
||||||
def step(self, action):
|
|
||||||
obs, reward, done, info = self.env.step(action)
|
|
||||||
self.was_real_done = done
|
|
||||||
# check current lives, make loss of life terminal,
|
|
||||||
# then update lives to handle bonus lives
|
|
||||||
lives = self.env.unwrapped.ale.lives()
|
|
||||||
if lives < self.lives and lives > 0:
|
|
||||||
# for Qbert somtimes we stay in lives == 0 condtion for a few
|
|
||||||
# frames so its important to keep lives > 0, so that we only reset
|
|
||||||
# once the environment advertises done.
|
|
||||||
done = True
|
|
||||||
self.lives = lives
|
|
||||||
return obs, reward, done, info
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
"""Reset only when lives are exhausted.
|
|
||||||
This way all states are still reachable even though lives are episodic,
|
|
||||||
and the learner need not know about any of this behind-the-scenes.
|
|
||||||
"""
|
|
||||||
if self.was_real_done:
|
|
||||||
obs = self.env.reset()
|
|
||||||
self.was_real_reset = True
|
|
||||||
else:
|
|
||||||
# no-op step to advance from terminal/lost life state
|
|
||||||
obs, _, _, _ = self.env.step(0)
|
|
||||||
self.was_real_reset = False
|
|
||||||
self.lives = self.env.unwrapped.ale.lives()
|
|
||||||
return obs
|
|
||||||
|
|
||||||
|
|
||||||
class MaxAndSkipEnv(gym.Wrapper):
|
|
||||||
def __init__(self, env=None, skip=4):
|
|
||||||
"""Return only every `skip`-th frame"""
|
|
||||||
super(MaxAndSkipEnv, self).__init__(env)
|
|
||||||
# most recent raw observations (for max pooling across time steps)
|
|
||||||
self._obs_buffer = deque(maxlen=2)
|
|
||||||
self._skip = skip
|
|
||||||
|
|
||||||
def step(self, action):
|
|
||||||
total_reward = 0.0
|
|
||||||
done = None
|
|
||||||
for _ in range(self._skip):
|
|
||||||
obs, reward, done, info = self.env.step(action)
|
|
||||||
self._obs_buffer.append(obs)
|
|
||||||
total_reward += reward
|
|
||||||
if done:
|
|
||||||
break
|
|
||||||
|
|
||||||
max_frame = np.max(np.stack(self._obs_buffer), axis=0)
|
|
||||||
|
|
||||||
return max_frame, total_reward, done, info
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
"""Clear past frame buffer and init. to first obs. from inner env."""
|
|
||||||
self._obs_buffer.clear()
|
|
||||||
obs = self.env.reset()
|
|
||||||
self._obs_buffer.append(obs)
|
|
||||||
return obs
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(ekl): switch this to use a RLlib common preprocessor
|
|
||||||
class ProcessFrame80(gym.ObservationWrapper):
|
|
||||||
def __init__(self, env=None):
|
|
||||||
super(ProcessFrame80, self).__init__(env)
|
|
||||||
self.observation_space = spaces.Box(
|
|
||||||
low=0, high=255, shape=(80, 80, 1), dtype=np.uint8)
|
|
||||||
|
|
||||||
def observation(self, obs):
|
|
||||||
return ProcessFrame80.process(obs)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def process(frame):
|
|
||||||
if frame.size == 210 * 160 * 3:
|
|
||||||
img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
|
|
||||||
elif frame.size == 250 * 160 * 3:
|
|
||||||
img = np.reshape(frame, [250, 160, 3]).astype(np.float32)
|
|
||||||
else:
|
|
||||||
assert False, "Unknown resolution."
|
|
||||||
img = (img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 +
|
|
||||||
img[:, :, 2] * 0.114)
|
|
||||||
resized_screen = cv2.resize(
|
|
||||||
img, (80, 110), interpolation=cv2.INTER_AREA)
|
|
||||||
x_t = resized_screen[20:100, :]
|
|
||||||
x_t = np.reshape(x_t, [80, 80, 1])
|
|
||||||
return x_t.astype(np.uint8)
|
|
||||||
|
|
||||||
|
|
||||||
class ClippedRewardsWrapper(gym.RewardWrapper):
|
|
||||||
def reward(self, reward):
|
|
||||||
"""Change all the positive rewards to 1, negative to -1 and keep
|
|
||||||
zero."""
|
|
||||||
return np.sign(reward)
|
|
||||||
|
|
||||||
|
|
||||||
class LazyFrames(object):
|
|
||||||
def __init__(self, frames):
|
|
||||||
"""This object ensures that common frames between the observations are
|
|
||||||
only stored once. It exists purely to optimize memory usage which can
|
|
||||||
be huge for DQN's 1M frames replay buffers.
|
|
||||||
|
|
||||||
This object should only be converted to numpy array before being passed
|
|
||||||
to the model.
|
|
||||||
|
|
||||||
You'd not belive how complex the previous solution was."""
|
|
||||||
self._frames = frames
|
|
||||||
|
|
||||||
def __array__(self, dtype=None):
|
|
||||||
out = np.concatenate(self._frames, axis=2)
|
|
||||||
if dtype is not None:
|
|
||||||
out = out.astype(dtype)
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
class FrameStack(gym.Wrapper):
|
|
||||||
def __init__(self, env, k):
|
|
||||||
"""Stack k last frames.
|
|
||||||
|
|
||||||
Returns lazy array, which is much more memory efficient.
|
|
||||||
|
|
||||||
See Also
|
|
||||||
--------
|
|
||||||
LazyFrames
|
|
||||||
"""
|
|
||||||
gym.Wrapper.__init__(self, env)
|
|
||||||
self.k = k
|
|
||||||
self.frames = deque([], maxlen=k)
|
|
||||||
shp = env.observation_space.shape
|
|
||||||
self.observation_space = spaces.Box(
|
|
||||||
low=0, high=255, shape=(shp[0], shp[1], shp[2] * k),
|
|
||||||
dtype=np.uint8)
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
ob = self.env.reset()
|
|
||||||
for _ in range(self.k):
|
|
||||||
self.frames.append(ob)
|
|
||||||
return self._get_ob()
|
|
||||||
|
|
||||||
def step(self, action):
|
|
||||||
ob, reward, done, info = self.env.step(action)
|
|
||||||
self.frames.append(ob)
|
|
||||||
return self._get_ob(), reward, done, info
|
|
||||||
|
|
||||||
def _get_ob(self):
|
|
||||||
assert len(self.frames) == self.k
|
|
||||||
return LazyFrames(list(self.frames))
|
|
||||||
|
|
||||||
|
|
||||||
def wrap_dqn(registry, env, options):
|
|
||||||
"""Apply a common set of wrappers for DQN."""
|
"""Apply a common set of wrappers for DQN."""
|
||||||
|
|
||||||
is_atari = hasattr(env.unwrapped, "ale")
|
is_atari = hasattr(env.unwrapped, "ale")
|
||||||
|
|
||||||
if is_atari:
|
# Override atari default to use the deepmind wrappers.
|
||||||
env = EpisodicLifeEnv(env)
|
# TODO(ekl) this logic should be pushed to the catalog.
|
||||||
env = NoopResetEnv(env, noop_max=30)
|
if is_atari and "custom_preprocessor" not in options:
|
||||||
if 'NoFrameskip' in env.spec.id:
|
return wrap_deepmind(env, random_starts=random_starts)
|
||||||
env = MaxAndSkipEnv(env, skip=4)
|
|
||||||
if 'FIRE' in env.unwrapped.get_action_meanings():
|
|
||||||
env = FireResetEnv(env)
|
|
||||||
|
|
||||||
env = ModelCatalog.get_preprocessor_as_wrapper(registry, env, options)
|
return ModelCatalog.get_preprocessor_as_wrapper(registry, env, options)
|
||||||
|
|
||||||
if is_atari:
|
|
||||||
env = FrameStack(env, 4)
|
|
||||||
env = ClippedRewardsWrapper(env)
|
|
||||||
|
|
||||||
return env
|
|
||||||
|
|
|
@ -52,6 +52,8 @@ DEFAULT_CONFIG = dict(
|
||||||
exploration_final_eps=0.02,
|
exploration_final_eps=0.02,
|
||||||
# Update the target network every `target_network_update_freq` steps.
|
# Update the target network every `target_network_update_freq` steps.
|
||||||
target_network_update_freq=500,
|
target_network_update_freq=500,
|
||||||
|
# Whether to start with random actions instead of noops.
|
||||||
|
random_starts=True,
|
||||||
|
|
||||||
# === Replay buffer ===
|
# === Replay buffer ===
|
||||||
# Size of the replay buffer. Note that if async_updates is set, then
|
# Size of the replay buffer. Note that if async_updates is set, then
|
||||||
|
@ -65,6 +67,8 @@ DEFAULT_CONFIG = dict(
|
||||||
prioritized_replay_beta=0.4,
|
prioritized_replay_beta=0.4,
|
||||||
# Epsilon to add to the TD errors when updating priorities.
|
# Epsilon to add to the TD errors when updating priorities.
|
||||||
prioritized_replay_eps=1e-6,
|
prioritized_replay_eps=1e-6,
|
||||||
|
# Whether to clip rewards to [-1, 1] prior to adding to the replay buffer.
|
||||||
|
clip_rewards=True,
|
||||||
|
|
||||||
# === Optimization ===
|
# === Optimization ===
|
||||||
# Learning rate for adam optimizer
|
# Learning rate for adam optimizer
|
||||||
|
|
|
@ -50,7 +50,7 @@ class DQNEvaluator(Evaluator):
|
||||||
|
|
||||||
def __init__(self, registry, env_creator, config, logdir, worker_index):
|
def __init__(self, registry, env_creator, config, logdir, worker_index):
|
||||||
env = env_creator(config["env_config"])
|
env = env_creator(config["env_config"])
|
||||||
env = wrap_dqn(registry, env, config["model"])
|
env = wrap_dqn(registry, env, config["model"], config["random_starts"])
|
||||||
self.env = env
|
self.env = env
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ class ReplayActor(object):
|
||||||
def __init__(
|
def __init__(
|
||||||
self, num_shards, learning_starts, buffer_size, train_batch_size,
|
self, num_shards, learning_starts, buffer_size, train_batch_size,
|
||||||
prioritized_replay_alpha, prioritized_replay_beta,
|
prioritized_replay_alpha, prioritized_replay_beta,
|
||||||
prioritized_replay_eps):
|
prioritized_replay_eps, clip_rewards):
|
||||||
self.replay_starts = learning_starts // num_shards
|
self.replay_starts = learning_starts // num_shards
|
||||||
self.buffer_size = buffer_size // num_shards
|
self.buffer_size = buffer_size // num_shards
|
||||||
self.train_batch_size = train_batch_size
|
self.train_batch_size = train_batch_size
|
||||||
|
@ -36,7 +36,8 @@ class ReplayActor(object):
|
||||||
self.prioritized_replay_eps = prioritized_replay_eps
|
self.prioritized_replay_eps = prioritized_replay_eps
|
||||||
|
|
||||||
self.replay_buffer = PrioritizedReplayBuffer(
|
self.replay_buffer = PrioritizedReplayBuffer(
|
||||||
buffer_size, alpha=prioritized_replay_alpha)
|
self.buffer_size, alpha=prioritized_replay_alpha,
|
||||||
|
clip_rewards=clip_rewards)
|
||||||
|
|
||||||
# Metrics
|
# Metrics
|
||||||
self.add_batch_timer = TimerStat()
|
self.add_batch_timer = TimerStat()
|
||||||
|
@ -98,6 +99,7 @@ class GenericLearner(threading.Thread):
|
||||||
self.queue_timer = TimerStat()
|
self.queue_timer = TimerStat()
|
||||||
self.grad_timer = TimerStat()
|
self.grad_timer = TimerStat()
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
|
self.weights_updated = False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -111,6 +113,7 @@ class GenericLearner(threading.Thread):
|
||||||
td_error = self.local_evaluator.compute_apply(replay)
|
td_error = self.local_evaluator.compute_apply(replay)
|
||||||
self.outqueue.put((ra, replay, td_error))
|
self.outqueue.put((ra, replay, td_error))
|
||||||
self.learner_queue_size.push(self.inqueue.qsize())
|
self.learner_queue_size.push(self.inqueue.qsize())
|
||||||
|
self.weights_updated = True
|
||||||
|
|
||||||
|
|
||||||
class ApexOptimizer(Optimizer):
|
class ApexOptimizer(Optimizer):
|
||||||
|
@ -121,7 +124,7 @@ class ApexOptimizer(Optimizer):
|
||||||
prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6,
|
prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6,
|
||||||
train_batch_size=512, sample_batch_size=50,
|
train_batch_size=512, sample_batch_size=50,
|
||||||
num_replay_buffer_shards=1, max_weight_sync_delay=400,
|
num_replay_buffer_shards=1, max_weight_sync_delay=400,
|
||||||
debug=False):
|
clip_rewards=True, debug=False):
|
||||||
|
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
self.replay_starts = learning_starts
|
self.replay_starts = learning_starts
|
||||||
|
@ -138,7 +141,7 @@ class ApexOptimizer(Optimizer):
|
||||||
ReplayActor,
|
ReplayActor,
|
||||||
[num_replay_buffer_shards, learning_starts, buffer_size,
|
[num_replay_buffer_shards, learning_starts, buffer_size,
|
||||||
train_batch_size, prioritized_replay_alpha,
|
train_batch_size, prioritized_replay_alpha,
|
||||||
prioritized_replay_beta, prioritized_replay_eps],
|
prioritized_replay_beta, prioritized_replay_eps, clip_rewards],
|
||||||
num_replay_buffer_shards)
|
num_replay_buffer_shards)
|
||||||
assert len(self.remote_evaluators) > 0
|
assert len(self.remote_evaluators) > 0
|
||||||
|
|
||||||
|
@ -199,7 +202,10 @@ class ApexOptimizer(Optimizer):
|
||||||
# Update weights if needed
|
# Update weights if needed
|
||||||
self.steps_since_update[ev] += self.sample_batch_size
|
self.steps_since_update[ev] += self.sample_batch_size
|
||||||
if self.steps_since_update[ev] >= self.max_weight_sync_delay:
|
if self.steps_since_update[ev] >= self.max_weight_sync_delay:
|
||||||
if weights is None:
|
# Note that it's important to pull new weights once
|
||||||
|
# updated to avoid excessive correlation between actors
|
||||||
|
if weights is None or self.learner.weights_updated:
|
||||||
|
self.learner.weights_updated = False
|
||||||
with self.timers["put_weights"]:
|
with self.timers["put_weights"]:
|
||||||
weights = ray.put(
|
weights = ray.put(
|
||||||
self.local_evaluator.get_weights())
|
self.local_evaluator.get_weights())
|
||||||
|
|
|
@ -20,7 +20,7 @@ class LocalSyncReplayOptimizer(Optimizer):
|
||||||
self, learning_starts=1000, buffer_size=10000,
|
self, learning_starts=1000, buffer_size=10000,
|
||||||
prioritized_replay=True, prioritized_replay_alpha=0.6,
|
prioritized_replay=True, prioritized_replay_alpha=0.6,
|
||||||
prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6,
|
prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6,
|
||||||
train_batch_size=32, sample_batch_size=4):
|
train_batch_size=32, sample_batch_size=4, clip_rewards=True):
|
||||||
|
|
||||||
self.replay_starts = learning_starts
|
self.replay_starts = learning_starts
|
||||||
self.prioritized_replay_beta = prioritized_replay_beta
|
self.prioritized_replay_beta = prioritized_replay_beta
|
||||||
|
@ -37,10 +37,10 @@ class LocalSyncReplayOptimizer(Optimizer):
|
||||||
# Set up replay buffer
|
# Set up replay buffer
|
||||||
if prioritized_replay:
|
if prioritized_replay:
|
||||||
self.replay_buffer = PrioritizedReplayBuffer(
|
self.replay_buffer = PrioritizedReplayBuffer(
|
||||||
buffer_size,
|
buffer_size, alpha=prioritized_replay_alpha,
|
||||||
alpha=prioritized_replay_alpha)
|
clip_rewards=clip_rewards)
|
||||||
else:
|
else:
|
||||||
self.replay_buffer = ReplayBuffer(buffer_size)
|
self.replay_buffer = ReplayBuffer(buffer_size, clip_rewards)
|
||||||
|
|
||||||
assert buffer_size >= self.replay_starts
|
assert buffer_size >= self.replay_starts
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ from ray.rllib.utils.window_stat import WindowStat
|
||||||
|
|
||||||
|
|
||||||
class ReplayBuffer(object):
|
class ReplayBuffer(object):
|
||||||
def __init__(self, size):
|
def __init__(self, size, clip_rewards):
|
||||||
"""Create Prioritized Replay buffer.
|
"""Create Prioritized Replay buffer.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
|
@ -30,11 +30,15 @@ class ReplayBuffer(object):
|
||||||
self._num_sampled = 0
|
self._num_sampled = 0
|
||||||
self._evicted_hit_stats = WindowStat("evicted_hit", 1000)
|
self._evicted_hit_stats = WindowStat("evicted_hit", 1000)
|
||||||
self._est_size_bytes = 0
|
self._est_size_bytes = 0
|
||||||
|
self._clip_rewards = clip_rewards
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self._storage)
|
return len(self._storage)
|
||||||
|
|
||||||
def add(self, obs_t, action, reward, obs_tp1, done, weight):
|
def add(self, obs_t, action, reward, obs_tp1, done, weight):
|
||||||
|
if self._clip_rewards:
|
||||||
|
reward = np.sign(reward)
|
||||||
|
|
||||||
data = (obs_t, action, reward, obs_tp1, done)
|
data = (obs_t, action, reward, obs_tp1, done)
|
||||||
self._num_added += 1
|
self._num_added += 1
|
||||||
|
|
||||||
|
@ -103,7 +107,7 @@ class ReplayBuffer(object):
|
||||||
|
|
||||||
|
|
||||||
class PrioritizedReplayBuffer(ReplayBuffer):
|
class PrioritizedReplayBuffer(ReplayBuffer):
|
||||||
def __init__(self, size, alpha):
|
def __init__(self, size, alpha, clip_rewards):
|
||||||
"""Create Prioritized Replay buffer.
|
"""Create Prioritized Replay buffer.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
|
@ -119,7 +123,7 @@ class PrioritizedReplayBuffer(ReplayBuffer):
|
||||||
--------
|
--------
|
||||||
ReplayBuffer.__init__
|
ReplayBuffer.__init__
|
||||||
"""
|
"""
|
||||||
super(PrioritizedReplayBuffer, self).__init__(size)
|
super(PrioritizedReplayBuffer, self).__init__(size, clip_rewards)
|
||||||
assert alpha > 0
|
assert alpha > 0
|
||||||
self._alpha = alpha
|
self._alpha = alpha
|
||||||
|
|
||||||
|
@ -134,6 +138,9 @@ class PrioritizedReplayBuffer(ReplayBuffer):
|
||||||
|
|
||||||
def add(self, obs_t, action, reward, obs_tp1, done, weight):
|
def add(self, obs_t, action, reward, obs_tp1, done, weight):
|
||||||
"""See ReplayBuffer.store_effect"""
|
"""See ReplayBuffer.store_effect"""
|
||||||
|
if self._clip_rewards:
|
||||||
|
reward = np.sign(reward)
|
||||||
|
|
||||||
idx = self._next_idx
|
idx = self._next_idx
|
||||||
super(PrioritizedReplayBuffer, self).add(
|
super(PrioritizedReplayBuffer, self).add(
|
||||||
obs_t, action, reward, obs_tp1, done, weight)
|
obs_t, action, reward, obs_tp1, done, weight)
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
|
# This can be expected to reach 20.8 reward within an hour when using
|
||||||
|
# a V100 GPU (e.g. p3.2xl instance on AWS, and m4.4xl workers).
|
||||||
pong-apex:
|
pong-apex:
|
||||||
env: Pong-v0
|
env: PongNoFrameskip-v4
|
||||||
run: APEX
|
run: APEX
|
||||||
resources:
|
resources:
|
||||||
cpu:
|
cpu:
|
||||||
|
@ -7,8 +9,7 @@ pong-apex:
|
||||||
gpu: 1
|
gpu: 1
|
||||||
config:
|
config:
|
||||||
force_evaluators_remote: True # requires cluster
|
force_evaluators_remote: True # requires cluster
|
||||||
|
target_network_update_freq: 50000
|
||||||
num_workers: 32
|
num_workers: 32
|
||||||
lr: .0001
|
lr: .0001
|
||||||
gamma: 0.99
|
gamma: 0.99
|
||||||
model:
|
|
||||||
grayscale: True
|
|
||||||
|
|
|
@ -187,4 +187,4 @@ def pretty_print(result):
|
||||||
out[k] = v
|
out[k] = v
|
||||||
|
|
||||||
cleaned = json.dumps(out, cls=_CustomEncoder)
|
cleaned = json.dumps(out, cls=_CustomEncoder)
|
||||||
return yaml.dump(json.loads(cleaned), default_flow_style=False)
|
return yaml.safe_dump(json.loads(cleaned), default_flow_style=False)
|
||||||
|
|
Loading…
Add table
Reference in a new issue