ray/rllib/execution/metric_ops.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

242 lines
8.9 KiB
Python
Raw Permalink Normal View History

from typing import Any, Dict, List, Optional
import time
from ray.actor import ActorHandle
from ray.util.iter import LocalIterator
from ray.rllib.evaluation.metrics import collect_episodes, summarize_episodes
from ray.rllib.execution.common import (
AGENT_STEPS_SAMPLED_COUNTER,
STEPS_SAMPLED_COUNTER,
STEPS_TRAINED_COUNTER,
STEPS_TRAINED_THIS_ITER_COUNTER,
_get_shared_metrics,
)
from ray.rllib.evaluation.worker_set import WorkerSet
def StandardMetricsReporting(
train_op: LocalIterator[Any],
workers: WorkerSet,
config: dict,
selected_workers: List[ActorHandle] = None,
by_steps_trained: bool = False,
) -> LocalIterator[dict]:
"""Operator to periodically collect and report metrics.
Args:
train_op: Operator for executing training steps.
We ignore the output values.
workers: Rollout workers to collect metrics from.
config: Algorithm configuration, used to determine the frequency
of stats reporting.
selected_workers: Override the list of remote workers
to collect metrics from.
by_steps_trained: If True, uses the `STEPS_TRAINED_COUNTER`
instead of the `STEPS_SAMPLED_COUNTER` in metrics.
Returns:
LocalIterator[dict]: A local iterator over training results.
Examples:
>>> from ray.rllib.execution import ParallelRollouts, TrainOneStep
>>> train_op = ParallelRollouts(...) # doctest: +SKIP
... .for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting( # doctest: +SKIP
... train_op, workers, config)
>>> next(metrics_op) # doctest: +SKIP
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
"""
output_op = (
train_op.filter(
OncePerTimestepsElapsed(
config["min_train_timesteps_per_iteration"] or 0
if by_steps_trained
else config["min_sample_timesteps_per_iteration"] or 0,
by_steps_trained=by_steps_trained,
)
)
.filter(OncePerTimeInterval(config["min_time_s_per_iteration"]))
.for_each(
CollectMetrics(
workers,
min_history=config["metrics_num_episodes_for_smoothing"],
timeout_seconds=config["metrics_episode_collection_timeout_s"],
keep_per_episode_custom_metrics=config[
"keep_per_episode_custom_metrics"
],
selected_workers=selected_workers,
by_steps_trained=by_steps_trained,
)
)
)
return output_op
class CollectMetrics:
"""Callable that collects metrics from workers.
The metrics are smoothed over a given history window.
This should be used with the .for_each() operator. For a higher level
API, consider using StandardMetricsReporting instead.
Examples:
>>> from ray.rllib.execution.metric_ops import CollectMetrics
>>> train_op, workers = ... # doctest: +SKIP
>>> output_op = train_op.for_each(CollectMetrics(workers)) # doctest: +SKIP
>>> print(next(output_op)) # doctest: +SKIP
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
"""
def __init__(
self,
2020-12-24 06:30:33 -08:00
workers: WorkerSet,
min_history: int = 100,
timeout_seconds: int = 180,
keep_per_episode_custom_metrics: bool = False,
selected_workers: List[ActorHandle] = None,
by_steps_trained: bool = False,
):
self.workers = workers
self.episode_history = []
self.to_be_collected = []
self.min_history = min_history
self.timeout_seconds = timeout_seconds
self.keep_custom_metrics = keep_per_episode_custom_metrics
self.selected_workers = selected_workers
self.by_steps_trained = by_steps_trained
2020-12-24 06:30:33 -08:00
def __call__(self, _: Any) -> Dict:
# Collect worker metrics.
episodes, self.to_be_collected = collect_episodes(
self.workers.local_worker(),
self.selected_workers or self.workers.remote_workers(),
self.to_be_collected,
timeout_seconds=self.timeout_seconds,
)
orig_episodes = list(episodes)
missing = self.min_history - len(episodes)
if missing > 0:
episodes = self.episode_history[-missing:] + episodes
assert len(episodes) <= self.min_history
self.episode_history.extend(orig_episodes)
self.episode_history = self.episode_history[-self.min_history :]
res = summarize_episodes(episodes, orig_episodes, self.keep_custom_metrics)
# Add in iterator metrics.
metrics = _get_shared_metrics()
custom_metrics_from_info = metrics.info.pop("custom_metrics", {})
timers = {}
counters = {}
info = {}
info.update(metrics.info)
for k, counter in metrics.counters.items():
counters[k] = counter
for k, timer in metrics.timers.items():
timers["{}_time_ms".format(k)] = round(timer.mean * 1000, 3)
if timer.has_units_processed():
timers["{}_throughput".format(k)] = round(timer.mean_throughput, 3)
res.update(
{
"num_healthy_workers": len(self.workers.remote_workers()),
"timesteps_total": (
metrics.counters[STEPS_TRAINED_COUNTER]
if self.by_steps_trained
else metrics.counters[STEPS_SAMPLED_COUNTER]
),
# tune.Trainable uses timesteps_this_iter for tracking
# total timesteps.
"timesteps_this_iter": metrics.counters[
STEPS_TRAINED_THIS_ITER_COUNTER
],
"agent_timesteps_total": metrics.counters.get(
AGENT_STEPS_SAMPLED_COUNTER, 0
),
}
)
res["timers"] = timers
res["info"] = info
res["info"].update(counters)
res["custom_metrics"] = res.get("custom_metrics", {})
res["episode_media"] = res.get("episode_media", {})
res["custom_metrics"].update(custom_metrics_from_info)
return res
class OncePerTimeInterval:
"""Callable that returns True once per given interval.
This should be used with the .filter() operator to throttle / rate-limit
metrics reporting. For a higher-level API, consider using
StandardMetricsReporting instead.
Examples:
>>> import time
>>> from ray.rllib.execution.metric_ops import OncePerTimeInterval
>>> train_op = ... # doctest: +SKIP
>>> throttled_op = train_op.filter(OncePerTimeInterval(5)) # doctest: +SKIP
>>> start = time.time() # doctest: +SKIP
>>> next(throttled_op) # doctest: +SKIP
>>> print(time.time() - start) # doctest: +SKIP
5.00001 # will be greater than 5 seconds
"""
def __init__(self, delay: Optional[float] = None):
self.delay = delay or 0.0
self.last_returned_true = 0
def __call__(self, item: Any) -> bool:
# No minimum time to wait for -> Return True.
if self.delay <= 0.0:
return True
# Return True, if time since last returned=True is larger than
# `self.delay`.
now = time.time()
if now - self.last_returned_true > self.delay:
self.last_returned_true = now
return True
return False
class OncePerTimestepsElapsed:
"""Callable that returns True once per given number of timesteps.
This should be used with the .filter() operator to throttle / rate-limit
metrics reporting. For a higher-level API, consider using
StandardMetricsReporting instead.
Examples:
>>> from ray.rllib.execution.metric_ops import OncePerTimestepsElapsed
>>> train_op = ... # doctest: +SKIP
>>> throttled_op = train_op.filter( # doctest: +SKIP
... OncePerTimestepsElapsed(1000))
>>> next(throttled_op) # doctest: +SKIP
# will only return after 1000 steps have elapsed
"""
def __init__(self, delay_steps: int, by_steps_trained: bool = False):
"""
Args:
delay_steps: The number of steps (sampled or trained) every
which this op returns True.
by_steps_trained: If True, uses the `STEPS_TRAINED_COUNTER`
instead of the `STEPS_SAMPLED_COUNTER` in metrics.
"""
self.delay_steps = delay_steps
self.by_steps_trained = by_steps_trained
self.last_called = 0
def __call__(self, item: Any) -> bool:
if self.delay_steps <= 0:
return True
metrics = _get_shared_metrics()
if self.by_steps_trained:
now = metrics.counters[STEPS_TRAINED_COUNTER]
else:
now = metrics.counters[STEPS_SAMPLED_COUNTER]
if now - self.last_called >= self.delay_steps:
self.last_called = now
return True
return False