Ray Datasets is available in early preview at ``ray.experimental.data``.
Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as ``map``, ``filter``, and ``repartition``, and are compatible with a variety of file formats, datasources, and distributed frameworks.
Ray Datasets implement `"Distributed Arrow" <https://arrow.apache.org/>`__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either `Arrow table <https://arrow.apache.org/docs/python/data.html#tables>`__ format or in a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data.
The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:
Since a Ray Dataset is just a list of Ray object references, it can be freely passed between Ray tasks, actors, and libraries like any other object reference. This flexibility is a unique characteristic of Ray Datasets.
Compared to `Spark RDDs <https://spark.apache.org/docs/latest/rdd-programming-guide.html>`__ and `Dask Bags <https://docs.dask.org/en/latest/bag.html>`__, Datasets offers a more basic set of features, and executes operations eagerly for simplicity. It is intended that users cast Datasets into more featureful dataframe types (e.g., ``ds.to_dask()``) for advanced operations.
Datasource Compatibility Matrices
---------------------------------
..list-table:: Input compatibility matrix
:header-rows:1
* - Input Type
- Read API
- Status
* - CSV File Format
-``ray.data.read_csv()``
- ✅
* - JSON File Format
-``ray.data.read_json()``
- ✅
* - Parquet File Format
-``ray.data.read_parquet()``
- ✅
* - Binary Files
-``ray.data.read_binary_files()``
- ✅
* - Spark Dataframe
-``ray.data.from_spark()``
- (todo)
* - Dask Dataframe
-``ray.data.from_dask()``
- ✅
* - Modin Dataframe
-``ray.data.from_modin()``
- (todo)
* - MARS Dataframe
-``ray.data.from_mars()``
- (todo)
* - Pandas Dataframe Objects
-``ray.data.from_pandas()``
- ✅
* - Arrow Table Objects
-``ray.data.from_arrow()``
- ✅
* - Custom Datasource
-``ray.data.read_datasource()``
- ✅
..list-table:: Output compatibility matrix
:header-rows:1
* - Output Type
- Dataset API
- Status
* - CSV File Format
-``ds.write_csv()``
- ✅
* - JSON File Format
-``ds.write_json()``
- ✅
* - Parquet File Format
-``ds.write_parquet()``
- ✅
* - Spark Dataframe
-``ds.to_spark()``
- (todo)
* - Dask Dataframe
-``ds.to_dask()``
- ✅
* - Modin Dataframe
-``ds.to_modin()``
- (todo)
* - MARS Dataframe
-``ds.to_mars()``
- (todo)
* - Arrow Table Objects
-``ds.to_arrow()``
- ✅
* - Arrow Table Iterator
-``ds.iter_batches(batch_format="pyarrow")``
- ✅
* - Pandas Dataframe Objects
-``ds.to_pandas()``
- ✅
* - Pandas Dataframe Iterator
-``ds.iter_batches(batch_format="pandas")``
- ✅
* - PyTorch Iterable Dataset
-``ds.to_torch()``
- ✅
* - TensorFlow Iterable Dataset
-``ds.to_tf()``
- ✅
* - Custom Datasource
-``ds.write_datasource()``
- ✅
Creating Datasets
-----------------
Get started by creating Datasets from synthetic data using ``ray.data.range()`` and ``ray.data.from_items()``. Datasets can hold either plain Python objects (schema is a Python type), or Arrow records (schema is Arrow).
Datasets can be created from files on local disk or remote datasources such as S3. Any filesystem `supported by pyarrow <http://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`__ can be used to specify file locations:
Datasets can be transformed in parallel using ``.map()``. Transformations are executed *eagerly* and block until the operation is finished. Datasets also supports ``.filter()`` and ``.flat_map()``.
To take advantage of vectorized functions, use ``.map_batches()``. Note that you can also implement ``filter`` and ``flat_map`` using ``.map_batches()``, since your map function can return an output batch of any size.
By default, transformations are executed using Ray tasks. For transformations that require setup, specify ``compute="actors"`` and Ray will use an autoscaling actor pool to execute your transforms instead. The following is an end-to-end example of reading, transforming, and saving batch inference results using Datasets:
..code-block:: python
# Example of GPU batch inference on an ImageNet model.
Datasets can be passed to Ray tasks or actors and read with ``.iter_batches()`` or ``.iter_rows()``. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
..code-block:: python
@ray.remote
def consume(data: Dataset[int]) -> int:
num_batches = 0
for batch in data.iter_batches():
num_batches += 1
return num_batches
ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200
Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the ``split()`` function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:
Currently Datasets does not have native support for tensor-typed values in records (e.g., TFRecord / Petastorm format / multi-dimensional arrays). This is planned for development.
Contributions to Datasets are `welcome <https://docs.ray.io/en/master/development.html#python-develop>`__! There are many potential improvements, including: