[release] LightGBM release tests (#17043)

This commit is contained in:
Antoni Baum 2021-07-14 09:38:55 +02:00 committed by GitHub
parent 0f79ebbd75
commit cfc5806c2d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1199 additions and 0 deletions

View file

@ -0,0 +1,36 @@
# --------------------------------------------------------------------
# Tests from the python/ray/util/lightgbm directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
name = "simple_example",
size = "small",
srcs = ["simple_example.py"],
deps = [":lgbm_lib"],
tags = ["exclusive"],
)
py_test(
name = "simple_tune",
size="small",
srcs = ["simple_tune.py"],
deps = [":lgbm_lib"],
tags = ["exclusive"]
)
py_test(
name = "test_client",
size = "small",
srcs = ["tests/test_client.py"],
deps = [":lgbm_lib"],
tags = ["exclusive", "client"]
)
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "lgbm_lib",
srcs = glob(["**/*.py"]),
)

View file

@ -0,0 +1,18 @@
import logging
logger = logging.getLogger(__name__)
train = None
predict = None
RayParams = None
RayDMatrix = None
RayFileType = None
try:
from lightgbm_ray import train, predict, RayParams, RayDMatrix, RayFileType
except ImportError:
logger.info(
"lightgbm_ray is not installed. Please run "
"`pip install git+https://github.com/ray-project/lightgbm_ray`.")
__all__ = ["train", "predict", "RayParams", "RayDMatrix", "RayFileType"]

View file

@ -0,0 +1,149 @@
import glob
import os
import time
import ray
from lightgbm_ray import train, RayDMatrix, RayFileType, \
RayParams, RayDeviceQuantileDMatrix
from lightgbm_ray.tune import _TuneLGBMRank0Mixin
from lightgbm.callback import CallbackEnv
if "OMP_NUM_THREADS" in os.environ:
del os.environ["OMP_NUM_THREADS"]
@ray.remote
class FailureState:
def __init__(self):
self._failed_ids = set()
def set_failed(self, id):
if id in self._failed_ids:
return False
self._failed_ids.add(id)
return True
def has_failed(self, id):
return id in self._failed_ids
class FailureInjection(_TuneLGBMRank0Mixin):
def __init__(self, id, state, ranks, iteration):
self._id = id
self._state = state
self._ranks = ranks or []
self._iteration = iteration
def __call__(self, env: CallbackEnv):
if env.iteration == self._iteration:
rank = 0 if self.is_rank_0 else 1
if rank in self._ranks:
if not ray.get(self._state.has_failed.remote(self._id)):
success = ray.get(self._state.set_failed.remote(self._id))
if not success:
# Another rank is already about to fail
return
pid = os.getpid()
print(f"Killing process: {pid} for actor rank {rank}")
time.sleep(1)
os.kill(pid, 9)
order = 2
class TrackingCallback(_TuneLGBMRank0Mixin):
def __call__(self, env: CallbackEnv):
if self.is_rank_0:
print(f"[Rank 0] I am at iteration {env.iteration}")
order = 1
def train_ray(path,
num_workers,
num_boost_rounds,
num_files=0,
regression=False,
use_gpu=False,
ray_params=None,
lightgbm_params=None,
**kwargs):
path = os.path.expanduser(path)
if not os.path.exists(path):
raise ValueError(f"Path does not exist: {path}")
if num_files:
files = sorted(glob.glob(f"{path}/**/*.parquet"))
while num_files > len(files):
files = files + files
path = files[0:num_files]
use_device_matrix = False
if use_gpu:
try:
import cupy # noqa: F401
use_device_matrix = True
except ImportError:
use_device_matrix = False
if use_device_matrix:
dtrain = RayDeviceQuantileDMatrix(
path,
num_actors=num_workers,
label="labels",
ignore=["partition"],
filetype=RayFileType.PARQUET)
else:
dtrain = RayDMatrix(
path,
num_actors=num_workers,
label="labels",
ignore=["partition"],
filetype=RayFileType.PARQUET)
config = {"device": "cpu" if not use_gpu else "gpu"}
if not regression:
# Classification
config.update({
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
})
else:
# Regression
config.update({
"objective": "regression",
"metric": ["l2", "rmse"],
})
if lightgbm_params:
config.update(lightgbm_params)
start = time.time()
evals_result = {}
additional_results = {}
bst = train(
config,
dtrain,
evals_result=evals_result,
additional_results=additional_results,
num_boost_round=num_boost_rounds,
ray_params=ray_params or RayParams(
max_actor_restarts=2,
num_actors=num_workers,
cpus_per_actor=2,
gpus_per_actor=0 if not use_gpu else 1),
evals=[(dtrain, "train")],
**kwargs)
taken = time.time() - start
print(f"TRAIN TIME TAKEN: {taken:.2f} seconds")
out_file = os.path.expanduser(
"~/benchmark_{}.lgbm".format("cpu" if not use_gpu else "gpu"))
bst.booster_.save_model(out_file)
print("Final training error: {:.4f}".format(evals_result["train"][
"binary_error" if not regression else "rmse"][-1]))
return bst, additional_results, taken

View file

@ -0,0 +1,44 @@
from sklearn import datasets
from sklearn.model_selection import train_test_split
from ray.util.lightgbm import RayDMatrix, RayParams, train
# __lightgbm_begin__
def main():
# Load dataset
data, labels = datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25)
train_set = RayDMatrix(train_x, train_y)
test_set = RayDMatrix(test_x, test_y)
# Set config
config = {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
"max_depth": 3,
}
evals_result = {}
# Train the classifier
bst = train(
config,
train_set,
evals=[(test_set, "eval")],
evals_result=evals_result,
ray_params=RayParams(max_actor_restarts=1, num_actors=1),
verbose_eval=False)
bst.booster_.save_model("simple.lgbm")
print("Final validation error: {:.4f}".format(
evals_result["eval"]["binary_error"][-1]))
# __lightgbm_end__
if __name__ == "__main__":
main()

View file

@ -0,0 +1,95 @@
from sklearn import datasets
from sklearn.model_selection import train_test_split
from ray.util.lightgbm import RayDMatrix, RayParams, train
# __train_begin__
num_cpus_per_actor = 2
num_actors = 1
def train_model(config):
# Load dataset
data, labels = datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25)
train_set = RayDMatrix(train_x, train_y)
test_set = RayDMatrix(test_x, test_y)
evals_result = {}
bst = train(
params=config,
dtrain=train_set,
evals=[(test_set, "eval")],
evals_result=evals_result,
verbose_eval=False,
ray_params=RayParams(
num_actors=num_actors, cpus_per_actor=num_cpus_per_actor))
bst.booster_.save_model("model.lgbm")
# __train_end__
# __load_begin__
def load_best_model(best_logdir):
import lightgbm as lgbm
import os
best_bst = lgbm.Booster(model_file=os.path.join(best_logdir, "model.lgbm"))
return best_bst
# __load_end__
def main():
# __tune_begin__
from ray import tune
# Set config
config = {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
# __tune_end__
# __tune_run_begin__
analysis = tune.run(
train_model,
config=config,
metric="eval-binary_error",
mode="min",
num_samples=4,
resources_per_trial={
"cpu": 1,
"extra_cpu": num_actors * num_cpus_per_actor
})
# Load in the best performing model.
best_bst = load_best_model(analysis.best_logdir)
# Use the following code block instead if using Ray Client.
# import ray
# if ray.util.client.ray.is_connected():
# # If using Ray Client best_logdir is a directory on the server.
# # So we want to make sure we wrap model loading in a task.
# remote_load_fn = ray.remote(load_best_model)
# best_bst = ray.get(remote_load_fn.remote(analysis.best_logdir))
# Do something with the best model.
_ = best_bst
accuracy = 1. - analysis.best_result["eval-binary_error"]
print(f"Best model parameters: {analysis.best_config}")
print(f"Best model total accuracy: {accuracy:.4f}")
# __tune_run_end__
if __name__ == "__main__":
main()

View file

@ -0,0 +1,28 @@
import pytest
import sys
import ray
from ray.util.client.ray_client_helpers import ray_start_client_server
@pytest.fixture
def start_client_server():
with ray_start_client_server() as client:
yield client
def test_simple_example(start_client_server):
assert ray.util.client.ray.is_connected()
from ray.util.lightgbm.simple_example import main
main()
def test_simple_tune(start_client_server):
assert ray.util.client.ray.is_connected()
from ray.util.lightgbm.simple_tune import main
main()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,24 @@
LightGBM on Ray tests
====================
This directory contains various LightGBM on Ray release tests.
You should run these tests with the `releaser <https://github.com/ray-project/releaser>`_ tool.
Overview
--------
There are four kinds of tests:
1. ``distributed_api_test`` - checks general API functionality and should finish very quickly (< 1 minute)
2. ``train_*`` - checks single trial training on different setups.
3. ``tune_*`` - checks multi trial training via Ray Tune.
4. ``ft_*`` - checks fault tolerance.
Generally the releaser tool will run all tests in parallel, but if you do
it sequentially, be sure to do it in the order above. If ``train_*`` fails,
``tune_*`` will fail, too.
Acceptance criteria
-------------------
These tests are considered passing when they throw no error at the end of
the output log.

View file

@ -0,0 +1,24 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37"
env_vars: {}
debian_packages:
- curl
python:
pip_packages:
- pytest
- lightgbm_ray
- petastorm
- tblib
conda_packages: []
post_build_cmds:
- pip uninstall -y numpy ray || true
- sudo rm -rf /home/ray/anaconda3/lib/python3.7/site-packages/numpy
- pip3 install numpy || true
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}
- pip3 install -U lightgbm_ray petastorm # Install latest releases
- sudo mkdir -p /data || true
- sudo chown ray:1000 /data || true
- rm -rf /data/classification.parquet || true
- curl -o create_test_data.py https://raw.githubusercontent.com/ray-project/ray/releases/1.3.0/release/xgboost_tests/create_test_data.py # XGBoost is intended
- python ./create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2

View file

@ -0,0 +1,38 @@
cluster_name: ray-lightgbm-release-cpu-moderate
max_workers: 32
upscaling_speed: 32
idle_timeout_minutes: 15
docker:
image: anyscale/ray:latest
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_4_ondemand:
node_config:
InstanceType: m5.xlarge
resources: {"CPU": 4}
min_workers: 31
max_workers: 31
auth:
ssh_user: ubuntu
head_node_type: cpu_4_ondemand
worker_default_node_type: cpu_4_ondemand
file_mounts: {
"~/lightgbm_tests": "."
}
file_mounts_sync_continuously: false

View file

@ -0,0 +1,38 @@
cluster_name: ray-lightgbm-release-cpu-small
max_workers: 4
upscaling_speed: 32
idle_timeout_minutes: 15
docker:
image: anyscale/ray:latest
container_name: ray_container
pull_before_run: true
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: false
available_node_types:
cpu_4_ondemand:
node_config:
InstanceType: m5.xlarge
resources: {"CPU": 4}
min_workers: 3
max_workers: 3
auth:
ssh_user: ubuntu
head_node_type: cpu_4_ondemand
worker_default_node_type: cpu_4_ondemand
file_mounts: {
"~/lightgbm_tests": "."
}
file_mounts_sync_continuously: false

View file

@ -0,0 +1,58 @@
import argparse
import numpy as np
import os
from xgboost_ray.tests.utils import create_parquet
if __name__ == "__main__":
if "OMP_NUM_THREADS" in os.environ:
del os.environ["OMP_NUM_THREADS"]
parser = argparse.ArgumentParser(description="Create fake data.")
parser.add_argument(
"filename", type=str, default="/data/parted.parquet/", help="ray/dask")
parser.add_argument(
"-r",
"--num-rows",
required=False,
type=int,
default=1e8,
help="num rows")
parser.add_argument(
"-p",
"--num-partitions",
required=False,
type=int,
default=100,
help="num partitions")
parser.add_argument(
"-c",
"--num-cols",
required=False,
type=int,
default=4,
help="num columns (features)")
parser.add_argument(
"-C",
"--num-classes",
required=False,
type=int,
default=2,
help="num classes")
parser.add_argument(
"-s",
"--seed",
required=False,
type=int,
default=1234,
help="random seed")
args = parser.parse_args()
np.random.seed(args.seed)
create_parquet(
args.filename,
num_rows=int(args.num_rows),
num_partitions=int(args.num_partitions),
num_features=int(args.num_cols),
num_classes=int(args.num_classes))

View file

@ -0,0 +1,83 @@
- name: train_small
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_small.yaml
run:
use_connect: True
timeout: 600
prepare: python wait_cluster.py 4 600
script: python workloads/train_small.py
- name: train_moderate
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_moderate.yaml
run:
timeout: 600
prepare: python wait_cluster.py 32 600
script: python workloads/train_moderate.py
- name: train_gpu
cluster:
app_config: app_config_gpu.yaml
compute_template: tpl_gpu_small.yaml
run:
timeout: 600
prepare: python wait_cluster.py 5 600
script: python workloads/train_gpu.py
- name: distributed_api_test
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_small.yaml
results:
run:
timeout: 600
prepare: python wait_cluster.py 4 600
script: python workloads/distributed_api_test.py
results: ""
- name: ft_small_non_elastic
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_small.yaml
run:
timeout: 900
prepare: python wait_cluster.py 4 600
script: python workloads/ft_small_non_elastic.py
results: ""
- name: tune_small
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_small.yaml
run:
timeout: 600
prepare: python wait_cluster.py 4 600
script: python workloads/tune_small.py
- name: tune_32x4
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_moderate.yaml
run:
timeout: 900
prepare: python wait_cluster.py 32 600
script: python workloads/tune_32x4.py
- name: tune_4x32
cluster:
app_config: app_config.yaml
compute_template: tpl_cpu_moderate.yaml
run:
timeout: 900
prepare: python wait_cluster.py 32 600
script: python workloads/tune_4x32.py

View file

@ -0,0 +1,3 @@
ray[tune]
lightgbm_ray
xgboost_ray

View file

@ -0,0 +1,15 @@
#!/bin/bash
pip install pytest
# Uninstall any existing lightgbm_ray repositories
pip uninstall -y lightgbm_ray || true
# Install lightgbm package
pip install -U "${LIGHTGBM_RAY_PACKAGE:-lightgbm_ray}"
# Create test dataset
sudo mkdir -p /data || true
sudo chown ray:1000 /data || true
rm -rf /data/classification.parquet || true
cp -R /tmp/ray_tmp_mount/lightgbm_tests ~/lightgbm_tests || echo "Copy failed"
python ~/lightgbm_tests/create_test_data.py /data/classification.parquet --seed 1234 --num-rows 1000000 --num-cols 40 --num-partitions 100 --num-classes 2

View file

@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 31
head_node_type:
name: head_node
instance_type: m5.xlarge
worker_node_types:
- name: worker_node
instance_type: m5.xlarge
min_workers: 31
max_workers: 31
use_spot: false

View file

@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 3
head_node_type:
name: head_node
instance_type: m5.xlarge
worker_node_types:
- name: worker_node
instance_type: m5.xlarge
min_workers: 3
max_workers: 3
use_spot: false

View file

@ -0,0 +1,49 @@
import argparse
import time
import ray
ray.init(address="auto")
parser = argparse.ArgumentParser()
parser.add_argument(
"num_nodes",
type=int,
help="Wait for this number of nodes (includes head)")
parser.add_argument(
"max_time_s", type=int, help="Wait for this number of seconds")
parser.add_argument(
"--feedback_interval_s",
type=int,
default=10,
help="Wait for this number of seconds")
args = parser.parse_args()
curr_nodes = 0
start = time.time()
next_feedback = start
max_time = start + args.max_time_s
while not curr_nodes >= args.num_nodes:
now = time.time()
if now >= max_time:
raise RuntimeError(
f"Maximum wait time reached, but only "
f"{curr_nodes}/{args.num_nodes} nodes came up. Aborting.")
if now >= next_feedback:
passed = now - start
print(f"Waiting for more nodes to come up: "
f"{curr_nodes}/{args.num_nodes} "
f"({passed:.0f} seconds passed)")
next_feedback = now + args.feedback_interval_s
time.sleep(5)
curr_nodes = len(ray.nodes())
passed = time.time() - start
print(f"Cluster is up: {curr_nodes}/{args.num_nodes} nodes online after "
f"{passed:.0f} seconds")

View file

@ -0,0 +1,26 @@
"""Distributed LightGBM API test
This test runs unit tests on a distributed cluster. This will confirm that
LightGBM API features like custom metrics/objectives work with remote
trainables.
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Unit tests should pass (requires pytest).
"""
import ray
from lightgbm_ray.tests.test_lightgbm_api import LightGBMAPITest
class LightGBMDistributedAPITest(LightGBMAPITest):
def _init_ray(self):
if not ray.is_initialized():
ray.init(address="auto")
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", f"{__file__}::LightGBMDistributedAPITest"]))

View file

@ -0,0 +1,51 @@
"""Fault tolerance test (small cluster, non-elastic training)
In this run, two training actors will die after some time. It is expected that
in both cases lightgbm_ray stops training, restarts the dead actors, and
continues training with all four actors.
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results. Intermediate
output should show that training halts wenn an actor dies and continues only
when all four actors are available again. The test will fail if fault
tolerance did not work correctly.
Notes: This test seems to be somewhat flaky. This might be due to
race conditions in handling dead actors. This is likely a problem of
the lightgbm_ray implementation and not of this test.
"""
import ray
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray, \
FailureState, FailureInjection, TrackingCallback
if __name__ == "__main__":
ray.init(address="auto")
failure_state = FailureState.remote()
ray_params = RayParams(
max_actor_restarts=2, num_actors=4, cpus_per_actor=4, gpus_per_actor=0)
_, additional_results, _ = train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=200,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=None,
callbacks=[
TrackingCallback(),
FailureInjection(
id="first_fail", state=failure_state, ranks=[1], iteration=14),
FailureInjection(
id="second_fail", state=failure_state, ranks=[0], iteration=34)
])
print("PASSED.")

View file

@ -0,0 +1,49 @@
"""Moderate cluster training
This training run will start 32 workers on 32 nodes (including head node).
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results.
"""
import json
import os
import time
import ray
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
if __name__ == "__main__":
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=32,
cpus_per_actor=4,
gpus_per_actor=0)
start = time.time()
train_ray(
path="/data/classification.parquet",
num_workers=32,
num_boost_rounds=100,
num_files=128,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=None,
)
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/train_moderate.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")

View file

@ -0,0 +1,67 @@
"""Small cluster training
This training run will start 4 workers on 4 nodes (including head node).
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results.
"""
import json
import os
import time
import ray
from ray.test_utils import wait_for_num_nodes
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
if __name__ == "__main__":
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "train_small")
if addr.startswith("anyscale://"):
ray.client(address=addr).job_name(job_name).connect()
else:
ray.init(address="auto")
wait_for_num_nodes(
int(os.environ.get("RAY_RELEASE_MIN_WORKERS", 0)) + 1, 600)
output = os.environ["TEST_OUTPUT_JSON"]
state = os.environ["TEST_STATE_JSON"]
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
start = time.time()
@ray.remote
def train():
os.environ["TEST_OUTPUT_JSON"] = output
os.environ["TEST_STATE_JSON"] = state
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=None,
)
ray.get(train.remote())
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/train_small.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")

View file

@ -0,0 +1,58 @@
"""Small cluster training
This training run will start 4 workers on 4 nodes (including head node).
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results.
"""
import json
import os
import time
import ray
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
if __name__ == "__main__":
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "train_small")
if addr.startswith("anyscale://"):
ray.client(address=addr).job_name(job_name).connect()
else:
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=4,
gpus_per_actor=0)
@ray.remote
def train():
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=None,
)
start = time.time()
ray.get(train.remote())
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/train_small.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")

View file

@ -0,0 +1,72 @@
"""Moderate Ray Tune run (32 trials, 4 actors).
This training run will start 32 Ray Tune trials, each starting 4 actors.
The cluster comprises 32 nodes.
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
from collections import Counter
import json
import os
import time
import ray
from ray import tune
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
def train_wrapper(config, ray_params):
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=64,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=1,
gpus_per_actor=0)
start = time.time()
analysis = tune.run(
tune.with_parameters(train_wrapper, ray_params=ray_params),
config=search_space,
num_samples=32,
resources_per_trial=ray_params.get_tune_resources())
taken = time.time() - start
result = {
"time_taken": taken,
"trial_states": dict(
Counter([trial.status for trial in analysis.trials]))
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/tune_32x4.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")

View file

@ -0,0 +1,72 @@
"""Moderate Ray Tune run (4 trials, 32 actors).
This training run will start 4 Ray Tune trials, each starting 32 actors.
The cluster comprises 32 nodes.
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
from collections import Counter
import json
import os
import time
import ray
from ray import tune
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
def train_wrapper(config, ray_params):
train_ray(
path="/data/classification.parquet",
num_workers=32,
num_boost_rounds=100,
num_files=128,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=32,
cpus_per_actor=1,
gpus_per_actor=0)
start = time.time()
analysis = tune.run(
tune.with_parameters(train_wrapper, ray_params=ray_params),
config=search_space,
num_samples=4,
resources_per_trial=ray_params.get_tune_resources())
taken = time.time() - start
result = {
"time_taken": taken,
"trial_states": dict(
Counter([trial.status for trial in analysis.trials]))
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/tune_4x32.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")

View file

@ -0,0 +1,72 @@
"""Small Ray Tune run (4 trials, 4 actors).
This training run will start 4 Ray Tune Trials, each starting 4 actors.
The cluster comprises 4 nodes.
Test owner: Yard1 (primary), krfricke
Acceptance criteria: Should run through and report final results, as well
as the Ray Tune results table. No trials should error. All trials should
run in parallel.
"""
from collections import Counter
import json
import os
import time
import ray
from ray import tune
from lightgbm_ray import RayParams
from ray.util.lightgbm.release_test_util import train_ray
def train_wrapper(config, ray_params):
train_ray(
path="/data/classification.parquet",
num_workers=4,
num_boost_rounds=100,
num_files=25,
regression=False,
use_gpu=False,
ray_params=ray_params,
lightgbm_params=config,
)
if __name__ == "__main__":
search_space = {
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray.init(address="auto")
ray_params = RayParams(
elastic_training=False,
max_actor_restarts=2,
num_actors=4,
cpus_per_actor=1,
gpus_per_actor=0)
start = time.time()
analysis = tune.run(
tune.with_parameters(train_wrapper, ray_params=ray_params),
config=search_space,
num_samples=4,
resources_per_trial=ray_params.get_tune_resources())
taken = time.time() - start
result = {
"time_taken": taken,
"trial_states": dict(
Counter([trial.status for trial in analysis.trials]))
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/tune_small.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED.")