mirror of
https://github.com/vale981/ray
synced 2025-03-04 17:41:43 -05:00
1272 lines
62 KiB
Python
1272 lines
62 KiB
Python
import copy
|
|
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Type, Union
|
|
|
|
import gym
|
|
|
|
from ray.rllib.algorithms.callbacks import DefaultCallbacks
|
|
from ray.rllib.env.env_context import EnvContext
|
|
from ray.rllib.evaluation.collectors.sample_collector import SampleCollector
|
|
from ray.rllib.evaluation.collectors.simple_list_collector import SimpleListCollector
|
|
from ray.rllib.models import MODEL_DEFAULTS
|
|
from ray.rllib.utils import deep_update, merge_dicts
|
|
from ray.rllib.utils.deprecation import DEPRECATED_VALUE, deprecation_warning
|
|
from ray.rllib.utils.typing import (
|
|
AlgorithmConfigDict,
|
|
EnvConfigDict,
|
|
EnvType,
|
|
PartialAlgorithmConfigDict,
|
|
ResultDict,
|
|
)
|
|
from ray.tune.logger import Logger
|
|
|
|
if TYPE_CHECKING:
|
|
from ray.rllib.algorithms.algorithm import Algorithm
|
|
|
|
|
|
class AlgorithmConfig:
|
|
"""A RLlib AlgorithmConfig builds an RLlib Algorithm from a given configuration.
|
|
|
|
Example:
|
|
>>> from ray.rllib.algorithms.callbacks import MemoryTrackingCallbacks
|
|
>>> # Construct a generic config object, specifying values within different
|
|
>>> # sub-categories, e.g. "training".
|
|
>>> config = AlgorithmConfig().training(gamma=0.9, lr=0.01)
|
|
... .environment(env="CartPole-v1")
|
|
... .resources(num_gpus=0)
|
|
... .rollouts(num_rollout_workers=4)
|
|
... .callbacks(MemoryTrackingCallbacks)
|
|
>>> # A config object can be used to construct the respective Trainer.
|
|
>>> rllib_trainer = config.build()
|
|
|
|
Example:
|
|
>>> from ray import tune
|
|
>>> # In combination with a tune.grid_search:
|
|
>>> config = AlgorithmConfig()
|
|
>>> config.training(lr=tune.grid_search([0.01, 0.001]))
|
|
>>> # Use `to_dict()` method to get the legacy plain python config dict
|
|
>>> # for usage with `tune.run()`.
|
|
>>> tune.run("[registered trainer class]", config=config.to_dict())
|
|
"""
|
|
|
|
def __init__(self, algo_class=None):
|
|
# Define all settings and their default values.
|
|
|
|
# Define the default RLlib Trainer class that this AlgorithmConfig will be
|
|
# applied to.
|
|
self.algo_class = algo_class
|
|
|
|
# `self.python_environment()`
|
|
self.extra_python_environs_for_driver = {}
|
|
self.extra_python_environs_for_worker = {}
|
|
|
|
# `self.resources()`
|
|
self.num_gpus = 0
|
|
self.num_cpus_per_worker = 1
|
|
self.num_gpus_per_worker = 0
|
|
self._fake_gpus = False
|
|
self.num_cpus_for_local_worker = 1
|
|
self.custom_resources_per_worker = {}
|
|
self.placement_strategy = "PACK"
|
|
|
|
# `self.framework()`
|
|
self.framework_str = "tf"
|
|
self.eager_tracing = False
|
|
self.eager_max_retraces = 20
|
|
self.tf_session_args = {
|
|
# note: overridden by `local_tf_session_args`
|
|
"intra_op_parallelism_threads": 2,
|
|
"inter_op_parallelism_threads": 2,
|
|
"gpu_options": {
|
|
"allow_growth": True,
|
|
},
|
|
"log_device_placement": False,
|
|
"device_count": {"CPU": 1},
|
|
# Required by multi-GPU (num_gpus > 1).
|
|
"allow_soft_placement": True,
|
|
}
|
|
self.local_tf_session_args = {
|
|
# Allow a higher level of parallelism by default, but not unlimited
|
|
# since that can cause crashes with many concurrent drivers.
|
|
"intra_op_parallelism_threads": 8,
|
|
"inter_op_parallelism_threads": 8,
|
|
}
|
|
|
|
# `self.environment()`
|
|
self.env = None
|
|
self.env_config = {}
|
|
self.observation_space = None
|
|
self.action_space = None
|
|
self.env_task_fn = None
|
|
self.render_env = False
|
|
self.clip_rewards = None
|
|
self.normalize_actions = True
|
|
self.clip_actions = False
|
|
self.disable_env_checking = False
|
|
|
|
# `self.rollouts()`
|
|
self.num_workers = 2
|
|
self.num_envs_per_worker = 1
|
|
self.sample_collector = SimpleListCollector
|
|
self.create_env_on_local_worker = False
|
|
self.sample_async = False
|
|
self.enable_connectors = False
|
|
self.rollout_fragment_length = 200
|
|
self.batch_mode = "truncate_episodes"
|
|
self.remote_worker_envs = False
|
|
self.remote_env_batch_wait_ms = 0
|
|
self.validate_workers_after_construction = True
|
|
self.ignore_worker_failures = False
|
|
self.recreate_failed_workers = False
|
|
self.restart_failed_sub_environments = False
|
|
self.num_consecutive_worker_failures_tolerance = 100
|
|
self.horizon = None
|
|
self.soft_horizon = False
|
|
self.no_done_at_end = False
|
|
self.preprocessor_pref = "deepmind"
|
|
self.observation_filter = "NoFilter"
|
|
self.synchronize_filters = True
|
|
self.compress_observations = False
|
|
self.enable_tf1_exec_eagerly = False
|
|
self.sampler_perf_stats_ema_coef = None
|
|
|
|
# `self.training()`
|
|
self.gamma = 0.99
|
|
self.lr = 0.001
|
|
self.train_batch_size = 32
|
|
self.model = copy.deepcopy(MODEL_DEFAULTS)
|
|
self.optimizer = {}
|
|
|
|
# `self.callbacks()`
|
|
self.callbacks_class = DefaultCallbacks
|
|
|
|
# `self.explore()`
|
|
self.explore = True
|
|
self.exploration_config = {
|
|
# The Exploration class to use. In the simplest case, this is the name
|
|
# (str) of any class present in the `rllib.utils.exploration` package.
|
|
# You can also provide the python class directly or the full location
|
|
# of your class (e.g. "ray.rllib.utils.exploration.epsilon_greedy.
|
|
# EpsilonGreedy").
|
|
"type": "StochasticSampling",
|
|
# Add constructor kwargs here (if any).
|
|
}
|
|
|
|
# `self.multi_agent()`
|
|
self.policies = {}
|
|
self.policy_map_capacity = 100
|
|
self.policy_map_cache = None
|
|
self.policy_mapping_fn = None
|
|
self.policies_to_train = None
|
|
self.observation_fn = None
|
|
self.replay_mode = "independent"
|
|
self.count_steps_by = "env_steps"
|
|
|
|
# `self.offline_data()`
|
|
self.input_ = "sampler"
|
|
self.input_config = {}
|
|
self.actions_in_input_normalized = False
|
|
self.postprocess_inputs = False
|
|
self.shuffle_buffer_size = 0
|
|
self.output = None
|
|
self.output_config = {}
|
|
self.output_compress_columns = ["obs", "new_obs"]
|
|
self.output_max_file_size = 64 * 1024 * 1024
|
|
|
|
# `self.evaluation()`
|
|
self.evaluation_interval = None
|
|
self.evaluation_duration = 10
|
|
self.evaluation_duration_unit = "episodes"
|
|
self.evaluation_sample_timeout_s = 180.0
|
|
self.evaluation_parallel_to_training = False
|
|
self.evaluation_config = {}
|
|
self.off_policy_estimation_methods = {}
|
|
self.evaluation_num_workers = 0
|
|
self.custom_evaluation_function = None
|
|
self.always_attach_evaluation_results = False
|
|
self.enable_async_evaluation = False
|
|
# TODO: Set this flag still in the config or - much better - in the
|
|
# RolloutWorker as a property.
|
|
self.in_evaluation = False
|
|
self.sync_filters_on_rollout_workers_timeout_s = 60.0
|
|
|
|
# `self.reporting()`
|
|
self.keep_per_episode_custom_metrics = False
|
|
self.metrics_episode_collection_timeout_s = 60.0
|
|
self.metrics_num_episodes_for_smoothing = 100
|
|
self.min_time_s_per_iteration = None
|
|
self.min_train_timesteps_per_iteration = 0
|
|
self.min_sample_timesteps_per_iteration = 0
|
|
|
|
# `self.debugging()`
|
|
self.logger_creator = None
|
|
self.logger_config = None
|
|
self.log_level = "WARN"
|
|
self.log_sys_usage = True
|
|
self.fake_sampler = False
|
|
self.seed = None
|
|
|
|
# `self.experimental()`
|
|
self._tf_policy_handles_more_than_one_loss = False
|
|
self._disable_preprocessor_api = False
|
|
self._disable_action_flattening = False
|
|
self._disable_execution_plan_api = True
|
|
|
|
# TODO: Remove, once all deprecation_warning calls upon using these keys
|
|
# have been removed.
|
|
# === Deprecated keys ===
|
|
self.simple_optimizer = DEPRECATED_VALUE
|
|
self.monitor = DEPRECATED_VALUE
|
|
self.evaluation_num_episodes = DEPRECATED_VALUE
|
|
self.metrics_smoothing_episodes = DEPRECATED_VALUE
|
|
self.timesteps_per_iteration = DEPRECATED_VALUE
|
|
self.min_iter_time_s = DEPRECATED_VALUE
|
|
self.collect_metrics_timeout = DEPRECATED_VALUE
|
|
# The following values have moved because of the new ReplayBuffer API
|
|
self.buffer_size = DEPRECATED_VALUE
|
|
self.prioritized_replay = DEPRECATED_VALUE
|
|
self.learning_starts = DEPRECATED_VALUE
|
|
self.replay_batch_size = DEPRECATED_VALUE
|
|
# -1 = DEPRECATED_VALUE is a valid value for replay_sequence_length
|
|
self.replay_sequence_length = None
|
|
self.prioritized_replay_alpha = DEPRECATED_VALUE
|
|
self.prioritized_replay_beta = DEPRECATED_VALUE
|
|
self.prioritized_replay_eps = DEPRECATED_VALUE
|
|
self.min_time_s_per_reporting = DEPRECATED_VALUE
|
|
self.min_train_timesteps_per_reporting = DEPRECATED_VALUE
|
|
self.min_sample_timesteps_per_reporting = DEPRECATED_VALUE
|
|
self.input_evaluation = DEPRECATED_VALUE
|
|
|
|
def to_dict(self) -> AlgorithmConfigDict:
|
|
"""Converts all settings into a legacy config dict for backward compatibility.
|
|
|
|
Returns:
|
|
A complete AlgorithmConfigDict, usable in backward-compatible Tune/RLlib
|
|
use cases, e.g. w/ `tune.run()`.
|
|
"""
|
|
config = copy.deepcopy(vars(self))
|
|
config.pop("algo_class")
|
|
|
|
# Worst naming convention ever: NEVER EVER use reserved key-words...
|
|
if "lambda_" in config:
|
|
assert hasattr(self, "lambda_")
|
|
config["lambda"] = getattr(self, "lambda_")
|
|
config.pop("lambda_")
|
|
if "input_" in config:
|
|
assert hasattr(self, "input_")
|
|
config["input"] = getattr(self, "input_")
|
|
config.pop("input_")
|
|
|
|
# Setup legacy multiagent sub-dict:
|
|
config["multiagent"] = {}
|
|
for k in [
|
|
"policies",
|
|
"policy_map_capacity",
|
|
"policy_map_cache",
|
|
"policy_mapping_fn",
|
|
"policies_to_train",
|
|
"observation_fn",
|
|
"replay_mode",
|
|
"count_steps_by",
|
|
]:
|
|
config["multiagent"][k] = config.pop(k)
|
|
|
|
# Switch out deprecated vs new config keys.
|
|
config["callbacks"] = config.pop("callbacks_class", DefaultCallbacks)
|
|
config["create_env_on_driver"] = config.pop("create_env_on_local_worker", 1)
|
|
config["custom_eval_function"] = config.pop("custom_evaluation_function", None)
|
|
config["framework"] = config.pop("framework_str", None)
|
|
config["num_cpus_for_driver"] = config.pop("num_cpus_for_local_worker", 1)
|
|
|
|
return config
|
|
|
|
def build(
|
|
self,
|
|
env: Optional[Union[str, EnvType]] = None,
|
|
logger_creator: Optional[Callable[[], Logger]] = None,
|
|
) -> "Algorithm":
|
|
"""Builds an Algorithm from the AlgorithmConfig.
|
|
|
|
Args:
|
|
env: Name of the environment to use (e.g. a gym-registered str),
|
|
a full class path (e.g.
|
|
"ray.rllib.examples.env.random_env.RandomEnv"), or an Env
|
|
class directly. Note that this arg can also be specified via
|
|
the "env" key in `config`.
|
|
logger_creator: Callable that creates a ray.tune.Logger
|
|
object. If unspecified, a default logger is created.
|
|
|
|
Returns:
|
|
A ray.rllib.algorithms.algorithm.Algorithm object.
|
|
"""
|
|
if env is not None:
|
|
self.env = env
|
|
if self.evaluation_config is not None:
|
|
self.evaluation_config["env"] = env
|
|
if logger_creator is not None:
|
|
self.logger_creator = logger_creator
|
|
|
|
return self.algo_class(
|
|
config=self.to_dict(),
|
|
env=self.env,
|
|
logger_creator=self.logger_creator,
|
|
)
|
|
|
|
def python_environment(
|
|
self,
|
|
*,
|
|
extra_python_environs_for_driver: Optional[dict] = None,
|
|
extra_python_environs_for_worker: Optional[dict] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's python environment settings.
|
|
|
|
Args:
|
|
extra_python_environs_for_driver: Any extra python env vars to set in the
|
|
algorithm's process, e.g., {"OMP_NUM_THREADS": "16"}.
|
|
extra_python_environs_for_worker: The extra python environments need to set
|
|
for worker processes.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if extra_python_environs_for_driver is not None:
|
|
self.extra_python_environs_for_driver = extra_python_environs_for_driver
|
|
if extra_python_environs_for_worker is not None:
|
|
self.extra_python_environs_for_worker = extra_python_environs_for_worker
|
|
return self
|
|
|
|
def resources(
|
|
self,
|
|
*,
|
|
num_gpus: Optional[Union[float, int]] = None,
|
|
_fake_gpus: Optional[bool] = None,
|
|
num_cpus_per_worker: Optional[int] = None,
|
|
num_gpus_per_worker: Optional[Union[float, int]] = None,
|
|
num_cpus_for_local_worker: Optional[int] = None,
|
|
custom_resources_per_worker: Optional[dict] = None,
|
|
placement_strategy: Optional[str] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Specifies resources allocated for an Algorithm and its ray actors/workers.
|
|
|
|
Args:
|
|
num_gpus: Number of GPUs to allocate to the algorithm process.
|
|
Note that not all algorithms can take advantage of GPUs.
|
|
Support for multi-GPU is currently only available for
|
|
tf-[PPO/IMPALA/DQN/PG]. This can be fractional (e.g., 0.3 GPUs).
|
|
_fake_gpus: Set to True for debugging (multi-)?GPU funcitonality on a
|
|
CPU machine. GPU towers will be simulated by graphs located on
|
|
CPUs in this case. Use `num_gpus` to test for different numbers of
|
|
fake GPUs.
|
|
num_cpus_per_worker: Number of CPUs to allocate per worker.
|
|
num_gpus_per_worker: Number of GPUs to allocate per worker. This can be
|
|
fractional. This is usually needed only if your env itself requires a
|
|
GPU (i.e., it is a GPU-intensive video game), or model inference is
|
|
unusually expensive.
|
|
custom_resources_per_worker: Any custom Ray resources to allocate per
|
|
worker.
|
|
num_cpus_for_local_worker: Number of CPUs to allocate for the algorithm.
|
|
Note: this only takes effect when running in Tune. Otherwise,
|
|
the algorithm runs in the main program (driver).
|
|
custom_resources_per_worker: Any custom Ray resources to allocate per
|
|
worker.
|
|
placement_strategy: The strategy for the placement group factory returned by
|
|
`Algorithm.default_resource_request()`. A PlacementGroup defines, which
|
|
devices (resources) should always be co-located on the same node.
|
|
For example, an Algorithm with 2 rollout workers, running with
|
|
num_gpus=1 will request a placement group with the bundles:
|
|
[{"gpu": 1, "cpu": 1}, {"cpu": 1}, {"cpu": 1}], where the first bundle
|
|
is for the driver and the other 2 bundles are for the two workers.
|
|
These bundles can now be "placed" on the same or different
|
|
nodes depending on the value of `placement_strategy`:
|
|
"PACK": Packs bundles into as few nodes as possible.
|
|
"SPREAD": Places bundles across distinct nodes as even as possible.
|
|
"STRICT_PACK": Packs bundles into one node. The group is not allowed
|
|
to span multiple nodes.
|
|
"STRICT_SPREAD": Packs bundles across distinct nodes.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if num_gpus is not None:
|
|
self.num_gpus = num_gpus
|
|
if _fake_gpus is not None:
|
|
self._fake_gpus = _fake_gpus
|
|
if num_cpus_per_worker is not None:
|
|
self.num_cpus_per_worker = num_cpus_per_worker
|
|
if num_gpus_per_worker is not None:
|
|
self.num_gpus_per_worker = num_gpus_per_worker
|
|
if num_cpus_for_local_worker is not None:
|
|
self.num_cpus_for_local_worker = num_cpus_for_local_worker
|
|
if custom_resources_per_worker is not None:
|
|
self.custom_resources_per_worker = custom_resources_per_worker
|
|
if placement_strategy is not None:
|
|
self.placement_strategy = placement_strategy
|
|
|
|
return self
|
|
|
|
def framework(
|
|
self,
|
|
framework: Optional[str] = None,
|
|
*,
|
|
eager_tracing: Optional[bool] = None,
|
|
eager_max_retraces: Optional[int] = None,
|
|
tf_session_args: Optional[Dict[str, Any]] = None,
|
|
local_tf_session_args: Optional[Dict[str, Any]] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's DL framework settings.
|
|
|
|
Args:
|
|
framework: tf: TensorFlow (static-graph); tf2: TensorFlow 2.x
|
|
(eager or traced, if eager_tracing=True); torch: PyTorch
|
|
eager_tracing: Enable tracing in eager mode. This greatly improves
|
|
performance (speedup ~2x), but makes it slightly harder to debug
|
|
since Python code won't be evaluated after the initial eager pass.
|
|
Only possible if framework=tf2.
|
|
eager_max_retraces: Maximum number of tf.function re-traces before a
|
|
runtime error is raised. This is to prevent unnoticed retraces of
|
|
methods inside the `..._eager_traced` Policy, which could slow down
|
|
execution by a factor of 4, without the user noticing what the root
|
|
cause for this slowdown could be.
|
|
Only necessary for framework=[tf2|tfe].
|
|
Set to None to ignore the re-trace count and never throw an error.
|
|
tf_session_args: Configures TF for single-process operation by default.
|
|
local_tf_session_args: Override the following tf session args on the local
|
|
worker
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if framework is not None:
|
|
self.framework_str = framework
|
|
if eager_tracing is not None:
|
|
self.eager_tracing = eager_tracing
|
|
if eager_max_retraces is not None:
|
|
self.eager_max_retraces = eager_max_retraces
|
|
if tf_session_args is not None:
|
|
self.tf_session_args = tf_session_args
|
|
if local_tf_session_args is not None:
|
|
self.local_tf_session_args = local_tf_session_args
|
|
|
|
return self
|
|
|
|
def environment(
|
|
self,
|
|
*,
|
|
env: Optional[Union[str, EnvType]] = None,
|
|
env_config: Optional[EnvConfigDict] = None,
|
|
observation_space: Optional[gym.spaces.Space] = None,
|
|
action_space: Optional[gym.spaces.Space] = None,
|
|
env_task_fn: Optional[Callable[[ResultDict, EnvType, EnvContext], Any]] = None,
|
|
render_env: Optional[bool] = None,
|
|
clip_rewards: Optional[Union[bool, float]] = None,
|
|
normalize_actions: Optional[bool] = None,
|
|
clip_actions: Optional[bool] = None,
|
|
disable_env_checking: Optional[bool] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's RL-environment settings.
|
|
|
|
Args:
|
|
env: The environment specifier. This can either be a tune-registered env,
|
|
via `tune.register_env([name], lambda env_ctx: [env object])`,
|
|
or a string specifier of an RLlib supported type. In the latter case,
|
|
RLlib will try to interpret the specifier as either an openAI gym env,
|
|
a PyBullet env, a ViZDoomGym env, or a fully qualified classpath to an
|
|
Env class, e.g. "ray.rllib.examples.env.random_env.RandomEnv".
|
|
env_config: Arguments dict passed to the env creator as an EnvContext
|
|
object (which is a dict plus the properties: num_workers, worker_index,
|
|
vector_index, and remote).
|
|
observation_space: The observation space for the Policies of this Algorithm.
|
|
action_space: The action space for the Policies of this Algorithm.
|
|
env_task_fn: A callable taking the last train results, the base env and the
|
|
env context as args and returning a new task to set the env to.
|
|
The env must be a `TaskSettableEnv` sub-class for this to work.
|
|
See `examples/curriculum_learning.py` for an example.
|
|
render_env: If True, try to render the environment on the local worker or on
|
|
worker 1 (if num_workers > 0). For vectorized envs, this usually means
|
|
that only the first sub-environment will be rendered.
|
|
In order for this to work, your env will have to implement the
|
|
`render()` method which either:
|
|
a) handles window generation and rendering itself (returning True) or
|
|
b) returns a numpy uint8 image of shape [height x width x 3 (RGB)].
|
|
clip_rewards: Whether to clip rewards during Policy's postprocessing.
|
|
None (default): Clip for Atari only (r=sign(r)).
|
|
True: r=sign(r): Fixed rewards -1.0, 1.0, or 0.0.
|
|
False: Never clip.
|
|
[float value]: Clip at -value and + value.
|
|
Tuple[value1, value2]: Clip at value1 and value2.
|
|
normalize_actions: If True, RLlib will learn entirely inside a normalized
|
|
action space (0.0 centered with small stddev; only affecting Box
|
|
components). We will unsquash actions (and clip, just in case) to the
|
|
bounds of the env's action space before sending actions back to the env.
|
|
clip_actions: If True, RLlib will clip actions according to the env's bounds
|
|
before sending them back to the env.
|
|
TODO: (sven) This option should be deprecated and always be False.
|
|
disable_env_checking: If True, disable the environment pre-checking module.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if env is not None:
|
|
self.env = env
|
|
if env_config is not None:
|
|
self.env_config = env_config
|
|
if observation_space is not None:
|
|
self.observation_space = observation_space
|
|
if action_space is not None:
|
|
self.action_space = action_space
|
|
if env_task_fn is not None:
|
|
self.env_task_fn = env_task_fn
|
|
if render_env is not None:
|
|
self.render_env = render_env
|
|
if clip_rewards is not None:
|
|
self.clip_rewards = clip_rewards
|
|
if normalize_actions is not None:
|
|
self.normalize_actions = normalize_actions
|
|
if clip_actions is not None:
|
|
self.clip_actions = clip_actions
|
|
if disable_env_checking is not None:
|
|
self.disable_env_checking = disable_env_checking
|
|
|
|
return self
|
|
|
|
def rollouts(
|
|
self,
|
|
*,
|
|
num_rollout_workers: Optional[int] = None,
|
|
num_envs_per_worker: Optional[int] = None,
|
|
create_env_on_local_worker: Optional[bool] = None,
|
|
sample_collector: Optional[Type[SampleCollector]] = None,
|
|
sample_async: Optional[bool] = None,
|
|
enable_connectors: Optional[bool] = None,
|
|
rollout_fragment_length: Optional[int] = None,
|
|
batch_mode: Optional[str] = None,
|
|
remote_worker_envs: Optional[bool] = None,
|
|
remote_env_batch_wait_ms: Optional[float] = None,
|
|
validate_workers_after_construction: Optional[bool] = None,
|
|
ignore_worker_failures: Optional[bool] = None,
|
|
recreate_failed_workers: Optional[bool] = None,
|
|
restart_failed_sub_environments: Optional[bool] = None,
|
|
num_consecutive_worker_failures_tolerance: Optional[int] = None,
|
|
horizon: Optional[int] = None,
|
|
soft_horizon: Optional[bool] = None,
|
|
no_done_at_end: Optional[bool] = None,
|
|
preprocessor_pref: Optional[str] = None,
|
|
observation_filter: Optional[str] = None,
|
|
synchronize_filter: Optional[bool] = None,
|
|
compress_observations: Optional[bool] = None,
|
|
enable_tf1_exec_eagerly: Optional[bool] = None,
|
|
sampler_perf_stats_ema_coef: Optional[float] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the rollout worker configuration.
|
|
|
|
Args:
|
|
num_rollout_workers: Number of rollout worker actors to create for
|
|
parallel sampling. Setting this to 0 will force rollouts to be done in
|
|
the local worker (driver process or the Algorithm's actor when using
|
|
Tune).
|
|
num_envs_per_worker: Number of environments to evaluate vector-wise per
|
|
worker. This enables model inference batching, which can improve
|
|
performance for inference bottlenecked workloads.
|
|
sample_collector: The SampleCollector class to be used to collect and
|
|
retrieve environment-, model-, and sampler data. Override the
|
|
SampleCollector base class to implement your own
|
|
collection/buffering/retrieval logic.
|
|
create_env_on_local_worker: When `num_workers` > 0, the driver
|
|
(local_worker; worker-idx=0) does not need an environment. This is
|
|
because it doesn't have to sample (done by remote_workers;
|
|
worker_indices > 0) nor evaluate (done by evaluation workers;
|
|
see below).
|
|
sample_async: Use a background thread for sampling (slightly off-policy,
|
|
usually not advisable to turn on unless your env specifically requires
|
|
it).
|
|
enable_connectors: Use connector based environment runner, so that all
|
|
preprocessing of obs and postprocessing of actions are done in agent
|
|
and action connectors.
|
|
rollout_fragment_length: Divide episodes into fragments of this many steps
|
|
each during rollouts. Sample batches of this size are collected from
|
|
rollout workers and combined into a larger batch of `train_batch_size`
|
|
for learning.
|
|
For example, given rollout_fragment_length=100 and
|
|
train_batch_size=1000:
|
|
1. RLlib collects 10 fragments of 100 steps each from rollout workers.
|
|
2. These fragments are concatenated and we perform an epoch of SGD.
|
|
When using multiple envs per worker, the fragment size is multiplied by
|
|
`num_envs_per_worker`. This is since we are collecting steps from
|
|
multiple envs in parallel. For example, if num_envs_per_worker=5, then
|
|
rollout workers will return experiences in chunks of 5*100 = 500 steps.
|
|
The dataflow here can vary per algorithm. For example, PPO further
|
|
divides the train batch into minibatches for multi-epoch SGD.
|
|
batch_mode: How to build per-Sampler (RolloutWorker) batches, which are then
|
|
usually concat'd to form the train batch. Note that "steps" below can
|
|
mean different things (either env- or agent-steps) and depends on the
|
|
`count_steps_by` (multiagent) setting below.
|
|
"truncate_episodes": Each produced batch (when calling
|
|
RolloutWorker.sample()) will contain exactly `rollout_fragment_length`
|
|
steps. This mode guarantees evenly sized batches, but increases
|
|
variance as the future return must now be estimated at truncation
|
|
boundaries.
|
|
"complete_episodes": Each unroll happens exactly over one episode, from
|
|
beginning to end. Data collection will not stop unless the episode
|
|
terminates or a configured horizon (hard or soft) is hit.
|
|
remote_worker_envs: If using num_envs_per_worker > 1, whether to create
|
|
those new envs in remote processes instead of in the same worker.
|
|
This adds overheads, but can make sense if your envs can take much
|
|
time to step / reset (e.g., for StarCraft). Use this cautiously;
|
|
overheads are significant.
|
|
remote_env_batch_wait_ms: Timeout that remote workers are waiting when
|
|
polling environments. 0 (continue when at least one env is ready) is
|
|
a reasonable default, but optimal value could be obtained by measuring
|
|
your environment step / reset and model inference perf.
|
|
validate_workers_after_construction: Whether to validate that each created
|
|
remote worker is healthy after its construction process.
|
|
ignore_worker_failures: Whether to attempt to continue training if a worker
|
|
crashes. The number of currently healthy workers is reported as the
|
|
"num_healthy_workers" metric.
|
|
recreate_failed_workers: Whether - upon a worker failure - RLlib will try to
|
|
recreate the lost worker as an identical copy of the failed one. The new
|
|
worker will only differ from the failed one in its
|
|
`self.recreated_worker=True` property value. It will have the same
|
|
`worker_index` as the original one. If True, the
|
|
`ignore_worker_failures` setting will be ignored.
|
|
restart_failed_sub_environments: If True and any sub-environment (within
|
|
a vectorized env) throws any error during env stepping, the
|
|
Sampler will try to restart the faulty sub-environment. This is done
|
|
without disturbing the other (still intact) sub-environment and without
|
|
the RolloutWorker crashing.
|
|
num_consecutive_worker_failures_tolerance: The number of consecutive times
|
|
a rollout worker (or evaluation worker) failure is tolerated before
|
|
finally crashing the Algorithm. Only useful if either
|
|
`ignore_worker_failures` or `recreate_failed_workers` is True.
|
|
Note that for `restart_failed_sub_environments` and sub-environment
|
|
failures, the worker itself is NOT affected and won't throw any errors
|
|
as the flawed sub-environment is silently restarted under the hood.
|
|
horizon: Number of steps after which the episode is forced to terminate.
|
|
Defaults to `env.spec.max_episode_steps` (if present) for Gym envs.
|
|
soft_horizon: Calculate rewards but don't reset the environment when the
|
|
horizon is hit. This allows value estimation and RNN state to span
|
|
across logical episodes denoted by horizon. This only has an effect
|
|
if horizon != inf.
|
|
no_done_at_end: Don't set 'done' at the end of the episode.
|
|
In combination with `soft_horizon`, this works as follows:
|
|
- no_done_at_end=False soft_horizon=False:
|
|
Reset env and add `done=True` at end of each episode.
|
|
- no_done_at_end=True soft_horizon=False:
|
|
Reset env, but do NOT add `done=True` at end of the episode.
|
|
- no_done_at_end=False soft_horizon=True:
|
|
Do NOT reset env at horizon, but add `done=True` at the horizon
|
|
(pretending the episode has terminated).
|
|
- no_done_at_end=True soft_horizon=True:
|
|
Do NOT reset env at horizon and do NOT add `done=True` at the horizon.
|
|
preprocessor_pref: Whether to use "rllib" or "deepmind" preprocessors by
|
|
default. Set to None for using no preprocessor. In this case, the
|
|
model will have to handle possibly complex observations from the
|
|
environment.
|
|
observation_filter: Element-wise observation filter, either "NoFilter"
|
|
or "MeanStdFilter".
|
|
synchronize_filter: Whether to synchronize the statistics of remote filters.
|
|
compress_observations: Whether to LZ4 compress individual observations
|
|
in the SampleBatches collected during rollouts.
|
|
enable_tf1_exec_eagerly: Explicitly tells the rollout worker to enable
|
|
TF eager execution. This is useful for example when framework is
|
|
"torch", but a TF2 policy needs to be restored for evaluation or
|
|
league-based purposes.
|
|
sampler_perf_stats_ema_coef: If specified, perf stats are in EMAs. This
|
|
is the coeff of how much new data points contribute to the averages.
|
|
Default is None, which uses simple global average instead.
|
|
The EMA update rule is: updated = (1 - ema_coef) * old + ema_coef * new
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if num_rollout_workers is not None:
|
|
self.num_workers = num_rollout_workers
|
|
if num_envs_per_worker is not None:
|
|
self.num_envs_per_worker = num_envs_per_worker
|
|
if sample_collector is not None:
|
|
self.sample_collector = sample_collector
|
|
if create_env_on_local_worker is not None:
|
|
self.create_env_on_local_worker = create_env_on_local_worker
|
|
if sample_async is not None:
|
|
self.sample_async = sample_async
|
|
if enable_connectors is not None:
|
|
self.enable_connectors = enable_connectors
|
|
if rollout_fragment_length is not None:
|
|
self.rollout_fragment_length = rollout_fragment_length
|
|
if batch_mode is not None:
|
|
self.batch_mode = batch_mode
|
|
if remote_worker_envs is not None:
|
|
self.remote_worker_envs = remote_worker_envs
|
|
if remote_env_batch_wait_ms is not None:
|
|
self.remote_env_batch_wait_ms = remote_env_batch_wait_ms
|
|
if validate_workers_after_construction is not None:
|
|
self.validate_workers_after_construction = (
|
|
validate_workers_after_construction
|
|
)
|
|
if ignore_worker_failures is not None:
|
|
self.ignore_worker_failures = ignore_worker_failures
|
|
if recreate_failed_workers is not None:
|
|
self.recreate_failed_workers = recreate_failed_workers
|
|
if restart_failed_sub_environments is not None:
|
|
self.restart_failed_sub_environments = restart_failed_sub_environments
|
|
if num_consecutive_worker_failures_tolerance is not None:
|
|
self.num_consecutive_worker_failures_tolerance = (
|
|
num_consecutive_worker_failures_tolerance
|
|
)
|
|
if horizon is not None:
|
|
self.horizon = horizon
|
|
if soft_horizon is not None:
|
|
self.soft_horizon = soft_horizon
|
|
if no_done_at_end is not None:
|
|
self.no_done_at_end = no_done_at_end
|
|
if preprocessor_pref is not None:
|
|
self.preprocessor_pref = preprocessor_pref
|
|
if observation_filter is not None:
|
|
self.observation_filter = observation_filter
|
|
if synchronize_filter is not None:
|
|
self.synchronize_filters = synchronize_filter
|
|
if compress_observations is not None:
|
|
self.compress_observations = compress_observations
|
|
if enable_tf1_exec_eagerly is not None:
|
|
self.enable_tf1_exec_eagerly = enable_tf1_exec_eagerly
|
|
if sampler_perf_stats_ema_coef is not None:
|
|
self.sampler_perf_stats_ema_coef = sampler_perf_stats_ema_coef
|
|
|
|
return self
|
|
|
|
def training(
|
|
self,
|
|
gamma: Optional[float] = None,
|
|
lr: Optional[float] = None,
|
|
train_batch_size: Optional[int] = None,
|
|
model: Optional[dict] = None,
|
|
optimizer: Optional[dict] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the training related configuration.
|
|
|
|
Args:
|
|
gamma: Float specifying the discount factor of the Markov Decision process.
|
|
lr: The default learning rate.
|
|
train_batch_size: Training batch size, if applicable.
|
|
model: Arguments passed into the policy model. See models/catalog.py for a
|
|
full list of the available model options.
|
|
optimizer: Arguments to pass to the policy optimizer.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if gamma is not None:
|
|
self.gamma = gamma
|
|
if lr is not None:
|
|
self.lr = lr
|
|
if train_batch_size is not None:
|
|
self.train_batch_size = train_batch_size
|
|
if model is not None:
|
|
self.model = model
|
|
if optimizer is not None:
|
|
self.optimizer = merge_dicts(self.optimizer, optimizer)
|
|
|
|
return self
|
|
|
|
def callbacks(self, callbacks_class) -> "AlgorithmConfig":
|
|
"""Sets the callbacks configuration.
|
|
|
|
Args:
|
|
callbacks_class: Callbacks class, whose methods will be run during
|
|
various phases of training and environment sample collection.
|
|
See the `DefaultCallbacks` class and
|
|
`examples/custom_metrics_and_callbacks.py` for more usage information.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
self.callbacks_class = callbacks_class
|
|
|
|
return self
|
|
|
|
def exploration(
|
|
self,
|
|
*,
|
|
explore: Optional[bool] = None,
|
|
exploration_config: Optional[dict] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's exploration settings.
|
|
|
|
Args:
|
|
explore: Default exploration behavior, iff `explore`=None is passed into
|
|
compute_action(s). Set to False for no exploration behavior (e.g.,
|
|
for evaluation).
|
|
exploration_config: A dict specifying the Exploration object's config.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if explore is not None:
|
|
self.explore = explore
|
|
if exploration_config is not None:
|
|
# Override entire `exploration_config` if `type` key changes.
|
|
# Update, if `type` key remains the same or is not specified.
|
|
new_exploration_config = deep_update(
|
|
{"exploration_config": self.exploration_config},
|
|
{"exploration_config": exploration_config},
|
|
False,
|
|
["exploration_config"],
|
|
["exploration_config"],
|
|
)
|
|
self.exploration_config = new_exploration_config["exploration_config"]
|
|
|
|
return self
|
|
|
|
def evaluation(
|
|
self,
|
|
*,
|
|
evaluation_interval: Optional[int] = None,
|
|
evaluation_duration: Optional[Union[int, str]] = None,
|
|
evaluation_duration_unit: Optional[str] = None,
|
|
evaluation_sample_timeout_s: Optional[float] = None,
|
|
evaluation_parallel_to_training: Optional[bool] = None,
|
|
evaluation_config: Optional[
|
|
Union["AlgorithmConfig", PartialAlgorithmConfigDict]
|
|
] = None,
|
|
off_policy_estimation_methods: Optional[Dict] = None,
|
|
evaluation_num_workers: Optional[int] = None,
|
|
custom_evaluation_function: Optional[Callable] = None,
|
|
always_attach_evaluation_results: Optional[bool] = None,
|
|
enable_async_evaluation: Optional[bool] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's evaluation settings.
|
|
|
|
Args:
|
|
evaluation_interval: Evaluate with every `evaluation_interval` training
|
|
iterations. The evaluation stats will be reported under the "evaluation"
|
|
metric key. Note that for Ape-X metrics are already only reported for
|
|
the lowest epsilon workers (least random workers).
|
|
Set to None (or 0) for no evaluation.
|
|
evaluation_duration: Duration for which to run evaluation each
|
|
`evaluation_interval`. The unit for the duration can be set via
|
|
`evaluation_duration_unit` to either "episodes" (default) or
|
|
"timesteps". If using multiple evaluation workers
|
|
(evaluation_num_workers > 1), the load to run will be split amongst
|
|
these.
|
|
If the value is "auto":
|
|
- For `evaluation_parallel_to_training=True`: Will run as many
|
|
episodes/timesteps that fit into the (parallel) training step.
|
|
- For `evaluation_parallel_to_training=False`: Error.
|
|
evaluation_duration_unit: The unit, with which to count the evaluation
|
|
duration. Either "episodes" (default) or "timesteps".
|
|
evaluation_sample_timeout_s: The timeout (in seconds) for the ray.get call
|
|
to the remote evaluation worker(s) `sample()` method. After this time,
|
|
the user will receive a warning and instructions on how to fix the
|
|
issue. This could be either to make sure the episode ends, increasing
|
|
the timeout, or switching to `evaluation_duration_unit=timesteps`.
|
|
evaluation_parallel_to_training: Whether to run evaluation in parallel to
|
|
a Algorithm.train() call using threading. Default=False.
|
|
E.g. evaluation_interval=2 -> For every other training iteration,
|
|
the Algorithm.train() and Algorithm.evaluate() calls run in parallel.
|
|
Note: This is experimental. Possible pitfalls could be race conditions
|
|
for weight synching at the beginning of the evaluation loop.
|
|
evaluation_config: Typical usage is to pass extra args to evaluation env
|
|
creator and to disable exploration by computing deterministic actions.
|
|
IMPORTANT NOTE: Policy gradient algorithms are able to find the optimal
|
|
policy, even if this is a stochastic one. Setting "explore=False" here
|
|
will result in the evaluation workers not using this optimal policy!
|
|
off_policy_estimation_methods: Specify how to evaluate the current policy,
|
|
along with any optional config parameters. This only has an effect when
|
|
reading offline experiences ("input" is not "sampler").
|
|
Available keys:
|
|
{ope_method_name: {"type": ope_type, ...}} where `ope_method_name`
|
|
is a user-defined string to save the OPE results under, and
|
|
`ope_type` can be any subclass of OffPolicyEstimator, e.g.
|
|
ray.rllib.offline.estimators.is::ImportanceSampling
|
|
or your own custom subclass, or the full class path to the subclass.
|
|
You can also add additional config arguments to be passed to the
|
|
OffPolicyEstimator in the dict, e.g.
|
|
{"qreg_dr": {"type": DoublyRobust, "q_model_type": "qreg", "k": 5}}
|
|
evaluation_num_workers: Number of parallel workers to use for evaluation.
|
|
Note that this is set to zero by default, which means evaluation will
|
|
be run in the algorithm process (only if evaluation_interval is not
|
|
None). If you increase this, it will increase the Ray resource usage of
|
|
the algorithm since evaluation workers are created separately from
|
|
rollout workers (used to sample data for training).
|
|
custom_evaluation_function: Customize the evaluation method. This must be a
|
|
function of signature (algo: Algorithm, eval_workers: WorkerSet) ->
|
|
metrics: dict. See the Algorithm.evaluate() method to see the default
|
|
implementation. The Algorithm guarantees all eval workers have the
|
|
latest policy state before this function is called.
|
|
always_attach_evaluation_results: Make sure the latest available evaluation
|
|
results are always attached to a step result dict. This may be useful
|
|
if Tune or some other meta controller needs access to evaluation metrics
|
|
all the time.
|
|
enable_async_evaluation: If True, use an AsyncRequestsManager for
|
|
the evaluation workers and use this manager to send `sample()` requests
|
|
to the evaluation workers. This way, the Algorithm becomes more robust
|
|
against long running episodes and/or failing (and restarting) workers.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if evaluation_interval is not None:
|
|
self.evaluation_interval = evaluation_interval
|
|
if evaluation_duration is not None:
|
|
self.evaluation_duration = evaluation_duration
|
|
if evaluation_duration_unit is not None:
|
|
self.evaluation_duration_unit = evaluation_duration_unit
|
|
if evaluation_sample_timeout_s is not None:
|
|
self.evaluation_sample_timeout_s = evaluation_sample_timeout_s
|
|
if evaluation_parallel_to_training is not None:
|
|
self.evaluation_parallel_to_training = evaluation_parallel_to_training
|
|
if evaluation_config is not None:
|
|
# Convert another AlgorithmConfig into dict.
|
|
if isinstance(evaluation_config, AlgorithmConfig):
|
|
self.evaluation_config = evaluation_config.to_dict()
|
|
else:
|
|
self.evaluation_config = evaluation_config
|
|
if off_policy_estimation_methods is not None:
|
|
self.off_policy_estimation_methods = off_policy_estimation_methods
|
|
if evaluation_num_workers is not None:
|
|
self.evaluation_num_workers = evaluation_num_workers
|
|
if custom_evaluation_function is not None:
|
|
self.custom_evaluation_function = custom_evaluation_function
|
|
if always_attach_evaluation_results:
|
|
self.always_attach_evaluation_results = always_attach_evaluation_results
|
|
if enable_async_evaluation:
|
|
self.enable_async_evaluation = enable_async_evaluation
|
|
|
|
return self
|
|
|
|
def offline_data(
|
|
self,
|
|
*,
|
|
input_=None,
|
|
input_config=None,
|
|
actions_in_input_normalized=None,
|
|
input_evaluation=None,
|
|
postprocess_inputs=None,
|
|
shuffle_buffer_size=None,
|
|
output=None,
|
|
output_config=None,
|
|
output_compress_columns=None,
|
|
output_max_file_size=None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's offline data settings.
|
|
|
|
TODO(jungong, sven): we can potentially unify all input types
|
|
under input and input_config keys. E.g.
|
|
input: sample
|
|
input_config {
|
|
env: Cartpole-v0
|
|
}
|
|
or:
|
|
input: json_reader
|
|
input_config {
|
|
path: /tmp/
|
|
}
|
|
or:
|
|
input: dataset
|
|
input_config {
|
|
format: parquet
|
|
path: /tmp/
|
|
}
|
|
|
|
Args:
|
|
input_: Specify how to generate experiences:
|
|
- "sampler": Generate experiences via online (env) simulation (default).
|
|
- A local directory or file glob expression (e.g., "/tmp/*.json").
|
|
- A list of individual file paths/URIs (e.g., ["/tmp/1.json",
|
|
"s3://bucket/2.json"]).
|
|
- A dict with string keys and sampling probabilities as values (e.g.,
|
|
{"sampler": 0.4, "/tmp/*.json": 0.4, "s3://bucket/expert.json": 0.2}).
|
|
- A callable that takes an `IOContext` object as only arg and returns a
|
|
ray.rllib.offline.InputReader.
|
|
- A string key that indexes a callable with tune.registry.register_input
|
|
input_config: Arguments accessible from the IOContext for configuring custom
|
|
input.
|
|
actions_in_input_normalized: True, if the actions in a given offline "input"
|
|
are already normalized (between -1.0 and 1.0). This is usually the case
|
|
when the offline file has been generated by another RLlib algorithm
|
|
(e.g. PPO or SAC), while "normalize_actions" was set to True.
|
|
postprocess_inputs: Whether to run postprocess_trajectory() on the
|
|
trajectory fragments from offline inputs. Note that postprocessing will
|
|
be done using the *current* policy, not the *behavior* policy, which
|
|
is typically undesirable for on-policy algorithms.
|
|
shuffle_buffer_size: If positive, input batches will be shuffled via a
|
|
sliding window buffer of this number of batches. Use this if the input
|
|
data is not in random enough order. Input is delayed until the shuffle
|
|
buffer is filled.
|
|
output: Specify where experiences should be saved:
|
|
- None: don't save any experiences
|
|
- "logdir" to save to the agent log dir
|
|
- a path/URI to save to a custom output directory (e.g., "s3://bckt/")
|
|
- a function that returns a rllib.offline.OutputWriter
|
|
output_config: Arguments accessible from the IOContext for configuring
|
|
custom output.
|
|
output_compress_columns: What sample batch columns to LZ4 compress in the
|
|
output data.
|
|
output_max_file_size: Max output file size before rolling over to a
|
|
new file.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if input_ is not None:
|
|
self.input_ = input_
|
|
if input_config is not None:
|
|
self.input_config = input_config
|
|
if actions_in_input_normalized is not None:
|
|
self.actions_in_input_normalized = actions_in_input_normalized
|
|
if input_evaluation is not None:
|
|
deprecation_warning(
|
|
old="offline_data(input_evaluation={})".format(input_evaluation),
|
|
new="evaluation(off_policy_estimation_methods={})".format(
|
|
input_evaluation
|
|
),
|
|
error=True,
|
|
help="Running OPE during training is not recommended.",
|
|
)
|
|
if postprocess_inputs is not None:
|
|
self.postprocess_inputs = postprocess_inputs
|
|
if shuffle_buffer_size is not None:
|
|
self.shuffle_buffer_size = shuffle_buffer_size
|
|
if output is not None:
|
|
self.output = output
|
|
if output_config is not None:
|
|
self.output_config = output_config
|
|
if output_compress_columns is not None:
|
|
self.output_compress_columns = output_compress_columns
|
|
if output_max_file_size is not None:
|
|
self.output_max_file_size = output_max_file_size
|
|
|
|
return self
|
|
|
|
def multi_agent(
|
|
self,
|
|
*,
|
|
policies=None,
|
|
policy_map_capacity=None,
|
|
policy_map_cache=None,
|
|
policy_mapping_fn=None,
|
|
policies_to_train=None,
|
|
observation_fn=None,
|
|
replay_mode=None,
|
|
count_steps_by=None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's multi-agent settings.
|
|
|
|
Args:
|
|
policies: Map of type MultiAgentPolicyConfigDict from policy ids to tuples
|
|
of (policy_cls, obs_space, act_space, config). This defines the
|
|
observation and action spaces of the policies and any extra config.
|
|
policy_map_capacity: Keep this many policies in the "policy_map" (before
|
|
writing least-recently used ones to disk/S3).
|
|
policy_map_cache: Where to store overflowing (least-recently used) policies?
|
|
Could be a directory (str) or an S3 location. None for using the
|
|
default output dir.
|
|
policy_mapping_fn: Function mapping agent ids to policy ids.
|
|
policies_to_train: Determines those policies that should be updated.
|
|
Options are:
|
|
- None, for all policies.
|
|
- An iterable of PolicyIDs that should be updated.
|
|
- A callable, taking a PolicyID and a SampleBatch or MultiAgentBatch
|
|
and returning a bool (indicating whether the given policy is trainable
|
|
or not, given the particular batch). This allows you to have a policy
|
|
trained only on certain data (e.g. when playing against a certain
|
|
opponent).
|
|
observation_fn: Optional function that can be used to enhance the local
|
|
agent observations to include more state. See
|
|
rllib/evaluation/observation_function.py for more info.
|
|
replay_mode: When replay_mode=lockstep, RLlib will replay all the agent
|
|
transitions at a particular timestep together in a batch. This allows
|
|
the policy to implement differentiable shared computations between
|
|
agents it controls at that timestep. When replay_mode=independent,
|
|
transitions are replayed independently per policy.
|
|
count_steps_by: Which metric to use as the "batch size" when building a
|
|
MultiAgentBatch. The two supported values are:
|
|
"env_steps": Count each time the env is "stepped" (no matter how many
|
|
multi-agent actions are passed/how many multi-agent observations
|
|
have been returned in the previous step).
|
|
"agent_steps": Count each individual agent step as one step.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if policies is not None:
|
|
self.policies = policies
|
|
if policy_map_capacity is not None:
|
|
self.policy_map_capacity = policy_map_capacity
|
|
if policy_map_cache is not None:
|
|
self.policy_map_cache = policy_map_cache
|
|
if policy_mapping_fn is not None:
|
|
self.policy_mapping_fn = policy_mapping_fn
|
|
if policies_to_train is not None:
|
|
self.policies_to_train = policies_to_train
|
|
if observation_fn is not None:
|
|
self.observation_fn = observation_fn
|
|
if replay_mode is not None:
|
|
self.replay_mode = replay_mode
|
|
if count_steps_by is not None:
|
|
self.count_steps_by = count_steps_by
|
|
|
|
return self
|
|
|
|
def reporting(
|
|
self,
|
|
*,
|
|
keep_per_episode_custom_metrics: Optional[bool] = None,
|
|
metrics_episode_collection_timeout_s: Optional[float] = None,
|
|
metrics_num_episodes_for_smoothing: Optional[int] = None,
|
|
min_time_s_per_iteration: Optional[int] = None,
|
|
min_train_timesteps_per_iteration: Optional[int] = None,
|
|
min_sample_timesteps_per_iteration: Optional[int] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's reporting settings.
|
|
|
|
Args:
|
|
keep_per_episode_custom_metrics: Store raw custom metrics without
|
|
calculating max, min, mean
|
|
metrics_episode_collection_timeout_s: Wait for metric batches for at most
|
|
this many seconds. Those that have not returned in time will be
|
|
collected in the next train iteration.
|
|
metrics_num_episodes_for_smoothing: Smooth rollout metrics over this many
|
|
episodes, if possible.
|
|
In case rollouts (sample collection) just started, there may be fewer
|
|
than this many episodes in the buffer and we'll compute metrics
|
|
over this smaller number of available episodes.
|
|
In case there are more than this many episodes collected in a single
|
|
training iteration, use all of these episodes for metrics computation,
|
|
meaning don't ever cut any "excess" episodes.
|
|
min_time_s_per_iteration: Minimum time to accumulate within a single
|
|
`train()` call. This value does not affect learning,
|
|
only the number of times `Algorithm.training_step()` is called by
|
|
`Algorithm.train()`. If - after one such step attempt, the time taken
|
|
has not reached `min_time_s_per_iteration`, will perform n more
|
|
`training_step()` calls until the minimum time has been
|
|
consumed. Set to 0 or None for no minimum time.
|
|
min_train_timesteps_per_iteration: Minimum training timesteps to accumulate
|
|
within a single `train()` call. This value does not affect learning,
|
|
only the number of times `Algorithm.training_step()` is called by
|
|
`Algorithm.train()`. If - after one such step attempt, the training
|
|
timestep count has not been reached, will perform n more
|
|
`training_step()` calls until the minimum timesteps have been
|
|
executed. Set to 0 or None for no minimum timesteps.
|
|
min_sample_timesteps_per_iteration: Minimum env sampling timesteps to
|
|
accumulate within a single `train()` call. This value does not affect
|
|
learning, only the number of times `Algorithm.training_step()` is
|
|
called by `Algorithm.train()`. If - after one such step attempt, the env
|
|
sampling timestep count has not been reached, will perform n more
|
|
`training_step()` calls until the minimum timesteps have been
|
|
executed. Set to 0 or None for no minimum timesteps.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if keep_per_episode_custom_metrics is not None:
|
|
self.keep_per_episode_custom_metrics = keep_per_episode_custom_metrics
|
|
if metrics_episode_collection_timeout_s is not None:
|
|
self.metrics_episode_collection_timeout_s = (
|
|
metrics_episode_collection_timeout_s
|
|
)
|
|
if metrics_num_episodes_for_smoothing is not None:
|
|
self.metrics_num_episodes_for_smoothing = metrics_num_episodes_for_smoothing
|
|
if min_time_s_per_iteration is not None:
|
|
self.min_time_s_per_iteration = min_time_s_per_iteration
|
|
if min_train_timesteps_per_iteration is not None:
|
|
self.min_train_timesteps_per_iteration = min_train_timesteps_per_iteration
|
|
if min_sample_timesteps_per_iteration is not None:
|
|
self.min_sample_timesteps_per_iteration = min_sample_timesteps_per_iteration
|
|
|
|
return self
|
|
|
|
def debugging(
|
|
self,
|
|
*,
|
|
logger_creator: Optional[Callable[[], Logger]] = None,
|
|
logger_config: Optional[dict] = None,
|
|
log_level: Optional[str] = None,
|
|
log_sys_usage: Optional[bool] = None,
|
|
fake_sampler: Optional[bool] = None,
|
|
seed: Optional[int] = None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's debugging settings.
|
|
|
|
Args:
|
|
logger_creator: Callable that creates a ray.tune.Logger
|
|
object. If unspecified, a default logger is created.
|
|
logger_config: Define logger-specific configuration to be used inside Logger
|
|
Default value None allows overwriting with nested dicts.
|
|
log_level: Set the ray.rllib.* log level for the agent process and its
|
|
workers. Should be one of DEBUG, INFO, WARN, or ERROR. The DEBUG level
|
|
will also periodically print out summaries of relevant internal dataflow
|
|
(this is also printed out once at startup at the INFO level). When using
|
|
the `rllib train` command, you can also use the `-v` and `-vv` flags as
|
|
shorthand for INFO and DEBUG.
|
|
log_sys_usage: Log system resource metrics to results. This requires
|
|
`psutil` to be installed for sys stats, and `gputil` for GPU metrics.
|
|
fake_sampler: Use fake (infinite speed) sampler. For testing only.
|
|
seed: This argument, in conjunction with worker_index, sets the random
|
|
seed of each worker, so that identically configured trials will have
|
|
identical results. This makes experiments reproducible.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if logger_creator is not None:
|
|
self.logger_creator = logger_creator
|
|
if logger_config is not None:
|
|
self.logger_config = logger_config
|
|
if log_level is not None:
|
|
self.log_level = log_level
|
|
if log_sys_usage is not None:
|
|
self.log_sys_usage = log_sys_usage
|
|
if fake_sampler is not None:
|
|
self.fake_sampler = fake_sampler
|
|
if seed is not None:
|
|
self.seed = seed
|
|
|
|
return self
|
|
|
|
def experimental(
|
|
self,
|
|
*,
|
|
_tf_policy_handles_more_than_one_loss=None,
|
|
_disable_preprocessor_api=None,
|
|
_disable_action_flattening=None,
|
|
_disable_execution_plan_api=None,
|
|
) -> "AlgorithmConfig":
|
|
"""Sets the config's experimental settings.
|
|
|
|
Args:
|
|
_tf_policy_handles_more_than_one_loss: Experimental flag.
|
|
If True, TFPolicy will handle more than one loss/optimizer.
|
|
Set this to True, if you would like to return more than
|
|
one loss term from your `loss_fn` and an equal number of optimizers
|
|
from your `optimizer_fn`. In the future, the default for this will be
|
|
True.
|
|
_disable_preprocessor_api: Experimental flag.
|
|
If True, no (observation) preprocessor will be created and
|
|
observations will arrive in model as they are returned by the env.
|
|
In the future, the default for this will be True.
|
|
_disable_action_flattening: Experimental flag.
|
|
If True, RLlib will no longer flatten the policy-computed actions into
|
|
a single tensor (for storage in SampleCollectors/output files/etc..),
|
|
but leave (possibly nested) actions as-is. Disabling flattening affects:
|
|
- SampleCollectors: Have to store possibly nested action structs.
|
|
- Models that have the previous action(s) as part of their input.
|
|
- Algorithms reading from offline files (incl. action information).
|
|
_disable_execution_plan_api: Experimental flag.
|
|
If True, the execution plan API will not be used. Instead,
|
|
a Algorithm's `training_iteration` method will be called as-is each
|
|
training iteration.
|
|
|
|
Returns:
|
|
This updated AlgorithmConfig object.
|
|
"""
|
|
if _tf_policy_handles_more_than_one_loss is not None:
|
|
self._tf_policy_handles_more_than_one_loss = (
|
|
_tf_policy_handles_more_than_one_loss
|
|
)
|
|
if _disable_preprocessor_api is not None:
|
|
self._disable_preprocessor_api = _disable_preprocessor_api
|
|
if _disable_action_flattening is not None:
|
|
self._disable_action_flattening = _disable_action_flattening
|
|
if _disable_execution_plan_api is not None:
|
|
self._disable_execution_plan_api = _disable_execution_plan_api
|
|
|
|
return self
|