ray/rllib/tests/test_placement_groups.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

133 lines
4.4 KiB
Python
Raw Normal View History

import os
import unittest
import ray
from ray import tune
from ray.tune import Callback
from ray.rllib.algorithms.pg import PG, DEFAULT_CONFIG
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
from ray.tune.experiment import Trial
from ray.tune.execution.placement_groups import PlacementGroupFactory
trial_executor = None
class _TestCallback(Callback):
def on_step_end(self, iteration, trials, **info):
num_finished = len(
[
t
for t in trials
if t.status == Trial.TERMINATED or t.status == Trial.ERROR
]
)
num_running = len([t for t in trials if t.status == Trial.RUNNING])
num_staging = sum(len(s) for s in trial_executor._pg_manager._staging.values())
num_ready = sum(len(s) for s in trial_executor._pg_manager._ready.values())
num_in_use = len(trial_executor._pg_manager._in_use_pgs)
num_cached = len(trial_executor._pg_manager._cached_pgs)
total_num_tracked = num_staging + num_ready + num_in_use + num_cached
# All 3 trials (3 different learning rates) should be scheduled.
assert 3 == min(3, len(trials))
# Cannot run more than 2 at a time
# (due to different resource restrictions in the test cases).
assert num_running <= 2
# The number of placement groups should decrease
# when trials finish.
assert max(3, len(trials)) - num_finished == total_num_tracked
class TestPlacementGroups(unittest.TestCase):
def setUp(self) -> None:
os.environ["TUNE_PLACEMENT_GROUP_RECON_INTERVAL"] = "0"
ray.init(num_cpus=6)
def tearDown(self) -> None:
ray.shutdown()
def test_overriding_default_resource_request(self):
config = DEFAULT_CONFIG.copy()
config["model"]["fcnet_hiddens"] = [10]
config["num_workers"] = 2
# 3 Trials: Can only run 2 at a time (num_cpus=6; needed: 3).
config["lr"] = tune.grid_search([0.1, 0.01, 0.001])
config["env"] = "CartPole-v0"
config["framework"] = "tf"
# Create an Algorithm with an overridden default_resource_request
# method that returns a PlacementGroupFactory.
class MyAlgo(PG):
@classmethod
def default_resource_request(cls, config):
head_bundle = {"CPU": 1, "GPU": 0}
child_bundle = {"CPU": 1}
return PlacementGroupFactory(
[head_bundle, child_bundle, child_bundle],
strategy=config["placement_strategy"],
)
tune.register_trainable("my_trainable", MyAlgo)
global trial_executor
trial_executor = RayTrialExecutor(reuse_actors=False)
tune.run(
"my_trainable",
config=config,
stop={"training_iteration": 2},
trial_executor=trial_executor,
callbacks=[_TestCallback()],
verbose=2,
)
def test_default_resource_request(self):
config = DEFAULT_CONFIG.copy()
config["model"]["fcnet_hiddens"] = [10]
config["num_workers"] = 2
config["num_cpus_per_worker"] = 2
# 3 Trials: Can only run 1 at a time (num_cpus=6; needed: 5).
config["lr"] = tune.grid_search([0.1, 0.01, 0.001])
config["env"] = "CartPole-v0"
config["framework"] = "torch"
config["placement_strategy"] = "SPREAD"
global trial_executor
trial_executor = RayTrialExecutor(reuse_actors=False)
tune.run(
"PG",
config=config,
stop={"training_iteration": 2},
trial_executor=trial_executor,
callbacks=[_TestCallback()],
verbose=2,
)
def test_default_resource_request_plus_manual_leads_to_error(self):
config = DEFAULT_CONFIG.copy()
config["model"]["fcnet_hiddens"] = [10]
config["num_workers"] = 0
config["env"] = "CartPole-v0"
try:
tune.run(
"PG",
config=config,
stop={"training_iteration": 2},
resources_per_trial=PlacementGroupFactory([{"CPU": 1}]),
verbose=2,
)
except ValueError as e:
assert "have been automatically set to" in e.args[0]
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))