mirror of
https://github.com/vale981/ray
synced 2025-03-08 19:41:38 -05:00

* add * update cluster env * fix build Co-authored-by: Matthew Deng <matthew.j.deng@gmail.com>
633 lines
22 KiB
Python
633 lines
22 KiB
Python
# flake8: noqa
|
|
import argparse
|
|
import os
|
|
import json
|
|
import sys
|
|
import time
|
|
from typing import Tuple
|
|
|
|
import boto3
|
|
import dask
|
|
import dask.dataframe as dd
|
|
import mlflow
|
|
import pandas as pd
|
|
import ray
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
from dask_ml.compose import ColumnTransformer
|
|
from dask_ml.model_selection import train_test_split
|
|
from dask_ml.preprocessing import OneHotEncoder
|
|
from dask_ml.preprocessing import StandardScaler
|
|
from ray import train
|
|
from ray.train import Trainer
|
|
from ray.train import TrainingCallback
|
|
from ray.train.callbacks import TBXLoggerCallback
|
|
from ray.util.dask import ray_dask_get
|
|
from torch.nn.parallel import DistributedDataParallel
|
|
from torch.utils.tensorboard import SummaryWriter
|
|
|
|
|
|
# TODO(amogkam): Upstream this into Ray Train.
|
|
class MLflowCallback(TrainingCallback):
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def handle_result(self, results, **info):
|
|
# For each result that's being reported by ``train.report()``,
|
|
# we get the result from the rank 0 worker (i.e. first worker) and
|
|
# report it to MLflow.
|
|
rank_zero_results = results[0]
|
|
mlflow.log_metrics(rank_zero_results)
|
|
|
|
# TODO: fix type hint for logdir
|
|
def start_training(self, logdir, **info):
|
|
mlflow.start_run(run_name=str(logdir.name))
|
|
mlflow.log_params(config)
|
|
|
|
# TODO: Update TrainCallback to provide logdir in finish_training.
|
|
self.logdir = logdir
|
|
|
|
def finish_training(self, error: bool = False, **info):
|
|
# Save the Trainer checkpoints as artifacts to mlflow.
|
|
mlflow.log_artifacts(self.logdir)
|
|
|
|
|
|
def read_dask_dataframe(path: str) -> "dask.DataFrame":
|
|
print(f"reading data from {path}")
|
|
dataset = ray.data.read_parquet(path, _spread_resource_prefix="node:")
|
|
print(f"total data size: {dataset.size_bytes()}")
|
|
return dataset.random_shuffle(
|
|
_spread_resource_prefix="node:").to_dask().reset_index()
|
|
|
|
|
|
class DataPreprocessor:
|
|
def __init__(self):
|
|
self.column_transformer = None
|
|
self.scaler = None
|
|
|
|
def preprocess_train_data(self, df: "dask.DataFrame"
|
|
) -> Tuple["dask.DataFrame", "dask.DataFrame"]:
|
|
return self._preprocess(df, False)
|
|
|
|
def preprocess_inference_data(
|
|
self,
|
|
df: "dask.DataFrame") -> Tuple["dask.DataFrame", "dask.DataFrame"]:
|
|
return self._preprocess(df, True)[0]
|
|
|
|
def _preprocess(self, df: "dask.DataFrame", inferencing: bool
|
|
) -> Tuple["dask.DataFrame", "dask.DataFrame"]:
|
|
df = df.loc[:, df.columns != "index"]
|
|
# remove nulls and/or NaNs scalably with dask
|
|
print(f"step1: drop nulls from rows")
|
|
df = df.dropna(subset=["nullable_feature"])
|
|
|
|
print(f"step2: creating new_col and updatingfeature_1")
|
|
df["new_col"] = (
|
|
df["feature_1"] - 2 * df["feature_2"] + df["feature_3"]) / 3.
|
|
df["feature_1"] = 2. * df["feature_1"] + 0.1
|
|
# TODO: this doesn't work with more than 1 parquet file
|
|
# df['mean_by_fruit'] = df.groupby('fruit')['feature_1'].transform('mean')
|
|
|
|
print(f"step3: one-hot encoding fruit")
|
|
df = df.astype({"fruit": "category"})
|
|
df = df.categorize()
|
|
df.persist()
|
|
|
|
if inferencing:
|
|
assert self.column_transformer is not None
|
|
df_fruits = self.column_transformer.transform(df)
|
|
else:
|
|
assert self.column_transformer is None
|
|
self.column_transformer = ColumnTransformer(
|
|
[("one-hot", OneHotEncoder(sparse=False), ["fruit"])])
|
|
df_fruits = self.column_transformer.fit_transform(df)
|
|
|
|
df_data = df.loc[:, (df.columns != "label") & (df.columns != "fruit")]
|
|
df_data = dd.concat([df_data, df_fruits], axis=1)
|
|
|
|
assert df_data.isnull().sum().sum().compute(
|
|
) == 0, "There are nulls or Nans in the data!"
|
|
|
|
if inferencing:
|
|
print(f"step4: standardrize inference dataset")
|
|
assert self.scaler is not None
|
|
df_data_inference = self.scaler.transform(df_data)
|
|
return df_data_inference, None
|
|
else:
|
|
print(f"step4: standardrize train dataset")
|
|
df_labels = df.loc[:, df.columns == "label"]
|
|
df_data_train, df_data_test, df_label_train, df_label_test = train_test_split(
|
|
df_data, df_labels)
|
|
df_data_train.persist()
|
|
assert self.scaler is None
|
|
self.scaler = StandardScaler(
|
|
) # this just turns col values to z-scores
|
|
df_data_train = self.scaler.fit_transform(df_data_train)
|
|
df_data_test = self.scaler.transform(df_data_test)
|
|
df_train = dd.concat([df_data_train, df_label_train], axis=1)
|
|
df_test = dd.concat([df_data_test, df_label_test], axis=1)
|
|
return df_train, df_test
|
|
|
|
|
|
def inference(dataset, model_cls: type, batch_size: int, result_path: str,
|
|
use_gpu: bool):
|
|
print("inferencing...")
|
|
num_gpus = 1 if use_gpu else 0
|
|
dataset.map_batches(model_cls, compute="actors", batch_size=batch_size, num_gpus=num_gpus, num_cpus=0). \
|
|
write_parquet(result_path)
|
|
|
|
|
|
"""
|
|
TODO: Define neural network code in pytorch
|
|
P0:
|
|
1) can take arguments to change size of net arbitrarily so we can stress test against distributed training on cluster
|
|
2) has a network (nn.module?), optimizer, and loss function for binary classification
|
|
3) has some semblence of regularization (ie: via dropout) so that this artificially gigantic net doesn't just overfit horrendously
|
|
4) works well with pytorch dataset we'll create from Ray data .to_torch_dataset()
|
|
P1:
|
|
1) also tracks AUC for training, testing sets and records to tensorboard to
|
|
"""
|
|
|
|
|
|
class Net(nn.Module):
|
|
def __init__(self, n_layers, n_features, num_hidden, dropout_every,
|
|
drop_prob):
|
|
super().__init__()
|
|
self.n_layers = n_layers
|
|
self.dropout_every = dropout_every
|
|
self.drop_prob = drop_prob
|
|
|
|
self.fc_input = nn.Linear(n_features, num_hidden)
|
|
self.relu_input = nn.ReLU()
|
|
|
|
for i in range(self.n_layers):
|
|
layer = nn.Linear(num_hidden, num_hidden)
|
|
relu = nn.ReLU()
|
|
dropout = nn.Dropout(p=self.drop_prob)
|
|
|
|
setattr(self, f"fc_{i}", layer)
|
|
setattr(self, f"relu_{i}", relu)
|
|
if i % self.dropout_every == 0:
|
|
# only apply every few layers
|
|
setattr(self, f"drop_{i}", dropout)
|
|
self.add_module(f"drop_{i}", dropout)
|
|
|
|
self.add_module(f"fc_{i}", layer)
|
|
self.add_module(f"relu_{i}", relu)
|
|
# self.register_parameter(name=f"fc_{i}", param=getattr(self, f"fc_{i}"))
|
|
# self.register_parameter(name=f"relu_{i}", param=getattr(self, f"relu_{i}"))
|
|
|
|
self.fc_output = nn.Linear(num_hidden, 1)
|
|
|
|
def forward(self, x):
|
|
x = self.fc_input(x)
|
|
x = self.relu_input(x)
|
|
|
|
for i in range(self.n_layers):
|
|
x = getattr(self, f"fc_{i}")(x)
|
|
x = getattr(self, f"relu_{i}")(x)
|
|
if i % self.dropout_every == 0:
|
|
x = getattr(self, f"drop_{i}")(x)
|
|
|
|
x = self.fc_output(x)
|
|
return x
|
|
|
|
|
|
"""
|
|
TODO: training loop for NN
|
|
P0 Requirements:
|
|
1) should iterate through epochs, inner loop per batch
|
|
2) should keep running total of accuracy, loss (training & test) and record those to tensorboard
|
|
3) should perform windowing / shuffling per epoch
|
|
P1:
|
|
1) use Ray Tune for tuning / checkpointing
|
|
"""
|
|
|
|
|
|
def train_epoch(dataset, model, device, criterion, optimizer):
|
|
num_correct = 0
|
|
num_total = 0
|
|
running_loss = 0.0
|
|
|
|
for i, (inputs, labels) in enumerate(dataset):
|
|
inputs = inputs.to(device)
|
|
labels = labels.to(device)
|
|
|
|
# Zero the parameter gradients
|
|
optimizer.zero_grad()
|
|
|
|
# Forward + backward + optimize
|
|
outputs = model(inputs.float())
|
|
loss = criterion(outputs, labels.float())
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
# how are we doing?
|
|
predictions = (torch.sigmoid(outputs) > 0.5).int()
|
|
num_correct += (predictions == labels).sum().item()
|
|
num_total += len(outputs)
|
|
|
|
# Save loss to plot
|
|
running_loss += loss.item()
|
|
if i % 100 == 0:
|
|
print(f"training batch [{i}] loss: {loss.item()}")
|
|
|
|
return (running_loss, num_correct, num_total)
|
|
|
|
|
|
def test_epoch(dataset, model, device, criterion):
|
|
num_correct = 0
|
|
num_total = 0
|
|
running_loss = 0.0
|
|
|
|
with torch.no_grad():
|
|
for i, (inputs, labels) in enumerate(dataset):
|
|
inputs = inputs.to(device)
|
|
labels = labels.to(device)
|
|
|
|
# Forward + backward + optimize
|
|
outputs = model(inputs.float())
|
|
loss = criterion(outputs, labels.float())
|
|
|
|
# how are we doing?
|
|
predictions = (torch.sigmoid(outputs) > 0.5).int()
|
|
num_correct += (predictions == labels).sum().item()
|
|
num_total += len(outputs)
|
|
|
|
# Save loss to plot
|
|
running_loss += loss.item()
|
|
if i % 100 == 0:
|
|
print(f"testing batch [{i}] loss: {loss.item()}")
|
|
|
|
return (running_loss, num_correct, num_total)
|
|
|
|
|
|
def train_func(config):
|
|
is_distributed = config.get("is_distributed", False)
|
|
use_gpu = config["use_gpu"]
|
|
num_epochs = config["num_epochs"]
|
|
batch_size = config["batch_size"]
|
|
num_layers = config["num_layers"]
|
|
num_hidden = config["num_hidden"]
|
|
dropout_every = config["dropout_every"]
|
|
dropout_prob = config["dropout_prob"]
|
|
num_features = config["num_features"]
|
|
|
|
print("Defining model, loss, and optimizer...")
|
|
|
|
# Setup device.
|
|
if is_distributed:
|
|
device = torch.device(f"cuda:{train.local_rank()}" if use_gpu
|
|
and torch.cuda.is_available() else "cpu")
|
|
else:
|
|
device = torch.device("cuda:0" if use_gpu
|
|
and torch.cuda.is_available() else "cpu")
|
|
print(f"Device: {device}")
|
|
|
|
# Setup data.
|
|
if is_distributed:
|
|
train_dataset_pipeline = train.get_dataset_shard("train_dataset")
|
|
train_dataset_epoch_iterator = train_dataset_pipeline.iter_epochs()
|
|
test_dataset = train.get_dataset_shard("test_dataset")
|
|
else:
|
|
train_dataset_epoch_iterator = config["train_dataset"].iter_epochs()
|
|
test_dataset = config["test_dataset"]
|
|
test_torch_dataset = test_dataset.to_torch(
|
|
label_column="label", batch_size=batch_size)
|
|
|
|
# Setup Tensorboard and MLflow.
|
|
if is_distributed:
|
|
# Setup is done through Callback.
|
|
pass
|
|
else:
|
|
writer = SummaryWriter()
|
|
mlflow.start_run()
|
|
mlflow_config = config.copy()
|
|
mlflow_config.pop("test_dataset")
|
|
mlflow_config.pop("train_dataset")
|
|
mlflow.log_params(mlflow_config)
|
|
|
|
net = Net(
|
|
n_layers=num_layers,
|
|
n_features=num_features,
|
|
num_hidden=num_hidden,
|
|
dropout_every=dropout_every,
|
|
drop_prob=dropout_prob,
|
|
).to(device)
|
|
print(net.parameters)
|
|
|
|
if is_distributed:
|
|
net = DistributedDataParallel(net)
|
|
|
|
criterion = nn.BCEWithLogitsLoss()
|
|
optimizer = optim.Adam(net.parameters(), weight_decay=0.0001)
|
|
|
|
print("Starting training...")
|
|
for epoch in range(num_epochs):
|
|
train_dataset = next(train_dataset_epoch_iterator)
|
|
|
|
train_torch_dataset = train_dataset.to_torch(
|
|
label_column="label", batch_size=batch_size)
|
|
|
|
train_running_loss, train_num_correct, train_num_total = train_epoch(
|
|
train_torch_dataset, net, device, criterion, optimizer)
|
|
train_acc = train_num_correct / train_num_total
|
|
print(
|
|
f"epoch [{epoch + 1}]: training accuracy: {train_num_correct} / {train_num_total} = {train_acc:.4f}"
|
|
)
|
|
|
|
test_running_loss, test_num_correct, test_num_total = test_epoch(
|
|
test_torch_dataset, net, device, criterion)
|
|
test_acc = test_num_correct / test_num_total
|
|
print(
|
|
f"epoch [{epoch + 1}]: testing accuracy: {test_num_correct} / {test_num_total} = {test_acc:.4f}"
|
|
)
|
|
|
|
# Record and log stats.
|
|
if is_distributed:
|
|
train.report(
|
|
train_acc=train_acc,
|
|
train_loss=train_running_loss,
|
|
test_acc=test_acc,
|
|
test_loss=test_running_loss)
|
|
else:
|
|
writer.add_scalar("Accuracy/train", train_acc, epoch)
|
|
writer.add_scalar("Loss/train", train_running_loss, epoch)
|
|
writer.add_scalar("Accuracy/test", test_acc, epoch)
|
|
writer.add_scalar("Loss/test", test_running_loss, epoch)
|
|
writer.flush()
|
|
|
|
mlflow.log_metrics({
|
|
"train_acc": train_acc,
|
|
"train_loss": train_running_loss,
|
|
"test_acc": test_acc,
|
|
"test_loss": test_running_loss
|
|
})
|
|
|
|
# Checkpoint model.
|
|
if is_distributed:
|
|
import copy
|
|
model_copy = copy.deepcopy(net.module)
|
|
train.save_checkpoint(
|
|
model_state_dict=model_copy.cpu().state_dict())
|
|
else:
|
|
torch.save(net.state_dict(), f"models/model-epoch-{epoch}.torch")
|
|
|
|
# Shutdown Tensorboard and MLflow.
|
|
if is_distributed:
|
|
pass
|
|
else:
|
|
writer.close()
|
|
# mlflow.end_run()
|
|
|
|
if is_distributed:
|
|
if train.world_rank() == 0:
|
|
return net.module.cpu()
|
|
else:
|
|
return None
|
|
else:
|
|
return net
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--use-s3",
|
|
action="store_true",
|
|
default=False,
|
|
help="Use data from s3 for testing.")
|
|
parser.add_argument(
|
|
"--smoke-test",
|
|
action="store_true",
|
|
default=False,
|
|
help="Finish quickly for testing.")
|
|
parser.add_argument(
|
|
"--address",
|
|
required=False,
|
|
type=str,
|
|
help="The address to use for Ray. `auto` if running through `ray submit"
|
|
)
|
|
parser.add_argument(
|
|
"--num-workers",
|
|
default=1,
|
|
type=int,
|
|
help="If > 1, number of Ray workers to use for distributed training")
|
|
parser.add_argument(
|
|
"--large-dataset",
|
|
action="store_true",
|
|
default=False,
|
|
help="Use 500GB dataset")
|
|
parser.add_argument(
|
|
"--use-gpu",
|
|
action="store_true",
|
|
default=False,
|
|
help="Use GPU for training.")
|
|
parser.add_argument(
|
|
"--mlflow-register-model",
|
|
action="store_true",
|
|
help="Whether to use mlflow model registry. If set, a local MLflow "
|
|
"tracking server is expected to have already been started.")
|
|
|
|
args = parser.parse_args()
|
|
smoke_test = args.smoke_test
|
|
address = args.address
|
|
num_workers = args.num_workers
|
|
use_gpu = args.use_gpu
|
|
use_s3 = args.use_s3
|
|
large_dataset = args.large_dataset
|
|
|
|
if large_dataset:
|
|
assert use_s3, "--large-dataset requires --use-s3 to be set."
|
|
|
|
start_time = time.time()
|
|
|
|
ray.init(address=address)
|
|
dask.config.set(scheduler=ray_dask_get)
|
|
|
|
# Setup MLflow.
|
|
|
|
# By default, all metrics & artifacts for each run will be saved to disk
|
|
# in ./mlruns directory. Uncomment the below lines if you want to change
|
|
# the URI for the tracking uri.
|
|
# TODO: Use S3 backed tracking server for golden notebook.
|
|
if args.mlflow_register_model:
|
|
# MLflow model registry does not work with a local file system backend.
|
|
# Have to start a mlflow tracking server on localhost
|
|
mlflow.set_tracking_uri("http://127.0.0.1:5000")
|
|
|
|
# Set the experiment. This will create the experiment if not already
|
|
# exists.
|
|
mlflow.set_experiment("cuj-big-data-training")
|
|
|
|
if use_s3:
|
|
# Check if s3 data is populated.
|
|
BUCKET_NAME = 'cuj-big-data'
|
|
FOLDER_NAME = 'big-data/' if large_dataset else 'data/'
|
|
s3_resource = boto3.resource('s3')
|
|
bucket = s3_resource.Bucket(BUCKET_NAME)
|
|
count = bucket.objects.filter(Prefix=FOLDER_NAME)
|
|
if len(list(count)) == 0:
|
|
print("please run `python make_and_upload_dataset.py` first")
|
|
sys.exit(1)
|
|
data_path = "s3://cuj-big-data/big-data/" if large_dataset else "s3://cuj-big-data/data/"
|
|
inference_path = "s3://cuj-big-data/inference/"
|
|
inference_output_path = "s3://cuj-big-data/output/"
|
|
else:
|
|
dir_path = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
data_path = os.path.join(dir_path, "data")
|
|
inference_path = os.path.join(dir_path, "inference")
|
|
inference_output_path = "/tmp"
|
|
|
|
if len(os.listdir(data_path)) <= 1 or len(
|
|
os.listdir(inference_path)) <= 1:
|
|
print("please run `python make_and_upload_dataset.py` first")
|
|
sys.exit(1)
|
|
|
|
if smoke_test:
|
|
# Only read a single file.
|
|
data_path = os.path.join(data_path, "data_00000.parquet.snappy")
|
|
inference_path = os.path.join(inference_path,
|
|
"data_00000.parquet.snappy")
|
|
|
|
preprocessor = DataPreprocessor()
|
|
train_df, test_df = preprocessor.preprocess_train_data(
|
|
read_dask_dataframe(data_path))
|
|
|
|
num_columns = len(train_df.columns)
|
|
num_features = num_columns - 1 # remove label column
|
|
|
|
train_dataset = ray.data.from_dask(train_df)
|
|
test_dataset = ray.data.from_dask(test_df)
|
|
del train_df
|
|
del test_df
|
|
|
|
NUM_EPOCHS = 2
|
|
BATCH_SIZE = 512
|
|
NUM_HIDDEN = 50 # 200
|
|
NUM_LAYERS = 3 # 15
|
|
DROPOUT_EVERY = 5
|
|
DROPOUT_PROB = 0.2
|
|
|
|
# Random global shuffle
|
|
train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window(
|
|
_spread_resource_prefix="node:")
|
|
del train_dataset
|
|
|
|
datasets = {
|
|
"train_dataset": train_dataset_pipeline,
|
|
"test_dataset": test_dataset
|
|
}
|
|
|
|
if num_workers <= 1:
|
|
config = {
|
|
"use_gpu": use_gpu,
|
|
"num_epochs": NUM_EPOCHS,
|
|
"batch_size": BATCH_SIZE,
|
|
"num_hidden": NUM_HIDDEN,
|
|
"num_layers": NUM_LAYERS,
|
|
"dropout_every": DROPOUT_EVERY,
|
|
"dropout_prob": DROPOUT_PROB,
|
|
"num_features": num_features,
|
|
}
|
|
|
|
config.update(datasets)
|
|
|
|
model = train_func(config=config)
|
|
|
|
else:
|
|
config = {
|
|
"is_distributed": True,
|
|
"use_gpu": use_gpu,
|
|
"num_epochs": NUM_EPOCHS,
|
|
"batch_size": BATCH_SIZE,
|
|
"num_hidden": NUM_HIDDEN,
|
|
"num_layers": NUM_LAYERS,
|
|
"dropout_every": DROPOUT_EVERY,
|
|
"dropout_prob": DROPOUT_PROB,
|
|
"num_features": num_features
|
|
}
|
|
|
|
# Create 2 callbacks: one for Tensorboard Logging and one for MLflow
|
|
# logging. Pass these into Trainer, and all results that are
|
|
# reported by ``train.report()`` will be logged to these 2 places.
|
|
# TODO: TBXLoggerCallback should create nonexistent logdir
|
|
# and should also create 1 directory per file.
|
|
callbacks = [TBXLoggerCallback(logdir="/tmp"), MLflowCallback(config)]
|
|
|
|
trainer = Trainer(
|
|
backend="torch",
|
|
num_workers=num_workers,
|
|
use_gpu=use_gpu,
|
|
resources_per_worker={
|
|
"CPU": 0,
|
|
"GPU": 1
|
|
})
|
|
trainer.start()
|
|
results = trainer.run(
|
|
train_func=train_func,
|
|
config=config,
|
|
callbacks=callbacks,
|
|
dataset=datasets)
|
|
model = results[0]
|
|
trainer.shutdown()
|
|
|
|
if args.mlflow_register_model:
|
|
mlflow.pytorch.log_model(
|
|
model, artifact_path="models", registered_model_name="torch_model")
|
|
|
|
# Get the latest model from mlflow model registry.
|
|
client = mlflow.tracking.MlflowClient()
|
|
registered_model_name = "torch_model"
|
|
# Get the info for the latest model.
|
|
# By default, registered models are in stage "None".
|
|
latest_model_info = client.get_latest_versions(
|
|
registered_model_name, stages=["None"])[0]
|
|
latest_version = latest_model_info.version
|
|
|
|
def load_model_func():
|
|
model_uri = f"models:/torch_model/{latest_version}"
|
|
return mlflow.pytorch.load_model(model_uri)
|
|
else:
|
|
state_dict = model.state_dict()
|
|
|
|
def load_model_func():
|
|
num_layers = config["num_layers"]
|
|
num_hidden = config["num_hidden"]
|
|
dropout_every = config["dropout_every"]
|
|
dropout_prob = config["dropout_prob"]
|
|
num_features = config["num_features"]
|
|
|
|
model = Net(
|
|
n_layers=num_layers,
|
|
n_features=num_features,
|
|
num_hidden=num_hidden,
|
|
dropout_every=dropout_every,
|
|
drop_prob=dropout_prob)
|
|
model.load_state_dict(state_dict)
|
|
return model
|
|
|
|
class BatchInferModel:
|
|
def __init__(self, load_model_func):
|
|
self.device = torch.device("cuda:0"
|
|
if torch.cuda.is_available() else "cpu")
|
|
self.model = load_model_func().to(self.device)
|
|
|
|
def __call__(self, batch) -> "pd.DataFrame":
|
|
tensor = torch.FloatTensor(batch.to_pandas().values).to(
|
|
self.device)
|
|
return pd.DataFrame(self.model(tensor).cpu().detach().numpy())
|
|
|
|
inference_df = preprocessor.preprocess_inference_data(
|
|
read_dask_dataframe(inference_path))
|
|
inference_dataset = ray.data.from_dask(inference_df)
|
|
inference(inference_dataset, BatchInferModel(load_model_func), 100,
|
|
inference_output_path, use_gpu)
|
|
|
|
end_time = time.time()
|
|
|
|
total_time = end_time - start_time
|
|
print(f"Job finished in {total_time} seconds.")
|
|
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
|
|
f.write(json.dumps({"time": total_time, "success": 1}))
|