.. _transforming_datasets: ===================== Transforming Datasets ===================== Datasets can be transformed in parallel using ``.map()``. Transformations are executed *eagerly* and block until the operation is finished. Datasets also supports ``.filter()`` and ``.flat_map()``. .. code-block:: python ds = ray.data.range(10000) ds = ds.map(lambda x: x * 2) # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s] # -> Dataset(num_blocks=200, num_rows=10000, schema=) ds.take(5) # -> [0, 2, 4, 6, 8] ds.filter(lambda x: x > 5).take(5) # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s] # -> [6, 8, 10, 12, 14] ds.flat_map(lambda x: [x, -x]).take(5) # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s] # -> [0, 0, 2, -2, 4] To take advantage of vectorized functions, use ``.map_batches()``. Note that you can also implement ``filter`` and ``flat_map`` using ``.map_batches()``, since your map function can return an output batch of any size. .. code-block:: python ds = ray.data.range_arrow(10000) ds = ds.map_batches( lambda df: df.applymap(lambda x: x * 2), batch_format="pandas") # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s] ds.take(5) # -> [{'value': 0}, {'value': 2}, ...] By default, transformations are executed using Ray tasks. For transformations that require setup, specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your transforms. For a fixed-size actor pool, specify ``ActorPoolStrategy(n, n)``. The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data: .. code-block:: python from ray.data import ActorPoolStrategy # Example of GPU batch inference on an ImageNet model. def preprocess(image: bytes) -> bytes: return image class BatchInferModel: def __init__(self): self.model = ImageNetModel() def __call__(self, batch: pd.DataFrame) -> pd.DataFrame: return self.model(batch) ds = ray.data.read_binary_files("s3://bucket/image-dir") # Preprocess the data. ds = ds.map(preprocess) # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s] # Apply GPU batch inference with actors, and assign each actor a GPU using # ``num_gpus=1`` (any Ray remote decorator argument can be used here). ds = ds.map_batches( BatchInferModel, compute=ActorPoolStrategy(10, 20), batch_size=256, num_gpus=1) # -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s] # Save the results. ds.repartition(1).write_json("s3://bucket/inference-results")