By default, AIR loads all Dataset blocks into the object store at the start of training. This provides the best performance if the
cluster has enough aggregate memory to fit all the data blocks in object store memory. Note that data often requires more space
when loaded uncompressed in memory than when resident in storage.
If there is insufficient object store memory, blocks may be spilled to disk during reads or preprocessing. Ray will print log messages
if spilling is occuring, and you can check this as well with the ``ray memory --stats-only`` utility. If spilling is happening, take
care to ensure the cluster has enough disk space to handle the spilled blocks. Alternatively, consider using machine with more memory /
more machines to avoid spilling.
Streaming Ingest
~~~~~~~~~~~~~~~~
AIR also supports streaming ingest via the DatasetPipeline feature. Streaming ingest is preferable when you are using large datasets
that don't fit into memory, and prefer to read *windows* of data from storage to minimize the active memory required for data ingest.
To enable streaming ingest, set ``use_stream_api=True`` in the dataset config. By default, this will configure streaming ingest with a window
size of 1GiB, which means AIR will load ~1 GiB of data at a time from the datasource.
Performance can be increased with larger window sizes, which can be adjusted using the ``stream_window_size`` config.
A reasonable stream window size is something like 20% of available object store memory. Note that the data may be larger
once deserialized in memory, or if individual files are larger than the window size.
If the window size is set to -1, then an infinite window size (equivalent to bulk loading) will be used.
..warning::
In AIR alpha, streaming ingest only applies to preprocessor transform, not preprocessor fitting.
This means that the preprocessor will be initially fit in bulk, after which data will be transformed
as it is loaded in a streaming manner.
Reading Data
~~~~~~~~~~~~
The ``get_dataset_shard`` method returns a reader object that is either a ``Dataset`` or ``DatasetPipeline``, depending on whether the ``use_stream_api``
option is set. The former is a finite set of records, and the latter represents an infinite stream of records.
See the following examples for clarification:
..tabbed:: Bulk Ingest
This example shows bulk ingest (the default). Data is bulk loaded and made available
directly via a ``Dataset`` object that can be looped over manually.
..literalinclude:: doc_code/air_ingest.py
:language:python
:start-after:__config_4__
:end-before:__config_4_end__
..tabbed:: Streaming Ingest
This example shows enabling streaming ingest for the "train" dataset with a *N-byte* window.
This means that AIR will only load *N* bytes of data from the datasource at a time (the data
may be larger once deserialized in memory or if individual files are larger than the window).
Train always uses Tune as the execution backend under the hood, even when running just ``Trainer.fit()`` directly (this is treated as a single-trial experiment). This ensures consistency of execution.
When using ``Ray Tune`` with AIR training jobs, you can choose to either provide an experiment-wide Dataset that is shared among all trials,
or have per-trial datasets. Trials will always run its preprocessing separately, so Dataset sharing will only deduplicate
the initial data reads from the external storage. The below figure illustrates. In case (a), initial blocks are shared between trials. In case (b), each trial loads its own data blocks and trials are fully independent:
Let's look at code examples for both cases. Generally, you'd prefer to use "Experiment-Wide Datasets" if possible since this reduces memory usage and can speed up trial starts. However, in some use cases you may want to do a hyperparameter sweep over the datasets themselves, which would require configuring "Per-Trial Datasets".
Tune typically creates a placement group reserving resource for each of its trials. These placement groups only reserve resources for the Train actors, however. By default, Dataset preprocessing tasks run using "spare" CPU resources in the cluster, which enables better autoscaling and utilization of resources. It is also possible to configure the Dataset tasks to run within the Tune trial placement groups.
If trial placement groups reserve all the CPUs in the cluster, then it may be that no CPUs are left for Datasets to use, and trials can hang. This can easily happen when using CPU-only trainers. For example, if you can change the above ingest example to use ``ray.init(num_cpus=2)``, such a hang will happen.
Refer to the :ref:`Datasets in Tune Example <datasets_tune>` to understand each of these options and how to configure them. We recommend starting with the default of allowing tasks to run using spare cluster resources, and only changing this if you encounter resource contention or want more performance predictability.
So why was the data ingest only 116MiB/s above? That's sufficient for many models, but one would expect
faster if the trainer was doing nothing except read the data. Based on the stats above, there was no object
spilling, but there was a high batch delay.
We can guess that perhaps AIR was spending too much time loading blocks from other machines, since
we were using a multi-node cluster. We can test this by setting ``prefetch_blocks=10`` to prefetch
blocks more aggressively and rerunning training.
..code::
P50/P95/Max batch delay (s) 0.0006792084998323844 0.0009853049503362856 0.12657493300002898
Num epochs read 47
Num batches read 4700
Num bytes read 458984.95 MiB
Mean throughput 15136.18 MiB/s
That's much better! Now we can see that our DummyTrainer is ingesting data at a rate of 15000MiB/s,
and was able to read through many more epochs of training. This high throughput means
that all data was able to be fit into memory on a single node.
Going from DummyTrainer to your real Trainer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Once you're happy with the ingest performance of with DummyTrainer with synthetic data, the next steps are to switch to adapting it for your real workload scenario. This involves:
***Scaling the DummyTrainer**: Change the scaling config of the DummyTrainer and cluster configuration to reflect your target workload.
***Switching the Dataset**: Change the dataset from synthetic tensor data to reading your real dataset.
***Switching the Trainer**: Swap the DummyTrainer with your real trainer.
Switching these components one by one allows performance problems to be easily isolated and reproduced.
Performance Tips
----------------
**Memory availability**: To maximize ingest performance, consider using machines with sufficient memory to fit the dataset entirely in memory. This avoids the need for disk spilling, streamed ingest, or fetching data across the network. As a rule of thumb, a Ray cluster with fewer but bigger nodes will outperform a Ray cluster with more smaller nodes due to better memory locality.
**Autoscaling**: We generally recommend first trying out AIR training with a fixed size cluster. This makes it easier to understand and debug issues. Autoscaling can be enabled after you are happy with performance to autoscale experiment sweeps with Tune, etc. We also recommend starting with autoscaling with a single node type. Autoscaling with hetereogeneous clusters can optimize costs, but may complicate performance debugging.
**Partitioning**: By default, Datasets will create up to 200 blocks per Dataset, or less if there are fewer base files for the Dataset. If you run into out-of-memory errors during preprocessing, consider increasing the number of blocks to reduce their size. To increase the max number of partitions, set the ``parallelism`` option when calling ``ray.data.read_*()``. To change the number of partitions at runtime, use ``ds.repartition(N)``. As a rule of thumb, blocks should be not more than 1-2GiB each.