ray/release/golden_notebook_tests/workloads/dask_xgboost_test.py

69 lines
1.9 KiB
Python

import argparse
import json
import os
import time
import dask
import dask.dataframe as dd
import ray
from ray.util.dask import ray_dask_get
from xgboost_ray import RayDMatrix, RayParams, train
FILE_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/" \
"00280/HIGGS.csv.gz"
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing.")
args = parser.parse_args()
def main():
ray.client("anyscale://").connect()
print("Loading HIGGS data.")
dask.config.set(scheduler=ray_dask_get)
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
data = dd.read_csv(FILE_URL, names=colnames)
if args.smoke_test:
data = data.head(n=1000)
print("Loaded HIGGS data.")
# partition on a column
df_train = data[(data["feature-01"] < 0.4)]
df_train = df_train.persist()
df_validation = data[(data["feature-01"] >= 0.4)
& (data["feature-01"] < 0.8)]
df_validation = df_validation.persist()
dtrain = RayDMatrix(df_train, label="label", columns=colnames)
dvalidation = RayDMatrix(df_validation, label="label")
evallist = [(dvalidation, "eval")]
evals_result = {}
config = {"tree_method": "hist", "eval_metric": ["logloss", "error"]}
train(
params=config,
dtrain=dtrain,
evals_result=evals_result,
ray_params=RayParams(
max_actor_restarts=1, num_actors=4, cpus_per_actor=2),
num_boost_round=100,
evals=evallist)
if __name__ == "__main__":
start = time.time()
main()
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/dask_xgboost_test.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("Test Successful!")