ray/rllib/utils/tf_utils.py

570 lines
20 KiB
Python

import gym
from gym.spaces import Discrete, MultiDiscrete
import logging
import numpy as np
import tree # pip install dm_tree
from typing import Any, Callable, List, Optional, Type, TYPE_CHECKING, Union
from ray.rllib.utils.annotations import PublicAPI, DeveloperAPI
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.spaces.space_utils import get_base_struct_from_space
from ray.rllib.utils.typing import (
LocalOptimizer,
ModelGradients,
PartialAlgorithmConfigDict,
SpaceStruct,
TensorStructType,
TensorType,
)
if TYPE_CHECKING:
from ray.rllib.policy.tf_policy import TFPolicy
logger = logging.getLogger(__name__)
tf1, tf, tfv = try_import_tf()
@PublicAPI
def explained_variance(y: TensorType, pred: TensorType) -> TensorType:
"""Computes the explained variance for a pair of labels and predictions.
The formula used is:
max(-1.0, 1.0 - (std(y - pred)^2 / std(y)^2))
Args:
y: The labels.
pred: The predictions.
Returns:
The explained variance given a pair of labels and predictions.
"""
_, y_var = tf.nn.moments(y, axes=[0])
_, diff_var = tf.nn.moments(y - pred, axes=[0])
return tf.maximum(-1.0, 1 - (diff_var / y_var))
@PublicAPI
def flatten_inputs_to_1d_tensor(
inputs: TensorStructType,
spaces_struct: Optional[SpaceStruct] = None,
time_axis: bool = False,
) -> TensorType:
"""Flattens arbitrary input structs according to the given spaces struct.
Returns a single 1D tensor resulting from the different input
components' values.
Thereby:
- Boxes (any shape) get flattened to (B, [T]?, -1). Note that image boxes
are not treated differently from other types of Boxes and get
flattened as well.
- Discrete (int) values are one-hot'd, e.g. a batch of [1, 0, 3] (B=3 with
Discrete(4) space) results in [[0, 1, 0, 0], [1, 0, 0, 0], [0, 0, 0, 1]].
- MultiDiscrete values are multi-one-hot'd, e.g. a batch of
[[0, 2], [1, 4]] (B=2 with MultiDiscrete([2, 5]) space) results in
[[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 0, 1]].
Args:
inputs: The inputs to be flattened.
spaces_struct: The structure of the spaces that behind the input
time_axis: Whether all inputs have a time-axis (after the batch axis).
If True, will keep not only the batch axis (0th), but the time axis
(1st) as-is and flatten everything from the 2nd axis up.
Returns:
A single 1D tensor resulting from concatenating all
flattened/one-hot'd input components. Depending on the time_axis flag,
the shape is (B, n) or (B, T, n).
Examples:
>>> # B=2
>>> from ray.rllib.utils.tf_utils import flatten_inputs_to_1d_tensor
>>> from gym.spaces import Discrete, Box
>>> out = flatten_inputs_to_1d_tensor( # doctest: +SKIP
... {"a": [1, 0], "b": [[[0.0], [0.1]], [1.0], [1.1]]},
... spaces_struct=dict(a=Discrete(2), b=Box(shape=(2, 1)))
... ) # doctest: +SKIP
>>> print(out) # doctest: +SKIP
[[0.0, 1.0, 0.0, 0.1], [1.0, 0.0, 1.0, 1.1]] # B=2 n=4
>>> # B=2; T=2
>>> out = flatten_inputs_to_1d_tensor( # doctest: +SKIP
... ([[1, 0], [0, 1]],
... [[[0.0, 0.1], [1.0, 1.1]], [[2.0, 2.1], [3.0, 3.1]]]),
... spaces_struct=tuple([Discrete(2), Box(shape=(2, ))]),
... time_axis=True
... ) # doctest: +SKIP
>>> print(out) # doctest: +SKIP
[[[0.0, 1.0, 0.0, 0.1], [1.0, 0.0, 1.0, 1.1]],\
[[1.0, 0.0, 2.0, 2.1], [0.0, 1.0, 3.0, 3.1]]] # B=2 T=2 n=4
"""
flat_inputs = tree.flatten(inputs)
flat_spaces = (
tree.flatten(spaces_struct)
if spaces_struct is not None
else [None] * len(flat_inputs)
)
B = None
T = None
out = []
for input_, space in zip(flat_inputs, flat_spaces):
input_ = tf.convert_to_tensor(input_)
shape = tf.shape(input_)
# Store batch and (if applicable) time dimension.
if B is None:
B = shape[0]
if time_axis:
T = shape[1]
# One-hot encoding.
if isinstance(space, Discrete):
if time_axis:
input_ = tf.reshape(input_, [B * T])
out.append(tf.cast(one_hot(input_, space), tf.float32))
elif isinstance(space, MultiDiscrete):
if time_axis:
input_ = tf.reshape(input_, [B * T, -1])
out.append(tf.cast(one_hot(input_, space), tf.float32))
# Flatten.
else:
if time_axis:
input_ = tf.reshape(input_, [B * T, -1])
else:
input_ = tf.reshape(input_, [B, -1])
out.append(tf.cast(input_, tf.float32))
merged = tf.concat(out, axis=-1)
# Restore the time-dimension, if applicable.
if time_axis:
merged = tf.reshape(merged, [B, T, -1])
return merged
@PublicAPI
def get_gpu_devices() -> List[str]:
"""Returns a list of GPU device names, e.g. ["/gpu:0", "/gpu:1"].
Supports both tf1.x and tf2.x.
Returns:
List of GPU device names (str).
"""
if tfv == 1:
from tensorflow.python.client import device_lib
devices = device_lib.list_local_devices()
else:
try:
devices = tf.config.list_physical_devices()
except Exception:
devices = tf.config.experimental.list_physical_devices()
# Expect "GPU", but also stuff like: "XLA_GPU".
return [d.name for d in devices if "GPU" in d.device_type]
@PublicAPI
def get_placeholder(
*,
space: Optional[gym.Space] = None,
value: Optional[Any] = None,
name: Optional[str] = None,
time_axis: bool = False,
flatten: bool = True
) -> "tf1.placeholder":
"""Returns a tf1.placeholder object given optional hints, such as a space.
Note that the returned placeholder will always have a leading batch
dimension (None).
Args:
space: An optional gym.Space to hint the shape and dtype of the
placeholder.
value: An optional value to hint the shape and dtype of the
placeholder.
name: An optional name for the placeholder.
time_axis: Whether the placeholder should also receive a time
dimension (None).
flatten: Whether to flatten the given space into a plain Box space
and then create the placeholder from the resulting space.
Returns:
The tf1 placeholder.
"""
from ray.rllib.models.catalog import ModelCatalog
if space is not None:
if isinstance(space, (gym.spaces.Dict, gym.spaces.Tuple)):
if flatten:
return ModelCatalog.get_action_placeholder(space, None)
else:
return tree.map_structure_with_path(
lambda path, component: get_placeholder(
space=component,
name=name + "." + ".".join([str(p) for p in path]),
),
get_base_struct_from_space(space),
)
return tf1.placeholder(
shape=(None,) + ((None,) if time_axis else ()) + space.shape,
dtype=tf.float32 if space.dtype == np.float64 else space.dtype,
name=name,
)
else:
assert value is not None
shape = value.shape[1:]
return tf1.placeholder(
shape=(None,)
+ ((None,) if time_axis else ())
+ (shape if isinstance(shape, tuple) else tuple(shape.as_list())),
dtype=tf.float32 if value.dtype == np.float64 else value.dtype,
name=name,
)
@PublicAPI
def get_tf_eager_cls_if_necessary(
orig_cls: Type["TFPolicy"], config: PartialAlgorithmConfigDict
) -> Type["TFPolicy"]:
"""Returns the corresponding tf-eager class for a given TFPolicy class.
Args:
orig_cls: The original TFPolicy class to get the corresponding tf-eager
class for.
config: The Algorithm config dict.
Returns:
The tf eager policy class corresponding to the given TFPolicy class.
"""
cls = orig_cls
framework = config.get("framework", "tf")
if framework in ["tf2", "tf", "tfe"] and not tf1:
raise ImportError("Could not import tensorflow!")
if framework in ["tf2", "tfe"]:
assert tf1.executing_eagerly()
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.policy.eager_tf_policy import EagerTFPolicy
from ray.rllib.policy.eager_tf_policy_v2 import EagerTFPolicyV2
# Create eager-class (if not already one).
if hasattr(orig_cls, "as_eager") and not issubclass(orig_cls, EagerTFPolicy):
cls = orig_cls.as_eager()
# Could be some other type of policy or already
# eager-ized.
elif not issubclass(orig_cls, TFPolicy):
pass
else:
raise ValueError(
"This policy does not support eager execution: {}".format(orig_cls)
)
# Now that we know, policy is an eager one, add tracing, if necessary.
if config.get("eager_tracing") and issubclass(
cls, (EagerTFPolicy, EagerTFPolicyV2)
):
cls = cls.with_tracing()
return cls
@PublicAPI
def huber_loss(x: TensorType, delta: float = 1.0) -> TensorType:
"""Computes the huber loss for a given term and delta parameter.
Reference: https://en.wikipedia.org/wiki/Huber_loss
Note that the factor of 0.5 is implicitly included in the calculation.
Formula:
L = 0.5 * x^2 for small abs x (delta threshold)
L = delta * (abs(x) - 0.5*delta) for larger abs x (delta threshold)
Args:
x: The input term, e.g. a TD error.
delta: The delta parmameter in the above formula.
Returns:
The Huber loss resulting from `x` and `delta`.
"""
return tf.where(
tf.abs(x) < delta, # for small x -> apply the Huber correction
tf.math.square(x) * 0.5,
delta * (tf.abs(x) - 0.5 * delta),
)
@PublicAPI
def make_tf_callable(
session_or_none: Optional["tf1.Session"], dynamic_shape: bool = False
) -> Callable:
"""Returns a function that can be executed in either graph or eager mode.
The function must take only positional args.
If eager is enabled, this will act as just a function. Otherwise, it
will build a function that executes a session run with placeholders
internally.
Args:
session_or_none: tf.Session if in graph mode, else None.
dynamic_shape: True if the placeholders should have a dynamic
batch dimension. Otherwise they will be fixed shape.
Returns:
A function that can be called in either eager or static-graph mode.
"""
if tf.executing_eagerly():
assert session_or_none is None
else:
assert session_or_none is not None
def make_wrapper(fn):
# Static-graph mode: Create placeholders and make a session call each
# time the wrapped function is called. Returns the output of this
# session call.
if session_or_none is not None:
args_placeholders = []
kwargs_placeholders = {}
symbolic_out = [None]
def call(*args, **kwargs):
args_flat = []
for a in args:
if type(a) is list:
args_flat.extend(a)
else:
args_flat.append(a)
args = args_flat
# We have not built any placeholders yet: Do this once here,
# then reuse the same placeholders each time we call this
# function again.
if symbolic_out[0] is None:
with session_or_none.graph.as_default():
def _create_placeholders(path, value):
if dynamic_shape:
if len(value.shape) > 0:
shape = (None,) + value.shape[1:]
else:
shape = ()
else:
shape = value.shape
return tf1.placeholder(
dtype=value.dtype,
shape=shape,
name=".".join([str(p) for p in path]),
)
placeholders = tree.map_structure_with_path(
_create_placeholders, args
)
for ph in tree.flatten(placeholders):
args_placeholders.append(ph)
placeholders = tree.map_structure_with_path(
_create_placeholders, kwargs
)
for k, ph in placeholders.items():
kwargs_placeholders[k] = ph
symbolic_out[0] = fn(*args_placeholders, **kwargs_placeholders)
feed_dict = dict(zip(args_placeholders, tree.flatten(args)))
tree.map_structure(
lambda ph, v: feed_dict.__setitem__(ph, v),
kwargs_placeholders,
kwargs,
)
ret = session_or_none.run(symbolic_out[0], feed_dict)
return ret
return call
# Eager mode (call function as is).
else:
return fn
return make_wrapper
@PublicAPI
def minimize_and_clip(
optimizer: LocalOptimizer,
objective: TensorType,
var_list: List["tf.Variable"],
clip_val: float = 10.0,
) -> ModelGradients:
"""Computes, then clips gradients using objective, optimizer and var list.
Ensures the norm of the gradients for each variable is clipped to
`clip_val`.
Args:
optimizer: Either a shim optimizer (tf eager) containing a
tf.GradientTape under `self.tape` or a tf1 local optimizer
object.
objective: The loss tensor to calculate gradients on.
var_list: The list of tf.Variables to compute gradients over.
clip_val: The global norm clip value. Will clip around -clip_val and
+clip_val.
Returns:
The resulting model gradients (list or tuples of grads + vars)
corresponding to the input `var_list`.
"""
# Accidentally passing values < 0.0 will break all gradients.
assert clip_val is None or clip_val > 0.0, clip_val
if tf.executing_eagerly():
tape = optimizer.tape
grads_and_vars = list(zip(list(tape.gradient(objective, var_list)), var_list))
else:
grads_and_vars = optimizer.compute_gradients(objective, var_list=var_list)
return [
(tf.clip_by_norm(g, clip_val) if clip_val is not None else g, v)
for (g, v) in grads_and_vars
if g is not None
]
@PublicAPI
def one_hot(x: TensorType, space: gym.Space) -> TensorType:
"""Returns a one-hot tensor, given and int tensor and a space.
Handles the MultiDiscrete case as well.
Args:
x: The input tensor.
space: The space to use for generating the one-hot tensor.
Returns:
The resulting one-hot tensor.
Raises:
ValueError: If the given space is not a discrete one.
Examples:
>>> import gym
>>> import tensorflow as tf
>>> from ray.rllib.utils.tf_utils import one_hot
>>> x = tf.Variable([0, 3], dtype=tf.int32) # batch-dim=2
>>> # Discrete space with 4 (one-hot) slots per batch item.
>>> s = gym.spaces.Discrete(4)
>>> one_hot(x, s) # doctest: +SKIP
<tf.Tensor 'one_hot:0' shape=(2, 4) dtype=float32>
>>> x = tf.Variable([[0, 1, 2, 3]], dtype=tf.int32) # batch-dim=1
>>> # MultiDiscrete space with 5 + 4 + 4 + 7 = 20 (one-hot) slots
>>> # per batch item.
>>> s = gym.spaces.MultiDiscrete([5, 4, 4, 7])
>>> one_hot(x, s) # doctest: +SKIP
<tf.Tensor 'concat:0' shape=(1, 20) dtype=float32>
"""
if isinstance(space, Discrete):
return tf.one_hot(x, space.n, dtype=tf.float32)
elif isinstance(space, MultiDiscrete):
if isinstance(space.nvec[0], np.ndarray):
nvec = np.ravel(space.nvec)
x = tf.reshape(x, (x.shape[0], -1))
else:
nvec = space.nvec
return tf.concat(
[tf.one_hot(x[:, i], n, dtype=tf.float32) for i, n in enumerate(nvec)],
axis=-1,
)
else:
raise ValueError("Unsupported space for `one_hot`: {}".format(space))
@PublicAPI
def reduce_mean_ignore_inf(x: TensorType, axis: Optional[int] = None) -> TensorType:
"""Same as tf.reduce_mean() but ignores -inf values.
Args:
x: The input tensor to reduce mean over.
axis: The axis over which to reduce. None for all axes.
Returns:
The mean reduced inputs, ignoring inf values.
"""
mask = tf.not_equal(x, tf.float32.min)
x_zeroed = tf.where(mask, x, tf.zeros_like(x))
return tf.math.reduce_sum(x_zeroed, axis) / tf.math.reduce_sum(
tf.cast(mask, tf.float32), axis
)
@PublicAPI
def scope_vars(
scope: Union[str, "tf1.VariableScope"], trainable_only: bool = False
) -> List["tf.Variable"]:
"""Get variables inside a given scope.
Args:
scope: Scope in which the variables reside.
trainable_only: Whether or not to return only the variables that were
marked as trainable.
Returns:
The list of variables in the given `scope`.
"""
return tf1.get_collection(
tf1.GraphKeys.TRAINABLE_VARIABLES
if trainable_only
else tf1.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name,
)
@PublicAPI
def zero_logps_from_actions(actions: TensorStructType) -> TensorType:
"""Helper function useful for returning dummy logp's (0) for some actions.
Args:
actions: The input actions. This can be any struct
of complex action components or a simple tensor of different
dimensions, e.g. [B], [B, 2], or {"a": [B, 4, 5], "b": [B]}.
Returns:
A 1D tensor of 0.0 (dummy logp's) matching the batch
dim of `actions` (shape=[B]).
"""
# Need to flatten `actions` in case we have a complex action space.
# Take the 0th component to extract the batch dim.
action_component = tree.flatten(actions)[0]
logp_ = tf.zeros_like(action_component, dtype=tf.float32)
# Logp's should be single values (but with the same batch dim as
# `deterministic_actions` or `stochastic_actions`). In case
# actions are just [B], zeros_like works just fine here, but if
# actions are [B, ...], we have to reduce logp back to just [B].
while len(logp_.shape) > 1:
logp_ = logp_[:, 0]
return logp_
@DeveloperAPI
def warn_if_infinite_kl_divergence(
policy: Type["TFPolicy"], mean_kl_loss: TensorType
) -> None:
def print_warning():
logger.warning(
"KL divergence is non-finite, this will likely destabilize your model and"
" the training process. Action(s) in a specific state have near-zero"
" probability. This can happen naturally in deterministic environments"
" where the optimal policy has zero mass for a specific action. To fix this"
" issue, consider setting the coefficient for the KL loss term to zero or"
" increasing policy entropy."
)
return tf.constant(0.0)
if policy.loss_initialized():
tf.cond(
tf.math.is_inf(mean_kl_loss),
false_fn=lambda: tf.constant(0.0),
true_fn=lambda: print_warning(),
)