[AIR/data] Move preprocessors to ray.data (#25599)

Moves ray.air.Preprocessor and ray.air.preprocessors to ray.data to converge on the agreed upon package structure discussed internally.
This commit is contained in:
Antoni Baum 2022-06-13 21:57:59 +02:00 committed by GitHub
parent 7727dcdac7
commit 5e9a8eb5f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
73 changed files with 186 additions and 176 deletions

View file

@ -5,6 +5,7 @@
- DATA_PROCESSING_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-gpu,-needs_credentials python/ray/air/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air,-gpu_only,-gpu,-needs_credentials python/ray/train/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_air python/ray/data/...
- label: ":brain: RLlib: Learning discr. actions TF2-static-graph"
conditions: ["RAY_CI_RLLIB_AFFECTED"]
@ -352,7 +353,7 @@
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 PYTHON=3.7 ./ci/env/install-dependencies.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-ray_air python/ray/data/...
- label: ":potable_water: Workflow tests (Python 3.7)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]

View file

@ -60,7 +60,7 @@ tuner.fit()
# __check_ingest_1__
import ray
from ray.air.preprocessors import Chain, BatchMapper
from ray.data.preprocessors import Chain, BatchMapper
from ray.air.util.check_ingest import DummyTrainer
# Generate a synthetic dataset of ~10GiB of float64 data. The dataset is sharded

View file

@ -6,7 +6,7 @@ import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from ray.air.preprocessors import *
from ray.data.preprocessors import *
data_raw = load_breast_cancer()
dataset_df = pd.DataFrame(data_raw["data"], columns=data_raw["feature_names"])

View file

@ -4,7 +4,7 @@
# __preprocessor_setup_start__
import pandas as pd
import ray
from ray.air.preprocessors import MinMaxScaler
from ray.data.preprocessors import MinMaxScaler
# Generate two simple datasets.
dataset = ray.data.range_table(8)
@ -47,8 +47,8 @@ print(batch_transformed)
# __trainer_start__
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.air.preprocessors import MinMaxScaler
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])
@ -106,7 +106,7 @@ print(predicted_labels.to_pandas())
# __chain_start__
import ray
from ray.air.preprocessors import Chain, MinMaxScaler, SimpleImputer
from ray.data.preprocessors import Chain, MinMaxScaler, SimpleImputer
# Generate one simple dataset.
dataset = ray.data.from_items(
@ -125,7 +125,7 @@ print(dataset_transformed.take())
# __custom_stateless_start__
import ray
from ray.air.preprocessors import BatchMapper
from ray.data.preprocessors import BatchMapper
# Generate a simple dataset.
dataset = ray.data.range_table(4)
@ -144,7 +144,7 @@ print(dataset_transformed.take())
from typing import Dict
import ray
from pandas import DataFrame
from ray.air.preprocessors import CustomStatefulPreprocessor
from ray.data.preprocessors import CustomStatefulPreprocessor
from ray.data import Dataset
from ray.data.aggregate import Max

View file

@ -2,7 +2,7 @@
# __air_xgb_preprocess_start__
import ray
from ray.air.preprocessors import StandardScaler
from ray.data.preprocessors import StandardScaler
import pandas as pd

View file

@ -51,12 +51,12 @@
"import ray\n",
"from ray.air.batch_predictor import BatchPredictor\n",
"from ray.air.predictors.integrations.lightgbm import LightGBMPredictor\n",
"from ray.air.preprocessors.chain import Chain\n",
"from ray.air.preprocessors.encoder import Categorizer\n",
"from ray.data.preprocessors.chain import Chain\n",
"from ray.data.preprocessors.encoder import Categorizer\n",
"from ray.train.lightgbm import LightGBMTrainer\n",
"from ray.data.dataset import Dataset\n",
"from ray.air.result import Result\n",
"from ray.air.preprocessors import StandardScaler\n",
"from ray.data.preprocessors import StandardScaler\n",
"from sklearn.datasets import load_breast_cancer\n",
"from sklearn.model_selection import train_test_split"
]

View file

@ -165,7 +165,7 @@
"from sklearn.model_selection import train_test_split\n",
"\n",
"from ray.air.train.integrations.xgboost import XGBoostTrainer\n",
"from ray.air.preprocessors import StandardScaler\n",
"from ray.data.preprocessors import StandardScaler\n",
"\n",
"data_raw = load_breast_cancer()\n",
"dataset_df = pd.DataFrame(data_raw[\"data\"], columns=data_raw[\"feature_names\"])\n",

View file

@ -56,7 +56,7 @@
"from ray.data.dataset import Dataset\n",
"from ray.air.batch_predictor import BatchPredictor\n",
"from ray.air.predictors.integrations.sklearn import SklearnPredictor\n",
"from ray.air.preprocessors import Chain, OrdinalEncoder, StandardScaler\n",
"from ray.data.preprocessors import Chain, OrdinalEncoder, StandardScaler\n",
"from ray.air.result import Result\n",
"from ray.train.sklearn import SklearnTrainer\n",
"\n",

View file

@ -472,7 +472,7 @@
},
"outputs": [],
"source": [
"from ray.air.preprocessors import (\n",
"from ray.data.preprocessors import (\n",
" BatchMapper,\n",
" Chain,\n",
" OneHotEncoder,\n",

View file

@ -590,7 +590,7 @@
},
"outputs": [],
"source": [
"from ray.air.preprocessors import BatchMapper\n",
"from ray.data.preprocessors import BatchMapper\n",
"\n",
"from torchvision import transforms\n",
"\n",

View file

@ -69,7 +69,7 @@
"from ray.train.xgboost import XGBoostTrainer\n",
"from ray.data.dataset import Dataset\n",
"from ray.air.result import Result\n",
"from ray.air.preprocessors import StandardScaler\n",
"from ray.data.preprocessors import StandardScaler\n",
"from sklearn.datasets import load_breast_cancer\n",
"from sklearn.model_selection import train_test_split"
]

View file

@ -14,10 +14,10 @@ Components
Preprocessors
~~~~~~~~~~~~~
.. autoclass:: ray.air.preprocessor.Preprocessor
.. autoclass:: ray.data.preprocessor.Preprocessor
:members:
.. automodule:: ray.air.preprocessors
.. automodule:: ray.data.preprocessors
:members:
:show-inheritance:

View file

@ -127,32 +127,32 @@ Ray AIR provides a handful of ``Preprocessor``\s that you can use out of the box
.. tabbed:: Common APIs
#. :class:`Preprocessor <ray.air.preprocessor.Preprocessor>`
#. :class:`Chain <ray.air.preprocessors.Chain>`
#. :class:`BatchMapper <ray.air.preprocessors.BatchMapper>`
#. :class:`CustomStatefulPreprocessor <ray.air.preprocessors.CustomStatefulPreprocessor>`
#. :class:`Preprocessor <ray.data.preprocessor.Preprocessor>`
#. :class:`BatchMapper <ray.data.preprocessors.BatchMapper>`
#. :class:`Chain <ray.data.preprocessors.Chain>`
#. :class:`CustomStatefulPreprocessor <ray.data.preprocessors.CustomStatefulPreprocessor>`
.. tabbed:: Tabular
#. :class:`Categorizer <ray.air.preprocessors.Categorizer>`
#. :class:`FeatureHasher <ray.air.preprocessors.FeatureHasher>`
#. :class:`LabelEncoder <ray.air.preprocessors.LabelEncoder>`
#. :class:`MaxAbsScaler <ray.air.preprocessors.MaxAbsScaler>`
#. :class:`MinMaxScaler <ray.air.preprocessors.MinMaxScaler>`
#. :class:`Normalizer <ray.air.preprocessors.Normalizer>`
#. :class:`OneHotEncoder <ray.air.preprocessors.OneHotEncoder>`
#. :class:`OrdinalEncoder <ray.air.preprocessors.OrdinalEncoder>`
#. :class:`PowerTransformer <ray.air.preprocessors.PowerTransformer>`
#. :class:`RobustScaler <ray.air.preprocessors.RobustScaler>`
#. :class:`SimpleImputer <ray.air.preprocessors.SimpleImputer>`
#. :class:`StandardScaler <ray.air.preprocessors.StandardScaler>`
#. :class:`SimpleImputer <ray.air.preprocessors.SimpleImputer>`
#. :class:`Categorizer <ray.data.preprocessors.Categorizer>`
#. :class:`FeatureHasher <ray.data.preprocessors.FeatureHasher>`
#. :class:`LabelEncoder <ray.data.preprocessors.LabelEncoder>`
#. :class:`MaxAbsScaler <ray.data.preprocessors.MaxAbsScaler>`
#. :class:`MinMaxScaler <ray.data.preprocessors.MinMaxScaler>`
#. :class:`Normalizer <ray.data.preprocessors.Normalizer>`
#. :class:`OneHotEncoder <ray.data.preprocessors.OneHotEncoder>`
#. :class:`OrdinalEncoder <ray.data.preprocessors.OrdinalEncoder>`
#. :class:`PowerTransformer <ray.data.preprocessors.PowerTransformer>`
#. :class:`RobustScaler <ray.data.preprocessors.RobustScaler>`
#. :class:`SimpleImputer <ray.data.preprocessors.SimpleImputer>`
#. :class:`StandardScaler <ray.data.preprocessors.StandardScaler>`
#. :class:`SimpleImputer <ray.data.preprocessors.SimpleImputer>`
.. tabbed:: Text
#. :class:`CountVectorizer <ray.air.preprocessors.CountVectorizer>`
#. :class:`HashingVectorizer <ray.air.preprocessors.HashingVectorizer>`
#. :class:`Tokenizer <ray.air.preprocessors.Tokenizer>`
#. :class:`CountVectorizer <ray.data.preprocessors.CountVectorizer>`
#. :class:`HashingVectorizer <ray.data.preprocessors.HashingVectorizer>`
#. :class:`Tokenizer <ray.data.preprocessors.Tokenizer>`
.. tabbed:: Image

View file

@ -233,14 +233,6 @@ py_test(
deps = [":ml_lib"]
)
py_test(
name = "test_preprocessors",
size = "small",
srcs = ["tests/test_preprocessors.py"],
tags = ["team:ml", "exclusive"],
deps = [":ml_lib"]
)
py_test(
name = "test_remote_storage",
size = "small",

View file

@ -1,7 +1,7 @@
from ray.air.checkpoint import Checkpoint
from ray.air.data_batch_type import DataBatchType
from ray.air.config import RunConfig, ScalingConfig, DatasetConfig
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.predictor import Predictor
from ray.air.result import Result
from ray.air.batch_predictor import BatchPredictor

View file

@ -6,7 +6,7 @@ import ray.cloudpickle as cpickle
from ray.air.constants import PREPROCESSOR_KEY
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def save_preprocessor_to_dir(

View file

@ -13,7 +13,7 @@ from ray.air.checkpoint import Checkpoint
from ray.air._internal.checkpointing import load_preprocessor_from_dir
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class HuggingFacePredictor(Predictor):

View file

@ -9,7 +9,7 @@ from ray.air.predictor import Predictor, DataBatchType
from ray.train.lightgbm import load_checkpoint
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class LightGBMPredictor(Predictor):

View file

@ -10,7 +10,7 @@ from ray.air._internal.checkpointing import (
)
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def to_air_checkpoint(

View file

@ -9,7 +9,7 @@ from ray.rllib.policy.policy import Policy
from ray.rllib.utils.typing import EnvType
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class RLPredictor(Predictor):

View file

@ -13,7 +13,7 @@ from ray.util.joblib import register_ray
from sklearn.base import BaseEstimator
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class SklearnPredictor(Predictor):

View file

@ -11,7 +11,7 @@ from ray.air._internal.checkpointing import (
import ray.cloudpickle as cpickle
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def to_air_checkpoint(

View file

@ -9,7 +9,7 @@ from ray.train.data_parallel_trainer import _load_checkpoint
from ray.air._internal.tensorflow_utils import convert_pandas_to_tf_tensor
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class TensorflowPredictor(Predictor):

View file

@ -6,7 +6,7 @@ from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY, PREPROCESSOR_KEY
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def to_air_checkpoint(

View file

@ -10,7 +10,7 @@ from ray.train.torch import load_checkpoint
from ray.air._internal.torch_utils import convert_pandas_to_torch_tensor
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class TorchPredictor(Predictor):

View file

@ -6,7 +6,7 @@ from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY, PREPROCESSOR_KEY
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def to_air_checkpoint(

View file

@ -10,7 +10,7 @@ from ray.air._internal.checkpointing import (
)
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def to_air_checkpoint(

View file

@ -9,7 +9,7 @@ from ray.air.predictor import Predictor, DataBatchType
from ray.train.xgboost import load_checkpoint
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class XGBoostPredictor(Predictor):

View file

@ -1,44 +0,0 @@
from ray.air.preprocessors.batch_mapper import BatchMapper
from ray.air.preprocessors.chain import Chain
from ray.air.preprocessors.encoder import (
Categorizer,
LabelEncoder,
MultiHotEncoder,
OneHotEncoder,
OrdinalEncoder,
)
from ray.air.preprocessors.hasher import FeatureHasher
from ray.air.preprocessors.imputer import SimpleImputer
from ray.air.preprocessors.normalizer import Normalizer
from ray.air.preprocessors.scaler import (
StandardScaler,
MinMaxScaler,
MaxAbsScaler,
RobustScaler,
)
from ray.air.preprocessors.custom_stateful import CustomStatefulPreprocessor
from ray.air.preprocessors.tokenizer import Tokenizer
from ray.air.preprocessors.transformer import PowerTransformer
from ray.air.preprocessors.vectorizer import CountVectorizer, HashingVectorizer
__all__ = [
"BatchMapper",
"Categorizer",
"Chain",
"CountVectorizer",
"CustomStatefulPreprocessor",
"FeatureHasher",
"HashingVectorizer",
"LabelEncoder",
"MaxAbsScaler",
"MinMaxScaler",
"MultiHotEncoder",
"Normalizer",
"OneHotEncoder",
"OrdinalEncoder",
"PowerTransformer",
"RobustScaler",
"SimpleImputer",
"StandardScaler",
"Tokenizer",
]

View file

@ -4,7 +4,7 @@ import ray
from ray.air import Checkpoint
from ray.air.config import ScalingConfigDataClass
from ray.train import BaseTrainer
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air._internal.config import ensure_only_allowed_dataclass_keys_updated

View file

@ -8,7 +8,7 @@ from ray.air.config import DatasetConfig
from ray import train
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.air.preprocessors import BatchMapper
from ray.data.preprocessors import BatchMapper
@pytest.fixture

View file

@ -10,7 +10,7 @@ from transformers import (
from transformers.pipelines import pipeline
import ray
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.predictors.integrations.huggingface import HuggingFacePredictor
prompts = pd.DataFrame(

View file

@ -5,7 +5,7 @@ from ray.air.predictors.integrations.lightgbm import (
LightGBMPredictor,
to_air_checkpoint,
)
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
from ray.air._internal.checkpointing import save_preprocessor_to_dir

View file

@ -7,7 +7,7 @@ import pytest
import tempfile
from ray.air.predictors.integrations.rl.rl_predictor import RLPredictor
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.train.rl import RLTrainer

View file

@ -9,7 +9,7 @@ from sklearn.ensemble import RandomForestClassifier
import ray
import ray.cloudpickle as cpickle
from ray.air.predictors.integrations.sklearn import SklearnPredictor, to_air_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
from ray.air.batch_predictor import BatchPredictor

View file

@ -6,7 +6,7 @@ from ray.air.predictors.integrations.tensorflow import (
TensorflowPredictor,
to_air_checkpoint,
)
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
import numpy as np
import pandas as pd

View file

@ -5,7 +5,7 @@ import pandas as pd
import torch
from ray.air.predictors.integrations.torch import TorchPredictor, to_air_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import PREPROCESSOR_KEY, MODEL_KEY

View file

@ -1,6 +1,6 @@
import os
from ray.air.predictors.integrations.xgboost import XGBoostPredictor, to_air_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
import json

View file

@ -7,7 +7,7 @@ import sys
import ray
from ray import train
from ray.air.preprocessors import Chain, BatchMapper
from ray.data.preprocessors import Chain, BatchMapper
from ray.air.config import DatasetConfig
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.util.annotations import DeveloperAPI

View file

@ -12,8 +12,19 @@ SRCS = [] + select({
"//conditions:default": [],
})
py_test(
name = "test_preprocessors",
size = "small",
srcs = ["tests/test_preprocessors.py"],
tags = ["team:ml", "exclusive", "ray_air"],
deps = ["//:ray_lib"],
)
py_test_module_list(
files = glob(["tests/test_*.py"]),
files = glob(
include=["tests/test_*.py"],
exclude=["tests/test_preprocessors.py"]
),
size = "large",
extra_srcs = SRCS,
tags = ["team:core", "exclusive"],

View file

@ -29,6 +29,7 @@ from ray.data.dataset import Dataset
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data._internal.progress_bar import set_progress_bars
from ray.data._internal.compute import ActorPoolStrategy
from ray.data.preprocessor import Preprocessor
# Module-level cached global functions (for impl/compute). It cannot be defined
# in impl/compute since it has to be process-global across cloudpickled funcs.
@ -65,4 +66,5 @@ __all__ = [
"read_parquet",
"read_parquet_bulk",
"set_progress_bars",
"Preprocessor",
]

View file

@ -6,9 +6,9 @@ from ray.util.annotations import PublicAPI
if TYPE_CHECKING:
import pandas as pd
from ray.air.data_batch_type import DataBatchType
from ray.data import Dataset
from ray.air.data_batch_type import DataBatchType
@PublicAPI(stability="alpha")
@ -134,7 +134,7 @@ class Preprocessor(abc.ABC):
self._transform_stats = transformed_ds.stats()
return transformed_ds
def transform_batch(self, df: DataBatchType) -> DataBatchType:
def transform_batch(self, df: "DataBatchType") -> "DataBatchType":
"""Transform a single batch of data.
Args:
@ -171,7 +171,7 @@ class Preprocessor(abc.ABC):
# The default may be too small for some datasets and too large for others.
return dataset.map_batches(self._transform_pandas, batch_format="pandas")
def _transform_batch(self, df: DataBatchType) -> DataBatchType:
def _transform_batch(self, df: "DataBatchType") -> "DataBatchType":
import pandas as pd
# TODO(matt): Add `_transform_arrow` to use based on input type.

View file

@ -0,0 +1,44 @@
from ray.data.preprocessors.batch_mapper import BatchMapper
from ray.data.preprocessors.chain import Chain
from ray.data.preprocessors.custom_stateful import CustomStatefulPreprocessor
from ray.data.preprocessors.encoder import (
Categorizer,
LabelEncoder,
MultiHotEncoder,
OneHotEncoder,
OrdinalEncoder,
)
from ray.data.preprocessors.hasher import FeatureHasher
from ray.data.preprocessors.imputer import SimpleImputer
from ray.data.preprocessors.normalizer import Normalizer
from ray.data.preprocessors.scaler import (
StandardScaler,
MinMaxScaler,
MaxAbsScaler,
RobustScaler,
)
from ray.data.preprocessors.tokenizer import Tokenizer
from ray.data.preprocessors.transformer import PowerTransformer
from ray.data.preprocessors.vectorizer import CountVectorizer, HashingVectorizer
__all__ = [
"BatchMapper",
"Categorizer",
"CountVectorizer",
"Chain",
"CustomStatefulPreprocessor",
"FeatureHasher",
"HashingVectorizer",
"LabelEncoder",
"MaxAbsScaler",
"MinMaxScaler",
"MultiHotEncoder",
"Normalizer",
"OneHotEncoder",
"OrdinalEncoder",
"PowerTransformer",
"RobustScaler",
"SimpleImputer",
"StandardScaler",
"Tokenizer",
]

View file

@ -1,6 +1,6 @@
from typing import Callable, TYPE_CHECKING
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
if TYPE_CHECKING:
import pandas

View file

@ -1,5 +1,9 @@
from typing import TYPE_CHECKING
from ray.data import Dataset
from ray.air.preprocessor import Preprocessor, DataBatchType
from ray.data.preprocessor import Preprocessor
if TYPE_CHECKING:
from ray.air.data_batch_type import DataBatchType
class Chain(Preprocessor):
@ -61,7 +65,7 @@ class Chain(Preprocessor):
self._transform_stats = preprocessor.transform_stats()
return ds
def _transform_batch(self, df: DataBatchType) -> DataBatchType:
def _transform_batch(self, df: "DataBatchType") -> "DataBatchType":
for preprocessor in self.preprocessors:
df = preprocessor.transform_batch(df)
return df

View file

@ -1,6 +1,6 @@
from typing import Callable, TYPE_CHECKING, Dict
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.data import Dataset
if TYPE_CHECKING:
@ -22,7 +22,7 @@ class CustomStatefulPreprocessor(Preprocessor):
import pandas as pd
import ray.data
from pandas import DataFrame
from ray.air.preprocessors import CustomStatefulPreprocessor
from ray.data.preprocessors import CustomStatefulPreprocessor
from ray.data import Dataset
from ray.data.aggregate import Max

View file

@ -6,7 +6,7 @@ import pandas as pd
import pandas.api.types
from ray.data import Dataset
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class OrdinalEncoder(Preprocessor):
@ -25,7 +25,7 @@ class OrdinalEncoder(Preprocessor):
.. code-block:: python
import ray.data
from ray.air.preprocessors import OrdinalEncoder
from ray.data.preprocessors import OrdinalEncoder
import pandas as pd
batch = pd.DataFrame(
{
@ -202,7 +202,7 @@ class MultiHotEncoder(Preprocessor):
.. code-block:: python
import ray.data
from ray.air.preprocessors import MultiHotEncoder
from ray.data.preprocessors import MultiHotEncoder
import pandas as pd
mhe = MultiHotEncoder(columns=["A", "B"])
batch = pd.DataFrame(

View file

@ -3,9 +3,9 @@ from typing import List
import pandas as pd
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.air.preprocessors.utils import simple_hash
from ray.data.preprocessors.utils import simple_hash
class FeatureHasher(Preprocessor):

View file

@ -6,7 +6,7 @@ import pandas as pd
from ray.data import Dataset
from ray.data.aggregate import Mean
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class SimpleImputer(Preprocessor):

View file

@ -3,7 +3,7 @@ from typing import List
import numpy as np
import pandas as pd
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class Normalizer(Preprocessor):

View file

@ -5,7 +5,7 @@ import pandas as pd
from ray.data import Dataset
from ray.data.aggregate import Mean, Std, Min, Max, AbsMax
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class StandardScaler(Preprocessor):

View file

@ -2,8 +2,8 @@ from typing import List, Callable, Optional
import pandas as pd
from ray.air.preprocessor import Preprocessor
from ray.air.preprocessors.utils import simple_split_tokenizer
from ray.data.preprocessor import Preprocessor
from ray.data.preprocessors.utils import simple_split_tokenizer
class Tokenizer(Preprocessor):

View file

@ -3,7 +3,7 @@ from typing import List
import numpy as np
import pandas as pd
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class PowerTransformer(Preprocessor):

View file

@ -4,8 +4,8 @@ from typing import List, Callable, Optional
import pandas as pd
from ray.data import Dataset
from ray.air.preprocessor import Preprocessor
from ray.air.preprocessors.utils import simple_split_tokenizer, simple_hash
from ray.data.preprocessor import Preprocessor
from ray.data.preprocessors.utils import simple_split_tokenizer, simple_hash
class HashingVectorizer(Preprocessor):

View file

@ -8,8 +8,8 @@ import pandas as pd
import pytest
import ray
from pandas import DataFrame
from ray.air.preprocessor import PreprocessorNotFittedException
from ray.air.preprocessors import (
from ray.data.preprocessor import PreprocessorNotFittedException
from ray.data.preprocessors import (
BatchMapper,
StandardScaler,
MinMaxScaler,
@ -20,14 +20,14 @@ from ray.air.preprocessors import (
Chain,
CustomStatefulPreprocessor,
)
from ray.air.preprocessors.encoder import Categorizer, MultiHotEncoder
from ray.air.preprocessors.hasher import FeatureHasher
from ray.air.preprocessors.normalizer import Normalizer
from ray.air.preprocessors.scaler import MaxAbsScaler, RobustScaler
from ray.air.preprocessors.tokenizer import Tokenizer
from ray.air.preprocessors.transformer import PowerTransformer
from ray.air.preprocessors.utils import simple_split_tokenizer, simple_hash
from ray.air.preprocessors.vectorizer import CountVectorizer, HashingVectorizer
from ray.data.preprocessors.encoder import Categorizer, MultiHotEncoder
from ray.data.preprocessors.hasher import FeatureHasher
from ray.data.preprocessors.normalizer import Normalizer
from ray.data.preprocessors.scaler import MaxAbsScaler, RobustScaler
from ray.data.preprocessors.tokenizer import Tokenizer
from ray.data.preprocessors.transformer import PowerTransformer
from ray.data.preprocessors.utils import simple_split_tokenizer, simple_hash
from ray.data.preprocessors.vectorizer import CountVectorizer, HashingVectorizer
from ray.data import Dataset
from ray.data.aggregate import Max

View file

@ -7,7 +7,7 @@ from ray.air.config import DatasetConfig
if TYPE_CHECKING:
from ray.data import Dataset, DatasetPipeline
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
RayDataset = Union["Dataset", "DatasetPipeline"]

View file

@ -22,7 +22,7 @@ from ray.util.ml_utils.dict import merge_dicts
if TYPE_CHECKING:
from ray.data import Dataset
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
# A type representing either a ray.data.Dataset or a function that returns a
# ray.data.Dataset and accepts no arguments.
@ -57,7 +57,7 @@ class BaseTrainer(abc.ABC):
specified here.
- ``trainer.preprocess_datasets()``: The provided
ray.data.Dataset are preprocessed with the provided
ray.air.preprocessor.
ray.data.Preprocessor.
- ``trainer.train_loop()``: Executes the main training logic.
- Calling ``trainer.fit()`` will return a ``ray.result.Result``
object where you can access metrics from your training run, as well
@ -200,10 +200,10 @@ class BaseTrainer(abc.ABC):
)
# Preprocessor
if self.preprocessor is not None and not isinstance(
self.preprocessor, ray.air.preprocessor.Preprocessor
self.preprocessor, ray.data.Preprocessor
):
raise ValueError(
f"`preprocessor` should be an instance of `ray.air.Preprocessor`, "
f"`preprocessor` should be an instance of `ray.data.Preprocessor`, "
f"found {type(self.preprocessor)} with value `{self.preprocessor}`."
)

View file

@ -32,7 +32,7 @@ from ray.util.annotations import DeveloperAPI
from ray.util.ml_utils.checkpoint_manager import CheckpointStrategy, _TrackedCheckpoint
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
logger = logging.getLogger(__name__)
@ -217,7 +217,7 @@ class DataParallelTrainer(BaseTrainer):
dataset. If a ``preprocessor`` is provided and has not already been fit,
it will be fit on the training dataset. All datasets will be transformed
by the ``preprocessor`` if one is provided.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
"""

View file

@ -13,7 +13,7 @@ from ray.train.constants import MODEL_KEY, TRAIN_DATASET_KEY
if TYPE_CHECKING:
import xgboost_ray
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
def _convert_scaling_config_to_ray_params(
@ -58,7 +58,7 @@ class GBDTTrainer(BaseTrainer):
:class:`xgboost_ray.RayDMatrix` initializations.
scaling_config: Configuration for how to scale data parallel training.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
**train_kwargs: Additional kwargs passed to framework ``train()`` function.

View file

@ -9,7 +9,7 @@ from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.train.horovod.config import HorovodConfig
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
class HorovodTrainer(DataParallelTrainer):
@ -160,7 +160,7 @@ class HorovodTrainer(DataParallelTrainer):
dataset. If a ``preprocessor`` is provided and has not already been fit,
it will be fit on the training dataset. All datasets will be transformed
by the ``preprocessor`` if one is provided.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
"""

View file

@ -46,7 +46,7 @@ from ray.tune.trainable import Trainable
from ray.tune.utils.file_transfer import delete_on_node, sync_dir_between_nodes
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
# This trainer uses a special checkpoint syncing logic.
# Because HF checkpoints are very large dirs (at least several GBs),
@ -254,7 +254,7 @@ class HuggingFaceTrainer(TorchTrainer):
scaling_config: Configuration for how to scale data parallel training.
dataset_config: Configuration for dataset ingest.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
"""

View file

@ -12,7 +12,7 @@ import lightgbm_ray
from lightgbm_ray.tune import TuneReportCheckpointCallback
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
@PublicAPI(stability="alpha")
@ -61,7 +61,7 @@ class LightGBMTrainer(GBDTTrainer):
can be used to add sample weights with the ``weights`` parameter.
scaling_config: Configuration for how to scale data parallel training.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
**train_kwargs: Additional kwargs passed to ``lightgbm.train()`` function.

View file

@ -21,7 +21,7 @@ from ray.util.annotations import PublicAPI
from ray.util.ml_utils.dict import merge_dicts
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
RL_TRAINER_CLASS_FILE = "trainer_class.pkl"
RL_CONFIG_FILE = "config.pkl"

View file

@ -32,7 +32,7 @@ from sklearn.model_selection import BaseCrossValidator, cross_validate
from sklearn.model_selection._validation import _check_multimetric_scoring, _score
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
logger = logging.getLogger(__name__)
@ -152,7 +152,7 @@ class SklearnTrainer(BaseTrainer):
as the training is not distributed.
dataset_config: Configuration for dataset ingest.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
**fit_params: Additional kwargs passed to ``estimator.fit()``
method.

View file

@ -9,7 +9,7 @@ from ray.air.checkpoint import Checkpoint
from ray.util import PublicAPI
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
@PublicAPI(stability="alpha")
@ -153,7 +153,7 @@ class TensorflowTrainer(DataParallelTrainer):
dataset. If a ``preprocessor`` is provided and has not already been fit,
it will be fit on the training dataset. All datasets will be transformed
by the ``preprocessor`` if one is provided.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
"""

View file

@ -3,7 +3,7 @@ import pytest
import ray
from ray import tune
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from ray.train.trainer import BaseTrainer
from ray.util.placement_group import get_current_placement_group

View file

@ -5,8 +5,8 @@ from ray import train, tune
from ray.air.checkpoint import Checkpoint
from ray.train.constants import PREPROCESSOR_KEY
from ray.data.preprocessor import Preprocessor
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.air.preprocessor import Preprocessor
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner

View file

@ -9,8 +9,8 @@ from ray import tune
from ray.air.checkpoint import Checkpoint
from ray.train.constants import TRAIN_DATASET_KEY
from ray.data.preprocessor import Preprocessor
from ray.train.lightgbm import LightGBMTrainer, load_checkpoint
from ray.air.preprocessor import Preprocessor
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

View file

@ -7,7 +7,7 @@ from ray.air.checkpoint import Checkpoint
from ray.train.constants import TRAIN_DATASET_KEY
from ray.train.sklearn import SklearnTrainer, load_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

View file

@ -10,7 +10,7 @@ from ray.air.checkpoint import Checkpoint
from ray.train.constants import TRAIN_DATASET_KEY
from ray.train.xgboost import XGBoostTrainer, load_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

View file

@ -10,7 +10,7 @@ from ray.air._internal.torch_utils import load_torch_model
from ray.util import PublicAPI
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
@PublicAPI(stability="alpha")
@ -163,7 +163,7 @@ class TorchTrainer(DataParallelTrainer):
dataset. If a ``preprocessor`` is provided and has not already been fit,
it will be fit on the training dataset. All datasets will be transformed
by the ``preprocessor`` if one is provided.
preprocessor: A ``ray.air.preprocessor.Preprocessor`` to preprocess the
preprocessor: A ``ray.data.Preprocessor`` to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
"""

View file

@ -12,7 +12,7 @@ import xgboost_ray
from xgboost_ray.tune import TuneReportCheckpointCallback
if TYPE_CHECKING:
from ray.air.preprocessor import Preprocessor
from ray.data.preprocessor import Preprocessor
@PublicAPI(stability="alpha")
@ -57,7 +57,7 @@ class XGBoostTrainer(GBDTTrainer):
be used to add sample weights with the ``weights`` parameter.
scaling_config: Configuration for how to scale data parallel training.
run_config: Configuration for the execution of the training run.
preprocessor: A ray.air.preprocessor.Preprocessor to preprocess the
preprocessor: A ray.data.Preprocessor to preprocess the
provided datasets.
resume_from_checkpoint: A checkpoint to resume training from.
**train_kwargs: Additional kwargs passed to ``xgboost.train()`` function.