ray/release/xgboost_tests/workloads/ft_small_elastic.py
2021-06-01 11:19:15 -07:00

89 lines
3 KiB
Python

"""Fault tolerance test (small cluster, elastic training)
In this run, two training actors will die after some time. It is expected that
in both cases xgboost_ray stops training, but continues right away with the
remaining three actors. Shortly after, the actors will be restarted and
re-integrated into the training loop. Training should finish with all four
actors.
Test owner: krfricke
Acceptance criteria: Should run through and report final results. Intermediate
output should show that training continues with fewer actors when an
actor died. The test will fail if elastic training didn't work.
Notes: This test seems to be somewhat flaky. This might be due to
race conditions in handling dead actors. This is likely a problem of
the xgboost_ray implementation and not of this test.
"""
import warnings
from unittest.mock import patch
import ray
from xgboost_ray import RayParams
from xgboost_ray.main import _train as unmocked_train
from ray.util.xgboost.release_test_util import train_ray, \
FailureState, FailureInjection, TrackingCallback
if __name__ == "__main__":
ray.init(address="auto")
from xgboost_ray.main import logger
logger.setLevel(10)
failure_state = FailureState.remote()
ray_params = RayParams(
elastic_training=True,
max_failed_actors=2,
max_actor_restarts=3,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
world_sizes = []
start_actors = []
def _mock_train(*args, _training_state, **kwargs):
world_sizes.append(len([a for a in _training_state.actors if a]))
start_actors.append(len(_training_state.failed_actor_ranks))
return unmocked_train(*args, _training_state=_training_state, **kwargs)
with patch("xgboost_ray.main._train") as mocked:
mocked.side_effect = _mock_train
_, additional_results, _ = train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=200,
regression=False,
use_gpu=False,
ray_params=ray_params,
xgboost_params=None,
callbacks=[
TrackingCallback(),
FailureInjection(
id="first_fail",
state=failure_state,
ranks=[2],
iteration=14),
FailureInjection(
id="second_fail",
state=failure_state,
ranks=[0],
iteration=34)
])
actor_1_world_size = set(additional_results["callback_returns"][1])
if 3 not in actor_1_world_size and 3 not in world_sizes and \
1 not in world_sizes:
warnings.warn(
"No training with only 3 actors observed, but this was elastic "
"training. Please check the output to see if data loading was "
"too fast so that the training actors were re-integrated directly "
"after restarting.")
print("PASSED.")