ray/release/golden_notebook_tests/workloads/modin_xgboost_test.py

141 lines
3.9 KiB
Python
Raw Normal View History

import argparse
import json
import os
import time
import modin.pandas as pd
import ray
from ray import tune
from xgboost_ray import RayDMatrix, RayParams, train, predict
from utils.utils import is_anyscale_connect
FILE_URL = "https://ray-ci-higgs.s3.us-west-2.amazonaws.com/" \
"simpleHIGGS.csv"
def train_xgboost(config, train_df, test_df, target_column, ray_params):
# distributed loading of a parquet dataset
train_set = RayDMatrix(train_df, target_column)
test_set = RayDMatrix(test_df, target_column)
evals_result = {}
start_time = time.time()
# Train the classifier
bst = train(
params=config,
dtrain=train_set,
evals=[(test_set, "eval")],
evals_result=evals_result,
verbose_eval=False,
num_boost_round=100,
ray_params=ray_params)
print(f"Total time taken: {time.time()-start_time}")
model_path = "model.xgb"
bst.save_model(model_path)
print("Final validation error: {:.4f}".format(
evals_result["eval"]["error"][-1]))
return bst
def tune_xgboost(train_df, test_df, target_column):
# Set XGBoost config.
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
ray_params = RayParams(
max_actor_restarts=1, gpus_per_actor=0, cpus_per_actor=1, num_actors=2)
analysis = tune.run(
tune.with_parameters(
train_xgboost,
train_df=train_df,
test_df=test_df,
target_column=target_column,
ray_params=ray_params),
# Use the `get_tune_resources` helper function to set the resources.
resources_per_trial=ray_params.get_tune_resources(),
config=config,
num_samples=1,
metric="eval-error",
mode="min",
verbose=1)
accuracy = 1. - analysis.best_result["eval-error"]
print(f"Best model parameters: {analysis.best_config}")
print(f"Best model total accuracy: {accuracy:.4f}")
return analysis.best_config
def main():
print("Loading HIGGS data.")
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
data = pd.read_csv(FILE_URL, names=colnames)
print("Loaded HIGGS data.")
# partition on a column
df_train = data[(data["feature-01"] < 0.4)]
df_validation = data[(data["feature-01"] >= 0.4)
& (data["feature-01"] < 0.8)]
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
bst = train_xgboost(
config, df_train, df_validation, "label",
RayParams(max_actor_restarts=1, cpus_per_actor=4, num_actors=4))
# tune_xgboost(df_train, df_validation, "label") # broken atm
inference_df = RayDMatrix(
df_train[sorted(df_train.columns)], ignore=["label", "partition"])
predict(
bst,
inference_df,
ray_params=RayParams(cpus_per_actor=1, num_actors=16))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test",
action="store_true",
help="Finish quickly for testing.")
args = parser.parse_args()
start = time.time()
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "modin_xgboost_test")
if is_anyscale_connect(addr):
ray.init(address=addr, job_name=job_name)
else:
ray.init(address="auto")
main()
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/modin_xgboost_test.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("Test Successful!")