ray/doc/source/data/exchanging-datasets.rst
2022-03-18 11:25:43 -07:00

50 lines
1.6 KiB
ReStructuredText

.. _exchanging_datasets:
===================
Exchanging Datasets
===================
Datasets can be passed to Ray tasks or actors and read with ``.iter_batches()`` or ``.iter_rows()``.
This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
.. code-block:: python
@ray.remote
def consume(data: Dataset[int]) -> int:
num_batches = 0
for batch in data.iter_batches():
num_batches += 1
return num_batches
ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200
Datasets can be split up into disjoint sub-datasets.
Locality-aware splitting is supported if you pass in a list of actor handles to the ``split()`` function along with the number of desired splits.
This is a common pattern useful for loading and splitting data between distributed training actors:
.. code-block:: python
@ray.remote(num_gpus=1)
class Worker:
def __init__(self, rank: int):
pass
def train(self, shard: ray.data.Dataset[int]) -> int:
for batch in shard.iter_batches(batch_size=256):
pass
return shard.count()
workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
# Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [650, 650, ...]