ray/doc/source/data/pipelining-compute.rst

204 lines
8 KiB
ReStructuredText

.. _pipelining_datasets:
==================
Pipelining Compute
==================
Creating a DatasetPipeline
==========================
A `DatasetPipeline <package-ref.html#datasetpipeline-api>`__ can be constructed in two ways: either by pipelining the execution of an existing Dataset (via ``Dataset.window``), or generating repeats of an existing Dataset (via ``Dataset.repeat``). Similar to Datasets, you can freely pass DatasetPipelines between Ray tasks, actors, and libraries. Get started with this synthetic data example:
.. code-block:: python
import ray
def func1(i: int) -> int:
return i + 1
def func2(i: int) -> int:
return i * 2
def func3(i: int) -> int:
return i % 3
# Create a dataset and then create a pipeline from it.
base = ray.data.range(1000000)
print(base)
# -> Dataset(num_blocks=200, num_rows=1000000, schema=<class 'int'>)
pipe = base.window(blocks_per_window=10)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=1)
# Applying transforms to pipelines adds more pipeline stages.
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=4)
# Output can be pulled from the pipeline concurrently with its execution.
num_rows = 0
for row in pipe.iter_rows():
num_rows += 1
# ->
# Stage 0: 55%|█████████████████████████ |11/20 [00:02<00:00, 9.86it/s]
# Stage 1: 50%|██████████████████████ |10/20 [00:02<00:01, 9.45it/s]
# Stage 2: 45%|███████████████████ | 9/20 [00:02<00:01, 8.27it/s]
# Stage 3: 35%|████████████████ | 8/20 [00:02<00:02, 5.33it/s]
print("Total num rows", num_rows)
# -> Total num rows 1000000
You can also create a DatasetPipeline from a custom iterator over dataset creators using ``DatasetPipeline.from_iterable``. For example, this is how you would implement ``Dataset.repeat`` and ``Dataset.window`` using ``from_iterable``:
.. code-block:: python
import ray
from ray.data.dataset_pipeline import DatasetPipeline
# Equivalent to ray.data.range(1000).repeat(times=4)
source = ray.data.range(1000)
pipe = DatasetPipeline.from_iterable(
[lambda: source, lambda: source, lambda: source, lambda: source])
# Equivalent to ray.data.range(1000).window(blocks_per_window=10)
splits = ray.data.range(1000, parallelism=200).split(20)
pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits])
Per-Window Transformations
==========================
While most Dataset operations are per-row (e.g., map, filter), some operations apply to the Dataset as a whole (e.g., sort, shuffle). When applied to a pipeline, holistic transforms like shuffle are applied separately to each window in the pipeline:
.. important::
Windowed shuffle or global shuffle are expensive operations. Use only if you really need them.
Alternatively, you may consider local shuffle after converting to_tf() or to_torch(), if simple shuffle is sufficient.
.. code-block:: python
# Example of randomly shuffling each window of a pipeline.
ray.data.from_items([0, 1, 2, 3, 4]) \
.repeat(2) \
.random_shuffle_each_window() \
.show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 4
# 3
# 1
# 0
# 2
# ----- Epoch 1 ------
# === Window 1 ===
# 2
# 1
# 4
# 0
# 3
You can also apply arbitrary transformations to each window using ``DatasetPipeline.foreach_window()``:
.. code-block:: python
# Equivalent transformation using .foreach_window()
ray.data.from_items([0, 1, 2, 3, 4]) \
.repeat(2) \
.foreach_window(lambda w: w.random_shuffle()) \
.show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 1
# 0
# 4
# 2
# 3
# ----- Epoch 1 ------
# === Window 1 ===
# 4
# 2
# 0
# 3
# 1
Example: Pipelined Batch Inference
==================================
In this example, we pipeline the execution of a three-stage Dataset application to minimize GPU idle time. Let's take a look at a simple batch inference example:
.. code-block:: python
def preprocess(image: bytes) -> bytes:
return image
class BatchInferModel:
def __init__(self):
self.model = ImageNetModel()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.model(batch)
# Load data from storage.
ds: Dataset = ray.data.read_binary_files("s3://bucket/image-dir")
# Preprocess the data.
ds = ds.map(preprocess)
# Apply GPU batch inference to the data.
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# Save the output.
ds.write_json("/tmp/results")
Ignoring the output, the above script has three separate stages: loading, preprocessing, and inference. Assuming we have a fixed-sized cluster, and that each stage takes 100 seconds each, the cluster GPUs will be idle for the first 200 seconds of execution:
..
https://docs.google.com/drawings/d/1UMRcpbxIsBRwD8G7hR3IW6DPa9rRSkd05isg9pAEx0I/edit
.. image:: images/dataset-pipeline-1.svg
Enabling Pipelining
===================
We can optimize this by *pipelining* the execution of the dataset with the ``.window()`` call, which returns a DatasetPipeline instead of a Dataset object. The pipeline supports similar transformations to the original Dataset:
.. code-block:: python
# Convert the Dataset into a DatasetPipeline.
pipe: DatasetPipeline = ray.data \
.read_binary_files("s3://bucket/image-dir") \
.window(blocks_per_window=2)
# The remainder of the steps do not change.
pipe = pipe.map(preprocess)
pipe = pipe.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
pipe.write_json("/tmp/results")
Here we specified ``blocks_per_window=2``, which means that the Dataset is split into smaller sub-Datasets of two blocks each. Each transformation or *stage* of the pipeline is operating over these two-block Datasets in parallel. This means batch inference processing can start as soon as two blocks are read and preprocessed, greatly reducing the GPU idle time:
.. image:: images/dataset-pipeline-2.svg
Pipelined Writes
~~~~~~~~~~~~~~~~
When calling ``write_<datasource>()`` on a pipeline, data is written separately for each window. This means that in the above example, JSON files will start being written as soon as the first window is finished, in a incremental / pipelined way.
Tuning Parallelism
==================
Tune the throughput vs latency of your pipeline with the ``blocks_per_window`` setting. As a rule of thumb, higher parallelism settings perform better, however ``blocks_per_window == num_blocks`` effectively disables pipelining, since the DatasetPipeline will only contain a single Dataset. The other extreme is setting ``blocks_per_window=1``, which minimizes the latency to initial output but only allows one concurrent transformation task per stage:
.. image:: images/dataset-pipeline-3.svg
You can also specify the size of each window using ``bytes_per_window``. In this mode, Datasets will determine the size of each window based on the target byte size, giving each window at least 1 block but not otherwise exceeding the target bytes per window. This mode can be useful to limit the memory usage of a pipeline. As a rule of thumb, the cluster memory should be at least 2-5x the window size to avoid spilling.
.. code-block:: python
# Create a DatasetPipeline with up to 10GB of data per window.
pipe: DatasetPipeline = ray.data \
.read_binary_files("s3://bucket/image-dir") \
.window(bytes_per_window=10e9)
# -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean