ray/rllib/models/tf/fcnet.py
Balaji Veeramani 7f1bacc7dc
[CI] Format Python code with Black (#21975)
See #21316 and #21311 for the motivation behind these changes.
2022-01-29 18:41:57 -08:00

278 lines
10 KiB
Python

import numpy as np
import gym
from typing import Dict, Optional, Sequence
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.utils import get_activation_fn
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.typing import TensorType, List, ModelConfigDict
tf1, tf, tfv = try_import_tf()
# TODO: (sven) obsolete this class once we only support native keras models.
class FullyConnectedNetwork(TFModelV2):
"""Generic fully connected network implemented in ModelV2 API."""
def __init__(
self,
obs_space: gym.spaces.Space,
action_space: gym.spaces.Space,
num_outputs: int,
model_config: ModelConfigDict,
name: str,
):
super(FullyConnectedNetwork, self).__init__(
obs_space, action_space, num_outputs, model_config, name
)
hiddens = list(model_config.get("fcnet_hiddens", [])) + list(
model_config.get("post_fcnet_hiddens", [])
)
activation = model_config.get("fcnet_activation")
if not model_config.get("fcnet_hiddens", []):
activation = model_config.get("post_fcnet_activation")
activation = get_activation_fn(activation)
no_final_linear = model_config.get("no_final_linear")
vf_share_layers = model_config.get("vf_share_layers")
free_log_std = model_config.get("free_log_std")
# Generate free-floating bias variables for the second half of
# the outputs.
if free_log_std:
assert num_outputs % 2 == 0, (
"num_outputs must be divisible by two",
num_outputs,
)
num_outputs = num_outputs // 2
self.log_std_var = tf.Variable(
[0.0] * num_outputs, dtype=tf.float32, name="log_std"
)
# We are using obs_flat, so take the flattened shape as input.
inputs = tf.keras.layers.Input(
shape=(int(np.product(obs_space.shape)),), name="observations"
)
# Last hidden layer output (before logits outputs).
last_layer = inputs
# The action distribution outputs.
logits_out = None
i = 1
# Create layers 0 to second-last.
for size in hiddens[:-1]:
last_layer = tf.keras.layers.Dense(
size,
name="fc_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
i += 1
# The last layer is adjusted to be of size num_outputs, but it's a
# layer with activation.
if no_final_linear and num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs,
name="fc_out",
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
# Finish the layers with the provided sizes (`hiddens`), plus -
# iff num_outputs > 0 - a last linear layer of size num_outputs.
else:
if len(hiddens) > 0:
last_layer = tf.keras.layers.Dense(
hiddens[-1],
name="fc_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
if num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs,
name="fc_out",
activation=None,
kernel_initializer=normc_initializer(0.01),
)(last_layer)
# Adjust num_outputs to be the number of nodes in the last layer.
else:
self.num_outputs = ([int(np.product(obs_space.shape))] + hiddens[-1:])[
-1
]
# Concat the log std vars to the end of the state-dependent means.
if free_log_std and logits_out is not None:
def tiled_log_std(x):
return tf.tile(tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
log_std_out = tf.keras.layers.Lambda(tiled_log_std)(inputs)
logits_out = tf.keras.layers.Concatenate(axis=1)([logits_out, log_std_out])
last_vf_layer = None
if not vf_share_layers:
# Build a parallel set of hidden layers for the value net.
last_vf_layer = inputs
i = 1
for size in hiddens:
last_vf_layer = tf.keras.layers.Dense(
size,
name="fc_value_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_vf_layer)
i += 1
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01),
)(last_vf_layer if last_vf_layer is not None else last_layer)
self.base_model = tf.keras.Model(
inputs, [(logits_out if logits_out is not None else last_layer), value_out]
)
def forward(
self,
input_dict: Dict[str, TensorType],
state: List[TensorType],
seq_lens: TensorType,
) -> (TensorType, List[TensorType]):
model_out, self._value_out = self.base_model(input_dict["obs_flat"])
return model_out, state
def value_function(self) -> TensorType:
return tf.reshape(self._value_out, [-1])
class Keras_FullyConnectedNetwork(tf.keras.Model if tf else object):
"""Generic fully connected network implemented in tf Keras."""
def __init__(
self,
input_space: gym.spaces.Space,
action_space: gym.spaces.Space,
num_outputs: Optional[int] = None,
*,
name: str = "",
fcnet_hiddens: Optional[Sequence[int]] = (),
fcnet_activation: Optional[str] = None,
post_fcnet_hiddens: Optional[Sequence[int]] = (),
post_fcnet_activation: Optional[str] = None,
no_final_linear: bool = False,
vf_share_layers: bool = False,
free_log_std: bool = False,
**kwargs,
):
super().__init__(name=name)
hiddens = list(fcnet_hiddens or ()) + list(post_fcnet_hiddens or ())
activation = fcnet_activation
if not fcnet_hiddens:
activation = post_fcnet_activation
activation = get_activation_fn(activation)
# Generate free-floating bias variables for the second half of
# the outputs.
if free_log_std:
assert num_outputs % 2 == 0, (
"num_outputs must be divisible by two",
num_outputs,
)
num_outputs = num_outputs // 2
self.log_std_var = tf.Variable(
[0.0] * num_outputs, dtype=tf.float32, name="log_std"
)
# We are using obs_flat, so take the flattened shape as input.
inputs = tf.keras.layers.Input(
shape=(int(np.product(input_space.shape)),), name="observations"
)
# Last hidden layer output (before logits outputs).
last_layer = inputs
# The action distribution outputs.
logits_out = None
i = 1
# Create layers 0 to second-last.
for size in hiddens[:-1]:
last_layer = tf.keras.layers.Dense(
size,
name="fc_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
i += 1
# The last layer is adjusted to be of size num_outputs, but it's a
# layer with activation.
if no_final_linear and num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs,
name="fc_out",
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
# Finish the layers with the provided sizes (`hiddens`), plus -
# iff num_outputs > 0 - a last linear layer of size num_outputs.
else:
if len(hiddens) > 0:
last_layer = tf.keras.layers.Dense(
hiddens[-1],
name="fc_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_layer)
if num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs,
name="fc_out",
activation=None,
kernel_initializer=normc_initializer(0.01),
)(last_layer)
# Concat the log std vars to the end of the state-dependent means.
if free_log_std and logits_out is not None:
def tiled_log_std(x):
return tf.tile(tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
log_std_out = tf.keras.layers.Lambda(tiled_log_std)(inputs)
logits_out = tf.keras.layers.Concatenate(axis=1)([logits_out, log_std_out])
last_vf_layer = None
if not vf_share_layers:
# Build a parallel set of hidden layers for the value net.
last_vf_layer = inputs
i = 1
for size in hiddens:
last_vf_layer = tf.keras.layers.Dense(
size,
name="fc_value_{}".format(i),
activation=activation,
kernel_initializer=normc_initializer(1.0),
)(last_vf_layer)
i += 1
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01),
)(last_vf_layer if last_vf_layer is not None else last_layer)
self.base_model = tf.keras.Model(
inputs, [(logits_out if logits_out is not None else last_layer), value_out]
)
def call(
self, input_dict: SampleBatch
) -> (TensorType, List[TensorType], Dict[str, TensorType]):
model_out, value_out = self.base_model(input_dict[SampleBatch.OBS])
extra_outs = {SampleBatch.VF_PREDS: tf.reshape(value_out, [-1])}
return model_out, [], extra_outs