diff --git a/doc/source/ray-air/getting-started.rst b/doc/source/ray-air/getting-started.rst index 951d2885a..020e956ba 100644 --- a/doc/source/ray-air/getting-started.rst +++ b/doc/source/ray-air/getting-started.rst @@ -89,6 +89,9 @@ Predictors .. autoclass:: ray.ml.predictor.Predictor :members: +.. autoclass:: ray.ml.batch_predictor.BatchPredictor + :members: + .. automodule:: ray.ml.predictors.integrations.xgboost :members: :show-inheritance: diff --git a/python/ray/ml/batch_predictor.py b/python/ray/ml/batch_predictor.py new file mode 100644 index 000000000..a0adf2b68 --- /dev/null +++ b/python/ray/ml/batch_predictor.py @@ -0,0 +1,99 @@ +from typing import Type, Optional, Dict, Any + +import ray +from ray.ml import Checkpoint +from ray.ml.predictor import Predictor + + +class BatchPredictor(Predictor): + """Batch predictor class. + + Takes a predictor class and a checkpoint and provides an interface to run + batch scoring on Ray datasets. + + This batch predictor wraps around a predictor class and executes it + in a distributed way when calling ``predict()``. + + Attributes: + checkpoint: Checkpoint loaded by the distributed predictor objects. + predictor_cls: Predictor class reference. When scoring, each scoring worker + will create an instance of this class and call ``predict(batch)`` on it. + **predictor_kwargs: Keyword arguments passed to the predictor on + initialization. + + """ + + def __init__( + self, checkpoint: Checkpoint, predictor_cls: Type[Predictor], **predictor_kwargs + ): + # Store as object ref so we only serialize it once for all map workers + self.checkpoint_ref = checkpoint.to_object_ref() + self.predictor_cls = predictor_cls + self.predictor_kwargs = predictor_kwargs + + @classmethod + def from_checkpoint( + cls, checkpoint: Checkpoint, predictor_cls: Type[Predictor], **kwargs + ) -> "BatchPredictor": + return BatchPredictor( + checkpoint=checkpoint, predictor_cls=predictor_cls, **kwargs + ) + + def predict( + self, + data: ray.data.Dataset, + *, + batch_size: int = 4096, + min_scoring_workers: int = 1, + max_scoring_workers: Optional[int] = None, + num_cpus_per_worker: int = 1, + num_gpus_per_worker: int = 0, + ray_remote_args: Optional[Dict[str, Any]] = None, + **predict_kwargs + ) -> ray.data.Dataset: + """Run batch scoring on dataset. + + Args: + data: Ray dataset to run batch prediction on. + batch_size: Split dataset into batches of this size for prediction. + min_scoring_workers: Minimum number of scoring actors. + max_scoring_workers: If set, specify the maximum number of scoring actors. + num_cpus_per_worker: Number of CPUs to allocate per scoring worker. + num_gpus_per_worker: Number of GPUs to allocate per scoring worker. + ray_remote_args: Additional resource requirements to request from + ray. + predict_kwargs: Keyword arguments passed to the predictor's + ``predict()`` method. + + Returns: + Dataset containing scoring results. + + """ + predictor_cls = self.predictor_cls + checkpoint_ref = self.checkpoint_ref + predictor_kwargs = self.predictor_kwargs + + class ScoringWrapper: + def __init__(self): + self.predictor = predictor_cls.from_checkpoint( + Checkpoint.from_object_ref(checkpoint_ref), **predictor_kwargs + ) + + def __call__(self, batch): + return self.predictor.predict(batch, **predict_kwargs) + + compute = ray.data.ActorPoolStrategy( + min_size=min_scoring_workers, max_size=max_scoring_workers + ) + + ray_remote_args = ray_remote_args or {} + ray_remote_args["num_cpus"] = num_cpus_per_worker + ray_remote_args["num_gpus"] = num_gpus_per_worker + + return data.map_batches( + ScoringWrapper, + compute=compute, + batch_format="pandas", + batch_size=batch_size, + **ray_remote_args + ) diff --git a/python/ray/ml/examples/lightgbm_example.py b/python/ray/ml/examples/lightgbm_example.py index de11d9320..1b4807511 100644 --- a/python/ray/ml/examples/lightgbm_example.py +++ b/python/ray/ml/examples/lightgbm_example.py @@ -4,7 +4,7 @@ from typing import Tuple import pandas as pd import ray -from ray.ml.checkpoint import Checkpoint +from ray.ml.batch_predictor import BatchPredictor from ray.ml.predictors.integrations.lightgbm import LightGBMPredictor from ray.ml.train.integrations.lightgbm import LightGBMTrainer from ray.data.dataset import Dataset @@ -57,33 +57,20 @@ def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result: def predict_lightgbm(result: Result): _, _, test_dataset = prepare_data() - checkpoint_object_ref = result.checkpoint.to_object_ref() - - class LightGBMScorer: - def __init__(self): - self.predictor = LightGBMPredictor.from_checkpoint( - Checkpoint.from_object_ref(checkpoint_object_ref) - ) - - def __call__(self, batch) -> pd.DataFrame: - return self.predictor.predict(batch) + batch_predictor = BatchPredictor.from_checkpoint( + result.checkpoint, LightGBMPredictor + ) predicted_labels = ( - test_dataset.map_batches( - LightGBMScorer, compute="actors", batch_format="pandas" - ) + batch_predictor.predict(test_dataset) .map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas") .to_pandas(limit=float("inf")) ) print(f"PREDICTED LABELS\n{predicted_labels}") - class LightGBMScorerSHAP(LightGBMScorer): - def __call__(self, batch) -> pd.DataFrame: - return self.predictor.predict(batch, pred_contrib=True) - - shap_values = test_dataset.map_batches( - LightGBMScorerSHAP, compute="actors", batch_format="pandas" - ).to_pandas(limit=float("inf")) + shap_values = batch_predictor.predict(test_dataset, pred_contrib=True).to_pandas( + limit=float("inf") + ) print(f"SHAP VALUES\n{shap_values}") diff --git a/python/ray/ml/examples/pytorch/torch_linear_dataset_example.py b/python/ray/ml/examples/pytorch/torch_linear_dataset_example.py index 80967f220..0763c2901 100644 --- a/python/ray/ml/examples/pytorch/torch_linear_dataset_example.py +++ b/python/ray/ml/examples/pytorch/torch_linear_dataset_example.py @@ -8,7 +8,7 @@ import torch.nn as nn import ray import ray.train as train from ray.data import Dataset -from ray.ml.checkpoint import Checkpoint +from ray.ml.batch_predictor import BatchPredictor from ray.ml.predictors.integrations.torch import TorchPredictor from ray.ml.result import Result from ray.ml.train.integrations.torch import TorchTrainer @@ -130,20 +130,12 @@ def train_linear(num_workers=2, use_gpu=False): def predict_linear(result: Result): - checkpoint_object_ref = result.checkpoint.to_object_ref() - - class TorchScorer: - def __init__(self): - checkpoint = Checkpoint.from_object_ref(checkpoint_object_ref) - self.predictor = TorchPredictor.from_checkpoint(checkpoint) - - def __call__(self, batch): - return self.predictor.predict(batch, dtype=torch.float) + batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, TorchPredictor) items = [{"x": random.uniform(0, 1) for _ in range(10)}] prediction_dataset = ray.data.from_items(items) - predictions = prediction_dataset.map_batches(TorchScorer, compute="actors") + predictions = batch_predictor.predict(prediction_dataset, dtype=torch.float) return predictions diff --git a/python/ray/ml/examples/tf/tensorflow_linear_dataset_example.py b/python/ray/ml/examples/tf/tensorflow_linear_dataset_example.py index 14d882c0e..4d58ff8d2 100644 --- a/python/ray/ml/examples/tf/tensorflow_linear_dataset_example.py +++ b/python/ray/ml/examples/tf/tensorflow_linear_dataset_example.py @@ -1,16 +1,15 @@ import argparse import numpy as np -import pandas as pd import tensorflow as tf +from ray.ml.batch_predictor import BatchPredictor from tensorflow.keras.callbacks import Callback import ray import ray.train as train from ray.data import Dataset from ray.train.tensorflow import prepare_dataset_shard -from ray.ml.checkpoint import Checkpoint from ray.ml.train.integrations.tensorflow import TensorflowTrainer from ray.ml.predictors.integrations.tensorflow import TensorflowPredictor from ray.ml.result import Result @@ -90,24 +89,14 @@ def train_tensorflow_linear(num_workers: int = 2, use_gpu: bool = False) -> Resu def predict_linear(result: Result) -> Dataset: - items = [{"x": np.random.uniform(0, 1)}] * 10 + batch_predictor = BatchPredictor.from_checkpoint( + result.checkpoint, TensorflowPredictor, model_definition=build_model + ) + + items = [{"x": np.random.uniform(0, 1)} for _ in range(10)] prediction_dataset = ray.data.from_items(items) - checkpoint_object_ref = result.checkpoint.to_object_ref() - - class TFScorer: - def __init__(self): - self.predictor = TensorflowPredictor.from_checkpoint( - Checkpoint.from_object_ref(checkpoint_object_ref), - model_definition=build_model, - ) - - def __call__(self, batch) -> pd.DataFrame: - return self.predictor.predict(batch, dtype=tf.float32) - - predictions = prediction_dataset.map_batches( - TFScorer, compute="actors", batch_format="pandas" - ) + predictions = batch_predictor.predict(prediction_dataset, dtype=tf.float32) pandas_predictions = predictions.to_pandas(float("inf")) diff --git a/python/ray/ml/examples/xgboost_example.py b/python/ray/ml/examples/xgboost_example.py index ddeac0c3e..e61a51cbf 100644 --- a/python/ray/ml/examples/xgboost_example.py +++ b/python/ray/ml/examples/xgboost_example.py @@ -4,7 +4,7 @@ from typing import Tuple import pandas as pd import ray -from ray.ml.checkpoint import Checkpoint +from ray.ml.batch_predictor import BatchPredictor from ray.ml.predictors.integrations.xgboost import XGBoostPredictor from ray.ml.train.integrations.xgboost import XGBoostTrainer from ray.data.dataset import Dataset @@ -58,31 +58,21 @@ def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result: def predict_xgboost(result: Result): _, _, test_dataset = prepare_data() - checkpoint_object_ref = result.checkpoint.to_object_ref() - class XGBoostScorer: - def __init__(self): - self.predictor = XGBoostPredictor.from_checkpoint( - Checkpoint.from_object_ref(checkpoint_object_ref) - ) - - def __call__(self, batch) -> pd.DataFrame: - return self.predictor.predict(batch) + batch_predictor = BatchPredictor.from_checkpoint( + result.checkpoint, XGBoostPredictor + ) predicted_labels = ( - test_dataset.map_batches(XGBoostScorer, compute="actors", batch_format="pandas") + batch_predictor.predict(test_dataset) .map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas") .to_pandas(limit=float("inf")) ) print(f"PREDICTED LABELS\n{predicted_labels}") - class XGBoostScorerSHAP(XGBoostScorer): - def __call__(self, batch) -> pd.DataFrame: - return self.predictor.predict(batch, pred_contribs=True) - - shap_values = test_dataset.map_batches( - XGBoostScorerSHAP, compute="actors", batch_format="pandas" - ).to_pandas(limit=float("inf")) + shap_values = batch_predictor.predict(test_dataset, pred_contribs=True).to_pandas( + limit=float("inf") + ) print(f"SHAP VALUES\n{shap_values}") diff --git a/python/ray/ml/tests/test_predictor.py b/python/ray/ml/tests/test_predictor.py index d9b3b7e00..38a8c5905 100644 --- a/python/ray/ml/tests/test_predictor.py +++ b/python/ray/ml/tests/test_predictor.py @@ -2,16 +2,25 @@ import pytest import ray from ray.ml.checkpoint import Checkpoint -from ray.ml.predictor import Predictor, DataBatchType, PredictorNotSerializableException +from ray.ml.predictor import ( + Predictor, + DataBatchType, + PredictorNotSerializableException, +) +from ray.ml.batch_predictor import BatchPredictor class DummyPredictor(Predictor): + def __init__(self, factor: float = 1.0): + self.factor = factor + @classmethod def from_checkpoint(cls, checkpoint: Checkpoint, **kwargs) -> "DummyPredictor": - return DummyPredictor() + checkpoint_data = checkpoint.to_dict() + return DummyPredictor(**checkpoint_data) def predict(self, data: DataBatchType, **kwargs) -> DataBatchType: - return data + return data * self.factor def test_serialization(): @@ -26,6 +35,22 @@ def test_serialization(): ray.put(predictor) +def test_batch_prediction(): + batch_predictor = BatchPredictor.from_checkpoint( + Checkpoint.from_dict({"factor": 2.0}), DummyPredictor + ) + + test_dataset = ray.data.from_items([1.0, 2.0, 3.0, 4.0]) + assert batch_predictor.predict( + test_dataset + ).to_pandas().to_numpy().squeeze().tolist() == [ + 2.0, + 4.0, + 6.0, + 8.0, + ] + + if __name__ == "__main__": import sys