ray/rllib/utils/tf_utils.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

571 lines
20 KiB
Python
Raw Permalink Normal View History

import gym
from gym.spaces import Discrete, MultiDiscrete
import logging
import numpy as np
import tree # pip install dm_tree
from typing import Any, Callable, List, Optional, Type, TYPE_CHECKING, Union
from ray.rllib.utils.annotations import PublicAPI, DeveloperAPI
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.spaces.space_utils import get_base_struct_from_space
from ray.rllib.utils.typing import (
LocalOptimizer,
ModelGradients,
PartialAlgorithmConfigDict,
SpaceStruct,
TensorStructType,
TensorType,
)
if TYPE_CHECKING:
from ray.rllib.policy.tf_policy import TFPolicy
logger = logging.getLogger(__name__)
tf1, tf, tfv = try_import_tf()
@PublicAPI
def explained_variance(y: TensorType, pred: TensorType) -> TensorType:
"""Computes the explained variance for a pair of labels and predictions.
The formula used is:
max(-1.0, 1.0 - (std(y - pred)^2 / std(y)^2))
Args:
y: The labels.
pred: The predictions.
Returns:
The explained variance given a pair of labels and predictions.
"""
_, y_var = tf.nn.moments(y, axes=[0])
_, diff_var = tf.nn.moments(y - pred, axes=[0])
return tf.maximum(-1.0, 1 - (diff_var / y_var))
@PublicAPI
def flatten_inputs_to_1d_tensor(
inputs: TensorStructType,
spaces_struct: Optional[SpaceStruct] = None,
time_axis: bool = False,
) -> TensorType:
"""Flattens arbitrary input structs according to the given spaces struct.
Returns a single 1D tensor resulting from the different input
components' values.
Thereby:
- Boxes (any shape) get flattened to (B, [T]?, -1). Note that image boxes
are not treated differently from other types of Boxes and get
flattened as well.
- Discrete (int) values are one-hot'd, e.g. a batch of [1, 0, 3] (B=3 with
Discrete(4) space) results in [[0, 1, 0, 0], [1, 0, 0, 0], [0, 0, 0, 1]].
- MultiDiscrete values are multi-one-hot'd, e.g. a batch of
[[0, 2], [1, 4]] (B=2 with MultiDiscrete([2, 5]) space) results in
[[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 0, 1]].
Args:
inputs: The inputs to be flattened.
spaces_struct: The structure of the spaces that behind the input
time_axis: Whether all inputs have a time-axis (after the batch axis).
If True, will keep not only the batch axis (0th), but the time axis
(1st) as-is and flatten everything from the 2nd axis up.
Returns:
A single 1D tensor resulting from concatenating all
flattened/one-hot'd input components. Depending on the time_axis flag,
the shape is (B, n) or (B, T, n).
Examples:
>>> # B=2
>>> from ray.rllib.utils.tf_utils import flatten_inputs_to_1d_tensor
>>> from gym.spaces import Discrete, Box
>>> out = flatten_inputs_to_1d_tensor( # doctest: +SKIP
... {"a": [1, 0], "b": [[[0.0], [0.1]], [1.0], [1.1]]},
... spaces_struct=dict(a=Discrete(2), b=Box(shape=(2, 1)))
... ) # doctest: +SKIP
>>> print(out) # doctest: +SKIP
[[0.0, 1.0, 0.0, 0.1], [1.0, 0.0, 1.0, 1.1]] # B=2 n=4
>>> # B=2; T=2
>>> out = flatten_inputs_to_1d_tensor( # doctest: +SKIP
... ([[1, 0], [0, 1]],
... [[[0.0, 0.1], [1.0, 1.1]], [[2.0, 2.1], [3.0, 3.1]]]),
... spaces_struct=tuple([Discrete(2), Box(shape=(2, ))]),
... time_axis=True
... ) # doctest: +SKIP
>>> print(out) # doctest: +SKIP
[[[0.0, 1.0, 0.0, 0.1], [1.0, 0.0, 1.0, 1.1]],\
[[1.0, 0.0, 2.0, 2.1], [0.0, 1.0, 3.0, 3.1]]] # B=2 T=2 n=4
"""
flat_inputs = tree.flatten(inputs)
flat_spaces = (
tree.flatten(spaces_struct)
if spaces_struct is not None
else [None] * len(flat_inputs)
)
B = None
T = None
out = []
for input_, space in zip(flat_inputs, flat_spaces):
input_ = tf.convert_to_tensor(input_)
shape = tf.shape(input_)
# Store batch and (if applicable) time dimension.
if B is None:
B = shape[0]
if time_axis:
T = shape[1]
# One-hot encoding.
if isinstance(space, Discrete):
if time_axis:
input_ = tf.reshape(input_, [B * T])
out.append(tf.cast(one_hot(input_, space), tf.float32))
elif isinstance(space, MultiDiscrete):
if time_axis:
input_ = tf.reshape(input_, [B * T, -1])
out.append(tf.cast(one_hot(input_, space), tf.float32))
# Flatten.
else:
if time_axis:
input_ = tf.reshape(input_, [B * T, -1])
else:
input_ = tf.reshape(input_, [B, -1])
out.append(tf.cast(input_, tf.float32))
merged = tf.concat(out, axis=-1)
# Restore the time-dimension, if applicable.
if time_axis:
merged = tf.reshape(merged, [B, T, -1])
return merged
@PublicAPI
def get_gpu_devices() -> List[str]:
"""Returns a list of GPU device names, e.g. ["/gpu:0", "/gpu:1"].
Supports both tf1.x and tf2.x.
Returns:
List of GPU device names (str).
"""
if tfv == 1:
from tensorflow.python.client import device_lib
devices = device_lib.list_local_devices()
else:
try:
devices = tf.config.list_physical_devices()
except Exception:
devices = tf.config.experimental.list_physical_devices()
# Expect "GPU", but also stuff like: "XLA_GPU".
return [d.name for d in devices if "GPU" in d.device_type]
@PublicAPI
def get_placeholder(
*,
space: Optional[gym.Space] = None,
value: Optional[Any] = None,
name: Optional[str] = None,
time_axis: bool = False,
flatten: bool = True
) -> "tf1.placeholder":
"""Returns a tf1.placeholder object given optional hints, such as a space.
Note that the returned placeholder will always have a leading batch
dimension (None).
Args:
space: An optional gym.Space to hint the shape and dtype of the
placeholder.
value: An optional value to hint the shape and dtype of the
placeholder.
name: An optional name for the placeholder.
time_axis: Whether the placeholder should also receive a time
dimension (None).
flatten: Whether to flatten the given space into a plain Box space
and then create the placeholder from the resulting space.
Returns:
The tf1 placeholder.
"""
from ray.rllib.models.catalog import ModelCatalog
if space is not None:
if isinstance(space, (gym.spaces.Dict, gym.spaces.Tuple)):
if flatten:
return ModelCatalog.get_action_placeholder(space, None)
else:
return tree.map_structure_with_path(
lambda path, component: get_placeholder(
space=component,
name=name + "." + ".".join([str(p) for p in path]),
),
get_base_struct_from_space(space),
)
return tf1.placeholder(
shape=(None,) + ((None,) if time_axis else ()) + space.shape,
dtype=tf.float32 if space.dtype == np.float64 else space.dtype,
name=name,
)
else:
assert value is not None
shape = value.shape[1:]
return tf1.placeholder(
shape=(None,)
+ ((None,) if time_axis else ())
+ (shape if isinstance(shape, tuple) else tuple(shape.as_list())),
dtype=tf.float32 if value.dtype == np.float64 else value.dtype,
name=name,
)
@PublicAPI
def get_tf_eager_cls_if_necessary(
orig_cls: Type["TFPolicy"], config: PartialAlgorithmConfigDict
) -> Type["TFPolicy"]:
"""Returns the corresponding tf-eager class for a given TFPolicy class.
Args:
orig_cls: The original TFPolicy class to get the corresponding tf-eager
class for.
config: The Algorithm config dict.
Returns:
The tf eager policy class corresponding to the given TFPolicy class.
"""
cls = orig_cls
framework = config.get("framework", "tf")
if framework in ["tf2", "tf", "tfe"] and not tf1:
raise ImportError("Could not import tensorflow!")
if framework in ["tf2", "tfe"]:
assert tf1.executing_eagerly()
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.policy.eager_tf_policy import EagerTFPolicy
from ray.rllib.policy.eager_tf_policy_v2 import EagerTFPolicyV2
# Create eager-class (if not already one).
if hasattr(orig_cls, "as_eager") and not issubclass(orig_cls, EagerTFPolicy):
cls = orig_cls.as_eager()
# Could be some other type of policy or already
# eager-ized.
elif not issubclass(orig_cls, TFPolicy):
pass
else:
raise ValueError(
"This policy does not support eager execution: {}".format(orig_cls)
)
# Now that we know, policy is an eager one, add tracing, if necessary.
if config.get("eager_tracing") and issubclass(
cls, (EagerTFPolicy, EagerTFPolicyV2)
):
cls = cls.with_tracing()
return cls
@PublicAPI
def huber_loss(x: TensorType, delta: float = 1.0) -> TensorType:
"""Computes the huber loss for a given term and delta parameter.
Reference: https://en.wikipedia.org/wiki/Huber_loss
Note that the factor of 0.5 is implicitly included in the calculation.
Formula:
L = 0.5 * x^2 for small abs x (delta threshold)
L = delta * (abs(x) - 0.5*delta) for larger abs x (delta threshold)
Args:
x: The input term, e.g. a TD error.
delta: The delta parmameter in the above formula.
Returns:
The Huber loss resulting from `x` and `delta`.
"""
return tf.where(
tf.abs(x) < delta, # for small x -> apply the Huber correction
tf.math.square(x) * 0.5,
delta * (tf.abs(x) - 0.5 * delta),
)
@PublicAPI
def make_tf_callable(
session_or_none: Optional["tf1.Session"], dynamic_shape: bool = False
) -> Callable:
"""Returns a function that can be executed in either graph or eager mode.
The function must take only positional args.
If eager is enabled, this will act as just a function. Otherwise, it
will build a function that executes a session run with placeholders
internally.
Args:
session_or_none: tf.Session if in graph mode, else None.
dynamic_shape: True if the placeholders should have a dynamic
batch dimension. Otherwise they will be fixed shape.
Returns:
A function that can be called in either eager or static-graph mode.
"""
if tf.executing_eagerly():
assert session_or_none is None
else:
assert session_or_none is not None
def make_wrapper(fn):
# Static-graph mode: Create placeholders and make a session call each
# time the wrapped function is called. Returns the output of this
# session call.
if session_or_none is not None:
args_placeholders = []
kwargs_placeholders = {}
symbolic_out = [None]
def call(*args, **kwargs):
args_flat = []
for a in args:
if type(a) is list:
args_flat.extend(a)
else:
args_flat.append(a)
args = args_flat
# We have not built any placeholders yet: Do this once here,
# then reuse the same placeholders each time we call this
# function again.
if symbolic_out[0] is None:
with session_or_none.graph.as_default():
def _create_placeholders(path, value):
if dynamic_shape:
if len(value.shape) > 0:
shape = (None,) + value.shape[1:]
else:
shape = ()
else:
shape = value.shape
return tf1.placeholder(
dtype=value.dtype,
shape=shape,
name=".".join([str(p) for p in path]),
)
placeholders = tree.map_structure_with_path(
_create_placeholders, args
)
for ph in tree.flatten(placeholders):
args_placeholders.append(ph)
placeholders = tree.map_structure_with_path(
_create_placeholders, kwargs
)
for k, ph in placeholders.items():
kwargs_placeholders[k] = ph
symbolic_out[0] = fn(*args_placeholders, **kwargs_placeholders)
feed_dict = dict(zip(args_placeholders, tree.flatten(args)))
tree.map_structure(
lambda ph, v: feed_dict.__setitem__(ph, v),
kwargs_placeholders,
kwargs,
)
ret = session_or_none.run(symbolic_out[0], feed_dict)
return ret
return call
# Eager mode (call function as is).
else:
return fn
return make_wrapper
@PublicAPI
def minimize_and_clip(
optimizer: LocalOptimizer,
objective: TensorType,
var_list: List["tf.Variable"],
clip_val: float = 10.0,
) -> ModelGradients:
"""Computes, then clips gradients using objective, optimizer and var list.
Ensures the norm of the gradients for each variable is clipped to
`clip_val`.
Args:
optimizer: Either a shim optimizer (tf eager) containing a
tf.GradientTape under `self.tape` or a tf1 local optimizer
object.
objective: The loss tensor to calculate gradients on.
var_list: The list of tf.Variables to compute gradients over.
clip_val: The global norm clip value. Will clip around -clip_val and
+clip_val.
Returns:
The resulting model gradients (list or tuples of grads + vars)
corresponding to the input `var_list`.
"""
# Accidentally passing values < 0.0 will break all gradients.
assert clip_val is None or clip_val > 0.0, clip_val
if tf.executing_eagerly():
tape = optimizer.tape
grads_and_vars = list(zip(list(tape.gradient(objective, var_list)), var_list))
else:
grads_and_vars = optimizer.compute_gradients(objective, var_list=var_list)
return [
(tf.clip_by_norm(g, clip_val) if clip_val is not None else g, v)
for (g, v) in grads_and_vars
if g is not None
]
@PublicAPI
def one_hot(x: TensorType, space: gym.Space) -> TensorType:
"""Returns a one-hot tensor, given and int tensor and a space.
Handles the MultiDiscrete case as well.
Args:
x: The input tensor.
space: The space to use for generating the one-hot tensor.
Returns:
The resulting one-hot tensor.
Raises:
ValueError: If the given space is not a discrete one.
Examples:
>>> import gym
>>> import tensorflow as tf
>>> from ray.rllib.utils.tf_utils import one_hot
>>> x = tf.Variable([0, 3], dtype=tf.int32) # batch-dim=2
>>> # Discrete space with 4 (one-hot) slots per batch item.
>>> s = gym.spaces.Discrete(4)
>>> one_hot(x, s) # doctest: +SKIP
<tf.Tensor 'one_hot:0' shape=(2, 4) dtype=float32>
>>> x = tf.Variable([[0, 1, 2, 3]], dtype=tf.int32) # batch-dim=1
>>> # MultiDiscrete space with 5 + 4 + 4 + 7 = 20 (one-hot) slots
>>> # per batch item.
>>> s = gym.spaces.MultiDiscrete([5, 4, 4, 7])
>>> one_hot(x, s) # doctest: +SKIP
<tf.Tensor 'concat:0' shape=(1, 20) dtype=float32>
"""
if isinstance(space, Discrete):
return tf.one_hot(x, space.n, dtype=tf.float32)
elif isinstance(space, MultiDiscrete):
if isinstance(space.nvec[0], np.ndarray):
nvec = np.ravel(space.nvec)
x = tf.reshape(x, (x.shape[0], -1))
else:
nvec = space.nvec
return tf.concat(
[tf.one_hot(x[:, i], n, dtype=tf.float32) for i, n in enumerate(nvec)],
axis=-1,
)
else:
raise ValueError("Unsupported space for `one_hot`: {}".format(space))
@PublicAPI
def reduce_mean_ignore_inf(x: TensorType, axis: Optional[int] = None) -> TensorType:
"""Same as tf.reduce_mean() but ignores -inf values.
Args:
x: The input tensor to reduce mean over.
axis: The axis over which to reduce. None for all axes.
Returns:
The mean reduced inputs, ignoring inf values.
"""
mask = tf.not_equal(x, tf.float32.min)
x_zeroed = tf.where(mask, x, tf.zeros_like(x))
return tf.math.reduce_sum(x_zeroed, axis) / tf.math.reduce_sum(
tf.cast(mask, tf.float32), axis
)
@PublicAPI
def scope_vars(
scope: Union[str, "tf1.VariableScope"], trainable_only: bool = False
) -> List["tf.Variable"]:
"""Get variables inside a given scope.
Args:
scope: Scope in which the variables reside.
trainable_only: Whether or not to return only the variables that were
marked as trainable.
Returns:
The list of variables in the given `scope`.
"""
return tf1.get_collection(
tf1.GraphKeys.TRAINABLE_VARIABLES
if trainable_only
else tf1.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name,
)
@PublicAPI
def zero_logps_from_actions(actions: TensorStructType) -> TensorType:
"""Helper function useful for returning dummy logp's (0) for some actions.
Args:
actions: The input actions. This can be any struct
of complex action components or a simple tensor of different
dimensions, e.g. [B], [B, 2], or {"a": [B, 4, 5], "b": [B]}.
Returns:
A 1D tensor of 0.0 (dummy logp's) matching the batch
dim of `actions` (shape=[B]).
"""
# Need to flatten `actions` in case we have a complex action space.
# Take the 0th component to extract the batch dim.
action_component = tree.flatten(actions)[0]
logp_ = tf.zeros_like(action_component, dtype=tf.float32)
# Logp's should be single values (but with the same batch dim as
# `deterministic_actions` or `stochastic_actions`). In case
# actions are just [B], zeros_like works just fine here, but if
# actions are [B, ...], we have to reduce logp back to just [B].
while len(logp_.shape) > 1:
logp_ = logp_[:, 0]
return logp_
@DeveloperAPI
def warn_if_infinite_kl_divergence(
policy: Type["TFPolicy"], mean_kl_loss: TensorType
) -> None:
def print_warning():
logger.warning(
"KL divergence is non-finite, this will likely destabilize your model and"
" the training process. Action(s) in a specific state have near-zero"
" probability. This can happen naturally in deterministic environments"
" where the optimal policy has zero mass for a specific action. To fix this"
" issue, consider setting the coefficient for the KL loss term to zero or"
" increasing policy entropy."
)
return tf.constant(0.0)
if policy.loss_initialized():
tf.cond(
tf.math.is_inf(mean_kl_loss),
false_fn=lambda: tf.constant(0.0),
true_fn=lambda: print_warning(),
)