.. _exchanging_datasets: =================== Exchanging Datasets =================== Datasets can be passed to Ray tasks or actors and read with ``.iter_batches()`` or ``.iter_rows()``. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects: .. code-block:: python @ray.remote def consume(data: Dataset[int]) -> int: num_batches = 0 for batch in data.iter_batches(): num_batches += 1 return num_batches ds = ray.data.range(10000) ray.get(consume.remote(ds)) # -> 200 Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the ``split()`` function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors: .. code-block:: python @ray.remote(num_gpus=1) class Worker: def __init__(self, rank: int): pass def train(self, shard: ray.data.Dataset[int]) -> int: for batch in shard.iter_batches(batch_size=256): pass return shard.count() workers = [Worker.remote(i) for i in range(16)] # -> [Actor(Worker, ...), Actor(Worker, ...), ...] ds = ray.data.range(10000) # -> Dataset(num_blocks=200, num_rows=10000, schema=) shards = ds.split(n=16, locality_hints=workers) # -> [Dataset(num_blocks=13, num_rows=650, schema=), # Dataset(num_blocks=13, num_rows=650, schema=), ...] ray.get([w.train.remote(s) for w, s in zip(workers, shards)]) # -> [650, 650, ...]