First cut at dataset documentation (#16956)

This commit is contained in:
Eric Liang 2021-07-14 23:27:13 -07:00 committed by GitHub
parent 42e6c9b020
commit 38bddc3f2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 536 additions and 340 deletions

1
.gitignore vendored
View file

@ -6,6 +6,7 @@
/python/ray/thirdparty_files/
/python/ray/pyarrow_files/
/python/ray/jars/
/python/ray/cpp/
/python/build
/python/dist
/python/python-driver-*

View file

@ -34,7 +34,7 @@ any Dask `.compute() <https://docs.dask.org/en/latest/api.html#dask.compute>`__
call.
Here's an example:
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_scheduler_example.py
.. literalinclude:: ../../../python/ray/util/dask/examples/dask_ray_scheduler_example.py
:language: python
.. note::
@ -99,7 +99,7 @@ aggregations): those downstream computations will be faster since that base coll
computation was kicked off early and referenced by all downstream computations, often
via shared memory.
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_persist_example.py
.. literalinclude:: ../../../python/ray/util/dask/examples/dask_ray_persist_example.py
:language: python
@ -112,7 +112,7 @@ Dask on Ray provides a Dask DataFrame optimizer that leverages Ray's ability to
execute multiple-return tasks in order to speed up shuffling by as much as 4x on Ray.
Simply set the `dataframe_optimize` configuration option to our optimizer function, similar to how you specify the Dask-on-Ray scheduler:
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_shuffle_optimization.py
.. literalinclude:: ../../../python/ray/util/dask/examples/dask_ray_shuffle_optimization.py
:language: python
Callbacks

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 142 KiB

332
doc/source/data/dataset.rst Normal file
View file

@ -0,0 +1,332 @@
Datasets: Distributed Arrow on Ray
==================================
.. tip::
Ray Datasets is available in early preview at ``ray.experimental.data``.
Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as ``map``, ``filter``, and ``repartition``, and are compatible with a variety of file formats, datasources, and distributed frameworks.
.. image:: dataset.svg
..
https://docs.google.com/drawings/d/16AwJeBNR46_TsrkOmMbGaBK7u-OPsf_V8fHjU-d2PPQ/edit
Concepts
--------
Ray Datasets implement `"Distributed Arrow" <https://arrow.apache.org/>`__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either `Arrow table <https://arrow.apache.org/docs/python/data.html#tables>`__ format or in a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data.
The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:
.. image:: dataset-arch.svg
..
https://docs.google.com/drawings/d/1PmbDvHRfVthme9XD7EYM-LIHPXtHdOfjCbc1SCsM64k/edit
Since a Ray Dataset is just a list of Ray object references, it can be freely passed between Ray tasks, actors, and libraries like any other object reference. This flexibility is a unique characteristic of Ray Datasets.
Compared to `Spark RDDs <https://spark.apache.org/docs/latest/rdd-programming-guide.html>`__ and `Dask Bags <https://docs.dask.org/en/latest/bag.html>`__, Datasets offers a more basic set of features, and executes operations eagerly for simplicity. It is intended that users cast Datasets into more featureful dataframe types (e.g., ``ds.to_dask()``) for advanced operations.
Datasource Compatibility Matrices
---------------------------------
.. list-table:: Input compatibility matrix
:header-rows: 1
* - Input Type
- Read API
- Status
* - CSV File Format
- ``ray.data.read_csv()``
- ✅
* - JSON File Format
- ``ray.data.read_json()``
- ✅
* - Parquet File Format
- ``ray.data.read_parquet()``
- ✅
* - Binary Files
- ``ray.data.read_binary_files()``
- ✅
* - Spark Dataframe
- ``ray.data.from_spark()``
- (todo)
* - Dask Dataframe
- ``ray.data.from_dask()``
- ✅
* - Modin Dataframe
- ``ray.data.from_modin()``
- (todo)
* - MARS Dataframe
- ``ray.data.from_mars()``
- (todo)
* - Pandas Dataframe Objects
- ``ray.data.from_pandas()``
- ✅
* - Arrow Table Objects
- ``ray.data.from_arrow()``
- ✅
* - Custom Datasource
- ``ray.data.read_datasource()``
- ✅
.. list-table:: Output compatibility matrix
:header-rows: 1
* - Output Type
- Dataset API
- Status
* - CSV File Format
- ``ds.write_csv()``
- ✅
* - JSON File Format
- ``ds.write_json()``
- ✅
* - Parquet File Format
- ``ds.write_parquet()``
- ✅
* - Spark Dataframe
- ``ds.to_spark()``
- (todo)
* - Dask Dataframe
- ``ds.to_dask()``
- ✅
* - Modin Dataframe
- ``ds.to_modin()``
- (todo)
* - MARS Dataframe
- ``ds.to_mars()``
- (todo)
* - Arrow Table Objects
- ``ds.to_arrow()``
- ✅
* - Arrow Table Iterator
- ``ds.iter_batches(batch_format="pyarrow")``
- ✅
* - Pandas Dataframe Objects
- ``ds.to_pandas()``
- ✅
* - Pandas Dataframe Iterator
- ``ds.iter_batches(batch_format="pandas")``
- ✅
* - PyTorch Iterable Dataset
- ``ds.to_torch()``
- ✅
* - TensorFlow Iterable Dataset
- ``ds.to_tf()``
- ✅
* - Custom Datasource
- ``ds.write_datasource()``
- ✅
Creating Datasets
-----------------
Get started by creating Datasets from synthetic data using ``ray.data.range()`` and ``ray.data.from_items()``. Datasets can hold either plain Python objects (schema is a Python type), or Arrow records (schema is Arrow).
.. code-block:: python
# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_rows=10000, num_blocks=200, schema=<class 'int'>)
ds.take(5)
# -> [0, 1, 2, 3, 4]
ds.count()
# -> 10000
# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_rows=10000, num_blocks=200, schema={col1: int64, col2: string})
ds.show(5)
# -> ArrowRow({'col1': 0, 'col2': '0'})
# -> ArrowRow({'col1': 1, 'col2': '1'})
# -> ArrowRow({'col1': 2, 'col2': '2'})
# -> ArrowRow({'col1': 3, 'col2': '3'})
# -> ArrowRow({'col1': 4, 'col2': '4'})
ds.schema()
# -> col1: int64
# -> col2: string
Datasets can be created from files on local disk or remote datasources such as S3. Any filesystem `supported by pyarrow <http://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`__ can be used to specify file locations:
.. code-block:: python
# Read a directory of files in remote storage.
ds = ray.data.read_csv("s3://bucket/path")
# Read multiple local files.
ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
# Read multiple directories.
ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Finally, you can create a Dataset from existing data in the Ray object store or Ray compatible distributed DataFrames:
.. code-block:: python
# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.experimental.data.from_pandas([ray.put(pdf)])
# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)
Saving Datasets
---------------
Datasets can be written to local or remote storage using ``.write_csv()``, ``.write_json()``, and ``.write_parquet()``.
.. code-block:: python
# Write to csv files in /tmp/output.
ds = ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...
# Use repartition to control the number of output files:
ds = ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv
Transforming Datasets
---------------------
Datasets can be transformed in parallel using ``.map()``. Transformations are executed *eagerly* and block until the operation is finished. Datasets also supports ``.filter()`` and ``.flat_map()``.
.. code-block:: python
ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_rows=10000, num_blocks=200, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]
ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]
ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]
To take advantage of vectorized functions, use ``.map_batches()``. Note that you can also implement ``filter`` and ``flat_map`` using ``.map_batches()``, since your map function can return an output batch of any size.
.. code-block:: python
ds = ray.data.range(10000)
ds = ds.map_batches(lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [ArrowRow({'value': 0}), ArrowRow({'value': 2}), ...]
By default, transformations are executed using Ray tasks. For transformations that require setup, specify ``compute="actors"`` and Ray will use an autoscaling actor pool to execute your transforms instead. The following is an end-to-end example of reading, transforming, and saving batch inference results using Datasets:
.. code-block:: python
# Example of GPU batch inference on an ImageNet model.
model = None
def preprocess(image: bytes) -> bytes:
return image
def batch_infer(batch: pandas.DataFrame) -> pandas.DataFrame:
global model
if model is None:
model = ImageNetModel()
return model(batch)
ds = ray.data.read_binary_files("s3://bucket/image-dir")
# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(batch_infer, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|█████| 200/200 [00:07<00:00, 27.60it/s]
# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")
Exchanging datasets
-------------------
Datasets can be passed to Ray tasks or actors and read with ``.iter_batches()`` or ``.iter_rows()``. This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
.. code-block:: python
@ray.remote
def consume(data: Dataset[int]) -> int:
num_batches = 0
for batch in data.iter_batches():
num_batches += 1
return num_batches
ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200
Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the ``split()`` function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:
.. code-block:: python
@ray.remote(num_gpus=1)
class Worker:
def __init__(self, rank: int):
...
def train(self, shard: Dataset[int]) -> int:
for batch in shard.iter_batches(batch_size=256):
...
return shard.count()
workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]
ds = ray.data.range(10000)
# -> Dataset(num_rows=10000, num_blocks=200, schema=<class 'int'>)
shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_rows=650, num_blocks=13, schema=<class 'int'>),
# Dataset(num_rows=650, num_blocks=13, schema=<class 'int'>), ...]
ray.get([w.train.remote(s) for s in shards])
# -> [650, 650, ...]
Custom datasources
------------------
Datasets can read and write in parallel to `custom datasources <package-ref.html#custom-datasource-api>`__ defined in Python.
.. code-block:: python
# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)
# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)
Pipelining data processing and ML computations
----------------------------------------------
This feature is planned for development. Please provide your input on this `GitHub RFC <https://github.com/ray-project/ray/issues/16852>`__.
Contributing
------------
Contributions to Datasets are welcome! There are many potential improvements, including:
- Supporting more datasources and transforms.
- Integration with more ecosystem libraries.
- Adding features that require partitioning such as groupby() and join().
- Performance optimizations.
Get started with Ray Python development `here <https://docs.ray.io/en/master/development.html#python-develop>`__.

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 660 KiB

View file

@ -0,0 +1,35 @@
Datasets API Reference
======================
Creating a Dataset
------------------
.. autofunction:: ray.experimental.data.range
.. autofunction:: ray.experimental.data.range_arrow
.. autofunction:: ray.experimental.data.read_csv
.. autofunction:: ray.experimental.data.read_json
.. autofunction:: ray.experimental.data.read_parquet
.. autofunction:: ray.experimental.data.read_binary_files
.. autofunction:: ray.experimental.data.read_datasource
.. autofunction:: ray.experimental.data.from_spark
.. autofunction:: ray.experimental.data.from_dask
.. autofunction:: ray.experimental.data.from_modin
.. autofunction:: ray.experimental.data.from_mars
.. autofunction:: ray.experimental.data.from_pandas
Dataset API
-----------
.. autoclass:: ray.experimental.data.Dataset
:members:
Custom Datasource API
---------------------
.. autoclass:: ray.experimental.data.Datasource
:members:
.. autoclass:: ray.experimental.data.ReadTask
:members:
.. autoclass:: ray.experimental.data.WriteTask
:members:

View file

@ -276,6 +276,18 @@ Papers
serve/faq.rst
serve/package-ref.rst
.. toctree::
:hidden:
:maxdepth: -1
:caption: Ray Data Processing
data/dataset.rst
data/package-ref.rst
data/dask-on-ray.rst
data/mars-on-ray.rst
data/modin/index.rst
data/raydp.rst
.. toctree::
:hidden:
:maxdepth: -1
@ -315,21 +327,10 @@ Papers
raysgd/raysgd.rst
raysgd/raysgd_pytorch.rst
raysgd/raysgd_tensorflow.rst
raysgd/raysgd_dataset.rst
raysgd/raysgd_ptl.rst
raysgd/raysgd_tune.rst
raysgd/raysgd_ref.rst
.. toctree::
:hidden:
:maxdepth: -1
:caption: Data Processing
modin/index.rst
dask-on-ray.rst
mars-on-ray.rst
raydp.rst
.. toctree::
:hidden:
:maxdepth: -1
@ -337,7 +338,6 @@ Papers
multiprocessing.rst
joblib.rst
iter.rst
xgboost-ray.rst
ray-collective.rst

View file

@ -1,211 +0,0 @@
Parallel Iterators
=====================
.. _`issue on GitHub`: https://github.com/ray-project/ray/issues
``ray.util.iter`` provides a parallel iterator API for simple data ingest and processing. It can be thought of as syntactic sugar around Ray actors and ``ray.wait`` loops.
Parallel iterators are lazy and can operate over infinite sequences of items. Iterator
transformations are only executed when the user calls ``next()`` to fetch the next output
item from the iterator.
.. note::
This API is new and may be revised in future Ray releases. If you encounter
any bugs, please file an `issue on GitHub`_.
Concepts
--------
**Parallel Iterators**: You can create a ``ParallelIterator`` object from an existing
set of items, range of numbers, set of iterators, or set of worker actors. Ray will
create a worker actor that produces the data for each shard of the iterator:
.. code-block:: python
# Create an iterator with 2 worker actors over the list [1, 2, 3, 4].
>>> it = ray.util.iter.from_items([1, 2, 3, 4], num_shards=2)
ParallelIterator[from_items[int, 4, shards=2]]
# Create an iterator with 32 worker actors over range(1000000).
>>> it = ray.util.iter.from_range(1000000, num_shards=32)
ParallelIterator[from_range[1000000, shards=32]]
# Create an iterator over two range(10) generators.
>>> it = ray.util.iter.from_iterators([range(10), range(10)])
ParallelIterator[from_iterators[shards=2]]
# Create an iterator from existing worker actors. These actors must
# implement the ParallelIteratorWorker interface.
>>> it = ray.util.iter.from_actors([a1, a2, a3, a4])
ParallelIterator[from_actors[shards=4]]
Simple transformations can be chained on the iterator, such as mapping,
filtering, and batching. These will be executed in parallel on the workers:
.. code-block:: python
# Apply a transformation to each element of the iterator.
>>> it = it.for_each(lambda x: x ** 2)
ParallelIterator[...].for_each()
# Batch together items into a lists of 32 elements.
>>> it = it.batch(32)
ParallelIterator[...].for_each().batch(32)
# Filter out items with odd values.
>>> it = it.filter(lambda x: x % 2 == 0)
ParallelIterator[...].for_each().batch(32).filter()
**Local Iterators**: To read elements from a parallel iterator, it has to be converted
to a ``LocalIterator`` by calling ``gather_sync()`` or ``gather_async()``. These
correspond to ``ray.get`` and ``ray.wait`` loops over the actors respectively:
.. code-block:: python
# Gather items synchronously (deterministic round robin across shards):
>>> it = ray.util.iter.from_range(1000000, 1)
>>> it = it.gather_sync()
LocalIterator[ParallelIterator[from_range[1000000, shards=1]].gather_sync()]
# Local iterators can be used as any other Python iterator.
>>> it.take(5)
[0, 1, 2, 3, 4]
# They also support chaining of transformations. Unlike transformations
# applied on a ParallelIterator, they will be executed in the current process.
>>> it.filter(lambda x: x % 2 == 0).take(5)
[0, 2, 4, 6, 8]
# Async gather can be used for better performance, but it is non-deterministic.
>>> it = ray.util.iter.from_range(1000, 4).gather_async()
>>> it.take(5)
[0, 250, 500, 750, 1]
**Passing iterators to remote functions**: Both ``ParallelIterator`` and ``LocalIterator``
are serializable. They can be passed to any Ray remote function. However, note that
each shard should only be read by one process at a time:
.. code-block:: python
# Get local iterators representing the shards of this ParallelIterator:
>>> it = ray.util.iter.from_range(10000, 3)
>>> [s0, s1, s2] = it.shards()
[LocalIterator[from_range[10000, shards=3].shard[0]],
LocalIterator[from_range[10000, shards=3].shard[1]],
LocalIterator[from_range[10000, shards=3].shard[2]]]
# Iterator shards can be passed to remote functions.
>>> @ray.remote
... def do_sum(it):
... return sum(it)
...
>>> ray.get([do_sum.remote(s) for s in it.shards()])
[5552778, 16661667, 27780555]
Semantic Guarantees
~~~~~~~~~~~~~~~~~~~
The parallel iterator API guarantees the following semantics:
**Fetch ordering**: When using ``it.gather_sync().foreach(fn)`` or
``it.gather_async().foreach(fn)`` (or any other transformation after a gather),
``fn(x_i)`` will be called on the element ``x_i`` before the next
element ``x_{i+1}`` is fetched from the source actor. This is useful if you need to
update the source actor between iterator steps. Note that for async gather, this
ordering only applies per shard.
**Operator state**: Operator state is preserved for each shard.
This means that you can pass a stateful callable to ``.foreach()``:
.. code-block:: python
class CumulativeSum:
def __init__(self):
self.total = 0
def __call__(self, x):
self.total += x
return (self.total, x)
it = ray.util.iter.from_range(5, 1)
for x in it.for_each(CumulativeSum()).gather_sync():
print(x)
## This prints:
#(0, 0)
#(1, 1)
#(3, 2)
#(6, 3)
#(10, 4)
Example: Streaming word frequency count
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Parallel iterators can be used for simple data processing use cases such as
streaming grep:
.. code-block:: python
import ray
import glob
import gzip
import numpy as np
ray.init()
file_list = glob.glob("/var/log/syslog*.gz")
it = (
ray.util.iter.from_items(file_list, num_shards=4)
.for_each(lambda f: gzip.open(f).readlines())
.flatten()
.for_each(lambda line: line.decode("utf-8"))
.for_each(lambda line: 1 if "cron" in line else 0)
.batch(1024)
.for_each(np.mean)
)
# Show the probability of a log line containing "cron", with a
# sliding window of 1024 lines.
for freq in it.gather_async():
print(freq)
Example: Passing iterator shards to remote functions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Both parallel iterators and local iterators are fully serializable, so once
created you can pass them to Ray tasks and actors. This can be useful for
distributed training:
.. code-block:: python
import ray
import numpy as np
ray.init()
@ray.remote
def train(data_shard):
for batch in data_shard:
print("train on", batch) # perform model update with batch
it = (
ray.util.iter.from_range(1000000, num_shards=4, repeat=True)
.batch(1024)
.for_each(np.array)
)
work = [train.remote(shard) for shard in it.shards()]
ray.get(work)
.. tip:: Using ParallelIterator built-in functions is typically most efficient.
For example, if you find yourself using list comprehensions like
``[foo(x) for x in iter.gather_async()]``, consider using
``iter.for_each(foo)`` instead!
API Reference
-------------
.. automodule:: ray.util.iter
:members:
:show-inheritance:

View file

@ -1,48 +0,0 @@
Distributed Dataset
===================
The RaySGD ``Dataset`` provides a simple abstraction for training with
distributed data.
.. tip:: Get in touch with us if you're using or considering using `RaySGD <https://forms.gle/26EMwdahdgm7Lscy9>`_!
Setting up a dataset
--------------------
A dataset can be constructed via any python iterable, or a ``ParallelIterator``. Optionally, a batch size, download function, concurrency, and a transformation can also be specified.
When constructing a dataset, a download function can be specified. For example, if a dataset is initialized with a set of paths, a download function can be specified which converts those paths to ``(input, label)`` tuples. The download function can be executed in parallel via ``max_concurrency``. This may be useful if the backing datastore has rate limits, there is high overhead associated with a download, or downloading is computationally expensive. Downloaded data is stored as objects in the plasma store.
An additional, final transformation can be specified via ``Dataset::transform``. This function is guaranteed to take place on the same worker that training will take place on. It is good practice to do operations which produce large outputs, such as converting images to tensors as transformations.
Finally, the batch size can be specified. The batch size is the number of data points used per training step per worker.
.. note:: Batch size should be specified via the dataset's constructor, __not__ the ``config["batch_size"]`` passed into the Trainer constructor. In general, datasets are configured via their own constructor, not the Trainer config, wherever possible.
Using a dataset
---------------
To use a dataset, pass it in as an argument to ``trainer.train()``. A dataset passed in to ``trainer.train`` will take precedence over the trainer's data creator during that training run.
.. code-block:: python
trainer.train(dataset=dataset, num_steps=10) # Trains using a dataset
trainer.train() # Trains with the original data creator
trainer.train(dataset=dataset2, num_steps=20) # Trains using a different dataset
Sharding and Sampling
---------------------
.. note:: These details may change in the future.
Datasets use ParallelIterator actors for sharding. In order to handle datasets which do not shard evenly, and streaming datasets (which may not have a defined size), shards are represented as repeated sequences of data. As a result, num_steps should always be specified when training and some data may be oversampled if the data cannot be evenly sharded.
If the dataset is of a known length (and can be evenly sharded), training for an epoch is eqivalent to setting ``num_steps = len(data) / (num_workers * batch_size)``.
Complete dataset example
------------------------
Below is an example of training a network with a single hidden layer to learn the identity function.
.. literalinclude:: ../../../python/ray/util/sgd/data/examples/mlp_identity.py
:language: python

View file

@ -151,9 +151,10 @@ __all__ += [
"PlacementGroupID",
]
if "RAY_EXPERIMENTAL_DATA_API" in os.environ:
from ray.experimental import data
__all__.append(data)
# Add an alias so we can point to the final location in docs.
# TODO(ekl) remove this once datasets is out of alpha.
from ray.experimental import data # noqa
__all__.append(data)
# Remove modules from top-level ray

View file

@ -1,2 +0,0 @@
raise DeprecationWarning("Pandas on Ray has moved to Modin: "
"github.com/modin-project/modin")

View file

@ -1,14 +1,17 @@
from ray.experimental.data.read_api import from_items, range, range_arrow, \
read_parquet, read_json, read_csv, read_binary_files, from_dask, \
from_modin, from_pandas, from_spark, read_datasource
from_modin, from_mars, from_pandas, from_arrow, from_spark, read_datasource
from ray.experimental.data.datasource import Datasource, ReadTask, WriteTask
from ray.experimental.data.dataset import Dataset
__all__ = [
"Dataset",
"Datasource",
"ReadTask",
"WriteTask",
"from_dask",
"from_items",
"from_arrow",
"from_mars",
"from_modin",
"from_pandas",

View file

@ -70,10 +70,10 @@ class Dataset(Generic[T]):
can be quite slow. Consider using `.map_batches()` for performance.
Examples:
# Transform python objects.
>>> # Transform python objects.
>>> ds.map(lambda x: x * 2)
# Transform Arrow records.
>>> # Transform Arrow records.
>>> ds.map(lambda record: {"v2": record["value"] * 2})
Time complexity: O(dataset size / parallelism)
@ -107,20 +107,20 @@ class Dataset(Generic[T]):
This is a blocking operation.
Examples:
# Transform batches in parallel.
>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
# Define a batch transform function that persists state across
# function invocations for efficiency with compute="actors".
>>> # Define a batch transform function that persists state across
>>> # function invocations for efficiency with compute="actors".
>>> def batch_infer_fn(batch):
... global model
... if model is None:
... model = init_model()
... return model(batch)
# Apply the transform in parallel on GPUs. Since compute="actors",
# the transform will be applied on an autoscaling pool of Ray
# actors, each allocated 1 GPU by Ray.
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
... batch_infer_fn,
... batch_size=256, compute="actors", num_gpus=1)
@ -260,7 +260,7 @@ class Dataset(Generic[T]):
This is a blocking operation.
Examples:
# Set the number of output partitions to write to disk.
>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(100).write_parquet(...)
Time complexity: O(dataset size / parallelism)
@ -428,16 +428,16 @@ class Dataset(Generic[T]):
This is a blocking operation.
Examples:
# Sort using the entire record as the key.
>>> # Sort using the entire record as the key.
>>> ds.sort()
# Sort by a single column.
>>> # Sort by a single column.
>>> ds.sort("field1")
# Sort by multiple columns.
>>> # Sort by multiple columns.
>>> ds.sort(["field1", "field2"])
# Sort by a key function.
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["field1"] % 100)
Time complexity: O(dataset size / parallelism)
@ -838,7 +838,7 @@ class Dataset(Generic[T]):
if batch_format == "pandas":
return batch.to_pandas()
elif batch_format == "pyarrow":
return batch._table
return batch.to_arrow_table()
elif batch_format == "_blocks":
return batch
else:
@ -939,7 +939,7 @@ class Dataset(Generic[T]):
Note that this function will set the Dask scheduler to Dask-on-Ray
globally, via the config.
Time complexity: O(1)
Time complexity: O(dataset size / parallelism)
Returns:
A Dask DataFrame created from this dataset.
@ -968,7 +968,7 @@ class Dataset(Generic[T]):
def to_mars(self) -> "mars.DataFrame":
"""Convert this dataset into a MARS dataframe.
Time complexity: O(1)
Time complexity: O(dataset size / parallelism)
Returns:
A MARS dataframe created from this dataset.
@ -978,19 +978,31 @@ class Dataset(Generic[T]):
def to_modin(self) -> "modin.DataFrame":
"""Convert this dataset into a Modin dataframe.
Time complexity: O(1)
Time complexity: O(dataset size / parallelism)
Returns:
A Modin dataframe created from this dataset.
"""
raise NotImplementedError # P1
def to_spark(self) -> "pyspark.sql.DataFrame":
"""Convert this dataset into a Spark dataframe.
Time complexity: O(dataset size / parallelism)
Returns:
A Spark dataframe created from this dataset.
"""
raise NotImplementedError # P2
def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""Convert this dataset into a set of Pandas dataframes.
"""Convert this dataset into a distributed set of Pandas dataframes.
This is only supported for datasets convertible to Arrow records.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using ``.get_blocks()`` instead.
Time complexity: O(1)
Time complexity: O(dataset size / parallelism)
Returns:
A list of remote Pandas dataframes created from this dataset.
@ -1002,15 +1014,36 @@ class Dataset(Generic[T]):
return [block_to_df.remote(block) for block in self._blocks]
def to_spark(self) -> "pyspark.sql.DataFrame":
"""Convert this dataset into a Spark dataframe.
def to_arrow(self) -> List[ObjectRef["pyarrow.Table"]]:
"""Convert this dataset into a distributed set of Arrow tables.
This is only supported for datasets convertible to Arrow records.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using ``.get_blocks()`` instead.
Time complexity: O(dataset size / parallelism)
Returns:
A list of remote Arrow tables created from this dataset.
"""
@ray.remote
def block_to_df(block: ArrowBlock):
return block.to_arrow_table()
return [block_to_df.remote(block) for block in self._blocks]
def get_blocks(self) -> List[ObjectRef["Block"]]:
"""Get a list of references to the underlying blocks of this dataset.
This function can be used for zero-copy access to the data.
Time complexity: O(1)
Returns:
A Spark dataframe created from this dataset.
A list of references to this dataset's blocks.
"""
raise NotImplementedError # P2
return list(self._blocks)
def __repr__(self) -> str:
schema = self.schema()

View file

@ -17,7 +17,7 @@ class Datasource(Generic[T]):
To read a datasource into a dataset, use ``ray.data.read_datasource()``.
To write to a writable datasource, use ``Dataset.write_datasource()``.
See ``RangeDatasource`` and ``DummyOutputDatasource`` below for examples
See ``RangeDatasource`` and ``DummyOutputDatasource`` for examples
of how to implement readable and writable datasources.
"""

View file

@ -18,8 +18,8 @@ from ray.experimental.data.dataset import Dataset
from ray.experimental.data.datasource import Datasource, RangeDatasource, \
JSONDatasource, CSVDatasource, ReadTask
from ray.experimental.data.impl import reader as _reader
from ray.experimental.data.impl.arrow_block import ArrowBlock, ArrowRow
from ray.experimental.data.impl.block_builder import SimpleBlock
from ray.experimental.data.impl.arrow_block import ArrowBlock, ArrowRow, \
DelegatingArrowBlockBuilder
from ray.experimental.data.impl.block_list import BlockList
from ray.experimental.data.impl.lazy_block_list import LazyBlockList
@ -63,7 +63,7 @@ def from_items(items: List[Any], parallelism: int = 200) -> Dataset[Any]:
"""Create a dataset from a list of local Python objects.
Examples:
>>> ds.from_items([1, 2, 3, 4, 5])
>>> ray.data.from_items([1, 2, 3, 4, 5])
Args:
items: List of local Python objects.
@ -78,17 +78,12 @@ def from_items(items: List[Any], parallelism: int = 200) -> Dataset[Any]:
metadata: List[BlockMetadata] = []
i = 0
while i < len(items):
builder = SimpleBlock.builder()
builder = DelegatingArrowBlockBuilder()
for item in items[i:i + block_size]:
builder.add(item)
block = builder.build()
blocks.append(ray.put(block))
metadata.append(
BlockMetadata(
num_rows=block.num_rows(),
size_bytes=block.size_bytes(),
schema=type(items[0]),
input_files=None))
metadata.append(block.get_metadata(input_files=None))
i += block_size
return Dataset(BlockList(blocks, metadata))
@ -98,7 +93,7 @@ def range(n: int, parallelism: int = 200) -> Dataset[int]:
"""Create a dataset from a range of integers [0..n).
Examples:
>>> ds.range(10000).map(lambda x: x * 2).show()
>>> ray.data.range(10000).map(lambda x: x * 2).show()
Args:
n: The upper bound of the range of integers.
@ -115,7 +110,8 @@ def range_arrow(n: int, parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create an Arrow dataset from a range of integers [0..n).
Examples:
>>> ds.range_arrow(1000).map(lambda r: {"v2": r["value"] * 2}).show()
>>> ds = ray.data.range_arrow(1000)
>>> ds.map(lambda r: {"v2": r["value"] * 2}).show()
This is similar to range(), but uses Arrow tables to hold the integers
in Arrow records. The dataset elements take the form {"value": N}.
@ -169,11 +165,11 @@ def read_parquet(paths: Union[str, List[str]],
"""Create an Arrow dataset from parquet files.
Examples:
# Read a directory of files in remote storage.
>>> ds.read_parquet("s3://bucket/path")
>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3://bucket/path")
# Read multiple local files.
>>> ds.read_parquet(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"])
Args:
paths: A single file path or a list of file paths (or directories).
@ -236,14 +232,14 @@ def read_json(paths: Union[str, List[str]],
"""Create an Arrow dataset from json files.
Examples:
# Read a directory of files in remote storage.
>>> ds.read_json("s3://bucket/path")
>>> # Read a directory of files in remote storage.
>>> ray.data.read_json("s3://bucket/path")
# Read multiple local files.
>>> ds.read_json(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple local files.
>>> ray.data.read_json(["/path/to/file1", "/path/to/file2"])
# Read multiple directories.
>>> ds.read_json(["s3://bucket/path1", "s3://bucket/path2"])
>>> # Read multiple directories.
>>> ray.data.read_json(["s3://bucket/path1", "s3://bucket/path2"])
Args:
paths: A single file/directory path or a list of file/directory paths.
@ -270,14 +266,14 @@ def read_csv(paths: Union[str, List[str]],
"""Create an Arrow dataset from csv files.
Examples:
# Read a directory of files in remote storage.
>>> ds.read_csv("s3://bucket/path")
>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3://bucket/path")
# Read multiple local files.
>>> ds.read_csv(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
# Read multiple directories.
>>> ds.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
>>> # Read multiple directories.
>>> ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Args:
paths: A single file/directory path or a list of file/directory paths.
@ -305,11 +301,11 @@ def read_binary_files(
"""Create a dataset from binary files of arbitrary contents.
Examples:
# Read a directory of files in remote storage.
>>> ds.read_binary_files("s3://bucket/path")
>>> # Read a directory of files in remote storage.
>>> ray.data.read_binary_files("s3://bucket/path")
# Read multiple local files.
>>> ds.read_binary_files(["/path/to/file1", "/path/to/file2"])
>>> # Read multiple local files.
>>> ray.data.read_binary_files(["/path/to/file1", "/path/to/file2"])
Args:
paths: A single file path or a list of file paths (or directories).
@ -390,7 +386,7 @@ def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]],
import pyarrow as pa
@ray.remote(num_returns=2)
def df_to_block(df: "pandas.DataFrame"):
def df_to_block(df: "pandas.DataFrame") -> ArrowBlock:
block = ArrowBlock(pa.table(df))
return block, block.get_metadata(input_files=None)
@ -399,6 +395,28 @@ def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]],
return Dataset(BlockList(blocks, ray.get(list(metadata))))
def from_arrow(tables: List[ObjectRef["pyarrow.Table"]],
parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create a dataset from a set of Arrow tables.
Args:
dfs: A list of Ray object references to Arrow tables.
parallelism: The amount of parallelism to use for the dataset.
Returns:
Dataset holding Arrow records from the tables.
"""
@ray.remote(num_returns=2)
def to_block(table: "pyarrow.Table") -> ArrowBlock:
block = ArrowBlock(table)
return block, block.get_metadata(input_files=None)
res = [to_block.remote(t) for t in tables]
blocks, metadata = zip(*res)
return Dataset(BlockList(blocks, ray.get(list(metadata))))
def from_spark(df: "pyspark.sql.DataFrame",
parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create a dataset from a Spark dataframe.

View file

@ -4,7 +4,6 @@ import shutil
import time
from unittest.mock import patch
import dask.dataframe as dd
import math
import numpy as np
import pandas as pd
@ -169,6 +168,18 @@ def test_from_pandas(ray_start_regular_shared):
assert values == rows
def test_from_arrow(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_arrow([
ray.put(pa.Table.from_pandas(df1)),
ray.put(pa.Table.from_pandas(df2))
])
values = [(r["one"], r["two"]) for r in ds.take(6)]
rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()]
assert values == rows
def test_to_pandas(ray_start_regular_shared):
n = 5
df = pd.DataFrame({"value": list(range(n))})
@ -177,6 +188,25 @@ def test_to_pandas(ray_start_regular_shared):
assert df.equals(dfds)
def test_to_arrow(ray_start_regular_shared):
n = 5
df = pd.DataFrame({"value": list(range(n))})
ds = ray.experimental.data.range_arrow(n)
dfds = pd.concat(
[t.to_pandas() for t in ray.get(ds.to_arrow())], ignore_index=True)
assert df.equals(dfds)
def test_get_blocks(ray_start_regular_shared):
blocks = ray.experimental.data.range(10).get_blocks()
assert len(blocks) == 10
out = []
for b in ray.get(blocks):
out.extend(list(b.iter_rows()))
out = sorted(out)
assert out == list(range(10)), out
def test_pandas_roundtrip(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
@ -650,6 +680,7 @@ def test_split_hints(ray_start_regular_shared):
def test_from_dask(ray_start_regular_shared):
import dask.dataframe as dd
df = pd.DataFrame({"one": list(range(100)), "two": list(range(100))})
ddf = dd.from_pandas(df, npartitions=10)
ds = ray.experimental.data.from_dask(ddf)

View file

@ -412,8 +412,9 @@ class TorchTrainer:
in case of shared cluster usage. Defaults to 3.
info (dict): Optional dictionary passed to the training
operator for ``train_epoch`` and ``train_batch``.
dataset (Dataset): Optional dataset to train with. If specified,
the dataloader passed in via data_creator will be ignored.
dataset (sgd.Dataset): Optional dataset to train with. If
specified, the dataloader passed in via data_creator will be
ignored.
Returns:
(dict | list) A dictionary of metrics for training.