ray/rllib/models/torch/misc.py

195 lines
6.8 KiB
Python

""" Code adapted from https://github.com/ikostrikov/pytorch-a3c"""
import numpy as np
from typing import Union, Tuple, Any, List
from ray.rllib.models.utils import get_activation_fn
from ray.rllib.utils.annotations import DeveloperAPI
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.typing import TensorType
torch, nn = try_import_torch()
@DeveloperAPI
def normc_initializer(std: float = 1.0) -> Any:
def initializer(tensor):
tensor.data.normal_(0, 1)
tensor.data *= std / torch.sqrt(tensor.data.pow(2).sum(1, keepdim=True))
return initializer
@DeveloperAPI
def same_padding(
in_size: Tuple[int, int],
filter_size: Tuple[int, int],
stride_size: Union[int, Tuple[int, int]],
) -> (Union[int, Tuple[int, int]], Tuple[int, int]):
"""Note: Padding is added to match TF conv2d `same` padding. See
www.tensorflow.org/versions/r0.12/api_docs/python/nn/convolution
Args:
in_size: Rows (Height), Column (Width) for input
stride_size (Union[int,Tuple[int, int]]): Rows (Height), column (Width)
for stride. If int, height == width.
filter_size: Rows (Height), column (Width) for filter
Returns:
padding: For input into torch.nn.ZeroPad2d.
output: Output shape after padding and convolution.
"""
in_height, in_width = in_size
if isinstance(filter_size, int):
filter_height, filter_width = filter_size, filter_size
else:
filter_height, filter_width = filter_size
if isinstance(stride_size, (int, float)):
stride_height, stride_width = int(stride_size), int(stride_size)
else:
stride_height, stride_width = int(stride_size[0]), int(stride_size[1])
out_height = np.ceil(float(in_height) / float(stride_height))
out_width = np.ceil(float(in_width) / float(stride_width))
pad_along_height = int(
((out_height - 1) * stride_height + filter_height - in_height)
)
pad_along_width = int(((out_width - 1) * stride_width + filter_width - in_width))
pad_top = pad_along_height // 2
pad_bottom = pad_along_height - pad_top
pad_left = pad_along_width // 2
pad_right = pad_along_width - pad_left
padding = (pad_left, pad_right, pad_top, pad_bottom)
output = (out_height, out_width)
return padding, output
@DeveloperAPI
class SlimConv2d(nn.Module):
"""Simple mock of tf.slim Conv2d"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]],
padding: Union[int, Tuple[int, int]],
# Defaulting these to nn.[..] will break soft torch import.
initializer: Any = "default",
activation_fn: Any = "default",
bias_init: float = 0,
):
"""Creates a standard Conv2d layer, similar to torch.nn.Conv2d
Args:
in_channels: Number of input channels
out_channels: Number of output channels
kernel: If int, the kernel is
a tuple(x,x). Elsewise, the tuple can be specified
stride: Controls the stride
for the cross-correlation. If int, the stride is a
tuple(x,x). Elsewise, the tuple can be specified
padding: Controls the amount
of implicit zero-paddings during the conv operation
initializer: Initializer function for kernel weights
activation_fn: Activation function at the end of layer
bias_init: Initalize bias weights to bias_init const
"""
super(SlimConv2d, self).__init__()
layers = []
# Padding layer.
if padding:
layers.append(nn.ZeroPad2d(padding))
# Actual Conv2D layer (including correct initialization logic).
conv = nn.Conv2d(in_channels, out_channels, kernel, stride)
if initializer:
if initializer == "default":
initializer = nn.init.xavier_uniform_
initializer(conv.weight)
nn.init.constant_(conv.bias, bias_init)
layers.append(conv)
# Activation function (if any; default=ReLu).
if isinstance(activation_fn, str):
if activation_fn == "default":
activation_fn = nn.ReLU
else:
activation_fn = get_activation_fn(activation_fn, "torch")
if activation_fn is not None:
layers.append(activation_fn())
# Put everything in sequence.
self._model = nn.Sequential(*layers)
def forward(self, x: TensorType) -> TensorType:
return self._model(x)
@DeveloperAPI
class SlimFC(nn.Module):
"""Simple PyTorch version of `linear` function"""
def __init__(
self,
in_size: int,
out_size: int,
initializer: Any = None,
activation_fn: Any = None,
use_bias: bool = True,
bias_init: float = 0.0,
):
"""Creates a standard FC layer, similar to torch.nn.Linear
Args:
in_size: Input size for FC Layer
out_size: Output size for FC Layer
initializer: Initializer function for FC layer weights
activation_fn: Activation function at the end of layer
use_bias: Whether to add bias weights or not
bias_init: Initalize bias weights to bias_init const
"""
super(SlimFC, self).__init__()
layers = []
# Actual nn.Linear layer (including correct initialization logic).
linear = nn.Linear(in_size, out_size, bias=use_bias)
if initializer is None:
initializer = nn.init.xavier_uniform_
initializer(linear.weight)
if use_bias is True:
nn.init.constant_(linear.bias, bias_init)
layers.append(linear)
# Activation function (if any; default=None (linear)).
if isinstance(activation_fn, str):
activation_fn = get_activation_fn(activation_fn, "torch")
if activation_fn is not None:
layers.append(activation_fn())
# Put everything in sequence.
self._model = nn.Sequential(*layers)
def forward(self, x: TensorType) -> TensorType:
return self._model(x)
@DeveloperAPI
class AppendBiasLayer(nn.Module):
"""Simple bias appending layer for free_log_std."""
def __init__(self, num_bias_vars: int):
super().__init__()
self.log_std = torch.nn.Parameter(torch.as_tensor([0.0] * num_bias_vars))
self.register_parameter("log_std", self.log_std)
def forward(self, x: TensorType) -> TensorType:
out = torch.cat([x, self.log_std.unsqueeze(0).repeat([len(x), 1])], axis=1)
return out
@DeveloperAPI
class Reshape(nn.Module):
"""Standard module that reshapes/views a tensor"""
def __init__(self, shape: List):
super().__init__()
self.shape = shape
def forward(self, x):
return x.view(*self.shape)