ray/rllib/policy/torch_policy.py
Sven Mika 0db2046b0a
[RLlib] Policy.compute_log_likelihoods() and SAC refactor. (issue #7107) (#7124)
* Exploration API (+EpsilonGreedy sub-class).

* Exploration API (+EpsilonGreedy sub-class).

* Cleanup/LINT.

* Add `deterministic` to generic Trainer config (NOTE: this is still ignored by most Agents).

* Add `error` option to deprecation_warning().

* WIP.

* Bug fix: Get exploration-info for tf framework.
Bug fix: Properly deprecate some DQN config keys.

* WIP.

* LINT.

* WIP.

* Split PerWorkerEpsilonGreedy out of EpsilonGreedy.
Docstrings.

* Fix bug in sampler.py in case Policy has self.exploration = None

* Update rllib/agents/dqn/dqn.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* WIP.

* Update rllib/agents/trainer.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* WIP.

* Change requests.

* LINT

* In tune/utils/util.py::deep_update() Only keep deep_updat'ing if both original and value are dicts. If value is not a dict, set

* Completely obsolete syn_replay_optimizer.py's parameters schedule_max_timesteps AND beta_annealing_fraction (replaced with prioritized_replay_beta_annealing_timesteps).

* Update rllib/evaluation/worker_set.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Review fixes.

* Fix default value for DQN's exploration spec.

* LINT

* Fix recursion bug (wrong parent c'tor).

* Do not pass timestep to get_exploration_info.

* Update tf_policy.py

* Fix some remaining issues with test cases and remove more deprecated DQN/APEX exploration configs.

* Bug fix tf-action-dist

* DDPG incompatibility bug fix with new DQN exploration handling (which is imported by DDPG).

* Switch off exploration when getting action probs from off-policy-estimator's policy.

* LINT

* Fix test_checkpoint_restore.py.

* Deprecate all SAC exploration (unused) configs.

* Properly use `model.last_output()` everywhere. Instead of `model._last_output`.

* WIP.

* Take out set_epsilon from multi-agent-env test (not needed, decays anyway).

* WIP.

* Trigger re-test (flaky checkpoint-restore test).

* WIP.

* WIP.

* Add test case for deterministic action sampling in PPO.

* bug fix.

* Added deterministic test cases for different Agents.

* Fix problem with TupleActions in dynamic-tf-policy.

* Separate supported_spaces tests so they can be run separately for easier debugging.

* LINT.

* Fix autoregressive_action_dist.py test case.

* Re-test.

* Fix.

* Remove duplicate py_test rule from bazel.

* LINT.

* WIP.

* WIP.

* SAC fix.

* SAC fix.

* WIP.

* WIP.

* WIP.

* FIX 2 examples tests.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* Fix.

* LINT.

* Renamed test file.

* WIP.

* Add unittest.main.

* Make action_dist_class mandatory.

* fix

* FIX.

* WIP.

* WIP.

* Fix.

* Fix.

* Fix explorations test case (contextlib cannot find its own nullcontext??).

* Force torch to be installed for QMIX.

* LINT.

* Fix determine_tests_to_run.py.

* Fix determine_tests_to_run.py.

* WIP

* Add Random exploration component to tests (fixed issue with "static-graph randomness" via py_function).

* Add Random exploration component to tests (fixed issue with "static-graph randomness" via py_function).

* Rename some stuff.

* Rename some stuff.

* WIP.

* WIP.

* Fix SAC.

* Fix SAC.

* Fix strange tf-error in ray core tests.

* Fix strange ray-core tf-error in test_memory_scheduling test case.

* Fix test_io.py.

* LINT.

* Update SAC yaml files' config.

Co-authored-by: Eric Liang <ekhliang@gmail.com>
2020-02-22 14:19:49 -08:00

322 lines
12 KiB
Python

import numpy as np
import time
from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY, ACTION_PROB, \
ACTION_LOGP
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import override, DeveloperAPI
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule
from ray.rllib.utils.tracking_dict import UsageTrackingDict
torch, _ = try_import_torch()
class TorchPolicy(Policy):
"""Template for a PyTorch policy and loss to use with RLlib.
This is similar to TFPolicy, but for PyTorch.
Attributes:
observation_space (gym.Space): observation space of the policy.
action_space (gym.Space): action space of the policy.
config (dict): config of the policy.
model (TorchModel): Torch model instance.
dist_class (type): Torch action distribution class.
"""
def __init__(self, observation_space, action_space, config, model, loss,
action_distribution_class):
"""Build a policy from policy and loss torch modules.
Note that model will be placed on GPU device if CUDA_VISIBLE_DEVICES
is set. Only single GPU is supported for now.
Arguments:
observation_space (gym.Space): observation space of the policy.
action_space (gym.Space): action space of the policy.
config (dict): The Policy config dict.
model (nn.Module): PyTorch policy module. Given observations as
input, this module must return a list of outputs where the
first item is action logits, and the rest can be any value.
loss (func): Function that takes (policy, model, dist_class,
train_batch) and returns a single scalar loss.
action_distribution_class (ActionDistribution): Class for action
distribution.
"""
self.framework = "torch"
super().__init__(observation_space, action_space, config)
self.device = (torch.device("cuda")
if torch.cuda.is_available() else torch.device("cpu"))
self.model = model.to(self.device)
self.unwrapped_model = model # used to support DistributedDataParallel
self._loss = loss
self._optimizer = self.optimizer()
self.dist_class = action_distribution_class
# If set, means we are using distributed allreduce during learning.
self.distributed_world_size = None
@override(Policy)
def compute_actions(self,
obs_batch,
state_batches=None,
prev_action_batch=None,
prev_reward_batch=None,
info_batch=None,
episodes=None,
explore=None,
timestep=None,
**kwargs):
explore = explore if explore is not None else self.config["explore"]
with torch.no_grad():
input_dict = self._lazy_tensor_dict({
SampleBatch.CUR_OBS: obs_batch,
})
if prev_action_batch:
input_dict[SampleBatch.PREV_ACTIONS] = prev_action_batch
if prev_reward_batch:
input_dict[SampleBatch.PREV_REWARDS] = prev_reward_batch
state_batches = [self._convert_to_tensor(s) for s in state_batches]
model_out = self.model(input_dict, state_batches,
self._convert_to_tensor([1]))
logits, state = model_out
action_dist = None
actions, logp = \
self.exploration.get_exploration_action(
logits, self.dist_class, self.model, explore,
timestep if timestep is not None else
self.global_timestep)
input_dict[SampleBatch.ACTIONS] = actions
extra_action_out = self.extra_action_out(input_dict, state_batches,
self.model, action_dist)
if logp is not None:
extra_action_out.update({
ACTION_PROB: torch.exp(logp),
ACTION_LOGP: logp
})
return (actions.cpu().numpy(), [h.cpu().numpy() for h in state],
extra_action_out)
@override(Policy)
def compute_log_likelihoods(self,
actions,
obs_batch,
state_batches=None,
prev_action_batch=None,
prev_reward_batch=None):
with torch.no_grad():
input_dict = self._lazy_tensor_dict({
SampleBatch.CUR_OBS: obs_batch,
SampleBatch.ACTIONS: actions
})
if prev_action_batch:
input_dict[SampleBatch.PREV_ACTIONS] = prev_action_batch
if prev_reward_batch:
input_dict[SampleBatch.PREV_REWARDS] = prev_reward_batch
parameters, _ = self.model(input_dict, state_batches, [1])
action_dist = self.dist_class(parameters, self.model)
log_likelihoods = action_dist.logp(input_dict[SampleBatch.ACTIONS])
return log_likelihoods
@override(Policy)
def learn_on_batch(self, postprocessed_batch):
train_batch = self._lazy_tensor_dict(postprocessed_batch)
loss_out = self._loss(self, self.model, self.dist_class, train_batch)
self._optimizer.zero_grad()
loss_out.backward()
info = {}
info.update(self.extra_grad_process())
if self.distributed_world_size:
grads = []
for p in self.model.parameters():
if p.grad is not None:
grads.append(p.grad)
start = time.time()
if torch.cuda.is_available():
# Sadly, allreduce_coalesced does not work with CUDA yet.
for g in grads:
torch.distributed.all_reduce(
g, op=torch.distributed.ReduceOp.SUM)
else:
torch.distributed.all_reduce_coalesced(
grads, op=torch.distributed.ReduceOp.SUM)
for p in self.model.parameters():
if p.grad is not None:
p.grad /= self.distributed_world_size
info["allreduce_latency"] = time.time() - start
self._optimizer.step()
info.update(self.extra_grad_info(train_batch))
return {LEARNER_STATS_KEY: info}
@override(Policy)
def compute_gradients(self, postprocessed_batch):
train_batch = self._lazy_tensor_dict(postprocessed_batch)
loss_out = self._loss(self, self.model, self.dist_class, train_batch)
self._optimizer.zero_grad()
loss_out.backward()
grad_process_info = self.extra_grad_process()
# Note that return values are just references;
# calling zero_grad will modify the values
grads = []
for p in self.model.parameters():
if p.grad is not None:
grads.append(p.grad.data.cpu().numpy())
else:
grads.append(None)
grad_info = self.extra_grad_info(train_batch)
grad_info.update(grad_process_info)
return grads, {LEARNER_STATS_KEY: grad_info}
@override(Policy)
def apply_gradients(self, gradients):
for g, p in zip(gradients, self.model.parameters()):
if g is not None:
p.grad = torch.from_numpy(g).to(self.device)
self._optimizer.step()
@override(Policy)
def get_weights(self):
return {k: v.cpu() for k, v in self.model.state_dict().items()}
@override(Policy)
def set_weights(self, weights):
self.model.load_state_dict(weights)
@override(Policy)
def is_recurrent(self):
return len(self.model.get_initial_state()) > 0
@override(Policy)
def num_state_tensors(self):
return len(self.model.get_initial_state())
@override(Policy)
def get_initial_state(self):
return [s.numpy() for s in self.model.get_initial_state()]
def extra_grad_process(self):
"""Allow subclass to do extra processing on gradients and
return processing info."""
return {}
def extra_action_out(self,
input_dict,
state_batches,
model,
action_dist=None):
"""Returns dict of extra info to include in experience batch.
Arguments:
input_dict (dict): Dict of model input tensors.
state_batches (list): List of state tensors.
model (TorchModelV2): Reference to the model.
action_dist (Distribution): Torch Distribution object to get
log-probs (e.g. for already sampled actions).
"""
return {}
def extra_grad_info(self, train_batch):
"""Return dict of extra grad info."""
return {}
def optimizer(self):
"""Custom PyTorch optimizer to use."""
if hasattr(self, "config"):
return torch.optim.Adam(
self.model.parameters(), lr=self.config["lr"])
else:
return torch.optim.Adam(self.model.parameters())
def _lazy_tensor_dict(self, postprocessed_batch):
train_batch = UsageTrackingDict(postprocessed_batch)
train_batch.set_get_interceptor(self._convert_to_tensor)
return train_batch
def _convert_to_tensor(self, arr):
if torch.is_tensor(arr):
return arr.to(self.device)
tensor = torch.from_numpy(np.asarray(arr))
if tensor.dtype == torch.double:
tensor = tensor.float()
return tensor.to(self.device)
@override(Policy)
def export_model(self, export_dir):
"""TODO: implement for torch.
"""
raise NotImplementedError
@override(Policy)
def export_checkpoint(self, export_dir):
"""TODO: implement for torch.
"""
raise NotImplementedError
@DeveloperAPI
class LearningRateSchedule:
"""Mixin for TFPolicy that adds a learning rate schedule."""
@DeveloperAPI
def __init__(self, lr, lr_schedule):
self.cur_lr = lr
if lr_schedule is None:
self.lr_schedule = ConstantSchedule(lr)
else:
self.lr_schedule = PiecewiseSchedule(
lr_schedule, outside_value=lr_schedule[-1][-1])
@override(Policy)
def on_global_var_update(self, global_vars):
super(LearningRateSchedule, self).on_global_var_update(global_vars)
self.cur_lr = self.lr_schedule.value(global_vars["timestep"])
@override(TorchPolicy)
def optimizer(self):
for p in self._optimizer.param_groups:
p["lr"] = self.cur_lr
return self._optimizer
@DeveloperAPI
class EntropyCoeffSchedule:
"""Mixin for TorchPolicy that adds entropy coeff decay."""
@DeveloperAPI
def __init__(self, entropy_coeff, entropy_coeff_schedule):
self.entropy_coeff = entropy_coeff
if entropy_coeff_schedule is None:
self.entropy_coeff_schedule = ConstantSchedule(entropy_coeff)
else:
# Allows for custom schedule similar to lr_schedule format
if isinstance(entropy_coeff_schedule, list):
self.entropy_coeff_schedule = PiecewiseSchedule(
entropy_coeff_schedule,
outside_value=entropy_coeff_schedule[-1][-1])
else:
# Implements previous version but enforces outside_value
self.entropy_coeff_schedule = PiecewiseSchedule(
[[0, entropy_coeff], [entropy_coeff_schedule, 0.0]],
outside_value=0.0)
@override(Policy)
def on_global_var_update(self, global_vars):
super(EntropyCoeffSchedule, self).on_global_var_update(global_vars)
self.entropy_coeff = self.entropy_coeff_schedule.value(
global_vars["timestep"])