[docs] Update the AIR data ingest guide (#26909)

This commit is contained in:
Eric Liang 2022-07-24 09:59:29 -07:00 committed by GitHub
parent e19cf164fd
commit 008eecfbff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 164 deletions

View file

@ -3,19 +3,12 @@
Configuring Training Datasets
=============================
This page is a guide for setting up, understanding, and diagnosing data ingest problems in Ray AIR.
AIR builds its training data pipeline on :ref:`Ray Datasets <datasets>`, which is a scalable, framework-agnostic data loading and preprocessing library. Datasets enables AIR to seamlessly load data for local and distributed training with Train.
Data ingest is the process of loading data from storage, apply preprocessing steps, and feeding the data into Trainers in AIR.
For datasets that are small, ingest is usually not an issue. However, ingest can be tricky to set up when datasets grow
large enough so that they may not fit fully in memory on a single node, or even in aggregate cluster memory.
This page describes how to setup and configure these datasets in Train under different scenarios and scales.
AIR builds its ingest pipeline on :ref:`Ray Datasets <datasets>`, which is a framework-agnostic distributed data loading library. If you have
an existing ingest pipeline (e.g., based on TF data), there is some upfront effort porting your loading code to Datasets.
In return, AIR provides portability across ML frameworks as well as advanced capabilities such as global random shuffles,
which are not possible in less general ML data preprocessing libraries.
Ingest Basics
-------------
Overview
--------
.. _ingest_basics:
@ -29,175 +22,84 @@ user-defined function to preprocess batches of data, and (3) runs an AIR Trainer
Let's walk through the stages of what happens when ``Trainer.fit()`` is called.
**Read**: First, AIR will read the Dataset into the Ray object store by calling ``ds.fully_executed()`` on the datasets
that you pass to the Trainer. Dataset blocks that don't fit into memory will be spilled to disk. Note that when you create
the dataset initially, typically only the first block and block metadata is read into memory. The rest of the blocks are
not loaded until ``fit`` is called.
**Preprocessing**: Next, if a preprocessor is defined, AIR will by default ``fit`` the preprocessor (e.g., compute statistics) on the
**Preprocessing**: First, AIR will ``fit`` the preprocessor (e.g., compute statistics) on the
``"train"`` dataset, and then ``transform`` all given datasets with the fitted preprocessor. This is done by calling ``prep.fit_transform()``
on the train dataset passed to the Trainer, followed by ``prep.transform()`` on remaining datasets. Preprocessors use Dataset APIs to execute
preprocessing in a parallelized way across the cluster. Both read and preprocessing stages use Ray tasks under the hood.
on the train dataset passed to the Trainer, followed by ``prep.transform()`` on remaining datasets.
**Training**: Finally, AIR passes a reference to the preprocessed dataset to Train workers (Ray actors) launched by the Trainer. Each worker then
typically calls ``iter_batches``, ``to_tf``, or ``to_torch`` to iterate over the dataset reader retrieved by ``get_dataset_shard``.
These read methods load blocks of the dataset into the local worker's memory in a streaming fashion, only fetching / prefetching a
limited number of blocks at once. Workers loop over the dataset blocks repeatedly until training completes.
**Training**: Then, AIR passes the preprocessed dataset to Train workers (Ray actors) launched by the Trainer. Each worker calls ``get_dataset_shard`` to get a handle to its assigned data shard, and then calls one of ``iter_batches``, ``iter_torch_batches``, or ``iter_tf_batches`` to loop over the data.
Configuring Ingest Per-Dataset
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Getting Started
---------------
It is common to customize processing per-dataset. For example, you may want to enable sharding
on a validation dataset, disable preprocessing of an auxiliary dataset, or adjust ingest strategy per dataset.
Each DataParallelTrainer has a default per-dataset config given by a ``Trainer._dataset_config`` class field. It is a mapping
from dataset names to ``DatasetConfig`` objects, and implements the default behavior described in :ref:`Ingest Basics <ingest_basics>`:
.. code:: python
# The default DataParallelTrainer dataset config, which is inherited
# by sub-classes such as TorchTrainer, HorovodTrainer, etc.
_dataset_config = {
# Fit preprocessors on the train dataset only. Split the dataset
# across workers if scaling_config["num_workers"] > 1.
"train": DatasetConfig(fit=True, split=True),
# For all other datasets, use the defaults (don't fit, don't split).
# The datasets will be transformed by the fitted preprocessor.
"*": DatasetConfig(),
}
These configs can be overriden via the ``dataset_config`` kwarg, which is recursively merged with the Trainer defaults.
Here are some examples of configuring Dataset ingest options and what they do:
.. tabbed:: Split All
This example shows overriding the split config for the "valid" and "test" datasets. This means that
both the valid and test datasets here will be ``.split()`` across the training workers.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_1__
:end-before: __config_1_end__
.. tabbed:: Disable Transform
This example shows overriding the transform config for the "side" dataset. This means that
the original dataset will be returned by ``.get_dataset_shard("side")``.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_2__
:end-before: __config_2_end__
Bulk vs Streaming Reads
-----------------------
Bulk Ingest
~~~~~~~~~~~
By default, AIR loads all Dataset blocks into the object store at the start of training. This provides the best performance if the
cluster has enough aggregate memory to fit all the data blocks in object store memory, or if your preprocessing step is expensive
and you don't want it to be re-run on each epoch. Note that data often requires more space
when loaded uncompressed in memory than when resident in storage.
If there is insufficient object store memory, blocks may be spilled to disk during reads or preprocessing. Ray will print log messages
if spilling is occuring, and you can check this as well with the ``ray memory --stats-only`` utility. If spilling is happening, take
care to ensure the cluster has enough disk space to handle the spilled blocks. Alternatively, consider using machine with more memory /
more machines to avoid spilling.
In short, use bulk ingest when:
* you have enough memory to fit data blocks in cluster object store;
* your preprocessing step is expensive per each epoch; and
* you want best performance when both or either the above conditions are met.
Streaming Ingest (experimental)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
AIR also supports streaming ingest via the DatasetPipeline feature. Streaming ingest is preferable when you are using large datasets
that don't fit into memory, and prefer to read *windows* of data from storage to minimize the active memory required for data ingest.
Note that streaming ingest will re-execute preprocessing on each pass over the data. If preprocessing is a bottleneck, consider
using bulk ingest instead for better performance.
To enable streaming ingest, set ``use_stream_api=True`` in the dataset config. By default, this will configure streaming ingest with a window
size of 1GiB, which means AIR will load ~1 GiB of data at a time from the datasource.
Performance can be increased with larger window sizes, which can be adjusted using the ``stream_window_size`` config.
A reasonable stream window size is something like 20% of available object store memory. Note that the data may be larger
once deserialized in memory, or if individual files are larger than the window size.
If the window size is set to -1, then an infinite window size will be used. This case is equivalent to using bulk loading
(including the performance advantages of caching preprocessed blocks), but still exposing a DatasetPipeline reader.
In short, use streaming ingest when:
* you have large datasets that don't fit into memory;
* you want to process small chunks or blocks per window;
* you can use small windows with small data blocks minimizing or avoiding memory starvation or OOM errors; and
* your preprocessing step is not a bottleneck or not an expensive operation since it's re-executed on each pass over the data.
.. warning::
Streaming ingest only applies to preprocessor transform, not preprocessor fitting.
This means that the preprocessor will be initially fit in bulk, after which data will be transformed
as it is loaded in a streaming manner.
Reading Data
~~~~~~~~~~~~
The ``get_dataset_shard`` method returns a reader object that is either a ``Dataset`` or ``DatasetPipeline``, depending on whether the ``use_stream_api``
option is set. The former is a finite set of records, and the latter represents an infinite stream of records.
See the following examples for clarification:
The following is a simple example of how to configure ingest for a dummy ``TorchTrainer``. Below, we are passing a small tensor dataset to the Trainer via the ``datasets`` argument. In the Trainer's ``train_loop_per_worker``, we access the preprocessed dataset using ``get_dataset_shard()``.
.. tabbed:: Bulk Ingest
This example shows bulk ingest (the default). Data is bulk loaded and made available
directly via a ``Dataset`` object that can be looped over manually.
By default, AIR loads all datasets into the Ray object store at the start of training.
This provides the best performance if the cluster can fit the datasets
entirely in memory, or if the preprocessing step is expensive to run more than once.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_4__
:end-before: __config_4_end__
.. tabbed:: Streaming Ingest
You should use bulk ingest when:
This example shows enabling streaming ingest for the "train" dataset with a *N-byte* window.
This means that AIR will only load *N* bytes of data from the datasource at a time (the data
may be larger once deserialized in memory or if individual files are larger than the window).
* you have enough memory to fit data blocks in cluster object store;
* your preprocessing step is expensive per each epoch; and
* you want best performance when both or either the above conditions are met.
.. tabbed:: Streaming Ingest (experimental)
In streaming ingest mode, ``get_dataset_shard`` returns a ``DatasetPipeline`` pipeline that
can be used to read data in a streaming way.
To enable streaming ingest, set ``use_stream_api=True`` in the dataset config.
By default, this will tell AIR to load *windows* of 1GiB of data into memory at a time.
Performance can be increased with larger window sizes, which can be adjusted using the
``stream_window_size`` config.
A reasonable stream window size is something like 20% of available object store memory.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_5__
:end-before: __config_5_end__
Use streaming ingest when:
* you have large datasets that don't fit into memory;
* you want to process small chunks or blocks per window;
* you can use small windows with small data blocks minimizing or avoiding memory starvation or OOM errors; and
* your preprocessing step is not a bottleneck or not an expensive operation since it's re-executed on each pass over the data.
Shuffling Data
~~~~~~~~~~~~~~
AIR offers several options for per-epoch shuffling, including *local
(per-shard) shuffling* and *global (whole-dataset) shuffling*.
Shuffling or data randomization is important for training high-quality models. By default, AIR will randomize the order the data files (blocks) are read from. AIR also offers options for further randomizing data records within each file:
.. tabbed:: Local Shuffling
Local shuffling is the recommended approach for randomizing data order. To use local shuffle,
simply specify a non-zero ``local_shuffle_buffer_size`` as an argument to ``iter_batches()``.
The iterator will then use a local buffer of the given size to randomize record order. The
larger the buffer size, the more randomization will be applied, but it will also use more
memory.
Local shuffling is an in-iterator shuffle that fills a trainer-local in-memory shuffle
buffer with records and then pops random samples as batches, keeping the buffer above a
user-provided threshold to ensure samples are mixed throughout the entirety of the
trainer's shard. This local shuffle doesn't mix samples across trainer shards between
epochs as the global shuffle does, and will therefore be a lower-quality shuffle;
however, since this shuffle only involves a local in-memory buffer, it is much less
expensive.
For configuring the size of the in-memory shuffle buffer, it is recommended to
allocate as large of a buffer as the trainer's CPU memory constraints will allow;
note that the ceiling of the CPU memory usage on a given node will be
``(# of trainers on node) * max(memory used by prefetching, memory used by shuffle
buffer)``, where the aggressiveness of the prefetching is controlled by the
``prefetch_blocks`` argument. See
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>` for details.
See :meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>` for more details.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __local_shuffling_start__
:end-before: __local_shuffling_end__
.. tabbed:: Global Shuffling
You should use local shuffling when:
* a small in-memory buffer provides enough randomization; or
* you want the highest possible ingest performance; or
* your model is not overly sensitive to shuffle quality
.. tabbed:: Global Shuffling (slower)
Global shuffling provides more uniformly random (decorrelated) samples and is carried
out via a distributed map-reduce operation. This higher quality shuffle can often lead
@ -214,28 +116,95 @@ AIR offers several options for per-epoch shuffling, including *local
:start-after: __global_shuffling_start__
:end-before: __global_shuffling_end__
Ingest and Ray Tune
-------------------
You should use global shuffling when:
.. note::
* you suspect high-quality shuffles may significantly improve model quality; and
* absolute ingest performance is less of a concern
Train always uses Tune as the execution backend under the hood, even when running just ``Trainer.fit()`` directly (this is treated as a single-trial experiment). This ensures consistency of execution.
Configuring Ingest
~~~~~~~~~~~~~~~~~~
Placement Group Behavior
~~~~~~~~~~~~~~~~~~~~~~~~
You can use the ``DatasetConfig`` object to configure how Datasets are preprocessed and split across training workers. Each ``DataParallelTrainer`` has a default ``_dataset_config`` class field. It is a mapping
from dataset names to ``DatasetConfig`` objects, and implements the default behavior described in the :ref:`overview <ingest_basics>`:
Tune typically creates a placement group reserving resource for each of its trials. These placement groups only reserve resources for the Train actors, however. By default, Dataset preprocessing tasks run using "spare" CPU resources in the cluster, which enables better autoscaling and utilization of resources. It is also possible to set aside node CPUs for Dataset tasks using the ``_max_cpu_fraction_per_node`` option of DatasetConfig (Experimental).
.. code:: python
.. warning::
# The default DataParallelTrainer dataset config, which is inherited
# by sub-classes such as TorchTrainer, HorovodTrainer, etc.
_dataset_config = {
# Fit preprocessors on the train dataset only. Split the dataset
# across workers if scaling_config["num_workers"] > 1.
"train": DatasetConfig(fit=True, split=True),
# For all other datasets, use the defaults (don't fit, don't split).
# The datasets will be transformed by the fitted preprocessor.
"*": DatasetConfig(),
}
If trial placement groups reserve all the CPUs in the cluster, then it may be that no CPUs are left for Datasets to use, and trials can hang. This can easily happen when using CPU-only trainers. For example, if you can change the above ingest example to use ``ray.init(num_cpus=2)``, such a hang will happen.
These configs can be overriden via the ``dataset_config`` constructor argument.
Here are some examples of configuring Dataset ingest options and what they do:
Refer to the :ref:`Datasets in Tune Example <datasets_tune>` to understand how to configure your Trainer. We recommend starting with the default of allowing tasks to run using spare cluster resources, and only changing this if you encounter hangs or want more performance predictability.
.. tabbed:: Example: Split All Datasets
Dataset Sharing
~~~~~~~~~~~~~~~
This example shows overriding the split config for the "valid" and "test" datasets. This means that
both the valid and test datasets here will be ``.split()`` across the training workers.
When you pass Datasets to a Tuner, it is important to understand that the Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call `ds = ds.fully_executed()` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_1__
:end-before: __config_1_end__
.. tabbed:: Example: Disable Transform on Aux Dataset
This example shows overriding the transform config for the "side" dataset. This means that
the original dataset will be returned by ``.get_dataset_shard("side")``.
.. literalinclude:: doc_code/air_ingest.py
:language: python
:start-after: __config_2__
:end-before: __config_2_end__
Dataset Resources
~~~~~~~~~~~~~~~~~
Datasets uses Ray tasks to execute data processing operations. These tasks use CPU resources in the cluster during execution, which may compete with resources needed for Training.
.. tabbed:: Unreserved CPUs
By default, Dataset tasks use cluster CPU resources for execution. This can sometimes
conflict with Trainer resource requests. For example, if Trainers allocate all CPU resources
in the cluster, then no Datasets tasks can run.
.. literalinclude:: ./doc_code/air_ingest.py
:language: python
:start-after: __resource_allocation_1_begin__
:end-before: __resource_allocation_1_end__
Unreserved CPUs work well when:
* you are running only one Trainer and the cluster has enough CPUs; or
* your Trainers are configured to use GPUs and not CPUs
.. tabbed:: Using Reserved CPUs (experimental)
The ``_max_cpu_fraction_per_node`` option can be used to exclude CPUs from placement
group scheduling. In the below example, setting this parameter to ``0.8`` enables Tune
trials to run smoothly without risk of deadlock by reserving 20% of node CPUs for
Dataset execution.
.. literalinclude:: ./doc_code/air_ingest.py
:language: python
:start-after: __resource_allocation_2_begin__
:end-before: __resource_allocation_2_end__
You should use reserved CPUs when:
* you are running multiple concurrent CPU Trainers using Tune; or
* you want to ensure predictable Datasets performance
.. warning::
``_max_cpu_fraction_per_node`` is experimental and not currently recommended for use with
autoscaling clusters (scale-up will not trigger properly).
Debugging Ingest with the ``DummyTrainer``
------------------------------------------
@ -370,3 +339,9 @@ Performance Tips
**Autoscaling**: We generally recommend first trying out AIR training with a fixed size cluster. This makes it easier to understand and debug issues. Autoscaling can be enabled after you are happy with performance to autoscale experiment sweeps with Tune, etc. We also recommend starting with autoscaling with a single node type. Autoscaling with hetereogeneous clusters can optimize costs, but may complicate performance debugging.
**Partitioning**: By default, Datasets will create up to 200 blocks per Dataset, or less if there are fewer base files for the Dataset. If you run into out-of-memory errors during preprocessing, consider increasing the number of blocks to reduce their size. To increase the max number of partitions, set the ``parallelism`` option when calling ``ray.data.read_*()``. To change the number of partitions at runtime, use ``ds.repartition(N)``. As a rule of thumb, blocks should be not more than 1-2GiB each.
Dataset Sharing
~~~~~~~~~~~~~~~
When you pass Datasets to a Tuner, Datasets are executed independently per-trial. This could potentially duplicate data reads in the cluster. To share Dataset blocks between trials, call ``ds = ds.fully_executed()`` prior to passing the Dataset to the Tuner. This ensures that the initial read operation will not be repeated per trial.

View file

@ -92,9 +92,12 @@ from ray.data import Dataset
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
# A simple preprocessor that just scales all values by 2.0.
preprocessor = BatchMapper(lambda df: df * 2)
def train_loop_per_worker():
# By default, bulk loading is used and returns a Dataset object.
# Get a handle to the worker's assigned Dataset shard.
data_shard: Dataset = session.get_dataset_shard("train")
# Manually iterate over the data 10 times (10 epochs).
@ -102,7 +105,7 @@ def train_loop_per_worker():
for batch in data_shard.iter_batches():
print("Do some training on batch", batch)
# View the stats for performance debugging.
# Print the stats for performance debugging.
print(data_shard.stats())
@ -112,6 +115,7 @@ my_trainer = TorchTrainer(
datasets={
"train": ray.data.range_tensor(1000),
},
preprocessor=preprocessor,
)
my_trainer.fit()
# __config_4_end__
@ -123,6 +127,9 @@ from ray.data import DatasetPipeline
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig, DatasetConfig
# A simple preprocessor that just scales all values by 2.0.
preprocessor = BatchMapper(lambda df: df * 2)
def train_loop_per_worker():
# A DatasetPipeline object is returned when `use_stream_api` is set.
@ -149,6 +156,7 @@ my_trainer = TorchTrainer(
dataset_config={
"train": DatasetConfig(use_stream_api=True, stream_window_size=N),
},
preprocessor=preprocessor,
)
my_trainer.fit()
# __config_5_end__
@ -223,3 +231,77 @@ print(my_trainer.get_dataset_config())
# -> {'train': DatasetConfig(fit=True, split=True, global_shuffle=False, ...)}
my_trainer.fit()
# __local_shuffling_end__
ray.shutdown()
# __resource_allocation_1_begin__
import ray
from ray.air import session
from ray.data.preprocessors import BatchMapper
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# A simple example training loop.
def train_loop_per_worker():
data_shard = session.get_dataset_shard("train")
for _ in range(10):
for batch in data_shard.iter_batches():
print("Do some training on batch", batch)
# A simple preprocessor that just scales all values by 2.0.
preprocessor = BatchMapper(lambda df: df * 2)
my_trainer = TorchTrainer(
train_loop_per_worker,
# This will hang if you set num_workers=4, since the
# Trainer will reserve all 4 CPUs for workers, leaving
# none left for Datasets execution.
scaling_config=ScalingConfig(num_workers=2),
datasets={
"train": ray.data.range_tensor(1000),
},
preprocessor=preprocessor,
)
my_trainer.fit()
# __resource_allocation_1_end__
ray.shutdown()
# __resource_allocation_2_begin__
import ray
from ray.air import session
from ray.data.preprocessors import BatchMapper
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# A simple example training loop.
def train_loop_per_worker():
data_shard = session.get_dataset_shard("train")
for _ in range(10):
for batch in data_shard.iter_batches():
print("Do some training on batch", batch)
# A simple preprocessor that just scales all values by 2.0.
preprocessor = BatchMapper(lambda df: df * 2)
my_trainer = TorchTrainer(
train_loop_per_worker,
# This will hang if you set num_workers=4, since the
# Trainer will reserve all 4 CPUs for workers, leaving
# none left for Datasets execution.
scaling_config=ScalingConfig(num_workers=2),
datasets={
"train": ray.data.range_tensor(1000),
},
preprocessor=preprocessor,
)
my_trainer.fit()
# __resource_allocation_2_end__

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 229 KiB

After

Width:  |  Height:  |  Size: 215 KiB