diff --git a/doc/source/data/doc_code/transforming_datasets.py b/doc/source/data/doc_code/transforming_datasets.py new file mode 100644 index 000000000..ba75d81a5 --- /dev/null +++ b/doc/source/data/doc_code/transforming_datasets.py @@ -0,0 +1,112 @@ +# flake8: noqa + +# fmt: off +# __dataset_transformation_begin__ +import ray +import pandas + +# Create a dataset from file with Iris data. +# Tip: "example://" is a convenient protocol to access the +# python/ray/data/examples/data directory. +ds = ray.data.read_csv("example://iris.csv") +# Dataset(num_blocks=1, num_rows=150, +# schema={sepal.length: float64, sepal.width: float64, +# petal.length: float64, petal.width: float64, variety: object}) +ds.show(3) +# -> {'sepal.length': 5.1, 'sepal.width': 3.5, +# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'} +# -> {'sepal.length': 4.9, 'sepal.width': 3.0, +# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'} +# -> {'sepal.length': 4.7, 'sepal.width': 3.2, +# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'} + +# Repartition the dataset to 5 blocks. +ds = ds.repartition(5) +# Dataset(num_blocks=5, num_rows=150, +# schema={sepal.length: double, sepal.width: double, +# petal.length: double, petal.width: double, variety: string}) + +# Find rows with sepal.length < 5.5 and petal.length > 3.5. +def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame: + return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)] + +# Map processing the dataset. +ds.map_batches(transform_batch).show() +# -> {'sepal.length': 5.2, 'sepal.width': 2.7, +# 'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'} +# -> {'sepal.length': 5.4, 'sepal.width': 3.0, +# 'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'} +# -> {'sepal.length': 4.9, 'sepal.width': 2.5, +# 'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'} + +# Split the dataset into 2 datasets +ds.split(2) +# -> [Dataset(num_blocks=3, num_rows=90, +# schema={sepal.length: double, sepal.width: double, +# petal.length: double, petal.width: double, variety: string}), +# Dataset(num_blocks=2, num_rows=60, +# schema={sepal.length: double, sepal.width: double, +# petal.length: double, petal.width: double, variety: string})] + +# Sort the dataset by sepal.length. +ds = ds.sort("sepal.length") +ds.show(3) +# -> {'sepal.length': 4.3, 'sepal.width': 3.0, +# 'petal.length': 1.1, 'petal.width': 0.1, 'variety': 'Setosa'} +# -> {'sepal.length': 4.4, 'sepal.width': 2.9, +# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'} +# -> {'sepal.length': 4.4, 'sepal.width': 3.0, +# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'} + +# Shuffle the dataset. +ds = ds.random_shuffle() +ds.show(3) +# -> {'sepal.length': 6.7, 'sepal.width': 3.1, +# 'petal.length': 4.4, 'petal.width': 1.4, 'variety': 'Versicolor'} +# -> {'sepal.length': 6.7, 'sepal.width': 3.3, +# 'petal.length': 5.7, 'petal.width': 2.1, 'variety': 'Virginica'} +# -> {'sepal.length': 4.5, 'sepal.width': 2.3, +# 'petal.length': 1.3, 'petal.width': 0.3, 'variety': 'Setosa'} + +# Group by the variety. +ds.groupby("variety").count().show() +# -> {'variety': 'Setosa', 'count()': 50} +# -> {'variety': 'Versicolor', 'count()': 50} +# -> {'variety': 'Virginica', 'count()': 50} +# __dataset_transformation_end__ +# fmt: on + +# fmt: off +# __dataset_compute_strategy_begin__ +import ray +import pandas +import numpy +from ray.data import ActorPoolStrategy + +# Dummy model to predict Iris variety. +def predict_iris(df: pandas.DataFrame) -> pandas.DataFrame: + conditions = [ + (df["sepal.length"] < 5.0), + (df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0), + (df["sepal.length"] >= 6.0) + ] + values = ["Setosa", "Versicolor", "Virginica"] + return pandas.DataFrame({"predicted_variety": numpy.select(conditions, values)}) + +class IrisInferModel: + def __init__(self): + self._model = predict_iris + + def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame: + return self._model(batch) + +ds = ray.data.read_csv("example://iris.csv").repartition(10) + +# Batch inference processing with Ray tasks (the default compute strategy). +predicted = ds.map_batches(predict_iris) + +# Batch inference processing with Ray actors. Autoscale the actors between 3 and 10. +predicted = ds.map_batches( + IrisInferModel, compute=ActorPoolStrategy(3, 10), batch_size=256) +# __dataset_compute_strategy_end__ +# fmt: on diff --git a/doc/source/data/saving-datasets.rst b/doc/source/data/saving-datasets.rst index 07a070db5..590971e4c 100644 --- a/doc/source/data/saving-datasets.rst +++ b/doc/source/data/saving-datasets.rst @@ -6,7 +6,7 @@ Saving Datasets Datasets can be written to local or remote storage in the desired data format. The supported formats include Parquet, CSV, JSON, NumPy. To control the number -of output files, you may use :meth:`ds.repartition() >` +of output files, you may use :meth:`ds.repartition() ` to repartition the Dataset before writing out. .. tabbed:: Parquet diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index 9cfb65981..6f4261880 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -4,64 +4,85 @@ Transforming Datasets ===================== -Datasets can be transformed in parallel using ``.map_batches()``. Ray will transform -batches of records in the Dataset using the given function. The function must return -a batch of records. You are allowed to filter or add additional records to the batch, -which will change the size of the Dataset. +The Ray Datasets transformations take in datasets and produce new datasets. +For example, *map* is a transformation that applies a user-defined function (UDF) +on each row of input dataset and returns a new dataset as result. The Datasets +transformations are **composable**. Operations can be further applied on the result +dataset, forming a chain of transformations to express more complex computations. +Transformations are the core for expressing business logic in Datasets. -Transformations are executed *eagerly* and block until the operation is finished. +Transformations +--------------- -.. code-block:: python +In general, we have two types of transformations: - def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame: - return df.applymap(lambda x: x * 2) +* **One-to-one transformations:** each input block will contribute to only one output + block, such as :meth:`ds.map_batches() `. In other + systems this may be called narrow transformations. +* **All-to-all transformations:** input blocks can contribute to multiple output blocks, + such as :meth:`ds.random_shuffle() `. In other + systems this may be called wide transformations. - ds = ray.data.range_table(10000) - ds = ds.map_batches(transform_batch, batch_format="pandas") - # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s] - ds.take(5) - # -> [{'value': 0}, {'value': 2}, ...] +Here is a table listing some common transformations supported by Ray Datasets. -The batch format can be specified using ``batch_format`` option, which defaults to "native", -meaning pandas format for Arrow-compatible batches, and Python lists for other types. You -can also specify explicitly "arrow" or "pandas" to force a conversion to that batch format. -The batch size can also be chosen. If not given, the batch size will default to entire blocks. +.. list-table:: Common Ray Datasets transformations. + :header-rows: 1 + + * - Transformation + - Type + - Description + * - :meth:`ds.map_batches() ` + - One-to-one + - Apply a given function to batches of records of this dataset. + * - :meth:`ds.split() ` + - One-to-one + - | Split the dataset into N disjoint pieces. + * - :meth:`ds.repartition(shuffle=False) ` + - One-to-one + - | Repartition the dataset into N blocks, without shuffling the data. + * - :meth:`ds.repartition(shuffle=True) ` + - All-to-all + - | Repartition the dataset into N blocks, shuffling the data during repartition. + * - :meth:`ds.random_shuffle() ` + - All-to-all + - | Randomly shuffle the elements of this dataset. + * - :meth:`ds.sort() ` + - All-to-all + - | Sort the dataset by a sortkey. + * - :meth:`ds.groupby() ` + - All-to-all + - | Group the dataset by a groupkey. .. tip:: - Datasets also provides the convenience methods ``map``, ``flat_map``, and ``filter``, which are not vectorized (slower than ``map_batches``), but may be useful for development. + Datasets also provides the convenience transformation methods :meth:`ds.map() `, + :meth:`ds.flat_map() `, and :meth:`ds.filter() `, + which are not vectorized (slower than :meth:`ds.map_batches() `), but + may be useful for development. -By default, transformations are executed using Ray tasks. -For transformations that require setup, specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your transforms. -For a fixed-size actor pool, specify ``ActorPoolStrategy(n, n)``. -The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data: +The following is an example to make use of those transformation APIs for processing +the Iris dataset. -.. code-block:: python +.. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __dataset_transformation_begin__ + :end-before: __dataset_transformation_end__ - from ray.data import ActorPoolStrategy +Compute Strategy +---------------- - # Example of GPU batch inference on an ImageNet model. - def preprocess(image: bytes) -> bytes: - return image +Datasets transformations are executed by either :ref:`Ray tasks ` +or :ref:`Ray actors ` across a Ray cluster. By default, Ray tasks are +used (with ``compute="tasks"``). For transformations that require expensive setup, +it's preferrable to use Ray actors, which are stateful and allows setup to be reused +for efficiency. You can specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and +Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your +transforms. For a fixed-size actor pool, just specify ``ActorPoolStrategy(n, n)``. - class BatchInferModel: - def __init__(self): - self.model = ImageNetModel() - def __call__(self, batch: pd.DataFrame) -> pd.DataFrame: - return self.model(batch) +The following is an example of using the Ray tasks and actors compute strategy +for batch inference: - ds = ray.data.read_binary_files("s3://bucket/image-dir") - - # Preprocess the data. - ds = ds.map(preprocess) - # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s] - - # Apply GPU batch inference with actors, and assign each actor a GPU using - # ``num_gpus=1`` (any Ray remote decorator argument can be used here). - ds = ds.map_batches( - BatchInferModel, compute=ActorPoolStrategy(10, 20), - batch_size=256, num_gpus=1) - # -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s] - - # Save the results. - ds.repartition(1).write_json("s3://bucket/inference-results") +.. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __dataset_compute_strategy_begin__ + :end-before: __dataset_compute_strategy_end__