ray/rllib/offline/off_policy_estimator.py
Sven Mika 0db2046b0a
[RLlib] Policy.compute_log_likelihoods() and SAC refactor. (issue #7107) (#7124)
* Exploration API (+EpsilonGreedy sub-class).

* Exploration API (+EpsilonGreedy sub-class).

* Cleanup/LINT.

* Add `deterministic` to generic Trainer config (NOTE: this is still ignored by most Agents).

* Add `error` option to deprecation_warning().

* WIP.

* Bug fix: Get exploration-info for tf framework.
Bug fix: Properly deprecate some DQN config keys.

* WIP.

* LINT.

* WIP.

* Split PerWorkerEpsilonGreedy out of EpsilonGreedy.
Docstrings.

* Fix bug in sampler.py in case Policy has self.exploration = None

* Update rllib/agents/dqn/dqn.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* WIP.

* Update rllib/agents/trainer.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* WIP.

* Change requests.

* LINT

* In tune/utils/util.py::deep_update() Only keep deep_updat'ing if both original and value are dicts. If value is not a dict, set

* Completely obsolete syn_replay_optimizer.py's parameters schedule_max_timesteps AND beta_annealing_fraction (replaced with prioritized_replay_beta_annealing_timesteps).

* Update rllib/evaluation/worker_set.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Review fixes.

* Fix default value for DQN's exploration spec.

* LINT

* Fix recursion bug (wrong parent c'tor).

* Do not pass timestep to get_exploration_info.

* Update tf_policy.py

* Fix some remaining issues with test cases and remove more deprecated DQN/APEX exploration configs.

* Bug fix tf-action-dist

* DDPG incompatibility bug fix with new DQN exploration handling (which is imported by DDPG).

* Switch off exploration when getting action probs from off-policy-estimator's policy.

* LINT

* Fix test_checkpoint_restore.py.

* Deprecate all SAC exploration (unused) configs.

* Properly use `model.last_output()` everywhere. Instead of `model._last_output`.

* WIP.

* Take out set_epsilon from multi-agent-env test (not needed, decays anyway).

* WIP.

* Trigger re-test (flaky checkpoint-restore test).

* WIP.

* WIP.

* Add test case for deterministic action sampling in PPO.

* bug fix.

* Added deterministic test cases for different Agents.

* Fix problem with TupleActions in dynamic-tf-policy.

* Separate supported_spaces tests so they can be run separately for easier debugging.

* LINT.

* Fix autoregressive_action_dist.py test case.

* Re-test.

* Fix.

* Remove duplicate py_test rule from bazel.

* LINT.

* WIP.

* WIP.

* SAC fix.

* SAC fix.

* WIP.

* WIP.

* WIP.

* FIX 2 examples tests.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* Fix.

* LINT.

* Renamed test file.

* WIP.

* Add unittest.main.

* Make action_dist_class mandatory.

* fix

* FIX.

* WIP.

* WIP.

* Fix.

* Fix.

* Fix explorations test case (contextlib cannot find its own nullcontext??).

* Force torch to be installed for QMIX.

* LINT.

* Fix determine_tests_to_run.py.

* Fix determine_tests_to_run.py.

* WIP

* Add Random exploration component to tests (fixed issue with "static-graph randomness" via py_function).

* Add Random exploration component to tests (fixed issue with "static-graph randomness" via py_function).

* Rename some stuff.

* Rename some stuff.

* WIP.

* WIP.

* Fix SAC.

* Fix SAC.

* Fix strange tf-error in ray core tests.

* Fix strange ray-core tf-error in test_memory_scheduling test case.

* Fix test_io.py.

* LINT.

* Update SAC yaml files' config.

Co-authored-by: Eric Liang <ekhliang@gmail.com>
2020-02-22 14:19:49 -08:00

98 lines
3.4 KiB
Python

from collections import namedtuple
import logging
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.utils.annotations import DeveloperAPI
logger = logging.getLogger(__name__)
OffPolicyEstimate = namedtuple("OffPolicyEstimate",
["estimator_name", "metrics"])
@DeveloperAPI
class OffPolicyEstimator:
"""Interface for an off policy reward estimator."""
@DeveloperAPI
def __init__(self, policy, gamma):
"""Creates an off-policy estimator.
Arguments:
policy (Policy): Policy to evaluate.
gamma (float): Discount of the MDP.
"""
self.policy = policy
self.gamma = gamma
self.new_estimates = []
@classmethod
def create(cls, ioctx):
"""Create an off-policy estimator from a IOContext."""
gamma = ioctx.worker.policy_config["gamma"]
# Grab a reference to the current model
keys = list(ioctx.worker.policy_map.keys())
if len(keys) > 1:
raise NotImplementedError(
"Off-policy estimation is not implemented for multi-agent. "
"You can set `input_evaluation: []` to resolve this.")
policy = ioctx.worker.get_policy(keys[0])
return cls(policy, gamma)
@DeveloperAPI
def estimate(self, batch):
"""Returns an estimate for the given batch of experiences.
The batch will only contain data from one episode, but it may only be
a fragment of an episode.
"""
raise NotImplementedError
@DeveloperAPI
def action_prob(self, batch):
"""Returns the probs for the batch actions for the current policy."""
num_state_inputs = 0
for k in batch.keys():
if k.startswith("state_in_"):
num_state_inputs += 1
state_keys = ["state_in_{}".format(i) for i in range(num_state_inputs)]
log_likelihoods = self.policy.compute_log_likelihoods(
actions=batch[SampleBatch.ACTIONS],
obs_batch=batch[SampleBatch.CUR_OBS],
state_batches=[batch[k] for k in state_keys],
prev_action_batch=batch.data.get(SampleBatch.PREV_ACTIONS),
prev_reward_batch=batch.data.get(SampleBatch.PREV_REWARDS))
return log_likelihoods
@DeveloperAPI
def process(self, batch):
self.new_estimates.append(self.estimate(batch))
@DeveloperAPI
def check_can_estimate_for(self, batch):
"""Returns whether we can support OPE for this batch."""
if isinstance(batch, MultiAgentBatch):
raise ValueError(
"IS-estimation is not implemented for multi-agent batches. "
"You can set `input_evaluation: []` to resolve this.")
if "action_prob" not in batch:
raise ValueError(
"Off-policy estimation is not possible unless the inputs "
"include action probabilities (i.e., the policy is stochastic "
"and emits the 'action_prob' key). For DQN this means using "
"`soft_q: True`. You can also set `input_evaluation: []` to "
"disable estimation.")
@DeveloperAPI
def get_metrics(self):
"""Return a list of new episode metric estimates since the last call.
Returns:
list of OffPolicyEstimate objects.
"""
out = self.new_estimates
self.new_estimates = []
return out