[AIR] SklearnTrainer & Predictor interfaces (#23803)

Co-authored-by: Amog Kamsetty <amogkam@users.noreply.github.com>
This commit is contained in:
Antoni Baum 2022-04-12 00:11:42 +02:00 committed by GitHub
parent 984e704207
commit 40646eecd4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 265 additions and 6 deletions

View file

@ -60,6 +60,10 @@ Trainer
:members:
:show-inheritance:
.. automodule:: ray.ml.train.integrations.sklearn
:members:
:show-inheritance:
.. autoclass:: ray.ml.train.data_parallel_trainer.DataParallelTrainer
:members:
:show-inheritance:
@ -101,6 +105,10 @@ Predictors
:members:
:show-inheritance:
.. automodule:: ray.ml.predictors.integrations.sklearn
:members:
:show-inheritance:
.. _air-serve-integration:
Serving

View file

@ -0,0 +1,5 @@
from ray.ml.predictors.integrations.sklearn.sklearn_predictor import (
SklearnPredictor,
)
__all__ = ["SklearnPredictor"]

View file

@ -0,0 +1,103 @@
from typing import Optional, List, Union
import pandas as pd
from ray.ml.checkpoint import Checkpoint
from ray.ml.predictor import Predictor, DataBatchType
from ray.ml.preprocessor import Preprocessor
from sklearn.base import BaseEstimator
class SklearnPredictor(Predictor):
"""A predictor for scikit-learn compatible estimators.
Args:
estimator: The fitted scikit-learn compatible estimator to use for
predictions.
preprocessor: A preprocessor used to transform data batches prior
to prediction.
"""
def __init__(
self, estimator: BaseEstimator, preprocessor: Optional[Preprocessor] = None
):
self.estimator = estimator
self.preprocessor = preprocessor
@classmethod
def from_checkpoint(cls, checkpoint: Checkpoint) -> "SklearnPredictor":
"""Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of ``SklearnTrainer``.
Args:
checkpoint (Checkpoint): The checkpoint to load the model and
preprocessor from. It is expected to be from the result of a
``SklearnTrainer`` run.
"""
raise NotImplementedError
def predict(
self,
data: DataBatchType,
feature_columns: Optional[Union[List[str], List[int]]] = None,
**predict_kwargs,
) -> pd.DataFrame:
"""Run inference on data batch.
Args:
data: A batch of input data. Either a pandas DataFrame or numpy
array.
feature_columns: The names or indices of the columns in the
data to use as features to predict on. If None, then use
all columns in ``data``.
**predict_kwargs: Keyword arguments passed to ``estimator.predict``.
Examples:
.. code-block:: python
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from ray.ml.predictors.sklearn import SklearnPredictor
train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])
model = RandomForestClassifier().fit(train_X, train_y)
predictor = SklearnPredictor(model=model)
data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)
# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
.. code-block:: python
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from ray.ml.predictors.sklearn import SklearnPredictor
train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])
model = RandomForestClassifier().fit(train_X, train_y)
predictor = SklearnPredictor(model=model)
# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)
# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns:
pd.DataFrame: Prediction result.
"""
raise NotImplementedError

View file

@ -1,4 +1,4 @@
from typing import Dict, Type, Any, Optional
from typing import TYPE_CHECKING, Dict, Type, Any, Optional
import warnings
import os
@ -12,14 +12,15 @@ from ray.ml.checkpoint import Checkpoint
from ray.tune import Trainable
from ray.ml.constants import MODEL_KEY, PREPROCESSOR_KEY, TRAIN_DATASET_KEY
import xgboost_ray
if TYPE_CHECKING:
import xgboost_ray
def _convert_scaling_config_to_ray_params(
scaling_config: ScalingConfig,
ray_params_cls: Type[xgboost_ray.RayParams],
ray_params_cls: Type["xgboost_ray.RayParams"],
default_ray_params: Optional[Dict[str, Any]] = None,
) -> xgboost_ray.RayParams:
) -> "xgboost_ray.RayParams":
default_ray_params = default_ray_params or {}
scaling_config_dataclass = ScalingConfigDataClass(**scaling_config)
resources_per_worker = scaling_config_dataclass.additional_resources_per_worker
@ -115,7 +116,7 @@ class GBDTTrainer(Trainer):
def _get_dmatrices(
self, dmatrix_params: Dict[str, Any]
) -> Dict[str, xgboost_ray.RayDMatrix]:
) -> Dict[str, "xgboost_ray.RayDMatrix"]:
return {
k: self._dmatrix_cls(
v, label=self.label_column, **dmatrix_params.get(k, {})
@ -130,7 +131,7 @@ class GBDTTrainer(Trainer):
raise NotImplementedError
@property
def _ray_params(self) -> xgboost_ray.RayParams:
def _ray_params(self) -> "xgboost_ray.RayParams":
return _convert_scaling_config_to_ray_params(
self.scaling_config, self._ray_params_cls, self._default_ray_params
)

View file

@ -0,0 +1,5 @@
from ray.ml.train.integrations.sklearn.sklearn_trainer import (
SklearnTrainer,
)
__all__ = ["SklearnTrainer"]

View file

@ -0,0 +1,137 @@
from typing import Union, Callable, Dict, Iterable, Optional
import numpy as np
import pandas as pd
from ray.ml.trainer import GenDataset
from ray.ml.config import ScalingConfig, RunConfig
from ray.ml.preprocessor import Preprocessor
from ray.util import PublicAPI
from ray.ml.trainer import Trainer
from ray.ml.checkpoint import Checkpoint
from sklearn.base import BaseEstimator
from sklearn.model_selection import BaseCrossValidator
# some nice properties of Sklearn models:
# - they are always pickleable
# - no logic is present in __init__
ArrayType = Union[pd.DataFrame, np.ndarray]
MetricType = Union[str, Callable[[BaseEstimator, ArrayType, ArrayType], float]]
ScoringType = Union[MetricType, Iterable[MetricType], Dict[str, MetricType]]
CVType = Union[int, Iterable, BaseCrossValidator]
@PublicAPI(stability="alpha")
class SklearnTrainer(Trainer):
"""A Trainer for scikit-learn estimator training.
This Trainer runs the ``fit`` method of the given estimator in a
non-distributed manner on a single Ray Actor.
The ``n_jobs`` (or ``thread_count``) parameters will be set to match
the number of CPUs assigned to the Ray Actor. Please note that
parallelism for joblib-based estimators is not yet implemented
(though technically possible).
Example:
.. code-block:: python
import ray
from ray.ml.train.integrations.sklearn import SklearnTrainer
from sklearn.ensemble import RandomForestRegressor
train_dataset = ray.data.from_items(
[{"x": x, "y": x + 1} for x in range(32)])
trainer = SklearnTrainer(
sklearn_estimator=RandomForestRegressor,
label_column="y",
scaling_config={
"trainer_resources": {"CPU": 4}
},
datasets={"train": train_dataset}
)
result = trainer.fit()
Args:
estimator: A scikit-learn compatible estimator to use.
datasets: Ray Datasets to use for training and validation. Must include a
"train" key denoting the training dataset. If a ``preprocessor``
is provided and has not already been fit, it will be fit on the training
dataset. All datasets will be transformed by the ``preprocessor`` if
one is provided. All non-training datasets will be used as separate
validation sets, each reporting separate metrics.
label_column: Name of the label column. A column with this name
must be present in the training dataset.
scoring: Strategy to evaluate the performance of the model on
the validation sets and for cross-validation. Same as in
``sklearn.model_selection.cross_validation``.
If ``scoring`` represents a single score, one can use:
- a single string;
- a callable that returns a single value.
If ``scoring`` represents multiple scores, one can use:
- a list or tuple of unique strings;
- a callable returning a dictionary where the keys are the metric
names and the values are the metric scores;
- a dictionary with metric names as keys and callables a values.
cv: Determines the cross-validation splitting strategy. If specified,
cross-validation will be run on the train dataset, in addition to
computing metrics for validation datasets. Same as in
``sklearn.model_selection.cross_validation``, with the exception of
None.
Possible inputs for ``cv`` are:
- None, to skip cross-validation.
- int, to specify the number of folds in a ``(Stratified)KFold``,
- ``CV splitter``,
- An iterable yielding (train, test) splits as arrays of indices.
For int/None inputs, if the estimator is a classifier and ``y`` is
either binary or multiclass, ``StratifiedKFold`` is used. In all
other cases, ``KFold`` is used. These splitters are instantiated
with ``shuffle=False`` so the splits will be the same across calls.
scaling_config: Configuration for how to scale training.
Only the ``trainer_resources`` key can be provided,
as the training is not distributed.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.ml.preprocessor.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
**fit_params: Additional kwargs passed to ``estimator.fit()``
method.
"""
def __init__(
self,
*,
estimator: BaseEstimator,
datasets: Dict[str, GenDataset],
label_column: Optional[str] = None,
scoring: Optional[ScoringType] = None,
cv: Optional[CVType] = None,
scaling_config: Optional[ScalingConfig] = None,
run_config: Optional[RunConfig] = None,
preprocessor: Optional[Preprocessor] = None,
resume_from_checkpoint: Optional[Checkpoint] = None,
**fit_params
):
self.estimator = estimator
self.label_column = label_column
self.fit_params = fit_params
self.scoring = scoring
self.cv = cv
super().__init__(
scaling_config=scaling_config,
run_config=run_config,
datasets=datasets,
preprocessor=preprocessor,
resume_from_checkpoint=resume_from_checkpoint,
)
def training_loop(self) -> None:
return super().training_loop()