Note that while epochs commonly consist of a single window, they can also contain multiple windows if ``.window()`` is used or there are multiple ``.repeat()`` calls.
In this example, we pipeline the execution of a three-stage Dataset application to minimize GPU idle time. Let's revisit the batch inference example from the previous page:
Ignoring the output, the above script has three separate stages: loading, preprocessing, and inference. Assuming we have a fixed-sized cluster, and that each stage takes 100 seconds each, the cluster GPUs will be idle for the first 200 seconds of execution:
We can optimize this by *pipelining* the execution of the dataset with the ``.window()`` call, which returns a DatasetPipeline instead of a Dataset object. The pipeline supports similar transformations to the original Dataset:
Here we specified ``blocks_per_window=2``, which means that the Dataset is split into smaller sub-Datasets of two blocks each. Each transformation or *stage* of the pipeline is operating over these two-block Datasets in parallel. This means batch inference processing can start as soon as two blocks are read and preprocessed, greatly reducing the GPU idle time:
Tune the throughput vs latency of your pipeline with the ``blocks_per_window`` setting. As a rule of thumb, higher parallelism settings perform better, however ``blocks_per_window == num_blocks`` effectively disables pipelining, since the DatasetPipeline will only contain a single Dataset. The other extreme is setting ``blocks_per_window=1``, which minimizes the latency to initial output but only allows one concurrent transformation task per stage:
You can also specify the size of each window using ``bytes_per_window``. In this mode, Datasets will determine the size of each window based on the target byte size, giving each window at least 1 block but not otherwise exceeding the target bytes per window. This mode can be useful to limit the memory usage of a pipeline. As a rule of thumb, the cluster memory should be at least 2-5x the window size to avoid spilling.
..code-block:: python
# Create a DatasetPipeline with up to 10GB of data per window.
pipe: DatasetPipeline = \
.read_binary_files("s3://bucket/image-dir") \
# -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean
Transformations prior to the call to ``.repeat()`` will be cached. However, note that the initial read will not be cached unless there is a subsequent transformation or ``.fully_executed()`` call. Transformations made to the DatasetPipeline after the repeat will always be executed once for each repetition of the Dataset.
For example, in the following pipeline, the ``map(func)`` transformation only occurs once. However, the random shuffle is applied to each repetition in the pipeline. However, if we omitted the map transformation, then the pipeline would re-read from the base data on each repetition.
Result caching only applies if there are *transformation* stages prior to the pipelining operation. If you ``repeat()`` or ``window()`` a Dataset right after the read call (e.g., ````), then the read will still be re-executed on each repetition. This optimization saves memory, at the cost of repeated reads from the datasource. To force result caching in all cases, use ``.fully_executed().repeat()``.
Similar to how you can ``.split()`` a Dataset, you can also split a DatasetPipeline with the same method call. This returns a number of DatasetPipeline shards that share a common parent pipeline. Each shard can be passed to a remote task or actor.
..code-block:: python
# Create a pipeline that loops over its source dataset indefinitely.
1. It allows you to focus on the training logic and automatically handles distributed setup for your framework of choice (PyTorch, Tensorflow, or Horovod).
2. It has out of the box fault-tolerance and elastic training
3. And it comes with support for standard ML tools and features that practitioners love such as checkpointing and logging.
..code-block:: python
def train_func():
# This is a dummy train function just iterating over the dataset shard.
# You should replace this with your training logic.
Sometimes, you may want to change the structure of an existing pipeline. For example, after generating a pipeline with ``ds.window(k)``, you may want to repeat that windowed pipeline ``n`` times. This can be done with ``ds.window(k).repeat(n)``. As another example, suppose you have a repeating pipeline generated with ``ds.repeat(n)``. The windowing of that pipeline can be changed with ``ds.repeat(n).rewindow(k)``. Note the subtle difference in the two examples: the former is repeating a windowed pipeline that has a base window size of ``k``, while the latter is re-windowing a pipeline of initial window size of ``ds.num_blocks()``. The latter may produce windows that span multiple copies of the same original data if ``preserve_epoch=False`` is set: