mirror of
https://github.com/vale981/ray
synced 2025-03-08 19:41:38 -05:00
Editing pass over the tensor support docs for clarity: Make heavy use of tabbed guides to condense the content Rewrite examples to be more organized around creating vs reading tensors Use doc_code for testing
This commit is contained in:
parent
0fa2806554
commit
b1f933a17d
23 changed files with 673 additions and 412 deletions
|
@ -80,7 +80,6 @@ This is a common pattern useful for loading and sharding data between distribute
|
|||
|
||||
.. _saving_datasets:
|
||||
|
||||
===============
|
||||
Saving Datasets
|
||||
===============
|
||||
|
||||
|
|
|
@ -388,11 +388,8 @@ futures.
|
|||
``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the
|
||||
Dask DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
This conversion has near-zero overhead, since Datasets simply reinterprets existing
|
||||
Dask-in-Ray partition objects as Dataset blocks.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
|
@ -418,11 +415,8 @@ futures.
|
|||
Create a ``Dataset`` from a Modin DataFrame. This constructs a ``Dataset``
|
||||
backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
This conversion has near-zero overhead, since Datasets simply reinterprets existing
|
||||
Modin partition objects as Dataset blocks.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
|
@ -434,11 +428,8 @@ futures.
|
|||
Create a ``Dataset`` from a Mars DataFrame. This constructs a ``Dataset``
|
||||
backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
This conversion has near-zero overhead, since Datasets simply reinterprets existing
|
||||
Mars partition objects as Dataset blocks.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
|
@ -527,6 +518,19 @@ converts it into a Ray Dataset directly.
|
|||
ray_datasets["train"].take(2)
|
||||
# [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]
|
||||
|
||||
.. _datasets_from_images:
|
||||
|
||||
-------------------------------
|
||||
From Image Files (experimental)
|
||||
-------------------------------
|
||||
|
||||
Load image data stored as individual files using :py:class:`~ray.data.datasource.ImageFolderDatasource`:
|
||||
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_images_begin__
|
||||
:end-before: __create_images_end__
|
||||
|
||||
.. _datasets_custom_datasource:
|
||||
|
||||
------------------
|
||||
|
|
|
@ -135,9 +135,10 @@ as either `Arrow Tables <https://arrow.apache.org/docs/python/generated/pyarrow.
|
|||
or `Pandas DataFrames <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
|
||||
|
||||
Different ways of creating Datasets leads to a different starting internal format:
|
||||
|
||||
* Reading tabular files (Parquet, CSV, JSON) creates Arrow blocks initially.
|
||||
* Converting from Pandas, Dask, Modin, and Mars creates Pandas blocks initially.
|
||||
* Reading NumPy files or converting from NumPy ndarrays creaates Arrow blocks.
|
||||
* Reading NumPy files or converting from NumPy ndarrays creates Arrow blocks.
|
||||
|
||||
However, this internal format is not exposed to the user. Datasets converts between formats
|
||||
as needed internally depending on the specified ``batch_format`` of transformations.
|
||||
|
|
|
@ -3,446 +3,197 @@
|
|||
ML Tensor Support
|
||||
=================
|
||||
|
||||
Tables with tensor columns
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Tensor (multi-dimensional array) data is ubiquitous in ML workloads. However, popular data formats such as Pandas, Parquet, and Arrow don't natively support tensor data types. To bridge this gap, Datasets provides a unified tensor data type that can be used to represent and store tensor data:
|
||||
|
||||
Datasets supports tables with fixed-shape tensor columns, where each element in the column is a tensor (n-dimensional array) with the same shape. As an example, this allows you to use Pandas and Ray Datasets to read, write, and manipulate e.g., images. All conversions between Pandas, Arrow, and Parquet, and all application of aggregations/operations to the underlying image ndarrays are taken care of by Ray Datasets.
|
||||
* For Pandas, Datasets will transparently convert ``List[np.ndarray]`` columns to and from the :class:`TensorDtype <ray.data.extensions.tensor_extension.TensorDtype>` extension type.
|
||||
* For Parquet, the Datasets Arrow extension :class:`ArrowTensorType <ray.data.extensions.tensor_extension.ArrowTensorType>` allows Tensors to be loaded and stored in Parquet format.
|
||||
* In addition, single-column Tensor datasets can be created from NumPy (.npy) files.
|
||||
|
||||
With our Pandas extension type, :class:`TensorDtype <ray.data.extensions.tensor_extension.TensorDtype>`, and extension array, :class:`TensorArray <ray.data.extensions.tensor_extension.TensorArray>`, you can do familiar aggregations and arithmetic, comparison, and logical operations on a DataFrame containing a tensor column and the operations will be applied to the underlying tensors as expected. With our Arrow extension type, :class:`ArrowTensorType <ray.data.extensions.tensor_extension.ArrowTensorType>`, and extension array, :class:`ArrowTensorArray <ray.data.extensions.tensor_extension.ArrowTensorArray>`, you'll be able to import that DataFrame into Ray Datasets and read/write the data from/to the Parquet format.
|
||||
Datasets automatically converts between the extension types/arrays above. This means you can just think of "Tensors" as a single first-class data type in Datasets.
|
||||
|
||||
Automatic conversion between the Pandas and Arrow extension types/arrays keeps the details under-the-hood, so you only have to worry about casting the column to a tensor column using our Pandas extension type when first ingesting the table into a ``Dataset``, whether from storage or in-memory. All table operations downstream from that cast should work automatically.
|
||||
Creating Tensor Datasets
|
||||
------------------------
|
||||
|
||||
Single-column tensor datasets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
This section shows how to create single and multi-column Tensor datasets.
|
||||
|
||||
The most basic case is when a dataset only has a single column, which is of tensor
|
||||
type. This kind of dataset can be:
|
||||
.. tabbed:: Synthetic Data
|
||||
|
||||
* created with :func:`range_tensor() <ray.data.range_tensor>`
|
||||
or :func:`from_numpy() <ray.data.from_numpy>`,
|
||||
* transformed with NumPy UDFs via
|
||||
:meth:`ds.map_batches() <ray.data.Dataset.map_batches>`,
|
||||
* consumed with :meth:`ds.iter_rows() <ray.data.Dataset.iter_rows>` and
|
||||
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>`, and
|
||||
* can be read from and written to ``.npy`` files.
|
||||
Create a synthetic tensor dataset from a range of integers.
|
||||
|
||||
Here is an end-to-end example:
|
||||
**Single-column only**:
|
||||
|
||||
.. code-block:: python
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_range_begin__
|
||||
:end-before: __create_range_end__
|
||||
|
||||
# Create a synthetic pure-tensor Dataset.
|
||||
ds = ray.data.range_tensor(10, shape=(3, 5))
|
||||
# -> Dataset(num_blocks=10, num_rows=10,
|
||||
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=int64>})
|
||||
.. tabbed:: Pandas UDF
|
||||
|
||||
# Create a pure-tensor Dataset from an existing NumPy ndarray.
|
||||
arr = np.arange(10 * 3 * 5).reshape((10, 3, 5))
|
||||
ds = ray.data.from_numpy(arr)
|
||||
# -> Dataset(num_blocks=1, num_rows=10,
|
||||
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=int64>})
|
||||
Create tensor datasets by returning ``List[np.ndarray]`` columns from a Pandas UDF.
|
||||
|
||||
# Transform the tensors. Datasets will automatically unpack the single-column Arrow
|
||||
# table into a NumPy ndarray, provide that ndarray to your UDF, and then repack it
|
||||
# into a single-column Arrow table; this will be a zero-copy conversion in both
|
||||
# cases.
|
||||
ds = ds.map_batches(lambda arr: arr / arr.max())
|
||||
# -> Dataset(num_blocks=1, num_rows=10,
|
||||
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=double>})
|
||||
**Single-column**:
|
||||
|
||||
# Consume the tensor. This will yield the underlying (3, 5) ndarrays.
|
||||
for arr in ds.iter_rows():
|
||||
assert isinstance(arr, np.ndarray)
|
||||
assert arr.shape == (3, 5)
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_pandas_begin__
|
||||
:end-before: __create_pandas_end__
|
||||
|
||||
# Consume the tensor in batches.
|
||||
for arr in ds.iter_batches(batch_size=2):
|
||||
assert isinstance(arr, np.ndarray)
|
||||
assert arr.shape == (2, 3, 5)
|
||||
**Multi-column**:
|
||||
|
||||
# Save to storage. This will write out the blocks of the tensor column as NPY files.
|
||||
ds.write_numpy("/tmp/tensor_out")
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_pandas_2_begin__
|
||||
:end-before: __create_pandas_2_end__
|
||||
|
||||
# Read back from storage.
|
||||
ray.data.read_numpy("/tmp/tensor_out")
|
||||
# -> Dataset(num_blocks=1, num_rows=?,
|
||||
# schema={__value__: <ArrowTensorType: shape=(3, 5), dtype=double>})
|
||||
.. tabbed:: NumPy
|
||||
|
||||
Reading existing serialized tensor columns
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Create from in-memory numpy data or from previously saved NumPy (.npy) files.
|
||||
|
||||
If you already have a Parquet dataset with columns containing serialized tensors, you can have these tensor columns cast to our tensor extension type at read-time by giving a simple schema for the tensor columns. Note that these tensors must have been serialized as their raw NumPy ndarray bytes in C-contiguous order (e.g. serialized via ``ndarray.tobytes()``).
|
||||
**Single-column only**:
|
||||
|
||||
.. code-block:: python
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_numpy_begin__
|
||||
:end-before: __create_numpy_end__
|
||||
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
.. tabbed:: Parquet
|
||||
|
||||
path = "/tmp/some_path"
|
||||
There are two ways to construct a parquet Tensor dataset: (1) loading a previously-saved Tensor
|
||||
dataset, or (2) casting non-Tensor parquet columns to Tensor type. When casting data, a tensor
|
||||
schema or deserialization UDF must be provided. The following are examples for each method.
|
||||
|
||||
# Create a DataFrame with a list of serialized ndarrays as a column.
|
||||
# Note that we do not cast it to a tensor array, so each element in the
|
||||
# column is an opaque blob of bytes.
|
||||
arr = np.arange(24).reshape((3, 2, 2, 2))
|
||||
df = pd.DataFrame({
|
||||
"one": [1, 2, 3],
|
||||
"two": [tensor.tobytes() for tensor in arr]})
|
||||
**Previously-saved Tensor datasets**:
|
||||
|
||||
# Write the dataset to Parquet. The tensor column will be written as an
|
||||
# array of opaque byte blobs.
|
||||
ds = ray.data.from_pandas([df])
|
||||
ds.write_parquet(path)
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_parquet_1_begin__
|
||||
:end-before: __create_parquet_1_end__
|
||||
|
||||
# Read the Parquet files into a new Dataset, with the serialized tensors
|
||||
# automatically cast to our tensor column extension type.
|
||||
ds = ray.data.read_parquet(
|
||||
path, tensor_column_schema={"two": (np.int, (2, 2, 2))})
|
||||
**Cast from data stored in C-contiguous format**:
|
||||
|
||||
# Internally, this column is represented with our Arrow tensor extension
|
||||
# type.
|
||||
print(ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
For tensors stored as raw NumPy ndarray bytes in C-contiguous order (e.g., via ``ndarray.tobytes()``), all you need to specify is the tensor column schema. The following is an end-to-end example:
|
||||
|
||||
If your serialized tensors don't fit the above constraints (e.g. they're stored in Fortran-contiguous order, or they're pickled), you can manually cast this tensor column to our tensor extension type via a read-time user-defined function. This UDF will be pushed down to Ray Datasets' IO layer and executed on each block in parallel, as it's read from storage.
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_parquet_2_begin__
|
||||
:end-before: __create_parquet_2_end__
|
||||
|
||||
.. code-block:: python
|
||||
**Cast from data stored in custom formats**:
|
||||
|
||||
import pickle
|
||||
import pyarrow as pa
|
||||
from ray.data.extensions import TensorArray
|
||||
For tensors stored in other formats (e.g., pickled), you can specify a deserializer UDF that returns TensorArray columns:
|
||||
|
||||
# Create a DataFrame with a list of pickled ndarrays as a column.
|
||||
arr = np.arange(24).reshape((3, 2, 2, 2))
|
||||
df = pd.DataFrame({
|
||||
"one": [1, 2, 3],
|
||||
"two": [pickle.dumps(tensor) for tensor in arr]})
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_parquet_3_begin__
|
||||
:end-before: __create_parquet_3_end__
|
||||
|
||||
# Write the dataset to Parquet. The tensor column will be written as an
|
||||
# array of opaque byte blobs.
|
||||
ds = ray.data.from_pandas([df])
|
||||
ds.write_parquet(path)
|
||||
.. tabbed:: Images (experimental)
|
||||
|
||||
# Manually deserialize the tensor pickle bytes and cast to our tensor
|
||||
# extension type. For the sake of efficiency, we directly construct a
|
||||
# TensorArray rather than .astype() casting on the mutated column with
|
||||
# TensorDtype.
|
||||
def cast_udf(block: pa.Table) -> pa.Table:
|
||||
block = block.to_pandas()
|
||||
block["two"] = TensorArray([pickle.loads(a) for a in block["two"]])
|
||||
return pa.Table.from_pandas(block)
|
||||
Load image data stored as individual files using :py:class:`~ray.data.datasource.ImageFolderDatasource`:
|
||||
|
||||
# Read the Parquet files into a new Dataset, applying the casting UDF
|
||||
# on-the-fly within the underlying read tasks.
|
||||
ds = ray.data.read_parquet(path, _block_udf=cast_udf)
|
||||
**Image and label columns**:
|
||||
|
||||
# Internally, this column is represented with our Arrow tensor extension
|
||||
# type.
|
||||
print(ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __create_images_begin__
|
||||
:end-before: __create_images_end__
|
||||
|
||||
.. note::
|
||||
|
||||
The ``tensor_column_schema`` and ``_block_udf`` parameters are both experimental developer APIs and may break in future versions.
|
||||
By convention, single-column Tensor datasets are represented with a single ``__value__`` column.
|
||||
This kind of dataset will be converted automatically to/from NumPy array format in all transformation and consumption APIs.
|
||||
|
||||
Working with tensor column datasets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Transforming / Consuming Tensor Data
|
||||
------------------------------------
|
||||
|
||||
Now that the tensor column is properly typed and in a ``Dataset``, we can perform operations on the dataset as if it was a normal table:
|
||||
Like any other Dataset, Datasets with tensor columns can be consumed / transformed in batches via the :meth:`ds.iter_batches(batch_format=\<format\>) <ray.data.Dataset.iter_batches>` and :meth:`ds.map_batches(fn, batch_format=\<format\>) <ray.data.Dataset.map_batches>` APIs. This section shows the available batch formats and their behavior:
|
||||
|
||||
.. code-block:: python
|
||||
.. tabbed:: "native" (default)
|
||||
|
||||
# Arrow and Pandas is now aware of this tensor column, so we can do the
|
||||
# typical DataFrame operations on this column.
|
||||
ds = ds.map_batches(lambda x: 2 * (x + 1), batch_format="pandas")
|
||||
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
|
||||
print(ds)
|
||||
# -> Dataset(
|
||||
# num_blocks=1, num_rows=3,
|
||||
# schema=<class 'int',
|
||||
# class ray.data.extensions.tensor_extension.ArrowTensorType>)
|
||||
print([row["two"] for row in ds.take(5)])
|
||||
# -> [2, 4, 6, 8, 10]
|
||||
**Single-column**:
|
||||
|
||||
Writing and reading tensor columns
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_native_begin__
|
||||
:end-before: __consume_native_end__
|
||||
|
||||
This dataset can then be written to Parquet files. The tensor column schema will be preserved via the Pandas and Arrow extension types and associated metadata, allowing us to later read the Parquet files into a Dataset without needing to specify a column casting schema. This Pandas --> Arrow --> Parquet --> Arrow --> Pandas conversion support makes working with tensor columns extremely easy when using Ray Datasets to both write and read data.
|
||||
**Multi-column**:
|
||||
|
||||
.. code-block:: python
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_native_2_begin__
|
||||
:end-before: __consume_native_2_end__
|
||||
|
||||
# You can write the dataset to Parquet.
|
||||
ds.write_parquet("/some/path")
|
||||
# And you can read it back.
|
||||
read_ds = ray.data.read_parquet("/some/path")
|
||||
print(read_ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
.. tabbed:: "pandas"
|
||||
|
||||
.. _datasets_tensor_ml_exchange:
|
||||
**Single-column**:
|
||||
|
||||
Converting to a Torch/TensorFlow Dataset
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_pandas_begin__
|
||||
:end-before: __consume_pandas_end__
|
||||
|
||||
This dataset can also be converted to a Torch or TensorFlow dataset via the standard
|
||||
:meth:`ds.to_torch() <ray.data.Dataset.to_torch>` and
|
||||
:meth:`ds.to_tf() <ray.data.Dataset.to_tf>` APIs for ingestion into those respective ML
|
||||
training frameworks. The tensor column will be automatically converted to a
|
||||
Torch/TensorFlow tensor without incurring any copies.
|
||||
**Multi-column**:
|
||||
|
||||
.. note::
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_pandas_2_begin__
|
||||
:end-before: __consume_pandas_2_end__
|
||||
|
||||
When converting to a TensorFlow Dataset, you will need to give the full tensor spec
|
||||
for the tensor columns, including the shape of each underlying tensor element in said
|
||||
column.
|
||||
.. tabbed:: "pyarrow"
|
||||
|
||||
**Single-column**:
|
||||
|
||||
.. tabbed:: Torch
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_pyarrow_begin__
|
||||
:end-before: __consume_pyarrow_end__
|
||||
|
||||
Convert a ``Dataset`` containing a single tensor feature column to a Torch ``IterableDataset``.
|
||||
**Multi-column**:
|
||||
|
||||
.. code-block:: python
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_pyarrow_2_begin__
|
||||
:end-before: __consume_pyarrow_2_end__
|
||||
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
.. tabbed:: "numpy"
|
||||
|
||||
df = pd.DataFrame({
|
||||
"feature": TensorArray(np.arange(4096).reshape((4, 32, 32))),
|
||||
"label": [1, 2, 3, 4],
|
||||
})
|
||||
ds = ray.data.from_pandas(df)
|
||||
**Single-column**:
|
||||
|
||||
# Convert the dataset to a Torch IterableDataset.
|
||||
torch_ds = ds.to_torch(
|
||||
label_column="label",
|
||||
batch_size=2,
|
||||
unsqueeze_label_tensor=False,
|
||||
unsqueeze_feature_tensors=False,
|
||||
)
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_numpy_begin__
|
||||
:end-before: __consume_numpy_end__
|
||||
|
||||
# A feature tensor and label tensor is yielded per batch.
|
||||
for X, y in torch_ds:
|
||||
# Train model(X, y)
|
||||
**Multi-column**:
|
||||
|
||||
.. tabbed:: TensorFlow
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __consume_numpy_2_begin__
|
||||
:end-before: __consume_numpy_2_end__
|
||||
|
||||
Convert a ``Dataset`` containing a single tensor feature column to a TensorFlow ``tf.data.Dataset``.
|
||||
Saving Tensor Datasets
|
||||
----------------------
|
||||
|
||||
.. code-block:: python
|
||||
Because Tensor datasets rely on Datasets-specific extension types, they can only be saved in formats that preserve Arrow metadata (currently only Parquet). In addition, single-column Tensor datasets can be saved in NumPy format.
|
||||
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import tensorflow as tf
|
||||
.. tabbed:: Parquet
|
||||
|
||||
tensor_element_shape = (32, 32)
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __write_1_begin_
|
||||
:end-before: __write_1_end__
|
||||
|
||||
df = pd.DataFrame({
|
||||
"feature": TensorArray(np.arange(4096).reshape((4,) + tensor_element_shape)),
|
||||
"label": [1, 2, 3, 4],
|
||||
})
|
||||
ds = ray.data.from_pandas(df)
|
||||
.. tabbed:: NumPy
|
||||
|
||||
# Convert the dataset to a TensorFlow Dataset.
|
||||
tf_ds = ds.to_tf(
|
||||
label_column="label",
|
||||
output_signature=(
|
||||
tf.TensorSpec(shape=(None, 1) + tensor_element_shape, dtype=tf.float32),
|
||||
tf.TensorSpec(shape=(None,), dtype=tf.float32),
|
||||
),
|
||||
batch_size=2,
|
||||
)
|
||||
|
||||
# A feature tensor and label tensor is yielded per batch.
|
||||
for X, y in tf_ds:
|
||||
# Train model(X, y)
|
||||
|
||||
If your (tensor) columns have different shapes,
|
||||
these columns are incompatible and you will not be able to stack the column tensors
|
||||
into a single tensor. Instead, you will need to group the columns by compatibility in
|
||||
the ``feature_columns`` argument.
|
||||
|
||||
E.g., if columns ``"feature_1"`` and ``"feature_2"`` are incompatible, you should give
|
||||
``to_torch()`` a ``feature_columns=[["feature_1"], ["feature_2"]]`` argument in order to
|
||||
instruct it to return separate tensors for ``"feature_1"`` and ``"feature_2"``. For
|
||||
``to_torch()``, if isolating single columns as in the ``"feature_1"`` + ``"feature_2"``
|
||||
example, you may also want to provide ``unsqueeze_feature_tensors=False`` in order to
|
||||
remove the redundant column dimension for each of the unit column tensors.
|
||||
|
||||
.. tabbed:: Torch
|
||||
|
||||
Convert a ``Dataset`` containing a tensor feature column and a scalar feature column
|
||||
to a Torch ``IterableDataset``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
df = pd.DataFrame({
|
||||
"feature_1": TensorArray(np.arange(4096).reshape((4, 32, 32))),
|
||||
"feature_2": [5, 6, 7, 8],
|
||||
"label": [1, 2, 3, 4],
|
||||
})
|
||||
ds = ray.data.from_pandas(df)
|
||||
|
||||
# Convert the dataset to a Torch IterableDataset.
|
||||
torch_ds = ds.to_torch(
|
||||
label_column="label",
|
||||
feature_columns=[["feature_1"], ["feature_2"]],
|
||||
batch_size=2,
|
||||
unsqueeze_label_tensor=False,
|
||||
unsqueeze_feature_tensors=False,
|
||||
)
|
||||
|
||||
# Two feature tensors and one label tensor is yielded per batch.
|
||||
for (feature_1, feature_2), y in torch_ds:
|
||||
# Train model((feature_1, feature_2), y)
|
||||
|
||||
.. tabbed:: TensorFlow
|
||||
|
||||
Convert a ``Dataset`` containing a tensor feature column and a scalar feature column
|
||||
to a TensorFlow ``tf.data.Dataset``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
|
||||
tensor_element_shape = (32, 32)
|
||||
|
||||
df = pd.DataFrame({
|
||||
"feature_1": TensorArray(np.arange(4096).reshape((4,) + tensor_element_shape)),
|
||||
"feature_2": [5, 6, 7, 8],
|
||||
"label": [1, 2, 3, 4],
|
||||
})
|
||||
ds = ray.data.from_pandas(df)
|
||||
|
||||
# Convert the dataset to a TensorFlow Dataset.
|
||||
tf_ds = ds.to_tf(
|
||||
label_column="label",
|
||||
feature_columns=[["feature_1"], ["feature_2"]],
|
||||
output_signature=(
|
||||
(
|
||||
tf.TensorSpec(shape=(None, 1) + tensor_element_shape, dtype=tf.float32),
|
||||
tf.TensorSpec(shape=(None, 1), dtype=tf.int64),
|
||||
),
|
||||
tf.TensorSpec(shape=(None,), dtype=tf.float32),
|
||||
),
|
||||
batch_size=2,
|
||||
)
|
||||
|
||||
# Two feature tensors and one label tensor is yielded per batch.
|
||||
for (feature_1, feature_2), y in tf_ds:
|
||||
# Train model((feature_1, feature_2), y)
|
||||
|
||||
End-to-end workflow with our Pandas extension type
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If working with in-memory Pandas DataFrames that you want to analyze, manipulate, store, and eventually read, the Pandas/Arrow extension types/arrays make it easy to extend this end-to-end workflow to tensor columns.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray.data.extensions import TensorDtype
|
||||
|
||||
# Create a DataFrame with a list of ndarrays as a column.
|
||||
df = pd.DataFrame({
|
||||
"one": [1, 2, 3],
|
||||
"two": list(np.arange(24).reshape((3, 2, 2, 2)))})
|
||||
# Note the opaque np.object dtype for this column.
|
||||
print(df.dtypes)
|
||||
# -> one int64
|
||||
# two object
|
||||
# dtype: object
|
||||
|
||||
# Cast column to our TensorDtype Pandas extension type.
|
||||
df["two"] = df["two"].astype(TensorDtype())
|
||||
|
||||
# Note that the column dtype is now TensorDtype instead of
|
||||
# np.object.
|
||||
print(df.dtypes)
|
||||
# -> one int64
|
||||
# two TensorDtype
|
||||
# dtype: object
|
||||
|
||||
# Pandas is now aware of this tensor column, and we can do the
|
||||
# typical DataFrame operations on this column.
|
||||
col = 2 * df["two"]
|
||||
# The ndarrays underlying the tensor column will be manipulated,
|
||||
# but the column itself will continue to be a Pandas type.
|
||||
print(type(col))
|
||||
# -> pandas.core.series.Series
|
||||
print(col)
|
||||
# -> 0 [[[ 2 4]
|
||||
# [ 6 8]]
|
||||
# [[10 12]
|
||||
# [14 16]]]
|
||||
# 1 [[[18 20]
|
||||
# [22 24]]
|
||||
# [[26 28]
|
||||
# [30 32]]]
|
||||
# 2 [[[34 36]
|
||||
# [38 40]]
|
||||
# [[42 44]
|
||||
# [46 48]]]
|
||||
# Name: two, dtype: TensorDtype
|
||||
|
||||
# Once you do an aggregation on that column that returns a single
|
||||
# row's value, you get back our TensorArrayElement type.
|
||||
tensor = col.mean()
|
||||
print(type(tensor))
|
||||
# -> ray.data.extensions.tensor_extension.TensorArrayElement
|
||||
print(tensor)
|
||||
# -> array([[[18., 20.],
|
||||
# [22., 24.]],
|
||||
# [[26., 28.],
|
||||
# [30., 32.]]])
|
||||
|
||||
# This is a light wrapper around a NumPy ndarray, and can easily
|
||||
# be converted to an ndarray.
|
||||
type(tensor.to_numpy())
|
||||
# -> numpy.ndarray
|
||||
|
||||
# In addition to doing Pandas operations on the tensor column,
|
||||
# you can now put the DataFrame directly into a Dataset.
|
||||
ds = ray.data.from_pandas([df])
|
||||
# Internally, this column is represented with the corresponding
|
||||
# Arrow tensor extension type.
|
||||
print(ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
|
||||
# You can write the dataset to Parquet.
|
||||
ds.write_parquet("/some/path")
|
||||
# And you can read it back.
|
||||
read_ds = ray.data.read_parquet("/some/path")
|
||||
print(read_ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
|
||||
read_df = read_ds.to_pandas()
|
||||
print(read_df.dtypes)
|
||||
# -> one int64
|
||||
# two TensorDtype
|
||||
# dtype: object
|
||||
|
||||
# The tensor extension type is preserved along the
|
||||
# Pandas --> Arrow --> Parquet --> Arrow --> Pandas
|
||||
# conversion chain.
|
||||
print(read_df.equals(df))
|
||||
# -> True
|
||||
.. literalinclude:: ./doc_code/tensor.py
|
||||
:language: python
|
||||
:start-after: __write_2_begin_
|
||||
:end-before: __write_2_end__
|
||||
|
||||
Limitations
|
||||
~~~~~~~~~~~
|
||||
-----------
|
||||
|
||||
This feature currently comes with a few known limitations that we are either actively working on addressing or have already implemented workarounds for.
|
||||
The following are current limitations of Tensor datasets.
|
||||
|
||||
* All tensors in a tensor column currently must be the same shape. Please let us know if you require heterogeneous tensor shape for your tensor column! Tracking issue is `here <https://github.com/ray-project/ray/issues/18316>`__.
|
||||
* Automatic casting via specifying an override Arrow schema when reading Parquet is blocked by Arrow supporting custom ExtensionType casting kernels. See `issue <https://issues.apache.org/jira/browse/ARROW-5890>`__. An explicit ``tensor_column_schema`` parameter has been added for :func:`read_parquet() <ray.data.read_api.read_parquet>` as a stopgap solution.
|
||||
* All tensors in a tensor column must have the same shape; see GitHub issue `#18316 <https://github.com/ray-project/ray/issues/18316>`__. An error will be raised in the ragged tensor case. Automatic casting can be disabled with ``ray.data.context.DatasetContext.get_current().enable_tensor_extension_cast = False`` in the ragged tensor scenario.
|
||||
|
|
498
doc/source/data/doc_code/tensor.py
Normal file
498
doc/source/data/doc_code/tensor.py
Normal file
|
@ -0,0 +1,498 @@
|
|||
# flake8: noqa
|
||||
|
||||
from typing import Dict, Any
|
||||
|
||||
# fmt: off
|
||||
# __create_range_begin__
|
||||
import ray
|
||||
|
||||
# Create a Dataset of tensors.
|
||||
ds = ray.data.range_tensor(10000, shape=(64, 64))
|
||||
# -> Dataset(num_blocks=200, num_rows=10000,
|
||||
# schema={__value__: ArrowTensorType(shape=(64, 64), dtype=int64)})
|
||||
|
||||
ds.take(2)
|
||||
# -> [array([[0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# ...,
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0]]),
|
||||
# array([[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]])]
|
||||
# __create_range_end__
|
||||
|
||||
# __create_pandas_begin__
|
||||
import ray
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
# Start with a tabular base dataset.
|
||||
ds = ray.data.range_table(1000)
|
||||
|
||||
# Create a single TensorArray column.
|
||||
def single_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
|
||||
bs = len(batch)
|
||||
|
||||
# Lists of ndarrays are automatically cast to TensorArray.
|
||||
arr = [np.zeros((128, 128, 3)) for _ in range(bs)]
|
||||
return pd.DataFrame({"__value__": arr})
|
||||
|
||||
## Alternatively, manually construct a TensorArray from a single ndarray.
|
||||
# from ray.data.extensions.tensor_extension import TensorArray
|
||||
# arr = TensorArray(np.zeros((bs, 128, 128, 3), dtype=np.int64))
|
||||
# return pd.DataFrame({"__value__": arr})
|
||||
|
||||
|
||||
ds.map_batches(single_col_udf)
|
||||
# -> Dataset(num_blocks=17, num_rows=1000,
|
||||
# schema={__value__: TensorDtype(shape=(128, 128, 3), dtype=int64)})
|
||||
# __create_pandas_end__
|
||||
|
||||
# __create_pandas_2_begin__
|
||||
# Create multiple TensorArray columns.
|
||||
def multi_col_udf(batch: pd.DataFrame) -> pd.DataFrame:
|
||||
bs = len(batch)
|
||||
|
||||
# Lists of ndarrays are automatically cast to TensorArray.
|
||||
image = [np.zeros((128, 128, 3), dtype=np.int64) for _ in range(bs)]
|
||||
embed = [np.zeros((256,), dtype=np.uint8) for _ in range(bs)]
|
||||
return pd.DataFrame({"image": image, "embed": embed})
|
||||
|
||||
## Alternatively, manually construct TensorArrays from ndarray batches.
|
||||
# image = TensorArray(np.zeros((bs, 128, 128, 3), dtype=np.int64))
|
||||
# embed = TensorArray(np.zeros((bs, 256,), dtype=np.uint8))
|
||||
# return pd.DataFrame({"image": image, "embed": embed})
|
||||
|
||||
|
||||
ds.map_batches(multi_col_udf)
|
||||
# -> Dataset(num_blocks=17, num_rows=1000,
|
||||
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=int64),
|
||||
# embed: TensorDtype(shape=(256,), dtype=uint8)})
|
||||
# __create_pandas_2_end__
|
||||
|
||||
# __create_numpy_begin__
|
||||
import ray
|
||||
|
||||
# From in-memory numpy data.
|
||||
ray.data.from_numpy(np.zeros((1000, 128, 128, 3), dtype=np.int64))
|
||||
# -> Dataset(num_blocks=1, num_rows=1000,
|
||||
# schema={__value__: ArrowTensorType(shape=(128, 128, 3), dtype=int64)})
|
||||
|
||||
# From saved numpy files.
|
||||
ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
# __create_numpy_end__
|
||||
|
||||
# __create_parquet_1_begin__
|
||||
import ray
|
||||
|
||||
# Reading previously saved Tensor data works out of the box.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
|
||||
# label: string})
|
||||
|
||||
ds.take(1)
|
||||
# -> [{'image':
|
||||
# array([[[ 92, 71, 57],
|
||||
# [107, 87, 72],
|
||||
# ...,
|
||||
# [141, 161, 185],
|
||||
# [139, 158, 184]],
|
||||
#
|
||||
# ...,
|
||||
#
|
||||
# [[135, 135, 109],
|
||||
# [135, 135, 108],
|
||||
# ...,
|
||||
# [167, 150, 89],
|
||||
# [165, 146, 90]]], dtype=uint8),
|
||||
# 'label': 'cat',
|
||||
# }]
|
||||
# __create_parquet_1_end__
|
||||
|
||||
import shutil
|
||||
shutil.rmtree("/tmp/some_path", ignore_errors=True)
|
||||
|
||||
# __create_parquet_2_begin__
|
||||
import ray
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
path = "/tmp/some_path"
|
||||
|
||||
# Create a DataFrame with a list of serialized ndarrays as a column.
|
||||
# Note that we do not cast it to a tensor array, so each element in the
|
||||
# column is an opaque blob of bytes.
|
||||
arr = np.arange(24).reshape((3, 2, 2, 2))
|
||||
df = pd.DataFrame({
|
||||
"one": [1, 2, 3],
|
||||
"two": [tensor.tobytes() for tensor in arr]})
|
||||
|
||||
# Write the dataset to Parquet. The tensor column will be written as an
|
||||
# array of opaque byte blobs.
|
||||
ds = ray.data.from_pandas([df])
|
||||
ds.write_parquet(path)
|
||||
|
||||
# Read the Parquet files into a new Dataset, with the serialized tensors
|
||||
# automatically cast to our tensor column extension type.
|
||||
ds = ray.data.read_parquet(
|
||||
path, tensor_column_schema={"two": (np.int, (2, 2, 2))})
|
||||
|
||||
# The new column is represented with as a Tensor extension type.
|
||||
print(ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# __create_parquet_2_end__
|
||||
|
||||
ds.fully_executed()
|
||||
shutil.rmtree(path)
|
||||
|
||||
# __create_parquet_3_begin__
|
||||
import pickle
|
||||
import pyarrow as pa
|
||||
from ray.data.extensions import TensorArray
|
||||
|
||||
path = "/tmp/some_path"
|
||||
|
||||
# Create a DataFrame with a list of pickled ndarrays as a column.
|
||||
arr = np.arange(24).reshape((3, 2, 2, 2))
|
||||
df = pd.DataFrame({
|
||||
"one": [1, 2, 3],
|
||||
"two": [pickle.dumps(tensor) for tensor in arr]})
|
||||
|
||||
# Write the dataset to Parquet. The tensor column will be written as an
|
||||
# array of opaque byte blobs.
|
||||
ds = ray.data.from_pandas([df])
|
||||
ds.write_parquet(path)
|
||||
|
||||
# Manually deserialize the tensor pickle bytes and cast to our tensor
|
||||
# extension type.
|
||||
def cast_udf(block: pa.Table) -> pa.Table:
|
||||
block = block.to_pandas()
|
||||
block["two"] = TensorArray([pickle.loads(a) for a in block["two"]])
|
||||
return pa.Table.from_pandas(block)
|
||||
|
||||
# Read the Parquet files into a new Dataset, applying the casting UDF
|
||||
# on-the-fly within the underlying read tasks.
|
||||
ds = ray.data.read_parquet(path, _block_udf=cast_udf)
|
||||
|
||||
# The new column is represented with as a Tensor extension type.
|
||||
print(ds.schema())
|
||||
# -> one: int64
|
||||
# two: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# __create_parquet_3_end__
|
||||
ds.fully_executed()
|
||||
|
||||
# __create_images_begin__
|
||||
from ray.data.datasource import ImageFolderDatasource
|
||||
|
||||
ds = ray.data.read_datasource(
|
||||
ImageFolderDatasource(), root="example://image-folder", size=(128, 128))
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8),
|
||||
# label: object})
|
||||
|
||||
ds.take(1)
|
||||
# -> [{'image':
|
||||
# array([[[ 92, 71, 57],
|
||||
# [107, 87, 72],
|
||||
# ...,
|
||||
# [141, 161, 185],
|
||||
# [139, 158, 184]],
|
||||
#
|
||||
# ...,
|
||||
#
|
||||
# [[135, 135, 109],
|
||||
# [135, 135, 108],
|
||||
# ...,
|
||||
# [167, 150, 89],
|
||||
# [165, 146, 90]]], dtype=uint8),
|
||||
# 'label': 'cat',
|
||||
# }]
|
||||
# __create_images_end__
|
||||
|
||||
|
||||
# __consume_native_begin__
|
||||
import ray
|
||||
|
||||
# Read a single-column example dataset.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
|
||||
def add_one(batch: np.ndarray) -> np.ndarray:
|
||||
return batch + 1
|
||||
|
||||
# This processes batches in numpy.ndarray format.
|
||||
ds = ds.map_batches(add_one)
|
||||
|
||||
# This returns batches in numpy.ndarray format.
|
||||
next(ds.iter_batches())
|
||||
# -> array([[[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]],
|
||||
#
|
||||
# ...,
|
||||
#
|
||||
# [[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]]], dtype=uint8)
|
||||
# __consume_native_end__
|
||||
|
||||
# __consume_native_2_begin__
|
||||
import ray
|
||||
|
||||
# Read a multi-column example dataset.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
|
||||
# label: string})
|
||||
|
||||
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
|
||||
batch["image"] += 1
|
||||
return batch
|
||||
|
||||
# This processes batches in pd.DataFrame format.
|
||||
ds = ds.map_batches(add_one)
|
||||
|
||||
# This returns pandas batches with List[np.ndarray] columns.
|
||||
next(ds.iter_batches())
|
||||
# -> image label
|
||||
# 0 [[[ 96, 76, 61], [ 92, 72, 57], [ 92, 72,... cat
|
||||
# 1 [[[ 38, 38, 39], [ 39, 39, 40], [ 39, 39,... cat
|
||||
# 2 [[[ 47, 39, 33], [ 43, 36, 29], [ 43, 36,... dog
|
||||
# __consume_native_2_end__
|
||||
|
||||
# __consume_pandas_begin__
|
||||
import ray
|
||||
|
||||
# Read a single-column example dataset.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
|
||||
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
|
||||
batch["__value__"] += 1
|
||||
return batch
|
||||
|
||||
# This processes batches in pd.DataFrame format.
|
||||
ds = ds.map_batches(add_one, batch_format="pandas")
|
||||
|
||||
# This returns pandas batches with List[np.ndarray] columns.
|
||||
next(ds.iter_batches(batch_format="pandas"))
|
||||
# -> __value__
|
||||
# 0 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
|
||||
# 1 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
|
||||
# 2 [[ 1, 1, 1, 1, 1, 1, 1, 1, 1,...
|
||||
# __consume_pandas_end__
|
||||
|
||||
# __consume_pandas_2_begin__
|
||||
import ray
|
||||
|
||||
# Read a multi-column example dataset.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8),
|
||||
# label: string})
|
||||
|
||||
def add_one(batch: pd.DataFrame) -> pd.DataFrame:
|
||||
batch["image"] += 1
|
||||
return batch
|
||||
|
||||
# This processes batches in pd.DataFrame format.
|
||||
ds = ds.map_batches(add_one, batch_format="pandas")
|
||||
|
||||
# This returns pandas batches with List[np.ndarray] columns.
|
||||
next(ds.iter_batches(batch_format="pandas"))
|
||||
# -> image label
|
||||
# 0 [[[ 96, 76, 61], [ 92, 72, 57], [ 92, 72,... cat
|
||||
# 1 [[[ 38, 38, 39], [ 39, 39, 40], [ 39, 39,... cat
|
||||
# 2 [[[ 47, 39, 33], [ 43, 36, 29], [ 43, 36,... dog
|
||||
# __consume_pandas_2_end__
|
||||
|
||||
# __consume_pyarrow_begin__
|
||||
import ray
|
||||
from ray.data.extensions.tensor_extension import ArrowTensorArray
|
||||
|
||||
import pyarrow
|
||||
|
||||
# Read a single-column example dataset.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
|
||||
def add_one(batch: pyarrow.Table) -> pyarrow.Table:
|
||||
np_col = np.array(
|
||||
[
|
||||
np.ndarray((28, 28), buffer=buf, dtype=np.uint8)
|
||||
for buf in batch.column("__value__")
|
||||
]
|
||||
)
|
||||
np_col += 1
|
||||
|
||||
return batch.set_column(
|
||||
batch._ensure_integer_index("__value__"),
|
||||
"__value__",
|
||||
ArrowTensorArray.from_numpy(np_col),
|
||||
)
|
||||
|
||||
# This processes batches in pyarrow.Table format.
|
||||
ds = ds.map_batches(add_one, batch_format="pyarrow")
|
||||
|
||||
# This returns batches in pyarrow.Table format.
|
||||
next(ds.iter_batches(batch_format="pyarrow"))
|
||||
# pyarrow.Table
|
||||
# __value__: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# ----
|
||||
# __value__: [[[1,1,1,1,1,1,1,1,1,1,...],...,[1,1,1,1,1,1,1,1,1,1,...]]]
|
||||
# __consume_pyarrow_end__
|
||||
|
||||
# __consume_pyarrow_2_begin__
|
||||
# Read a multi-column example dataset.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
|
||||
|
||||
def add_one(batch: pyarrow.Table) -> pyarrow.Table:
|
||||
np_col = np.array(
|
||||
[
|
||||
np.ndarray((128, 128, 3), buffer=buf, dtype=np.uint8)
|
||||
for buf in batch.column("image")
|
||||
]
|
||||
)
|
||||
np_col += 1
|
||||
|
||||
return batch.set_column(
|
||||
batch._ensure_integer_index("image"),
|
||||
"image",
|
||||
ArrowTensorArray.from_numpy(np_col),
|
||||
)
|
||||
|
||||
# This processes batches in pyarrow.Table format.
|
||||
ds = ds.map_batches(add_one, batch_format="pyarrow")
|
||||
|
||||
# This returns batches in pyarrow.Table format.
|
||||
next(ds.iter_batches(batch_format="pyarrow"))
|
||||
# pyarrow.Table
|
||||
# image: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# label: string
|
||||
# ----
|
||||
# image: [[[92,71,57,107,87,72,113,97,85,122,...,85,170,152,88,167,150,89,165,146,90]]]
|
||||
# label: [["cat"]]
|
||||
# __consume_pyarrow_2_end__
|
||||
|
||||
# __consume_numpy_begin__
|
||||
import ray
|
||||
|
||||
# Read a single-column example dataset.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
|
||||
def add_one(batch: np.ndarray) -> np.ndarray:
|
||||
batch += 1
|
||||
return batch
|
||||
|
||||
# This processes batches in np.ndarray format.
|
||||
ds = ds.map_batches(add_one, batch_format="numpy")
|
||||
|
||||
# This returns batches in np.ndarray format.
|
||||
next(ds.iter_batches(batch_format="numpy"))
|
||||
# -> array([[[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]],
|
||||
#
|
||||
# ...,
|
||||
#
|
||||
# [[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]]], dtype=uint8)
|
||||
# __consume_numpy_end__
|
||||
|
||||
# __consume_numpy_2_begin__
|
||||
# Read a multi-column example dataset.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
|
||||
|
||||
def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:
|
||||
assert isinstance(batch, dict)
|
||||
batch["image"] += 1
|
||||
return batch
|
||||
|
||||
# This processes batches in np.ndarray format.
|
||||
ds = ds.map_batches(add_one, batch_format="numpy")
|
||||
|
||||
# This returns batches in Dict[str, np.ndarray] format.
|
||||
next(ds.iter_batches(batch_format="numpy"))
|
||||
# -> {'image': array([[[[ 92, 71, 57],
|
||||
# [107, 87, 72],
|
||||
# ...,
|
||||
# [141, 161, 185],
|
||||
# [139, 158, 184]],
|
||||
#
|
||||
# ...,
|
||||
#
|
||||
# [[135, 135, 109],
|
||||
# [135, 135, 108],
|
||||
# ...,
|
||||
# [167, 150, 89],
|
||||
# [165, 146, 90]]]], dtype=uint8),
|
||||
# 'label': array(['cat'], dtype=object)}
|
||||
# __consume_numpy_2_end__
|
||||
|
||||
|
||||
ds.fully_executed()
|
||||
shutil.rmtree("/tmp/some_path")
|
||||
|
||||
# __write_1_begin__
|
||||
# Read a multi-column example dataset.
|
||||
ds = ray.data.read_parquet("example://parquet_images_mini")
|
||||
# -> Dataset(num_blocks=3, num_rows=3,
|
||||
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8), label: object})
|
||||
|
||||
# You can write the dataset to Parquet.
|
||||
ds.write_parquet("/tmp/some_path")
|
||||
|
||||
# And you can read it back.
|
||||
read_ds = ray.data.read_parquet("/tmp/some_path")
|
||||
print(read_ds.schema())
|
||||
# -> image: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# label: string
|
||||
# __write_1_end__
|
||||
|
||||
read_ds.fully_executed()
|
||||
shutil.rmtree("/tmp/some_path")
|
||||
|
||||
# __write_2_begin__
|
||||
# Read a single-column example dataset.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(num_blocks=1, num_rows=3,
|
||||
# schema={__value__: ArrowTensorType(shape=(28, 28), dtype=uint8)})
|
||||
|
||||
# You can write the dataset to Parquet.
|
||||
ds.write_numpy("/tmp/some_path")
|
||||
|
||||
# And you can read it back.
|
||||
read_ds = ray.data.read_numpy("/tmp/some_path")
|
||||
print(read_ds.schema())
|
||||
# -> __value__: extension<arrow.py_extension_type<ArrowTensorType>>
|
||||
# __write_2_end__
|
|
@ -13,10 +13,16 @@ Datasets
|
|||
A Dataset consists of a list of Ray object references to *blocks*.
|
||||
Each block holds a set of items in either `Arrow table format <https://arrow.apache.org/docs/python/data.html#tables>`__
|
||||
or a Python list (for non-tabular data).
|
||||
For ML use cases, Datasets also natively supports :ref:`Tensor data <datasets_tensor_support>`.
|
||||
For ML use cases, Datasets also natively supports mixing :ref:`Tensor <datasets_tensor_support>` and tabular data.
|
||||
Having multiple blocks in a dataset allows for parallel transformation and ingest.
|
||||
|
||||
The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:
|
||||
Informally, we refer to:
|
||||
|
||||
* A Dataset with Arrow blocks as a *Tabular Dataset*,
|
||||
* A Dataset with Python list blocks as a *Simple Dataset*, and
|
||||
* A Tabular Dataset with one or more tensor-type columns as a *Tensor Dataset*.
|
||||
|
||||
The following figure visualizes a tabular dataset with three blocks, each block holding 1000 rows each:
|
||||
|
||||
.. image:: images/dataset-arch.svg
|
||||
|
||||
|
|
|
@ -119,8 +119,8 @@ Some Dataset operations require a *shuffle* operation, meaning that data is shuf
|
|||
These operations include ``Dataset.random_shuffle``, ``Dataset.sort`` and ``Dataset.groupby``.
|
||||
Shuffle can be challenging to scale to large data sizes and clusters, especially when the total dataset size cannot fit into memory.
|
||||
|
||||
Starting in Ray v1.13, Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.
|
||||
We recommend trying this out if your dataset has more than 1k partitions (input files) or 1TB of data.
|
||||
Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.
|
||||
We recommend trying this out if your dataset has more than 1000 blocks or is larger than 1 TB in size.
|
||||
|
||||
To try this out locally or on a cluster, you can start with the `nightly release test <https://github.com/ray-project/ray/blob/master/release/nightly_tests/dataset/sort.py>`_ that Ray runs for ``Dataset.random_shuffle`` and ``Dataset.sort``.
|
||||
To get an idea of the performance you can expect, here are some run time results for ``Dataset.random_shuffle`` on 1-10TB of data on 20 machines (m5.4xlarge instances on AWS EC2, each with 16 vCPUs, 64GB RAM).
|
||||
|
@ -155,5 +155,3 @@ setting the ``DatasetContext.use_push_based_shuffle`` flag:
|
|||
ds = ray.data.range(n, parallelism=parallelism)
|
||||
print(ds.random_shuffle().take(10))
|
||||
# [954, 405, 434, 501, 956, 762, 488, 920, 657, 834]
|
||||
|
||||
Push-based shuffle is available as **alpha** in Ray 1.13+. Expect some rough edges, and please file any feature requests and bug reports on GitHub Issues.
|
||||
|
|
|
@ -203,7 +203,7 @@ You can also specify the size of each window using ``bytes_per_window``. In this
|
|||
# -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean
|
||||
# -> INFO -- Blocks per window: 10 min, 16 max, 14 mean
|
||||
# -> INFO -- ✔️ This pipeline's per-window parallelism is high enough to fully utilize the cluster.
|
||||
# -> INFO -- ✔️ This pipeline's windows can each fit in object store memory without spilling.
|
||||
# -> INFO -- ✔️ This pipeline's windows likely fit in object store memory without spilling.
|
||||
|
||||
Datasets will warn you if the windows are too large or each window has insufficient parallelism (too few blocks). Check out the reported statistics for window size and blocks per window to ensure efficient pipeline execution.
|
||||
|
||||
|
|
|
@ -176,7 +176,7 @@ Here is an overview of the available batch formats:
|
|||
|
||||
* **Tensor Datasets** (single-column): Each batch will be a single
|
||||
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`__
|
||||
containing the single-tensor-column for this batch.
|
||||
containing the single tensor column for this batch.
|
||||
|
||||
* **Simple Datasets**: Each batch will be a single NumPy ndarray, where Datasets will
|
||||
attempt to convert each list-batch to an ndarray.
|
||||
|
|
|
@ -35,7 +35,7 @@ Predictors expose a ``predict`` method that accepts an input batch of type ``Dat
|
|||
|
||||
**Life of a prediction:** Underneath the hood, when the ``Predictor.predict`` method is called the following occurs:
|
||||
|
||||
- The input batch is converted into a Pandas DataFrame. Tensor input (like a ``np.ndarray``) will be converted into a single column Pandas Dataframe.
|
||||
- The input batch is converted into a Pandas DataFrame. Tensor input (like a ``np.ndarray``) will be converted into a single-column Pandas Dataframe.
|
||||
- If there is a :ref:`Preprocessor <air-preprocessor-ref>` saved in the provided :ref:`Checkpoint <air-checkpoint-ref>`, the preprocessor will be used to transform the DataFrame.
|
||||
- The transformed DataFrame will be passed to the model for inference.
|
||||
- The predictions will be outputted by ``predict`` in the same type as the original input.
|
||||
|
|
|
@ -435,7 +435,7 @@ class TensorDtype(pd.api.extensions.ExtensionDtype):
|
|||
return is_bool_dtype(self._dtype)
|
||||
|
||||
|
||||
class TensorOpsMixin(pd.api.extensions.ExtensionScalarOpsMixin):
|
||||
class _TensorOpsMixin(pd.api.extensions.ExtensionScalarOpsMixin):
|
||||
"""
|
||||
Mixin for TensorArray operator support, applying operations on the
|
||||
underlying ndarrays.
|
||||
|
@ -489,7 +489,7 @@ class TensorOpsMixin(pd.api.extensions.ExtensionScalarOpsMixin):
|
|||
return cls._create_method(op)
|
||||
|
||||
|
||||
class TensorScalarCastMixin:
|
||||
class _TensorScalarCastMixin:
|
||||
"""
|
||||
Mixin for casting scalar tensors to a particular numeric type.
|
||||
"""
|
||||
|
@ -513,7 +513,8 @@ class TensorScalarCastMixin:
|
|||
return self._scalarfunc(oct)
|
||||
|
||||
|
||||
class TensorArrayElement(TensorOpsMixin, TensorScalarCastMixin):
|
||||
@PublicAPI(stability="beta")
|
||||
class TensorArrayElement(_TensorOpsMixin, _TensorScalarCastMixin):
|
||||
"""
|
||||
Single element of a TensorArray, wrapping an underlying ndarray.
|
||||
"""
|
||||
|
@ -578,8 +579,8 @@ class TensorArrayElement(TensorOpsMixin, TensorScalarCastMixin):
|
|||
@PublicAPI(stability="beta")
|
||||
class TensorArray(
|
||||
pd.api.extensions.ExtensionArray,
|
||||
TensorOpsMixin,
|
||||
TensorScalarCastMixin,
|
||||
_TensorOpsMixin,
|
||||
_TensorScalarCastMixin,
|
||||
):
|
||||
"""
|
||||
Pandas `ExtensionArray` representing a tensor column, i.e. a column
|
||||
|
|
|
@ -41,6 +41,9 @@ from ray.data.row import TableRow
|
|||
|
||||
try:
|
||||
import pyarrow
|
||||
|
||||
# This import is necessary to load the tensor extension type.
|
||||
from ray.data.extensions.tensor_extension import ArrowTensorType # noqa
|
||||
except ImportError:
|
||||
pyarrow = None
|
||||
|
||||
|
|
|
@ -3280,7 +3280,7 @@ class Dataset(Generic[T]):
|
|||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"{OK_PREFIX} This pipeline's windows can each fit in "
|
||||
f"{OK_PREFIX} This pipeline's windows likely fit in "
|
||||
"object store memory without spilling."
|
||||
)
|
||||
except Exception as e:
|
||||
|
|
Before Width: | Height: | Size: 2.1 KiB After Width: | Height: | Size: 2.1 KiB |
Before Width: | Height: | Size: 923 B After Width: | Height: | Size: 923 B |
Before Width: | Height: | Size: 2.3 KiB After Width: | Height: | Size: 2.3 KiB |
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -2825,7 +2825,7 @@ def test_torch_datasource_value_error(ray_start_regular_shared, local_path):
|
|||
def test_image_folder_datasource(
|
||||
ray_start_regular_shared, enable_automatic_tensor_extension_cast
|
||||
):
|
||||
root = os.path.join(os.path.dirname(__file__), "image-folder")
|
||||
root = "example://image-folder"
|
||||
ds = ray.data.read_datasource(ImageFolderDatasource(), root=root, size=(64, 64))
|
||||
|
||||
assert ds.count() == 3
|
||||
|
@ -2851,7 +2851,7 @@ def test_image_folder_datasource_e2e(ray_start_regular_shared):
|
|||
from torchvision import transforms
|
||||
from torchvision.models import resnet18
|
||||
|
||||
root = os.path.join(os.path.dirname(__file__), "image-folder")
|
||||
root = "example://image-folder"
|
||||
dataset = ray.data.read_datasource(
|
||||
ImageFolderDatasource(), root=root, size=(32, 32)
|
||||
)
|
||||
|
|
|
@ -52,7 +52,7 @@ def test_warnings(shutdown_only):
|
|||
assert dataset.logger.infos == [
|
||||
"Created DatasetPipeline with 10 windows: 8b min, 8b max, 8b mean",
|
||||
"Blocks per window: 1 min, 1 max, 1 mean",
|
||||
f"{OK_PREFIX} This pipeline's windows can each fit in object store memory "
|
||||
f"{OK_PREFIX} This pipeline's windows likely fit in object store memory "
|
||||
"without spilling.",
|
||||
]
|
||||
|
||||
|
@ -117,7 +117,7 @@ def test_warnings(shutdown_only):
|
|||
"Blocks per window: 10 min, 10 max, 10 mean",
|
||||
f"{OK_PREFIX} This pipeline's per-window parallelism is high enough to fully "
|
||||
"utilize the cluster.",
|
||||
f"{OK_PREFIX} This pipeline's windows can each fit in object store memory "
|
||||
f"{OK_PREFIX} This pipeline's windows likely fit in object store memory "
|
||||
"without spilling.",
|
||||
]
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue