ray/release/nightly_tests/dataset/ray_sgd_training.py
Chen Shen 107aef89a8
[CUJ2] add nightly tests for running 500GB ray train (#20195)
* add

* update cluster env

* fix build

Co-authored-by: Matthew Deng <matthew.j.deng@gmail.com>
2021-11-21 20:04:45 -08:00

633 lines
22 KiB
Python

# flake8: noqa
import argparse
import os
import json
import sys
import time
from typing import Tuple
import boto3
import dask
import dask.dataframe as dd
import mlflow
import pandas as pd
import ray
import torch
import torch.nn as nn
import torch.optim as optim
from dask_ml.compose import ColumnTransformer
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import OneHotEncoder
from dask_ml.preprocessing import StandardScaler
from ray import train
from ray.train import Trainer
from ray.train import TrainingCallback
from ray.train.callbacks import TBXLoggerCallback
from ray.util.dask import ray_dask_get
from torch.nn.parallel import DistributedDataParallel
from torch.utils.tensorboard import SummaryWriter
# TODO(amogkam): Upstream this into Ray Train.
class MLflowCallback(TrainingCallback):
def __init__(self, config):
self.config = config
def handle_result(self, results, **info):
# For each result that's being reported by ``train.report()``,
# we get the result from the rank 0 worker (i.e. first worker) and
# report it to MLflow.
rank_zero_results = results[0]
mlflow.log_metrics(rank_zero_results)
# TODO: fix type hint for logdir
def start_training(self, logdir, **info):
mlflow.start_run(run_name=str(logdir.name))
mlflow.log_params(config)
# TODO: Update TrainCallback to provide logdir in finish_training.
self.logdir = logdir
def finish_training(self, error: bool = False, **info):
# Save the Trainer checkpoints as artifacts to mlflow.
mlflow.log_artifacts(self.logdir)
def read_dask_dataframe(path: str) -> "dask.DataFrame":
print(f"reading data from {path}")
dataset = ray.data.read_parquet(path, _spread_resource_prefix="node:")
print(f"total data size: {dataset.size_bytes()}")
return dataset.random_shuffle(
_spread_resource_prefix="node:").to_dask().reset_index()
class DataPreprocessor:
def __init__(self):
self.column_transformer = None
self.scaler = None
def preprocess_train_data(self, df: "dask.DataFrame"
) -> Tuple["dask.DataFrame", "dask.DataFrame"]:
return self._preprocess(df, False)
def preprocess_inference_data(
self,
df: "dask.DataFrame") -> Tuple["dask.DataFrame", "dask.DataFrame"]:
return self._preprocess(df, True)[0]
def _preprocess(self, df: "dask.DataFrame", inferencing: bool
) -> Tuple["dask.DataFrame", "dask.DataFrame"]:
df = df.loc[:, df.columns != "index"]
# remove nulls and/or NaNs scalably with dask
print(f"step1: drop nulls from rows")
df = df.dropna(subset=["nullable_feature"])
print(f"step2: creating new_col and updatingfeature_1")
df["new_col"] = (
df["feature_1"] - 2 * df["feature_2"] + df["feature_3"]) / 3.
df["feature_1"] = 2. * df["feature_1"] + 0.1
# TODO: this doesn't work with more than 1 parquet file
# df['mean_by_fruit'] = df.groupby('fruit')['feature_1'].transform('mean')
print(f"step3: one-hot encoding fruit")
df = df.astype({"fruit": "category"})
df = df.categorize()
df.persist()
if inferencing:
assert self.column_transformer is not None
df_fruits = self.column_transformer.transform(df)
else:
assert self.column_transformer is None
self.column_transformer = ColumnTransformer(
[("one-hot", OneHotEncoder(sparse=False), ["fruit"])])
df_fruits = self.column_transformer.fit_transform(df)
df_data = df.loc[:, (df.columns != "label") & (df.columns != "fruit")]
df_data = dd.concat([df_data, df_fruits], axis=1)
assert df_data.isnull().sum().sum().compute(
) == 0, "There are nulls or Nans in the data!"
if inferencing:
print(f"step4: standardrize inference dataset")
assert self.scaler is not None
df_data_inference = self.scaler.transform(df_data)
return df_data_inference, None
else:
print(f"step4: standardrize train dataset")
df_labels = df.loc[:, df.columns == "label"]
df_data_train, df_data_test, df_label_train, df_label_test = train_test_split(
df_data, df_labels)
df_data_train.persist()
assert self.scaler is None
self.scaler = StandardScaler(
) # this just turns col values to z-scores
df_data_train = self.scaler.fit_transform(df_data_train)
df_data_test = self.scaler.transform(df_data_test)
df_train = dd.concat([df_data_train, df_label_train], axis=1)
df_test = dd.concat([df_data_test, df_label_test], axis=1)
return df_train, df_test
def inference(dataset, model_cls: type, batch_size: int, result_path: str,
use_gpu: bool):
print("inferencing...")
num_gpus = 1 if use_gpu else 0
dataset.map_batches(model_cls, compute="actors", batch_size=batch_size, num_gpus=num_gpus, num_cpus=0). \
write_parquet(result_path)
"""
TODO: Define neural network code in pytorch
P0:
1) can take arguments to change size of net arbitrarily so we can stress test against distributed training on cluster
2) has a network (nn.module?), optimizer, and loss function for binary classification
3) has some semblence of regularization (ie: via dropout) so that this artificially gigantic net doesn't just overfit horrendously
4) works well with pytorch dataset we'll create from Ray data .to_torch_dataset()
P1:
1) also tracks AUC for training, testing sets and records to tensorboard to
"""
class Net(nn.Module):
def __init__(self, n_layers, n_features, num_hidden, dropout_every,
drop_prob):
super().__init__()
self.n_layers = n_layers
self.dropout_every = dropout_every
self.drop_prob = drop_prob
self.fc_input = nn.Linear(n_features, num_hidden)
self.relu_input = nn.ReLU()
for i in range(self.n_layers):
layer = nn.Linear(num_hidden, num_hidden)
relu = nn.ReLU()
dropout = nn.Dropout(p=self.drop_prob)
setattr(self, f"fc_{i}", layer)
setattr(self, f"relu_{i}", relu)
if i % self.dropout_every == 0:
# only apply every few layers
setattr(self, f"drop_{i}", dropout)
self.add_module(f"drop_{i}", dropout)
self.add_module(f"fc_{i}", layer)
self.add_module(f"relu_{i}", relu)
# self.register_parameter(name=f"fc_{i}", param=getattr(self, f"fc_{i}"))
# self.register_parameter(name=f"relu_{i}", param=getattr(self, f"relu_{i}"))
self.fc_output = nn.Linear(num_hidden, 1)
def forward(self, x):
x = self.fc_input(x)
x = self.relu_input(x)
for i in range(self.n_layers):
x = getattr(self, f"fc_{i}")(x)
x = getattr(self, f"relu_{i}")(x)
if i % self.dropout_every == 0:
x = getattr(self, f"drop_{i}")(x)
x = self.fc_output(x)
return x
"""
TODO: training loop for NN
P0 Requirements:
1) should iterate through epochs, inner loop per batch
2) should keep running total of accuracy, loss (training & test) and record those to tensorboard
3) should perform windowing / shuffling per epoch
P1:
1) use Ray Tune for tuning / checkpointing
"""
def train_epoch(dataset, model, device, criterion, optimizer):
num_correct = 0
num_total = 0
running_loss = 0.0
for i, (inputs, labels) in enumerate(dataset):
inputs = inputs.to(device)
labels = labels.to(device)
# Zero the parameter gradients
optimizer.zero_grad()
# Forward + backward + optimize
outputs = model(inputs.float())
loss = criterion(outputs, labels.float())
loss.backward()
optimizer.step()
# how are we doing?
predictions = (torch.sigmoid(outputs) > 0.5).int()
num_correct += (predictions == labels).sum().item()
num_total += len(outputs)
# Save loss to plot
running_loss += loss.item()
if i % 100 == 0:
print(f"training batch [{i}] loss: {loss.item()}")
return (running_loss, num_correct, num_total)
def test_epoch(dataset, model, device, criterion):
num_correct = 0
num_total = 0
running_loss = 0.0
with torch.no_grad():
for i, (inputs, labels) in enumerate(dataset):
inputs = inputs.to(device)
labels = labels.to(device)
# Forward + backward + optimize
outputs = model(inputs.float())
loss = criterion(outputs, labels.float())
# how are we doing?
predictions = (torch.sigmoid(outputs) > 0.5).int()
num_correct += (predictions == labels).sum().item()
num_total += len(outputs)
# Save loss to plot
running_loss += loss.item()
if i % 100 == 0:
print(f"testing batch [{i}] loss: {loss.item()}")
return (running_loss, num_correct, num_total)
def train_func(config):
is_distributed = config.get("is_distributed", False)
use_gpu = config["use_gpu"]
num_epochs = config["num_epochs"]
batch_size = config["batch_size"]
num_layers = config["num_layers"]
num_hidden = config["num_hidden"]
dropout_every = config["dropout_every"]
dropout_prob = config["dropout_prob"]
num_features = config["num_features"]
print("Defining model, loss, and optimizer...")
# Setup device.
if is_distributed:
device = torch.device(f"cuda:{train.local_rank()}" if use_gpu
and torch.cuda.is_available() else "cpu")
else:
device = torch.device("cuda:0" if use_gpu
and torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
# Setup data.
if is_distributed:
train_dataset_pipeline = train.get_dataset_shard("train_dataset")
train_dataset_epoch_iterator = train_dataset_pipeline.iter_epochs()
test_dataset = train.get_dataset_shard("test_dataset")
else:
train_dataset_epoch_iterator = config["train_dataset"].iter_epochs()
test_dataset = config["test_dataset"]
test_torch_dataset = test_dataset.to_torch(
label_column="label", batch_size=batch_size)
# Setup Tensorboard and MLflow.
if is_distributed:
# Setup is done through Callback.
pass
else:
writer = SummaryWriter()
mlflow.start_run()
mlflow_config = config.copy()
mlflow_config.pop("test_dataset")
mlflow_config.pop("train_dataset")
mlflow.log_params(mlflow_config)
net = Net(
n_layers=num_layers,
n_features=num_features,
num_hidden=num_hidden,
dropout_every=dropout_every,
drop_prob=dropout_prob,
).to(device)
print(net.parameters)
if is_distributed:
net = DistributedDataParallel(net)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(net.parameters(), weight_decay=0.0001)
print("Starting training...")
for epoch in range(num_epochs):
train_dataset = next(train_dataset_epoch_iterator)
train_torch_dataset = train_dataset.to_torch(
label_column="label", batch_size=batch_size)
train_running_loss, train_num_correct, train_num_total = train_epoch(
train_torch_dataset, net, device, criterion, optimizer)
train_acc = train_num_correct / train_num_total
print(
f"epoch [{epoch + 1}]: training accuracy: {train_num_correct} / {train_num_total} = {train_acc:.4f}"
)
test_running_loss, test_num_correct, test_num_total = test_epoch(
test_torch_dataset, net, device, criterion)
test_acc = test_num_correct / test_num_total
print(
f"epoch [{epoch + 1}]: testing accuracy: {test_num_correct} / {test_num_total} = {test_acc:.4f}"
)
# Record and log stats.
if is_distributed:
train.report(
train_acc=train_acc,
train_loss=train_running_loss,
test_acc=test_acc,
test_loss=test_running_loss)
else:
writer.add_scalar("Accuracy/train", train_acc, epoch)
writer.add_scalar("Loss/train", train_running_loss, epoch)
writer.add_scalar("Accuracy/test", test_acc, epoch)
writer.add_scalar("Loss/test", test_running_loss, epoch)
writer.flush()
mlflow.log_metrics({
"train_acc": train_acc,
"train_loss": train_running_loss,
"test_acc": test_acc,
"test_loss": test_running_loss
})
# Checkpoint model.
if is_distributed:
import copy
model_copy = copy.deepcopy(net.module)
train.save_checkpoint(
model_state_dict=model_copy.cpu().state_dict())
else:
torch.save(net.state_dict(), f"models/model-epoch-{epoch}.torch")
# Shutdown Tensorboard and MLflow.
if is_distributed:
pass
else:
writer.close()
# mlflow.end_run()
if is_distributed:
if train.world_rank() == 0:
return net.module.cpu()
else:
return None
else:
return net
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--use-s3",
action="store_true",
default=False,
help="Use data from s3 for testing.")
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.")
parser.add_argument(
"--address",
required=False,
type=str,
help="The address to use for Ray. `auto` if running through `ray submit"
)
parser.add_argument(
"--num-workers",
default=1,
type=int,
help="If > 1, number of Ray workers to use for distributed training")
parser.add_argument(
"--large-dataset",
action="store_true",
default=False,
help="Use 500GB dataset")
parser.add_argument(
"--use-gpu",
action="store_true",
default=False,
help="Use GPU for training.")
parser.add_argument(
"--mlflow-register-model",
action="store_true",
help="Whether to use mlflow model registry. If set, a local MLflow "
"tracking server is expected to have already been started.")
args = parser.parse_args()
smoke_test = args.smoke_test
address = args.address
num_workers = args.num_workers
use_gpu = args.use_gpu
use_s3 = args.use_s3
large_dataset = args.large_dataset
if large_dataset:
assert use_s3, "--large-dataset requires --use-s3 to be set."
start_time = time.time()
ray.init(address=address)
dask.config.set(scheduler=ray_dask_get)
# Setup MLflow.
# By default, all metrics & artifacts for each run will be saved to disk
# in ./mlruns directory. Uncomment the below lines if you want to change
# the URI for the tracking uri.
# TODO: Use S3 backed tracking server for golden notebook.
if args.mlflow_register_model:
# MLflow model registry does not work with a local file system backend.
# Have to start a mlflow tracking server on localhost
mlflow.set_tracking_uri("http://127.0.0.1:5000")
# Set the experiment. This will create the experiment if not already
# exists.
mlflow.set_experiment("cuj-big-data-training")
if use_s3:
# Check if s3 data is populated.
BUCKET_NAME = 'cuj-big-data'
FOLDER_NAME = 'big-data/' if large_dataset else 'data/'
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket(BUCKET_NAME)
count = bucket.objects.filter(Prefix=FOLDER_NAME)
if len(list(count)) == 0:
print("please run `python make_and_upload_dataset.py` first")
sys.exit(1)
data_path = "s3://cuj-big-data/big-data/" if large_dataset else "s3://cuj-big-data/data/"
inference_path = "s3://cuj-big-data/inference/"
inference_output_path = "s3://cuj-big-data/output/"
else:
dir_path = os.path.dirname(os.path.realpath(__file__))
data_path = os.path.join(dir_path, "data")
inference_path = os.path.join(dir_path, "inference")
inference_output_path = "/tmp"
if len(os.listdir(data_path)) <= 1 or len(
os.listdir(inference_path)) <= 1:
print("please run `python make_and_upload_dataset.py` first")
sys.exit(1)
if smoke_test:
# Only read a single file.
data_path = os.path.join(data_path, "data_00000.parquet.snappy")
inference_path = os.path.join(inference_path,
"data_00000.parquet.snappy")
preprocessor = DataPreprocessor()
train_df, test_df = preprocessor.preprocess_train_data(
read_dask_dataframe(data_path))
num_columns = len(train_df.columns)
num_features = num_columns - 1 # remove label column
train_dataset = ray.data.from_dask(train_df)
test_dataset = ray.data.from_dask(test_df)
del train_df
del test_df
NUM_EPOCHS = 2
BATCH_SIZE = 512
NUM_HIDDEN = 50 # 200
NUM_LAYERS = 3 # 15
DROPOUT_EVERY = 5
DROPOUT_PROB = 0.2
# Random global shuffle
train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window(
_spread_resource_prefix="node:")
del train_dataset
datasets = {
"train_dataset": train_dataset_pipeline,
"test_dataset": test_dataset
}
if num_workers <= 1:
config = {
"use_gpu": use_gpu,
"num_epochs": NUM_EPOCHS,
"batch_size": BATCH_SIZE,
"num_hidden": NUM_HIDDEN,
"num_layers": NUM_LAYERS,
"dropout_every": DROPOUT_EVERY,
"dropout_prob": DROPOUT_PROB,
"num_features": num_features,
}
config.update(datasets)
model = train_func(config=config)
else:
config = {
"is_distributed": True,
"use_gpu": use_gpu,
"num_epochs": NUM_EPOCHS,
"batch_size": BATCH_SIZE,
"num_hidden": NUM_HIDDEN,
"num_layers": NUM_LAYERS,
"dropout_every": DROPOUT_EVERY,
"dropout_prob": DROPOUT_PROB,
"num_features": num_features
}
# Create 2 callbacks: one for Tensorboard Logging and one for MLflow
# logging. Pass these into Trainer, and all results that are
# reported by ``train.report()`` will be logged to these 2 places.
# TODO: TBXLoggerCallback should create nonexistent logdir
# and should also create 1 directory per file.
callbacks = [TBXLoggerCallback(logdir="/tmp"), MLflowCallback(config)]
trainer = Trainer(
backend="torch",
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker={
"CPU": 0,
"GPU": 1
})
trainer.start()
results = trainer.run(
train_func=train_func,
config=config,
callbacks=callbacks,
dataset=datasets)
model = results[0]
trainer.shutdown()
if args.mlflow_register_model:
mlflow.pytorch.log_model(
model, artifact_path="models", registered_model_name="torch_model")
# Get the latest model from mlflow model registry.
client = mlflow.tracking.MlflowClient()
registered_model_name = "torch_model"
# Get the info for the latest model.
# By default, registered models are in stage "None".
latest_model_info = client.get_latest_versions(
registered_model_name, stages=["None"])[0]
latest_version = latest_model_info.version
def load_model_func():
model_uri = f"models:/torch_model/{latest_version}"
return mlflow.pytorch.load_model(model_uri)
else:
state_dict = model.state_dict()
def load_model_func():
num_layers = config["num_layers"]
num_hidden = config["num_hidden"]
dropout_every = config["dropout_every"]
dropout_prob = config["dropout_prob"]
num_features = config["num_features"]
model = Net(
n_layers=num_layers,
n_features=num_features,
num_hidden=num_hidden,
dropout_every=dropout_every,
drop_prob=dropout_prob)
model.load_state_dict(state_dict)
return model
class BatchInferModel:
def __init__(self, load_model_func):
self.device = torch.device("cuda:0"
if torch.cuda.is_available() else "cpu")
self.model = load_model_func().to(self.device)
def __call__(self, batch) -> "pd.DataFrame":
tensor = torch.FloatTensor(batch.to_pandas().values).to(
self.device)
return pd.DataFrame(self.model(tensor).cpu().detach().numpy())
inference_df = preprocessor.preprocess_inference_data(
read_dask_dataframe(inference_path))
inference_dataset = ray.data.from_dask(inference_df)
inference(inference_dataset, BatchInferModel(load_model_func), 100,
inference_output_path, use_gpu)
end_time = time.time()
total_time = end_time - start_time
print(f"Job finished in {total_time} seconds.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"time": total_time, "success": 1}))