2019-02-13 16:25:05 -08:00
|
|
|
from collections import namedtuple
|
|
|
|
import logging
|
|
|
|
|
2020-08-16 14:25:12 -04:00
|
|
|
import numpy as np
|
|
|
|
|
2020-02-22 23:19:49 +01:00
|
|
|
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
|
2020-07-27 14:01:17 -07:00
|
|
|
from ray.rllib.policy import Policy
|
2019-02-13 16:25:05 -08:00
|
|
|
from ray.rllib.utils.annotations import DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
from ray.rllib.offline.io_context import IOContext
|
2020-08-16 14:25:12 -04:00
|
|
|
from ray.rllib.utils.numpy import convert_to_numpy
|
2020-08-15 13:24:22 +02:00
|
|
|
from ray.rllib.utils.typing import TensorType, SampleBatchType
|
2020-07-27 14:01:17 -07:00
|
|
|
from typing import List
|
2019-02-13 16:25:05 -08:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
OffPolicyEstimate = namedtuple("OffPolicyEstimate",
|
|
|
|
["estimator_name", "metrics"])
|
|
|
|
|
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-01-02 17:42:13 -08:00
|
|
|
class OffPolicyEstimator:
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Interface for an off policy reward estimator."""
|
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
def __init__(self, policy: Policy, gamma: float):
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Creates an off-policy estimator.
|
|
|
|
|
2020-09-20 11:27:02 +02:00
|
|
|
Args:
|
2019-05-20 16:46:05 -07:00
|
|
|
policy (Policy): Policy to evaluate.
|
2019-02-13 16:25:05 -08:00
|
|
|
gamma (float): Discount of the MDP.
|
|
|
|
"""
|
|
|
|
self.policy = policy
|
|
|
|
self.gamma = gamma
|
|
|
|
self.new_estimates = []
|
|
|
|
|
|
|
|
@classmethod
|
2020-07-27 14:01:17 -07:00
|
|
|
def create(cls, ioctx: IOContext) -> "OffPolicyEstimator":
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Create an off-policy estimator from a IOContext."""
|
2019-06-03 06:49:24 +08:00
|
|
|
gamma = ioctx.worker.policy_config["gamma"]
|
2019-02-13 16:25:05 -08:00
|
|
|
# Grab a reference to the current model
|
2019-06-03 06:49:24 +08:00
|
|
|
keys = list(ioctx.worker.policy_map.keys())
|
2019-02-13 16:25:05 -08:00
|
|
|
if len(keys) > 1:
|
|
|
|
raise NotImplementedError(
|
|
|
|
"Off-policy estimation is not implemented for multi-agent. "
|
|
|
|
"You can set `input_evaluation: []` to resolve this.")
|
2019-06-03 06:49:24 +08:00
|
|
|
policy = ioctx.worker.get_policy(keys[0])
|
2019-02-13 16:25:05 -08:00
|
|
|
return cls(policy, gamma)
|
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
def estimate(self, batch: SampleBatchType):
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Returns an estimate for the given batch of experiences.
|
|
|
|
|
|
|
|
The batch will only contain data from one episode, but it may only be
|
|
|
|
a fragment of an episode.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-08-16 14:25:12 -04:00
|
|
|
def action_prob(self, batch: SampleBatchType) -> np.ndarray:
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Returns the probs for the batch actions for the current policy."""
|
|
|
|
|
|
|
|
num_state_inputs = 0
|
|
|
|
for k in batch.keys():
|
|
|
|
if k.startswith("state_in_"):
|
|
|
|
num_state_inputs += 1
|
|
|
|
state_keys = ["state_in_{}".format(i) for i in range(num_state_inputs)]
|
2020-08-16 14:25:12 -04:00
|
|
|
log_likelihoods: TensorType = self.policy.compute_log_likelihoods(
|
2020-02-22 23:19:49 +01:00
|
|
|
actions=batch[SampleBatch.ACTIONS],
|
|
|
|
obs_batch=batch[SampleBatch.CUR_OBS],
|
2019-02-13 16:25:05 -08:00
|
|
|
state_batches=[batch[k] for k in state_keys],
|
2021-04-15 19:19:51 +02:00
|
|
|
prev_action_batch=batch.get(SampleBatch.PREV_ACTIONS),
|
|
|
|
prev_reward_batch=batch.get(SampleBatch.PREV_REWARDS))
|
2020-11-20 08:59:43 +01:00
|
|
|
log_likelihoods = convert_to_numpy(log_likelihoods)
|
|
|
|
return np.exp(log_likelihoods)
|
2019-02-13 16:25:05 -08:00
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
def process(self, batch: SampleBatchType):
|
2019-02-13 16:25:05 -08:00
|
|
|
self.new_estimates.append(self.estimate(batch))
|
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
def check_can_estimate_for(self, batch: SampleBatchType):
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Returns whether we can support OPE for this batch."""
|
|
|
|
|
|
|
|
if isinstance(batch, MultiAgentBatch):
|
|
|
|
raise ValueError(
|
|
|
|
"IS-estimation is not implemented for multi-agent batches. "
|
|
|
|
"You can set `input_evaluation: []` to resolve this.")
|
|
|
|
|
|
|
|
if "action_prob" not in batch:
|
|
|
|
raise ValueError(
|
|
|
|
"Off-policy estimation is not possible unless the inputs "
|
|
|
|
"include action probabilities (i.e., the policy is stochastic "
|
2019-03-21 00:15:24 -07:00
|
|
|
"and emits the 'action_prob' key). For DQN this means using "
|
2020-03-01 20:53:35 +01:00
|
|
|
"`exploration_config: {type: 'SoftQ'}`. You can also set "
|
|
|
|
"`input_evaluation: []` to disable estimation.")
|
2019-02-13 16:25:05 -08:00
|
|
|
|
|
|
|
@DeveloperAPI
|
2020-07-27 14:01:17 -07:00
|
|
|
def get_metrics(self) -> List[OffPolicyEstimate]:
|
2019-02-13 16:25:05 -08:00
|
|
|
"""Return a list of new episode metric estimates since the last call.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
list of OffPolicyEstimate objects.
|
|
|
|
"""
|
|
|
|
out = self.new_estimates
|
|
|
|
self.new_estimates = []
|
|
|
|
return out
|