2022-03-18 11:25:43 -07:00
.. _transforming_datasets:
=====================
Transforming Datasets
=====================
2022-07-24 19:48:29 -07:00
Datasets transformations take in datasets and produce new datasets.
2022-05-20 19:25:06 -07:00
For example, *map* is a transformation that applies a user-defined function (UDF)
2022-07-24 19:48:29 -07:00
on each dataset record and returns a new dataset as the result. Datasets
transformations can be composed to express a chain of computations.
2022-05-20 19:25:06 -07:00
2022-06-17 16:01:02 -07:00
.. _transform_datasets_transformations:
---------------
2022-05-20 19:25:06 -07:00
Transformations
---------------
2022-07-24 19:48:29 -07:00
There are two main types of transformations:
2022-05-20 19:25:06 -07:00
2022-07-24 19:48:29 -07:00
* One-to-one: each input block will contribute to only one output
block, such as :meth: `ds.map_batches() <ray.data.Dataset.map_batches>` .
* All-to-all: input blocks can contribute to multiple output blocks,
such as :meth: `ds.random_shuffle() <ray.data.Dataset.random_shuffle>` .
2022-05-20 19:25:06 -07:00
Here is a table listing some common transformations supported by Ray Datasets.
.. list-table :: Common Ray Datasets transformations.
:header-rows: 1
* - Transformation
- Type
- Description
* - :meth: `ds.map_batches() <ray.data.Dataset.map_batches>`
- One-to-one
- Apply a given function to batches of records of this dataset.
2022-07-24 19:48:29 -07:00
* - :meth: `ds.add_column() <ray.data.Dataset.add_column>`
- One-to-one
- Apply a given function to batches of records to create a new column.
* - :meth: `ds.drop_columns() <ray.data.Dataset.add_column>`
- One-to-one
- Drop the given columns from the dataset.
2022-05-20 19:25:06 -07:00
* - :meth: `ds.split() <ray.data.Dataset.split>`
- One-to-one
- | Split the dataset into N disjoint pieces.
* - :meth: `ds.repartition(shuffle=False) <ray.data.Dataset.repartition>`
- One-to-one
- | Repartition the dataset into N blocks, without shuffling the data.
* - :meth: `ds.repartition(shuffle=True) <ray.data.Dataset.repartition>`
- All-to-all
- | Repartition the dataset into N blocks, shuffling the data during repartition.
* - :meth: `ds.random_shuffle() <ray.data.Dataset.random_shuffle>`
- All-to-all
- | Randomly shuffle the elements of this dataset.
* - :meth: `ds.sort() <ray.data.Dataset.sort>`
- All-to-all
- | Sort the dataset by a sortkey.
* - :meth: `ds.groupby() <ray.data.Dataset.groupby>`
- All-to-all
- | Group the dataset by a groupkey.
2022-04-09 12:09:08 -07:00
.. tip ::
2022-05-20 19:25:06 -07:00
Datasets also provides the convenience transformation methods :meth: `ds.map() <ray.data.Dataset.map>` ,
:meth: `ds.flat_map() <ray.data.Dataset.flat_map>` , and :meth: `ds.filter() <ray.data.Dataset.filter>` ,
which are not vectorized (slower than :meth: `ds.map_batches() <ray.data.Dataset.map_batches>` ), but
may be useful for development.
The following is an example to make use of those transformation APIs for processing
the Iris dataset.
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __dataset_transformation_begin__
:end-before: __dataset_transformation_end__
2022-06-17 16:01:02 -07:00
.. _transform_datasets_writing_udfs:
------------
2022-06-02 20:00:50 -07:00
Writing UDFs
------------
User-defined functions (UDFs) are routines that apply on one row (e.g.
:meth: `.map() <ray.data.Dataset.map>` ) or a batch of rows (e.g.
:meth: `.map_batches() <ray.data.Dataset.map_batches>` ) of a dataset. UDFs let you
express your customized business logic in transformations. Here we will focus on
:meth: `.map_batches() <ray.data.Dataset.map_batches>` as it's the primary mapping
API in Datasets.
2022-07-24 19:48:29 -07:00
Here are the basics that you need to know about UDFs:
* A UDF can be either a function, or if using the :ref: `actor compute strategy <transform_datasets_compute_strategy>` , a :ref: `callable class <transform_datasets_callable_classes>` .
* Select the UDF input :ref: `batch format <transform_datasets_batch_formats>` using the `` batch_format `` argument.
* The UDF output type determines the Dataset schema of the transformation result.
.. _transform_datasets_callable_classes:
Callable Class UDFs
===================
When using the actor compute strategy, per-row and per-batch UDFs can also be
*callable classes* , i.e. classes that implement the `` __call__ `` magic method. The
constructor of the class can be used for stateful setup, and will be only invoked once
per worker actor.
.. note ::
These transformation APIs take the uninstantiated callable class as an argument,
not an instance of the class.
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_callable_classes_udfs_begin__
:end-before: __writing_callable_classes_udfs_end__
2022-06-17 16:01:02 -07:00
.. _transform_datasets_batch_formats:
2022-07-24 19:48:29 -07:00
UDF Input Batch Format
======================
2022-06-17 16:01:02 -07:00
2022-07-24 19:48:29 -07:00
Choose the *batch format* of the data given to UDFs
by setting the `` batch_format `` option of :meth: `.map_batches() <ray.data.Dataset.map_batches>` .
Here is an overview of the available batch formats:
2022-06-17 16:01:02 -07:00
.. tabbed :: "native" (default)
2022-07-24 19:48:29 -07:00
The "native" batch format presents data as follows for each Dataset type:
2022-06-17 16:01:02 -07:00
2022-07-24 19:48:29 -07:00
* **Tabular Datasets** : Each batch will be a
2022-06-17 16:01:02 -07:00
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html> `__ .
2022-07-24 19:48:29 -07:00
This may incur a conversion cost if the underlying Dataset block is not
zero-copy convertible from an Arrow table.
* **Tensor Datasets** (single-column): Each batch will be a single
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html> `__
containing the single tensor column for this batch.
* **Simple Datasets** : Each batch will be a Python list.
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_native_udfs_begin__
:end-before: __writing_native_udfs_end__
.. tabbed :: "pandas"
2022-07-24 19:48:29 -07:00
The `` "pandas" `` batch format presents batches in
2022-06-17 16:01:02 -07:00
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html> `__
format. If converting a simple dataset to Pandas DataFrame batches, a single-column
2022-07-24 19:48:29 -07:00
dataframe with the column `` "__value__" `` will be created.
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_pandas_udfs_begin__
:end-before: __writing_pandas_udfs_end__
.. tabbed :: "pyarrow"
2022-07-24 19:48:29 -07:00
The `` "pyarrow" `` batch format presents batches in
2022-06-17 16:01:02 -07:00
`pyarrow.Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html> `__
format. If converting a simple dataset to Arrow Table batches, a single-column table
2022-07-24 19:48:29 -07:00
with the column `` "__value__" `` will be created.
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_arrow_udfs_begin__
:end-before: __writing_arrow_udfs_end__
.. tabbed :: "numpy"
2022-07-24 19:48:29 -07:00
The `` "numpy" `` batch format presents batches in
2022-06-17 16:01:02 -07:00
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html> `__
2022-07-24 19:48:29 -07:00
format as follows:
2022-06-17 16:01:02 -07:00
2022-07-24 19:48:29 -07:00
* **Tabular Datasets** : Each batch will be a dictionary of NumPy
2022-06-17 16:01:02 -07:00
ndarrays (`` Dict[str, np.ndarray] `` ), with each key-value pair representing a column
in the table.
2022-07-24 19:48:29 -07:00
* **Tensor Datasets** (single-column): Each batch will be a single
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html> `__
2022-08-01 17:31:41 -07:00
containing the single tensor column for this batch.
2022-06-17 16:01:02 -07:00
2022-07-24 19:48:29 -07:00
* **Simple Datasets** : Each batch will be a single NumPy ndarray, where Datasets will
2022-06-17 16:01:02 -07:00
attempt to convert each list-batch to an ndarray.
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_numpy_udfs_begin__
:end-before: __writing_numpy_udfs_end__
.. tip ::
2022-07-24 19:48:29 -07:00
Prefer using vectorized operations on the `` pandas.DataFrame `` ,
`` pyarrow.Table `` , and `` numpy.ndarray `` types for better performance. For
2022-06-17 16:01:02 -07:00
example, suppose you want to compute the sum of a column in `` pandas.DataFrame `` :
instead of iterating over each row of a batch and summing up values of that column,
2022-07-24 19:48:29 -07:00
use `` df_batch["col_foo"].sum() `` .
2022-06-02 20:00:50 -07:00
2022-06-17 16:01:02 -07:00
.. _transform_datasets_batch_output_types:
2022-06-02 20:00:50 -07:00
2022-06-17 16:01:02 -07:00
Batch UDF Output Types
======================
2022-07-24 19:48:29 -07:00
The following output types are allowed for batch UDFs (e.g.,
:meth: `ds.map_batches() <ray.data.Dataset.map_batches>` ). The following describes
how they are interpreted to create the transformation result:
2022-06-17 16:01:02 -07:00
.. tabbed :: pd.DataFrame
2022-07-24 19:48:29 -07:00
Returning `` pd.DataFrame `` creates a Tabular dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_pandas_out_udfs_begin__
:end-before: __writing_pandas_out_udfs_end__
.. tabbed :: pa.Table
2022-07-24 19:48:29 -07:00
Returning `` pa.Table `` creates a Tabular dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_arrow_out_udfs_begin__
:end-before: __writing_arrow_out_udfs_end__
.. tabbed :: np.ndarray
2022-07-24 19:48:29 -07:00
Returning `` np.ndarray `` creates a single-column Tensor dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_numpy_out_udfs_begin__
:end-before: __writing_numpy_out_udfs_end__
.. tabbed :: Dict[str, np.ndarray]
2022-07-24 19:48:29 -07:00
Returning `` Dict[str, np.ndarray] `` creates a multi-column Tensor dataset as the transformation result.
2022-06-17 16:01:02 -07:00
If a column tensor is 1-dimensional, then the native Arrow 1D list
2022-07-24 19:48:29 -07:00
type is used; if a column tensor has 2 or more dimensions, then the Dataset
2022-06-17 16:01:02 -07:00
:ref: `tensor extension type <dataset-tensor-extension-api>` to embed these
n-dimensional tensors in the Arrow table.
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_numpy_dict_out_udfs_begin__
:end-before: __writing_numpy_dict_out_udfs_end__
.. tabbed :: list
2022-07-24 19:48:29 -07:00
Returning `` list `` creates a simple Python object dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_simple_out_udfs_begin__
:end-before: __writing_simple_out_udfs_end__
.. _transform_datasets_row_output_types:
Row UDF Output Types
====================
2022-07-24 19:48:29 -07:00
The following output types are allowed for per-row UDFs (e.g.,
:meth: `ds.map() <ray.data.Dataset.map>` ):
2022-06-02 20:00:50 -07:00
2022-06-17 16:01:02 -07:00
.. tabbed :: dict
2022-07-24 19:48:29 -07:00
Returning a `` dict `` of Arrow-compatible data types creates a Tabular dataset
as the transformation result. If any dict values are not Arrow-compatible, then
a simple Python object dataset will be created:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_dict_out_row_udfs_begin__
:end-before: __writing_dict_out_row_udfs_end__
.. tabbed :: np.ndarray
2022-07-24 19:48:29 -07:00
Returning `` np.ndarray `` creates a single-column Tensor dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_numpy_out_row_udfs_begin__
:end-before: __writing_numpy_out_row_udfs_end__
2022-07-24 19:48:29 -07:00
.. tabbed :: object
2022-06-17 16:01:02 -07:00
2022-07-24 19:48:29 -07:00
Other return row types will create a simple Python object dataset as the transformation result:
2022-06-17 16:01:02 -07:00
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __writing_simple_out_row_udfs_begin__
:end-before: __writing_simple_out_row_udfs_end__
.. _transform_datasets_compute_strategy:
----------------
2022-05-20 19:25:06 -07:00
Compute Strategy
----------------
Datasets transformations are executed by either :ref: `Ray tasks <ray-remote-functions>`
or :ref: `Ray actors <actor-guide>` across a Ray cluster. By default, Ray tasks are
used (with `` compute="tasks" `` ). For transformations that require expensive setup,
2022-05-22 12:11:51 -07:00
it's preferrable to use Ray actors, which are stateful and allow setup to be reused
2022-05-20 19:25:06 -07:00
for efficiency. You can specify `` compute=ray.data.ActorPoolStrategy(min, max) `` and
Ray will use an autoscaling actor pool of `` min `` to `` max `` actors to execute your
2022-07-24 19:48:29 -07:00
transforms. For a fixed-size actor pool, specify `` ActorPoolStrategy(n, n) `` .
2022-05-20 19:25:06 -07:00
The following is an example of using the Ray tasks and actors compute strategy
for batch inference:
.. literalinclude :: ./doc_code/transforming_datasets.py
:language: python
:start-after: __dataset_compute_strategy_begin__
:end-before: __dataset_compute_strategy_end__