Revamp the Transforming Datasets user guide (#25033)

This commit is contained in:
Jian Xiao 2022-05-20 19:25:06 -07:00 committed by GitHub
parent e5838c4700
commit ad842ec9ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 181 additions and 48 deletions

View file

@ -0,0 +1,112 @@
# flake8: noqa
# fmt: off
# __dataset_transformation_begin__
import ray
import pandas
# Create a dataset from file with Iris data.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
ds.show(3)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}
# Repartition the dataset to 5 blocks.
ds = ds.repartition(5)
# Dataset(num_blocks=5, num_rows=150,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string})
# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]
# Map processing the dataset.
ds.map_batches(transform_batch).show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
# 'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
# 'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
# 'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}
# Split the dataset into 2 datasets
ds.split(2)
# -> [Dataset(num_blocks=3, num_rows=90,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# Dataset(num_blocks=2, num_rows=60,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string})]
# Sort the dataset by sepal.length.
ds = ds.sort("sepal.length")
ds.show(3)
# -> {'sepal.length': 4.3, 'sepal.width': 3.0,
# 'petal.length': 1.1, 'petal.width': 0.1, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 2.9,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 3.0,
# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}
# Shuffle the dataset.
ds = ds.random_shuffle()
ds.show(3)
# -> {'sepal.length': 6.7, 'sepal.width': 3.1,
# 'petal.length': 4.4, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 6.7, 'sepal.width': 3.3,
# 'petal.length': 5.7, 'petal.width': 2.1, 'variety': 'Virginica'}
# -> {'sepal.length': 4.5, 'sepal.width': 2.3,
# 'petal.length': 1.3, 'petal.width': 0.3, 'variety': 'Setosa'}
# Group by the variety.
ds.groupby("variety").count().show()
# -> {'variety': 'Setosa', 'count()': 50}
# -> {'variety': 'Versicolor', 'count()': 50}
# -> {'variety': 'Virginica', 'count()': 50}
# __dataset_transformation_end__
# fmt: on
# fmt: off
# __dataset_compute_strategy_begin__
import ray
import pandas
import numpy
from ray.data import ActorPoolStrategy
# Dummy model to predict Iris variety.
def predict_iris(df: pandas.DataFrame) -> pandas.DataFrame:
conditions = [
(df["sepal.length"] < 5.0),
(df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0),
(df["sepal.length"] >= 6.0)
]
values = ["Setosa", "Versicolor", "Virginica"]
return pandas.DataFrame({"predicted_variety": numpy.select(conditions, values)})
class IrisInferModel:
def __init__(self):
self._model = predict_iris
def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
return self._model(batch)
ds = ray.data.read_csv("example://iris.csv").repartition(10)
# Batch inference processing with Ray tasks (the default compute strategy).
predicted = ds.map_batches(predict_iris)
# Batch inference processing with Ray actors. Autoscale the actors between 3 and 10.
predicted = ds.map_batches(
IrisInferModel, compute=ActorPoolStrategy(3, 10), batch_size=256)
# __dataset_compute_strategy_end__
# fmt: on

View file

@ -6,7 +6,7 @@ Saving Datasets
Datasets can be written to local or remote storage in the desired data format.
The supported formats include Parquet, CSV, JSON, NumPy. To control the number
of output files, you may use :meth:`ds.repartition() <ray.data.Dataset.repartition>>`
of output files, you may use :meth:`ds.repartition() <ray.data.Dataset.repartition>`
to repartition the Dataset before writing out.
.. tabbed:: Parquet

View file

@ -4,64 +4,85 @@
Transforming Datasets
=====================
Datasets can be transformed in parallel using ``.map_batches()``. Ray will transform
batches of records in the Dataset using the given function. The function must return
a batch of records. You are allowed to filter or add additional records to the batch,
which will change the size of the Dataset.
The Ray Datasets transformations take in datasets and produce new datasets.
For example, *map* is a transformation that applies a user-defined function (UDF)
on each row of input dataset and returns a new dataset as result. The Datasets
transformations are **composable**. Operations can be further applied on the result
dataset, forming a chain of transformations to express more complex computations.
Transformations are the core for expressing business logic in Datasets.
Transformations are executed *eagerly* and block until the operation is finished.
Transformations
---------------
.. code-block:: python
In general, we have two types of transformations:
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df.applymap(lambda x: x * 2)
* **One-to-one transformations:** each input block will contribute to only one output
block, such as :meth:`ds.map_batches() <ray.data.Dataset.map_batches>`. In other
systems this may be called narrow transformations.
* **All-to-all transformations:** input blocks can contribute to multiple output blocks,
such as :meth:`ds.random_shuffle() <ray.data.Dataset.random_shuffle>`. In other
systems this may be called wide transformations.
ds = ray.data.range_table(10000)
ds = ds.map_batches(transform_batch, batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [{'value': 0}, {'value': 2}, ...]
Here is a table listing some common transformations supported by Ray Datasets.
The batch format can be specified using ``batch_format`` option, which defaults to "native",
meaning pandas format for Arrow-compatible batches, and Python lists for other types. You
can also specify explicitly "arrow" or "pandas" to force a conversion to that batch format.
The batch size can also be chosen. If not given, the batch size will default to entire blocks.
.. list-table:: Common Ray Datasets transformations.
:header-rows: 1
* - Transformation
- Type
- Description
* - :meth:`ds.map_batches() <ray.data.Dataset.map_batches>`
- One-to-one
- Apply a given function to batches of records of this dataset.
* - :meth:`ds.split() <ray.data.Dataset.split>`
- One-to-one
- | Split the dataset into N disjoint pieces.
* - :meth:`ds.repartition(shuffle=False) <ray.data.Dataset.repartition>`
- One-to-one
- | Repartition the dataset into N blocks, without shuffling the data.
* - :meth:`ds.repartition(shuffle=True) <ray.data.Dataset.repartition>`
- All-to-all
- | Repartition the dataset into N blocks, shuffling the data during repartition.
* - :meth:`ds.random_shuffle() <ray.data.Dataset.random_shuffle>`
- All-to-all
- | Randomly shuffle the elements of this dataset.
* - :meth:`ds.sort() <ray.data.Dataset.sort>`
- All-to-all
- | Sort the dataset by a sortkey.
* - :meth:`ds.groupby() <ray.data.Dataset.groupby>`
- All-to-all
- | Group the dataset by a groupkey.
.. tip::
Datasets also provides the convenience methods ``map``, ``flat_map``, and ``filter``, which are not vectorized (slower than ``map_batches``), but may be useful for development.
Datasets also provides the convenience transformation methods :meth:`ds.map() <ray.data.Dataset.map>`,
:meth:`ds.flat_map() <ray.data.Dataset.flat_map>`, and :meth:`ds.filter() <ray.data.Dataset.filter>`,
which are not vectorized (slower than :meth:`ds.map_batches() <ray.data.Dataset.map_batches>`), but
may be useful for development.
By default, transformations are executed using Ray tasks.
For transformations that require setup, specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your transforms.
For a fixed-size actor pool, specify ``ActorPoolStrategy(n, n)``.
The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data:
The following is an example to make use of those transformation APIs for processing
the Iris dataset.
.. code-block:: python
.. literalinclude:: ./doc_code/transforming_datasets.py
:language: python
:start-after: __dataset_transformation_begin__
:end-before: __dataset_transformation_end__
from ray.data import ActorPoolStrategy
Compute Strategy
----------------
# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
return image
Datasets transformations are executed by either :ref:`Ray tasks <ray-remote-functions>`
or :ref:`Ray actors <actor-guide>` across a Ray cluster. By default, Ray tasks are
used (with ``compute="tasks"``). For transformations that require expensive setup,
it's preferrable to use Ray actors, which are stateful and allows setup to be reused
for efficiency. You can specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and
Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your
transforms. For a fixed-size actor pool, just specify ``ActorPoolStrategy(n, n)``.
class BatchInferModel:
def __init__(self):
self.model = ImageNetModel()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.model(batch)
The following is an example of using the Ray tasks and actors compute strategy
for batch inference:
ds = ray.data.read_binary_files("s3://bucket/image-dir")
# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(
BatchInferModel, compute=ActorPoolStrategy(10, 20),
batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]
# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")
.. literalinclude:: ./doc_code/transforming_datasets.py
:language: python
:start-after: __dataset_compute_strategy_begin__
:end-before: __dataset_compute_strategy_end__