Merge branch 'master' of https://github.com/ray-project/ray into oomthreading

Signed-off-by: Clarence Ng <clarence.wyng@gmail.com>
This commit is contained in:
Clarence Ng 2022-09-02 14:57:54 -07:00
commit 56746a4f97
26 changed files with 395 additions and 154 deletions

View file

@ -282,6 +282,7 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 PYTHON=3.7 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- python ./ci/env/setup_credentials.py sigopt
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=tests_dir_S,tests_dir_T,tests_dir_U,tests_dir_V,tests_dir_W,tests_dir_X,tests_dir_Y,tests_dir_Z,-example,-py37,-soft_imports,-gpu_only,-rllib
python/ray/tune/...
@ -436,6 +437,9 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DOC_TESTING=1 PYTHON=3.7 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,-needs_credentials,-gpu,-py37,-post_wheel_build doc/...
# Only run examples with credentials in non-PRs
- if [ "$BUILDKITE_PULL_REQUEST" != "false" ]; then exit 0; fi
- python ./ci/env/setup_credentials.py wandb comet_ml
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,-gpu,-py37,-post_wheel_build doc/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,needs_credentials,-gpu,-py37,-post_wheel_build doc/...
- python ./ci/env/cleanup_test_state.py wandb comet_ml

View file

@ -48,4 +48,7 @@ if __name__ == "__main__":
)
for service in services:
SERVICES[service]()
try:
SERVICES[service]()
except Exception as e:
print(f"Could not cleanup service test state for {service}: {e}")

View file

@ -3,8 +3,10 @@ This script is used to set up credentials for some services in the
CI environment. For instance, it can fetch WandB API tokens and write
the WandB configuration file so test scripts can use the service.
"""
import json
import os
import sys
from pathlib import Path
import boto3
@ -14,6 +16,9 @@ AWS_WANDB_SECRET_ARN = (
AWS_COMET_SECRET_ARN = (
"arn:aws:secretsmanager:us-west-2:029272617770:secret:oss-ci/comet-ml-token-vw81C3"
)
AWS_SIGOPT_SECRET_ARN = (
"arn:aws:secretsmanager:us-west-2:029272617770:secret:oss-ci/sigopt-key-qEqYrk"
)
def get_and_write_wandb_api_key(client):
@ -28,9 +33,26 @@ def get_and_write_comet_ml_api_key(client):
fp.write(f"[comet]\napi_key={api_key}\n")
def get_and_write_sigopt_api_key(client):
api_key = client.get_secret_value(SecretId=AWS_SIGOPT_SECRET_ARN)["SecretString"]
sigopt_config_file = Path("~/.sigopt/client/config.json").expanduser()
sigopt_config_file.parent.mkdir(parents=True, exist_ok=True)
with open(sigopt_config_file, "wt") as f:
json.dump(
{
"api_token": api_key,
"code_tracking_enabled": False,
"log_collection_enabled": False,
},
f,
)
SERVICES = {
"wandb": get_and_write_wandb_api_key,
"comet_ml": get_and_write_comet_ml_api_key,
"sigopt": get_and_write_sigopt_api_key,
}
@ -49,4 +71,7 @@ if __name__ == "__main__":
client = boto3.client("secretsmanager", region_name="us-west-2")
for service in services:
SERVICES[service](client)
try:
SERVICES[service](client)
except Exception as e:
print(f"Could not setup service credentials for {service}: {e}")

View file

@ -74,7 +74,7 @@
"metadata": {},
"source": [
"To run the example in this guide, make sure your Kubernetes cluster (or local Kind cluster) can accomodate\n",
"additional resource requests of 3 CPU and 2Gi memory. \n",
"additional resource requests of 3 CPU and 3Gi memory. \n",
"\n",
"(kuberay-operator-deploy)=\n",
"## Deploying the KubeRay operator\n",
@ -157,7 +157,7 @@
"outputs": [],
"source": [
"# Deploy a sample Ray Cluster CR from the KubeRay repo:\n",
"! kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/release-0.3/ray-operator/config/samples/ray-cluster.autoscaler.yaml\n",
"! kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml\n",
"\n",
"# This Ray cluster is named `raycluster-autoscaler` because it has optional Ray Autoscaler support enabled."
]

View file

@ -49,7 +49,7 @@ First, follow the [quickstart guide](kuberay-quickstart) to create an autoscalin
# Create the KubeRay operator.
$ kubectl create -k "github.com/ray-project/kuberay/ray-operator/config/default?ref=v0.3.0&timeout=90s"
# Create an autoscaling Ray cluster.
$ kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/release-0.3/ray-operator/config/samples/ray-cluster.autoscaler.yaml
$ kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml
```
Now, we can run a Ray program on the head pod that uses [``request_resources``](ref-autoscaler-sdk) to scale the cluster to a total of 3 CPUs. The head and worker pods in our [example cluster config](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-cluster.autoscaler.yaml) each have a capacity of 1 CPU, and we specified a minimum of 1 worker pod. Thus, the request should trigger upscaling of one additional worker pod.

View file

@ -32,11 +32,21 @@ py_test_run_all_notebooks(
exclude = [
"huggingface_text_classification.ipynb",
"feast_example.ipynb", # REGRESSION
"upload_to_comet_ml.ipynb", # Needs credentials
"upload_to_wandb.ipynb", # Needs credentials
],
data = ["//doc/source/ray-air/examples:air_examples"],
tags = ["exclusive", "team:ml", "ray_air"],
)
py_test_run_all_notebooks(
size = "large",
include = ["upload_to_*.ipynb"],
exclude = [],
data = ["//doc/source/ray-air/examples:air_examples"],
tags = ["exclusive", "team:ml", "ray_air", "needs_credentials"],
)
# GPU Tests

View file

@ -80,8 +80,8 @@ RLlib's OPE estimators output six metrics:
- ``v_behavior_std``: The standard deviation corresponding to v_behavior.
- ``v_target``: The OPE's estimated discounted return for the target policy, averaged over episodes in the batch.
- ``v_target_std``: The standard deviation corresponding to v_target.
- ``v_gain``: ``v_target / max(v_behavior, 1e-8)``, averaged over episodes in the batch. ``v_gain > 1.0`` indicates that the policy is better than the policy that generated the behavior data.
- ``v_gain_std``: The standard deviation corresponding to v_gain.
- ``v_gain``: ``v_target / max(v_behavior, 1e-8)``. ``v_gain > 1.0`` indicates that the policy is better than the policy that generated the behavior data. In case, ``v_behavior <= 0``, ``v_delta`` should be used instead for comparison.
- ``v_delta``: The difference between v_target and v_behavior.
As an example, we generate an evaluation dataset for off-policy estimation:
@ -170,7 +170,7 @@ We can now train a DQN algorithm offline and evaluate it using OPE:
batch = reader.next()
print(estimator.estimate(batch))
# {'v_behavior': ..., 'v_target': ..., 'v_gain': ...,
# 'v_behavior_std': ..., 'v_target_std': ..., 'v_gain_std': ...}
# 'v_behavior_std': ..., 'v_target_std': ..., 'v_delta': ...}
Example: Converting external experiences to batch format
--------------------------------------------------------

View file

@ -99,6 +99,7 @@ if __name__ == "__main__":
do_link("experimental", force=args.yes, skip_list=args.skip)
do_link("util", force=args.yes, skip_list=args.skip)
do_link("workflow", force=args.yes, skip_list=args.skip)
do_link("widgets", force=args.yes, skip_list=args.skip)
do_link("cluster_utils.py", force=args.yes, skip_list=args.skip)
do_link("_private", force=args.yes, skip_list=args.skip)
# Link package's `dashboard` directly to local (repo's) dashboard.

View file

@ -11,6 +11,7 @@ from ray import air, tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.sigopt import SigOptSearch
from ray.tune.search.sigopt.sigopt_search import load_sigopt_key
def evaluate(step, width, height):
@ -39,6 +40,8 @@ if __name__ == "__main__":
)
args, _ = parser.parse_known_args()
load_sigopt_key()
if "SIGOPT_KEY" not in os.environ:
if args.smoke_test:
print("SigOpt API Key not found. Skipping smoke test.")

View file

@ -217,7 +217,7 @@ class RayTrialExecutor:
self._has_cleaned_up_pgs = False
self._reuse_actors = reuse_actors
# The maxlen will be updated when `set_max_pending_trials()` is called
# The maxlen will be updated when `setup(max_pending_trials)` is called
self._cached_actor_pg = deque(maxlen=1)
self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix())
self._staged_trials = set()
@ -235,16 +235,20 @@ class RayTrialExecutor:
self._buffer_max_time_s = float(
os.getenv("TUNE_RESULT_BUFFER_MAX_TIME_S", 100.0)
)
self._trainable_kwargs = {}
def set_max_pending_trials(self, max_pending: int) -> None:
def setup(
self, max_pending_trials: int, trainable_kwargs: Optional[Dict] = None
) -> None:
if len(self._cached_actor_pg) > 0:
logger.warning(
"Cannot update maximum number of queued actors for reuse "
"during a run."
)
else:
self._cached_actor_pg = deque(maxlen=max_pending)
self._pg_manager.set_max_staging(max_pending)
self._cached_actor_pg = deque(maxlen=max_pending_trials)
self._pg_manager.set_max_staging(max_pending_trials)
self._trainable_kwargs = trainable_kwargs or {}
def set_status(self, trial: Trial, status: str) -> None:
"""Sets status and checkpoints metadata if needed.
@ -377,6 +381,9 @@ class RayTrialExecutor:
kwargs["remote_checkpoint_dir"] = trial.remote_checkpoint_dir
kwargs["custom_syncer"] = trial.custom_syncer
if self._trainable_kwargs:
kwargs.update(self._trainable_kwargs)
# Throw a meaningful error if trainable does not use the
# new API
sig = inspect.signature(trial.get_trainable_cls())

View file

@ -198,6 +198,8 @@ class _ExperimentCheckpointManager:
exclude = ["*/checkpoint_*"]
if self._syncer:
# Todo: Implement sync_timeout for experiment-level syncing
# (it is currently only used for trainable-to-cloud syncing)
if force:
# Wait until previous sync command finished
self._syncer.wait()
@ -341,7 +343,13 @@ class TrialRunner:
else:
# Manual override
self._max_pending_trials = int(max_pending_trials)
self.trial_executor.set_max_pending_trials(self._max_pending_trials)
sync_config = sync_config or SyncConfig()
self.trial_executor.setup(
max_pending_trials=self._max_pending_trials,
trainable_kwargs={"sync_timeout": sync_config.sync_timeout},
)
self._metric = metric
@ -385,7 +393,6 @@ class TrialRunner:
if self._local_checkpoint_dir:
os.makedirs(self._local_checkpoint_dir, exist_ok=True)
sync_config = sync_config or SyncConfig()
self._remote_checkpoint_dir = remote_checkpoint_dir
self._syncer = get_node_to_storage_syncer(sync_config)

View file

@ -1,3 +1,3 @@
from ray.tune.search.sigopt.sigopt_search import SigOptSearch
from ray.tune.search.sigopt.sigopt_search import SigOptSearch, load_sigopt_key
__all__ = ["SigOptSearch"]
__all__ = ["SigOptSearch", "load_sigopt_key"]

View file

@ -1,7 +1,9 @@
import copy
import json
import os
import logging
import pickle
from pathlib import Path
from typing import Dict, List, Optional, Union
try:
@ -18,6 +20,31 @@ from ray.tune.search import Searcher
logger = logging.getLogger(__name__)
def load_sigopt_key(overwrite: bool = False) -> bool:
"""Load SigOpt key from config file and save in environment.
Args:
overwrite: If True, will overwrite an existing SIGOPT_KEY env variable.
Returns:
True if a key was loaded into the environment.
"""
if "SIGOPT_KEY" in os.environ and not overwrite:
return False
sigopt_key_file = Path("~/.sigopt/client/config.json").expanduser()
if not sigopt_key_file.exists():
return False
try:
with open(sigopt_key_file, "rt") as f:
config = json.load(f)
os.environ["SIGOPT_KEY"] = config["api_token"]
return True
except Exception:
return False
class SigOptSearch(Searcher):
"""A wrapper around SigOpt to provide trial suggestions.
@ -184,6 +211,7 @@ class SigOptSearch(Searcher):
), """SigOpt must be installed!
You can install SigOpt with the command:
`pip install -U sigopt`."""
load_sigopt_key()
assert (
"SIGOPT_KEY" in os.environ
), "SigOpt API key must be stored as environ variable at SIGOPT_KEY"

View file

@ -40,6 +40,9 @@ logger = logging.getLogger(__name__)
# Syncing period for syncing checkpoints between nodes or to cloud.
DEFAULT_SYNC_PERIOD = 300
# Default sync timeout after which syncing processes are aborted
DEFAULT_SYNC_TIMEOUT = 1800
_EXCLUDE_FROM_SYNC = [
"./checkpoint_-00001",
"./checkpoint_tmp*",
@ -85,6 +88,8 @@ class SyncConfig:
is asynchronous and best-effort. This does not affect persistent
storage syncing. Defaults to True.
sync_period: Syncing period for syncing between nodes.
sync_timeout: Timeout after which running sync processes are aborted.
Currently only affects trial-to-cloud syncing.
"""
@ -93,6 +98,7 @@ class SyncConfig:
sync_on_checkpoint: bool = True
sync_period: int = DEFAULT_SYNC_PERIOD
sync_timeout: int = DEFAULT_SYNC_TIMEOUT
def _repr_html_(self) -> str:
"""Generate an HTML representation of the SyncConfig.

View file

@ -499,7 +499,7 @@ class RayExecutorPlacementGroupTest(unittest.TestCase):
executor = RayTrialExecutor(reuse_actors=True)
executor._pg_manager = pgm
executor.set_max_pending_trials(1)
executor.setup(max_pending_trials=1)
def train(config):
yield 1

View file

@ -1,14 +1,16 @@
import json
import os
import tempfile
import time
from typing import Dict, Union
from unittest.mock import patch
import pytest
import ray
from ray import tune
from ray.air import session, Checkpoint
from ray.air._internal.remote_storage import download_from_uri
from ray.air._internal.remote_storage import download_from_uri, upload_to_uri
from ray.tune.trainable import wrap_function
@ -188,6 +190,42 @@ def test_checkpoint_object_no_sync(tmpdir):
trainable.restore_from_object(obj)
@pytest.mark.parametrize("hanging", [True, False])
def test_sync_timeout(tmpdir, hanging):
orig_upload_fn = upload_to_uri
def _hanging_upload(*args, **kwargs):
time.sleep(200 if hanging else 0)
orig_upload_fn(*args, **kwargs)
trainable = SavingTrainable(
"object",
remote_checkpoint_dir=f"memory:///test/location_hanging_{hanging}",
sync_timeout=0.5,
)
with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload):
trainable.save()
check_dir = tmpdir / "check_save_obj"
try:
download_from_uri(
uri=f"memory:///test/location_hanging_{hanging}", local_path=str(check_dir)
)
except FileNotFoundError:
hung = True
else:
hung = False
assert hung == hanging
if hanging:
assert not check_dir.exists()
else:
assert check_dir.listdir()
if __name__ == "__main__":
import sys

View file

@ -22,7 +22,7 @@ from ray.tune.search.flaml import CFO, BlendSearch
from ray.tune.search.skopt import SkOptSearch
from ray.tune.search.nevergrad import NevergradSearch
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.sigopt import SigOptSearch
from ray.tune.search.sigopt import SigOptSearch, load_sigopt_key
from ray.tune.search.zoopt import ZOOptSearch
from ray.tune.search.hebo import HEBOSearch
from ray.tune.search.ax import AxSearch
@ -316,6 +316,10 @@ class DragonflyWarmStartTest(AbstractWarmStartTest, unittest.TestCase):
class SigOptWarmStartTest(AbstractWarmStartTest, unittest.TestCase):
def setUp(self):
AbstractWarmStartTest.setUp(self)
load_sigopt_key()
def set_basic_conf(self):
space = [
{

View file

@ -1,45 +1,94 @@
import unittest
import time
import pytest
from ray.tune.search.variant_generator import format_vars
from ray.tune.utils.util import retry_fn
class TuneUtilsTest(unittest.TestCase):
def testFormatVars(self):
# Format brackets correctly
self.assertTrue(
format_vars(
{
("a", "b", "c"): 8.1234567,
("a", "b", "d"): [7, 8],
("a", "b", "e"): [[[3, 4]]],
}
),
"c=8.12345,d=7_8,e=3_4",
def test_format_vars():
# Format brackets correctly
assert (
format_vars(
{
("a", "b", "c"): 8.1234567,
("a", "b", "d"): [7, 8],
("a", "b", "e"): [[[3, 4]]],
}
)
# Sorted by full keys, but only last key is reported
self.assertTrue(
format_vars(
{
("a", "c", "x"): [7, 8],
("a", "b", "x"): 8.1234567,
}
),
"x=8.12345,x=7_8",
== "c=8.1235,d=7_8,e=3_4"
)
# Sorted by full keys, but only last key is reported
assert (
format_vars(
{
("a", "c", "x"): [7, 8],
("a", "b", "x"): 8.1234567,
}
)
# Filter out invalid chars. It's ok to have empty keys or values.
self.assertTrue(
format_vars(
{
("a c?x"): " <;%$ok ",
("some"): " ",
}
),
"a_c_x=ok,some=",
== "x=8.1235,x=7_8"
)
# Filter out invalid chars. It's ok to have empty keys or values.
assert (
format_vars(
{
("a c?x",): " <;%$ok ",
("some",): " ",
}
)
== "a_c_x=ok,some="
)
def test_retry_fn_repeat(tmpdir):
success = tmpdir / "success"
marker = tmpdir / "marker"
def _fail_once():
if marker.exists():
success.write_text(".", encoding="utf-8")
return
marker.write_text(".", encoding="utf-8")
raise RuntimeError("Failing")
assert not success.exists()
assert not marker.exists()
assert retry_fn(
fn=_fail_once,
exception_type=RuntimeError,
sleep_time=0,
)
assert success.exists()
assert marker.exists()
def test_retry_fn_timeout(tmpdir):
success = tmpdir / "success"
marker = tmpdir / "marker"
def _fail_once():
if not marker.exists():
marker.write_text(".", encoding="utf-8")
raise RuntimeError("Failing")
time.sleep(5)
success.write_text(".", encoding="utf-8")
return
assert not success.exists()
assert not marker.exists()
assert not retry_fn(
fn=_fail_once, exception_type=RuntimeError, sleep_time=0, timeout=0.1
)
assert not success.exists()
assert marker.exists()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -101,8 +101,9 @@ class Trainable:
logger_creator: Callable[[Dict[str, Any]], "Logger"] = None,
remote_checkpoint_dir: Optional[str] = None,
custom_syncer: Optional[Syncer] = None,
sync_timeout: Optional[int] = None,
):
"""Initialize an Trainable.
"""Initialize a Trainable.
Sets up logging and points ``self.logdir`` to a directory in which
training outputs should be placed.
@ -120,6 +121,7 @@ class Trainable:
which is different from **per checkpoint** directory.
custom_syncer: Syncer used for synchronizing data from Ray nodes
to external storage.
sync_timeout: Timeout after which sync processes are aborted.
"""
self._experiment_id = uuid.uuid4().hex
@ -171,6 +173,7 @@ class Trainable:
self.remote_checkpoint_dir = remote_checkpoint_dir
self.custom_syncer = custom_syncer
self.sync_timeout = sync_timeout
@property
def uses_cloud_checkpointing(self):
@ -512,12 +515,22 @@ class Trainable:
return True
checkpoint = Checkpoint.from_directory(checkpoint_dir)
retry_fn(
lambda: checkpoint.to_uri(self._storage_path(checkpoint_dir)),
checkpoint_uri = self._storage_path(checkpoint_dir)
if not retry_fn(
lambda: checkpoint.to_uri(checkpoint_uri),
subprocess.CalledProcessError,
num_retries=3,
sleep_time=1,
)
timeout=self.sync_timeout,
):
logger.error(
f"Could not upload checkpoint even after 3 retries."
f"Please check if the credentials expired and that the remote "
f"filesystem is supported.. For large checkpoints, consider "
f"increasing `SyncConfig(sync_timeout)` "
f"(current value: {self.sync_timeout} seconds). Checkpoint URI: "
f"{checkpoint_uri}"
)
return True
def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool:
@ -546,12 +559,17 @@ class Trainable:
return True
checkpoint = Checkpoint.from_uri(external_uri)
retry_fn(
if not retry_fn(
lambda: checkpoint.to_directory(local_dir),
subprocess.CalledProcessError,
num_retries=3,
sleep_time=1,
)
timeout=self.sync_timeout,
):
logger.error(
f"Could not download checkpoint even after 3 retries: "
f"{external_uri}"
)
return True
@ -719,12 +737,17 @@ class Trainable:
self.custom_syncer.wait_or_retry()
else:
checkpoint_uri = self._storage_path(checkpoint_dir)
retry_fn(
if not retry_fn(
lambda: _delete_external_checkpoint(checkpoint_uri),
subprocess.CalledProcessError,
num_retries=3,
sleep_time=1,
)
timeout=self.sync_timeout,
):
logger.error(
f"Could not delete checkpoint even after 3 retries: "
f"{checkpoint_uri}"
)
if os.path.exists(checkpoint_dir):
shutil.rmtree(checkpoint_dir)

View file

@ -7,6 +7,7 @@ import threading
import time
from collections import defaultdict
from datetime import datetime
from numbers import Number
from threading import Thread
from typing import Dict, List, Union, Type, Callable, Any, Optional
@ -124,18 +125,41 @@ class UtilMonitor(Thread):
@DeveloperAPI
def retry_fn(
fn: Callable[[], Any],
exception_type: Type[Exception],
exception_type: Type[Exception] = Exception,
num_retries: int = 3,
sleep_time: int = 1,
):
for i in range(num_retries):
timeout: Optional[Number] = None,
) -> bool:
errored = threading.Event()
def _try_fn():
try:
fn()
except exception_type as e:
logger.warning(e)
time.sleep(sleep_time)
else:
break
errored.set()
for i in range(num_retries):
errored.clear()
proc = threading.Thread(target=_try_fn)
proc.daemon = True
proc.start()
proc.join(timeout=timeout)
if proc.is_alive():
logger.debug(
f"Process timed out (try {i+1}/{num_retries}): "
f"{getattr(fn, '__name__', None)}"
)
elif not errored.is_set():
return True
# Timed out, sleep and try again
time.sleep(sleep_time)
# Timed out, so return False
return False
@ray.remote

View file

@ -3527,7 +3527,6 @@
legacy:
test_name: stress_test_state_api_scale
test_suite: nightly_tests
stable: false
frequency: nightly
team: core
@ -3560,7 +3559,6 @@
legacy:
test_name: shuffle_20gb_with_state_api
test_suite: nightly_tests
stable: false
frequency: nightly
team: core

View file

@ -75,12 +75,12 @@ class DirectMethod(OffPolicyEstimator):
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes
- v_gain_std: The standard deviation corresponding to v_gain
- v_gain: v_target / max(v_behavior, 1e-8)
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates = {"v_behavior": [], "v_target": [], "v_gain": []}
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
# Calculate Direct Method OPE estimates
for episode in batch.split_by_episode():
rewards = episode["rewards"]
@ -93,15 +93,18 @@ class DirectMethod(OffPolicyEstimator):
v_target = self.model.estimate_v(init_step)
v_target = convert_to_numpy(v_target).item()
estimates["v_behavior"].append(v_behavior)
estimates["v_target"].append(v_target)
estimates["v_gain"].append(v_target / max(v_behavior, 1e-8))
estimates["v_behavior_std"] = np.std(estimates["v_behavior"])
estimates["v_behavior"] = np.mean(estimates["v_behavior"])
estimates["v_target_std"] = np.std(estimates["v_target"])
estimates["v_target"] = np.mean(estimates["v_target"])
estimates["v_gain_std"] = np.std(estimates["v_gain"])
estimates["v_gain"] = np.mean(estimates["v_gain"])
estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)
estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]
return estimates
@override(OffPolicyEstimator)

View file

@ -90,12 +90,12 @@ class DoublyRobust(OffPolicyEstimator):
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes
- v_gain_std: The standard deviation corresponding to v_gain
- v_gain: v_target / max(v_behavior, 1e-8)'
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates = {"v_behavior": [], "v_target": [], "v_gain": []}
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
# Calculate doubly robust OPE estimates
for episode in batch.split_by_episode():
rewards, old_prob = episode["rewards"], episode["action_prob"]
@ -119,15 +119,18 @@ class DoublyRobust(OffPolicyEstimator):
)
v_target = v_target.item()
estimates["v_behavior"].append(v_behavior)
estimates["v_target"].append(v_target)
estimates["v_gain"].append(v_target / max(v_behavior, 1e-8))
estimates["v_behavior_std"] = np.std(estimates["v_behavior"])
estimates["v_behavior"] = np.mean(estimates["v_behavior"])
estimates["v_target_std"] = np.std(estimates["v_target"])
estimates["v_target"] = np.mean(estimates["v_target"])
estimates["v_gain_std"] = np.std(estimates["v_gain"])
estimates["v_gain"] = np.mean(estimates["v_gain"])
estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)
estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]
return estimates
@override(OffPolicyEstimator)

View file

@ -37,12 +37,12 @@ class ImportanceSampling(OffPolicyEstimator):
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes
- v_gain_std: The standard deviation corresponding to v_gain
- v_gain: v_target / max(v_behavior, 1e-8)
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates = {"v_behavior": [], "v_target": [], "v_gain": []}
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
for episode in batch.split_by_episode():
rewards, old_prob = episode["rewards"], episode["action_prob"]
log_likelihoods = compute_log_likelihoods_from_input_dict(
@ -66,13 +66,16 @@ class ImportanceSampling(OffPolicyEstimator):
v_behavior += rewards[t] * self.gamma ** t
v_target += p[t] * rewards[t] * self.gamma ** t
estimates["v_behavior"].append(v_behavior)
estimates["v_target"].append(v_target)
estimates["v_gain"].append(v_target / max(v_behavior, 1e-8))
estimates["v_behavior_std"] = np.std(estimates["v_behavior"])
estimates["v_behavior"] = np.mean(estimates["v_behavior"])
estimates["v_target_std"] = np.std(estimates["v_target"])
estimates["v_target"] = np.mean(estimates["v_target"])
estimates["v_gain_std"] = np.std(estimates["v_gain"])
estimates["v_gain"] = np.mean(estimates["v_gain"])
estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)
estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]
return estimates

View file

@ -25,6 +25,15 @@ import ray
torch, _ = try_import_torch()
ESTIMATOR_OUTPUTS = {
"v_behavior",
"v_behavior_std",
"v_target",
"v_target_std",
"v_gain",
"v_delta",
}
class TestOPE(unittest.TestCase):
"""Compilation tests for using OPE both standalone and in an RLlib Algorithm"""
@ -75,49 +84,38 @@ class TestOPE(unittest.TestCase):
def tearDownClass(cls):
ray.shutdown()
def test_ope_standalone(self):
# Test all OPE methods standalone
estimator_outputs = {
"v_behavior",
"v_behavior_std",
"v_target",
"v_target_std",
"v_gain",
"v_gain_std",
}
estimator = ImportanceSampling(
policy=self.algo.get_policy(),
gamma=self.gamma,
)
estimates = estimator.estimate(self.batch)
self.assertEqual(estimates.keys(), estimator_outputs)
def test_is_and_wis_standalone(self):
ope_classes = [
ImportanceSampling,
WeightedImportanceSampling,
]
estimator = WeightedImportanceSampling(
policy=self.algo.get_policy(),
gamma=self.gamma,
)
estimates = estimator.estimate(self.batch)
self.assertEqual(estimates.keys(), estimator_outputs)
for class_module in ope_classes:
estimator = class_module(
policy=self.algo.get_policy(),
gamma=self.gamma,
)
estimates = estimator.estimate(self.batch)
self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS)
check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"])
estimator = DirectMethod(
policy=self.algo.get_policy(),
gamma=self.gamma,
q_model_config=self.q_model_config,
)
losses = estimator.train(self.batch)
assert losses, "DM estimator did not return mean loss"
estimates = estimator.estimate(self.batch)
self.assertEqual(estimates.keys(), estimator_outputs)
def test_dm_and_dr_standalone(self):
ope_classes = [
DirectMethod,
DoublyRobust,
]
estimator = DoublyRobust(
policy=self.algo.get_policy(),
gamma=self.gamma,
q_model_config=self.q_model_config,
)
losses = estimator.train(self.batch)
assert losses, "DM estimator did not return mean loss"
estimates = estimator.estimate(self.batch)
self.assertEqual(estimates.keys(), estimator_outputs)
for class_module in ope_classes:
estimator = class_module(
policy=self.algo.get_policy(),
gamma=self.gamma,
q_model_config=self.q_model_config,
)
losses = estimator.train(self.batch)
assert losses, f"{class_module.__name__} estimator did not return mean loss"
estimates = estimator.estimate(self.batch)
self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS)
check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"])
def test_ope_in_algo(self):
# Test OPE in DQN, during training as well as by calling evaluate()

View file

@ -49,10 +49,11 @@ class WeightedImportanceSampling(OffPolicyEstimator):
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes
- v_gain_std: The standard deviation corresponding to v_gain
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates = {"v_behavior": [], "v_target": [], "v_gain": []}
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
for episode in batch.split_by_episode():
rewards, old_prob = episode["rewards"], episode["action_prob"]
log_likelihoods = compute_log_likelihoods_from_input_dict(
@ -84,13 +85,16 @@ class WeightedImportanceSampling(OffPolicyEstimator):
w_t = self.filter_values[t] / self.filter_counts[t]
v_target += p[t] / w_t * rewards[t] * self.gamma ** t
estimates["v_behavior"].append(v_behavior)
estimates["v_target"].append(v_target)
estimates["v_gain"].append(v_target / max(v_behavior, 1e-8))
estimates["v_behavior_std"] = np.std(estimates["v_behavior"])
estimates["v_behavior"] = np.mean(estimates["v_behavior"])
estimates["v_target_std"] = np.std(estimates["v_target"])
estimates["v_target"] = np.mean(estimates["v_target"])
estimates["v_gain_std"] = np.std(estimates["v_gain"])
estimates["v_gain"] = np.mean(estimates["v_gain"])
estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)
estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]
return estimates