import logging import numpy as np from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 from ray.rllib.models.torch.misc import SlimFC, AppendBiasLayer, \ normc_initializer from ray.rllib.utils.annotations import override from ray.rllib.utils.framework import get_activation_fn from ray.rllib.utils import try_import_torch torch, nn = try_import_torch() logger = logging.getLogger(__name__) class FullyConnectedNetwork(TorchModelV2, nn.Module): """Generic fully connected network.""" def __init__(self, obs_space, action_space, num_outputs, model_config, name): TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name) nn.Module.__init__(self) activation = get_activation_fn( model_config.get("fcnet_activation"), framework="torch") hiddens = model_config.get("fcnet_hiddens") no_final_linear = model_config.get("no_final_linear") self.free_log_std = model_config.get("free_log_std") # TODO(sven): implement case: vf_shared_layers = False. # vf_share_layers = model_config.get("vf_share_layers") logger.debug("Constructing fcnet {} {}".format(hiddens, activation)) layers = [] prev_layer_size = int(np.product(obs_space.shape)) self._logits = None # Maybe generate free-floating bias variables for the second half of # the outputs. if self.free_log_std: assert num_outputs % 2 == 0, ( "num_outputs must be divisible by two", num_outputs) num_outputs = num_outputs // 2 # Create layers 0 to second-last. for size in hiddens[:-1]: layers.append( SlimFC( in_size=prev_layer_size, out_size=size, initializer=normc_initializer(1.0), activation_fn=activation)) prev_layer_size = size # The last layer is adjusted to be of size num_outputs, but it's a # layer with activation. if no_final_linear and num_outputs: layers.append( SlimFC( in_size=prev_layer_size, out_size=num_outputs, initializer=normc_initializer(1.0), activation_fn=activation)) prev_layer_size = num_outputs # Finish the layers with the provided sizes (`hiddens`), plus - # iff num_outputs > 0 - a last linear layer of size num_outputs. else: if len(hiddens) > 0: layers.append( SlimFC( in_size=prev_layer_size, out_size=hiddens[-1], initializer=normc_initializer(1.0), activation_fn=activation)) prev_layer_size = hiddens[-1] if num_outputs: self._logits = SlimFC( in_size=prev_layer_size, out_size=num_outputs, initializer=normc_initializer(0.01), activation_fn=None) else: self.num_outputs = ( [np.product(obs_space.shape)] + hiddens[-1:-1])[-1] # Layer to add the log std vars to the state-dependent means. if self.free_log_std: self._append_free_log_std = AppendBiasLayer(num_outputs) self._hidden_layers = nn.Sequential(*layers) # TODO(sven): Implement non-shared value branch. self._value_branch = SlimFC( in_size=prev_layer_size, out_size=1, initializer=normc_initializer(1.0), activation_fn=None) # Holds the current value output. self._cur_value = None @override(TorchModelV2) def forward(self, input_dict, state, seq_lens): obs = input_dict["obs_flat"].float() features = self._hidden_layers(obs.reshape(obs.shape[0], -1)) logits = self._logits(features) if self._logits else features if self.free_log_std: logits = self._append_free_log_std(logits) self._cur_value = self._value_branch(features).squeeze(1) return logits, state @override(TorchModelV2) def value_function(self): assert self._cur_value is not None, "must call forward() first" return self._cur_value