[air/wip] Add batch predictor class (#23808)

What: This class adds a generic BatchPredictor class that offers an interface to run batch inference on Ray datasets. It takes a Predictor class and checkpoint as an input, and provides a predict(dataset) method to run scalable scoring inference.

Why: Currently users have to implement scorers themselves. This is mostly boilerplate and prone to errors, so we should provide a simple solution instead.

Note that this predictor also implements the Predictor interface.
This commit is contained in:
Kai Fricke 2022-04-13 08:58:08 +01:00 committed by GitHub
parent a85baac3b4
commit 40d3a62aa1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 156 additions and 71 deletions

View file

@ -89,6 +89,9 @@ Predictors
.. autoclass:: ray.ml.predictor.Predictor
:members:
.. autoclass:: ray.ml.batch_predictor.BatchPredictor
:members:
.. automodule:: ray.ml.predictors.integrations.xgboost
:members:
:show-inheritance:

View file

@ -0,0 +1,99 @@
from typing import Type, Optional, Dict, Any
import ray
from ray.ml import Checkpoint
from ray.ml.predictor import Predictor
class BatchPredictor(Predictor):
"""Batch predictor class.
Takes a predictor class and a checkpoint and provides an interface to run
batch scoring on Ray datasets.
This batch predictor wraps around a predictor class and executes it
in a distributed way when calling ``predict()``.
Attributes:
checkpoint: Checkpoint loaded by the distributed predictor objects.
predictor_cls: Predictor class reference. When scoring, each scoring worker
will create an instance of this class and call ``predict(batch)`` on it.
**predictor_kwargs: Keyword arguments passed to the predictor on
initialization.
"""
def __init__(
self, checkpoint: Checkpoint, predictor_cls: Type[Predictor], **predictor_kwargs
):
# Store as object ref so we only serialize it once for all map workers
self.checkpoint_ref = checkpoint.to_object_ref()
self.predictor_cls = predictor_cls
self.predictor_kwargs = predictor_kwargs
@classmethod
def from_checkpoint(
cls, checkpoint: Checkpoint, predictor_cls: Type[Predictor], **kwargs
) -> "BatchPredictor":
return BatchPredictor(
checkpoint=checkpoint, predictor_cls=predictor_cls, **kwargs
)
def predict(
self,
data: ray.data.Dataset,
*,
batch_size: int = 4096,
min_scoring_workers: int = 1,
max_scoring_workers: Optional[int] = None,
num_cpus_per_worker: int = 1,
num_gpus_per_worker: int = 0,
ray_remote_args: Optional[Dict[str, Any]] = None,
**predict_kwargs
) -> ray.data.Dataset:
"""Run batch scoring on dataset.
Args:
data: Ray dataset to run batch prediction on.
batch_size: Split dataset into batches of this size for prediction.
min_scoring_workers: Minimum number of scoring actors.
max_scoring_workers: If set, specify the maximum number of scoring actors.
num_cpus_per_worker: Number of CPUs to allocate per scoring worker.
num_gpus_per_worker: Number of GPUs to allocate per scoring worker.
ray_remote_args: Additional resource requirements to request from
ray.
predict_kwargs: Keyword arguments passed to the predictor's
``predict()`` method.
Returns:
Dataset containing scoring results.
"""
predictor_cls = self.predictor_cls
checkpoint_ref = self.checkpoint_ref
predictor_kwargs = self.predictor_kwargs
class ScoringWrapper:
def __init__(self):
self.predictor = predictor_cls.from_checkpoint(
Checkpoint.from_object_ref(checkpoint_ref), **predictor_kwargs
)
def __call__(self, batch):
return self.predictor.predict(batch, **predict_kwargs)
compute = ray.data.ActorPoolStrategy(
min_size=min_scoring_workers, max_size=max_scoring_workers
)
ray_remote_args = ray_remote_args or {}
ray_remote_args["num_cpus"] = num_cpus_per_worker
ray_remote_args["num_gpus"] = num_gpus_per_worker
return data.map_batches(
ScoringWrapper,
compute=compute,
batch_format="pandas",
batch_size=batch_size,
**ray_remote_args
)

View file

@ -4,7 +4,7 @@ from typing import Tuple
import pandas as pd
import ray
from ray.ml.checkpoint import Checkpoint
from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.lightgbm import LightGBMPredictor
from ray.ml.train.integrations.lightgbm import LightGBMTrainer
from ray.data.dataset import Dataset
@ -57,33 +57,20 @@ def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result:
def predict_lightgbm(result: Result):
_, _, test_dataset = prepare_data()
checkpoint_object_ref = result.checkpoint.to_object_ref()
class LightGBMScorer:
def __init__(self):
self.predictor = LightGBMPredictor.from_checkpoint(
Checkpoint.from_object_ref(checkpoint_object_ref)
)
def __call__(self, batch) -> pd.DataFrame:
return self.predictor.predict(batch)
batch_predictor = BatchPredictor.from_checkpoint(
result.checkpoint, LightGBMPredictor
)
predicted_labels = (
test_dataset.map_batches(
LightGBMScorer, compute="actors", batch_format="pandas"
)
batch_predictor.predict(test_dataset)
.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
.to_pandas(limit=float("inf"))
)
print(f"PREDICTED LABELS\n{predicted_labels}")
class LightGBMScorerSHAP(LightGBMScorer):
def __call__(self, batch) -> pd.DataFrame:
return self.predictor.predict(batch, pred_contrib=True)
shap_values = test_dataset.map_batches(
LightGBMScorerSHAP, compute="actors", batch_format="pandas"
).to_pandas(limit=float("inf"))
shap_values = batch_predictor.predict(test_dataset, pred_contrib=True).to_pandas(
limit=float("inf")
)
print(f"SHAP VALUES\n{shap_values}")

View file

@ -8,7 +8,7 @@ import torch.nn as nn
import ray
import ray.train as train
from ray.data import Dataset
from ray.ml.checkpoint import Checkpoint
from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.torch import TorchPredictor
from ray.ml.result import Result
from ray.ml.train.integrations.torch import TorchTrainer
@ -130,20 +130,12 @@ def train_linear(num_workers=2, use_gpu=False):
def predict_linear(result: Result):
checkpoint_object_ref = result.checkpoint.to_object_ref()
class TorchScorer:
def __init__(self):
checkpoint = Checkpoint.from_object_ref(checkpoint_object_ref)
self.predictor = TorchPredictor.from_checkpoint(checkpoint)
def __call__(self, batch):
return self.predictor.predict(batch, dtype=torch.float)
batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, TorchPredictor)
items = [{"x": random.uniform(0, 1) for _ in range(10)}]
prediction_dataset = ray.data.from_items(items)
predictions = prediction_dataset.map_batches(TorchScorer, compute="actors")
predictions = batch_predictor.predict(prediction_dataset, dtype=torch.float)
return predictions

View file

@ -1,16 +1,15 @@
import argparse
import numpy as np
import pandas as pd
import tensorflow as tf
from ray.ml.batch_predictor import BatchPredictor
from tensorflow.keras.callbacks import Callback
import ray
import ray.train as train
from ray.data import Dataset
from ray.train.tensorflow import prepare_dataset_shard
from ray.ml.checkpoint import Checkpoint
from ray.ml.train.integrations.tensorflow import TensorflowTrainer
from ray.ml.predictors.integrations.tensorflow import TensorflowPredictor
from ray.ml.result import Result
@ -90,24 +89,14 @@ def train_tensorflow_linear(num_workers: int = 2, use_gpu: bool = False) -> Resu
def predict_linear(result: Result) -> Dataset:
items = [{"x": np.random.uniform(0, 1)}] * 10
batch_predictor = BatchPredictor.from_checkpoint(
result.checkpoint, TensorflowPredictor, model_definition=build_model
)
items = [{"x": np.random.uniform(0, 1)} for _ in range(10)]
prediction_dataset = ray.data.from_items(items)
checkpoint_object_ref = result.checkpoint.to_object_ref()
class TFScorer:
def __init__(self):
self.predictor = TensorflowPredictor.from_checkpoint(
Checkpoint.from_object_ref(checkpoint_object_ref),
model_definition=build_model,
)
def __call__(self, batch) -> pd.DataFrame:
return self.predictor.predict(batch, dtype=tf.float32)
predictions = prediction_dataset.map_batches(
TFScorer, compute="actors", batch_format="pandas"
)
predictions = batch_predictor.predict(prediction_dataset, dtype=tf.float32)
pandas_predictions = predictions.to_pandas(float("inf"))

View file

@ -4,7 +4,7 @@ from typing import Tuple
import pandas as pd
import ray
from ray.ml.checkpoint import Checkpoint
from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.xgboost import XGBoostPredictor
from ray.ml.train.integrations.xgboost import XGBoostTrainer
from ray.data.dataset import Dataset
@ -58,31 +58,21 @@ def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:
def predict_xgboost(result: Result):
_, _, test_dataset = prepare_data()
checkpoint_object_ref = result.checkpoint.to_object_ref()
class XGBoostScorer:
def __init__(self):
self.predictor = XGBoostPredictor.from_checkpoint(
Checkpoint.from_object_ref(checkpoint_object_ref)
)
def __call__(self, batch) -> pd.DataFrame:
return self.predictor.predict(batch)
batch_predictor = BatchPredictor.from_checkpoint(
result.checkpoint, XGBoostPredictor
)
predicted_labels = (
test_dataset.map_batches(XGBoostScorer, compute="actors", batch_format="pandas")
batch_predictor.predict(test_dataset)
.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
.to_pandas(limit=float("inf"))
)
print(f"PREDICTED LABELS\n{predicted_labels}")
class XGBoostScorerSHAP(XGBoostScorer):
def __call__(self, batch) -> pd.DataFrame:
return self.predictor.predict(batch, pred_contribs=True)
shap_values = test_dataset.map_batches(
XGBoostScorerSHAP, compute="actors", batch_format="pandas"
).to_pandas(limit=float("inf"))
shap_values = batch_predictor.predict(test_dataset, pred_contribs=True).to_pandas(
limit=float("inf")
)
print(f"SHAP VALUES\n{shap_values}")

View file

@ -2,16 +2,25 @@ import pytest
import ray
from ray.ml.checkpoint import Checkpoint
from ray.ml.predictor import Predictor, DataBatchType, PredictorNotSerializableException
from ray.ml.predictor import (
Predictor,
DataBatchType,
PredictorNotSerializableException,
)
from ray.ml.batch_predictor import BatchPredictor
class DummyPredictor(Predictor):
def __init__(self, factor: float = 1.0):
self.factor = factor
@classmethod
def from_checkpoint(cls, checkpoint: Checkpoint, **kwargs) -> "DummyPredictor":
return DummyPredictor()
checkpoint_data = checkpoint.to_dict()
return DummyPredictor(**checkpoint_data)
def predict(self, data: DataBatchType, **kwargs) -> DataBatchType:
return data
return data * self.factor
def test_serialization():
@ -26,6 +35,22 @@ def test_serialization():
ray.put(predictor)
def test_batch_prediction():
batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0}), DummyPredictor
)
test_dataset = ray.data.from_items([1.0, 2.0, 3.0, 4.0])
assert batch_predictor.predict(
test_dataset
).to_pandas().to_numpy().squeeze().tolist() == [
2.0,
4.0,
6.0,
8.0,
]
if __name__ == "__main__":
import sys