ray/doc/examples/modin_xgboost/modin_xgboost.py
matthewdeng 3fbe135a24
[docs] add modin_xgboost and dask_xgboost notebook tutorials (#18775)
* Add xgboost-dask golden notebook

* [examples] add modin-xgboost Jupyter notebook

* Add xgboost dast gn

* update modin notebook to sphinx-gallery compatible python file

* fix build file

* fix test

* fix test

* Add modin notebook anyscale connect test

* Add missing file

* add dask_xgboost notebook

* Add the new modin golden notebook to CI

* fix lint and filter out tests with py37

* Update release/golden_notebook_tests_new/golden_notebook_tests.yaml

Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>

* Add dask, wait for cluster client, remove pytest

* Replace folder

* Fix

* Update dask_xgboost_app_config.yaml

* Update modin_xgboost_app_config.yaml

* comment on filtered out tests

Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
2021-10-05 09:17:33 -07:00

233 lines
7.7 KiB
Python

"""
XGBoost-Ray with Modin
======================
This notebook includes an example workflow using
`XGBoost-Ray <https://docs.ray.io/en/latest/xgboost-ray.html>`_ and
`Modin <https://modin.readthedocs.io/en/latest/>`_ for distributed model
training and prediction.
"""
###############################################################################
# Cluster Setup
# -------------
#
# First, we'll set up our Ray Cluster. The provided ``modin_xgboost.yaml``
# cluster config can be used to set up an AWS cluster with 64 CPUs.
#
# The following steps assume you are in a directory with both
# ``modin_xgboost.yaml`` and this file saved as ``modin_xgboost.ipynb``.
#
# **Step 1:** Bring up the Ray cluster.
#
# .. code-block:: bash
#
# $ pip install ray boto3
# $ ray up modin_xgboost.yaml
#
# **Step 2:** Move ``modin_xgboost.ipynb`` to the cluster and start Jupyter.
#
# .. code-block:: bash
#
# $ ray rsync_up modin_xgboost.yaml "./modin_xgboost.ipynb" \
# "~/modin_xgboost.ipynb"
# $ ray exec modin_xgboost.yaml --port-forward=9999 "jupyter notebook \
# --port=9999"
#
# You can then access this notebook at the URL that is output:
# ``http://localhost:9999/?token=<token>``
###############################################################################
# Python Setup
# ------------
#
# First, we'll import all the libraries we'll be using. This step also helps us
# verify that the environment is configured correctly. If any of the imports
# are missing, an exception will be raised.
import argparse
import time
import modin.pandas as pd
from modin.experimental.sklearn.model_selection import train_test_split
from xgboost_ray import RayDMatrix, RayParams, train, predict
import ray
###############################################################################
#
# Next, let's parse some arguments. This will be used for executing the ``.py``
# file, but not for the ``.ipynb``. If you are using the interactive notebook,
# you can directly override the arguments manually.
parser = argparse.ArgumentParser()
parser.add_argument(
"--address", type=str, default="auto", help="The address to use for Ray.")
parser.add_argument(
"--smoke-test",
action="store_true",
help="Read a smaller dataset for quick testing purposes.")
parser.add_argument(
"--num-actors",
type=int,
default=4,
help="Sets number of actors for training.")
parser.add_argument(
"--cpus-per-actor",
type=int,
default=8,
help="The number of CPUs per actor for training.")
parser.add_argument(
"--num-actors-inference",
type=int,
default=16,
help="Sets number of actors for inference.")
parser.add_argument(
"--cpus-per-actor-inference",
type=int,
default=2,
help="The number of CPUs per actor for inference.")
# Ignore -f from ipykernel_launcher
args, _ = parser.parse_known_args()
###############################################################################
# Override these arguments as needed:
address = args.address
smoke_test = args.smoke_test
num_actors = args.num_actors
cpus_per_actor = args.cpus_per_actor
num_actors_inference = args.num_actors_inference
cpus_per_actor_inference = args.cpus_per_actor_inference
###############################################################################
# Connecting to the Ray cluster
# -----------------------------
# Now, let's connect our Python script to this newly deployed Ray cluster!
if not ray.is_initialized():
ray.init(address=address)
###############################################################################
# Data Preparation
# -----------------
# We will use the `HIGGS dataset from the UCI Machine Learning dataset
# repository <https://archive.ics.uci.edu/ml/datasets/HIGGS>`_. The HIGGS
# dataset consists of 11,000,000 samples and 28 attributes, which is large
# enough size to show the benefits of distributed computation.
LABEL_COLUMN = "label"
if smoke_test:
# Test dataset with only 10,000 records.
FILE_URL = "https://ray-ci-higgs.s3.us-west-2.amazonaws.com/simpleHIGGS" \
".csv"
else:
# Full dataset. This may take a couple of minutes to load.
FILE_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases" \
"/00280/HIGGS.csv.gz"
colnames = [LABEL_COLUMN] + ["feature-%02d" % i for i in range(1, 29)]
###############################################################################
load_data_start_time = time.time()
df = pd.read_csv(FILE_URL, names=colnames)
load_data_end_time = time.time()
load_data_duration = load_data_end_time - load_data_start_time
print(f"Dataset loaded in {load_data_duration} seconds.")
###############################################################################
# Split data into training and validation.
df_train, df_validation = train_test_split(df)
print(df_train, df_validation)
###############################################################################
# Distributed Training
# --------------------
# The ``train_xgboost`` function contains all of the logic necessary for
# training using XGBoost-Ray.
#
# Distributed training can not only speed up the process, but also allow you
# to use datasets that are to large to fit in memory of a single node. With
# distributed training, the dataset is sharded across different actors
# running on separate nodes. Those actors communicate with each other to
# create the final model.
#
# First, the dataframes are wrapped in ``RayDMatrix`` objects, which handle
# data sharding across the cluster. Then, the ``train`` function is called.
# The evaluation scores will be saved to ``evals_result`` dictionary. The
# function returns a tuple of the trained model (booster) and the evaluation
# scores.
#
# The ``ray_params`` variable expects a ``RayParams`` object that contains
# Ray-specific settings, such as the number of workers.
def train_xgboost(config, train_df, test_df, target_column, ray_params):
train_set = RayDMatrix(train_df, target_column)
test_set = RayDMatrix(test_df, target_column)
evals_result = {}
train_start_time = time.time()
# Train the classifier
bst = train(
params=config,
dtrain=train_set,
evals=[(test_set, "eval")],
evals_result=evals_result,
verbose_eval=False,
num_boost_round=100,
ray_params=ray_params)
train_end_time = time.time()
train_duration = train_end_time - train_start_time
print(f"Total time taken: {train_duration} seconds.")
model_path = "model.xgb"
bst.save_model(model_path)
print("Final validation error: {:.4f}".format(
evals_result["eval"]["error"][-1]))
return bst, evals_result
###############################################################################
# We can now pass our Modin dataframes and run the function. We will use
# ``RayParams`` to specify that the number of actors and CPUs to train with.
# standard XGBoost config for classification
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
bst, evals_result = train_xgboost(
config, df_train, df_validation, LABEL_COLUMN,
RayParams(cpus_per_actor=cpus_per_actor, num_actors=num_actors))
print(f"Results: {evals_result}")
###############################################################################
# Prediction
# ----------
# With the model trained, we can now predict on unseen data. For the
# purposes of this example, we will use the same dataset for prediction as
# for training.
#
# Since prediction is naively parallelizable, distributing it over multiple
# actors can measurably reduce the amount of time needed.
inference_df = RayDMatrix(df, ignore=[LABEL_COLUMN, "partition"])
results = predict(
bst,
inference_df,
ray_params=RayParams(
cpus_per_actor=cpus_per_actor_inference,
num_actors=num_actors_inference))
print(results)