mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Datasets] Overhaul of "Creating Datasets" feature guide. (#24831)
This PR is a general overhaul of the "Creating Datasets" feature guide, providing complete coverage of all (public) dataset creation APIs and highlighting features and quirks of the individual APIs, data modalities, storage backends, etc. In order to keep the page from getting too long and keeping it easy to navigate, tabbed views are used heavily.
This commit is contained in:
parent
68d4dd3a8b
commit
4444150c29
3 changed files with 1111 additions and 47 deletions
|
@ -4,71 +4,539 @@
|
|||
Creating Datasets
|
||||
=================
|
||||
|
||||
You can get started by creating Datasets from synthetic data using ``ray.data.range()`` and ``ray.data.from_items()``.
|
||||
Datasets can hold either plain Python objects (i.e. their schema is a Python type), or Arrow records
|
||||
(in which case their schema is Arrow).
|
||||
A :class:`Dataset <ray.data.Dataset>` can be created from:
|
||||
|
||||
.. code-block:: python
|
||||
* generated synthetic data,
|
||||
* local and distributed in-memory data, and
|
||||
* local and external storage systems (local disk, cloud storage, HDFS, etc.).
|
||||
|
||||
import ray
|
||||
Creation from existing in-memory data is enabled via Datasets' integrations
|
||||
with familiar single-node data libraries (`Pandas <https://pandas.pydata.org/>`__,
|
||||
`NumPy <https://numpy.org/>`__, `Arrow <https://arrow.apache.org/>`__) and distributed
|
||||
data processing frameworks (:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`,
|
||||
:ref:`Modin <modin-on-ray>`, :ref:`Mars <mars-on-ray>`). Creating datasets from
|
||||
persistent storage is enabled by Datasets' support for reading many common file
|
||||
formats (Parquet, CSV, JSON, NPY, text, binary).
|
||||
|
||||
# Create a Dataset of Python objects.
|
||||
ds = ray.data.range(10000)
|
||||
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
|
||||
A :class:`Dataset <ray.data.Dataset>` can hold plain Python objects (simple datasets),
|
||||
Arrow records (Arrow datasets), or Pandas records (Pandas datasets). These records are
|
||||
grouped into one or more data **blocks**, and these blocks can be spread across
|
||||
multiple Ray nodes. Simple datasets are represented by simple blocks (lists of Python
|
||||
objects), Arrow datasets are represented by Arrow blocks (
|
||||
`Arrow Tables <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
), and Pandas
|
||||
datasets are represented by Pandas blocks (
|
||||
`Pandas DataFrames <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__
|
||||
).
|
||||
|
||||
ds.take(5)
|
||||
# -> [0, 1, 2, 3, 4]
|
||||
The method of
|
||||
creating the dataset will determine the format of its internal block representation.
|
||||
|
||||
ds.count()
|
||||
# -> 10000
|
||||
.. dropdown:: See more about Datasets' internal block representation
|
||||
|
||||
# Create a Dataset of Arrow records.
|
||||
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
|
||||
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})
|
||||
The following details the block representation for each creation method:
|
||||
|
||||
ds.show(5)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# -> {'col1': 3, 'col2': '3'}
|
||||
# -> {'col1': 4, 'col2': '4'}
|
||||
* Reading tabular files (Parquet, CSV, JSON) and converting directly from Arrow produces Arrow datasets.
|
||||
* Converting from Pandas, Dask, Modin, and Mars produces Pandas datasets.
|
||||
* Reading NumPy files or converting from NumPy ndarrays produces Arrow datasets.
|
||||
* Reading text and raw binary files produces simple datasets.
|
||||
|
||||
ds.schema()
|
||||
# -> col1: int64
|
||||
# -> col2: string
|
||||
The following figure visualizes a ``Dataset`` that has three
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
blocks, each block holding 1000 rows:
|
||||
|
||||
Datasets can be created from files on local disk or remote datasources such as S3.
|
||||
Any filesystem `supported by pyarrow <http://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`__
|
||||
can be used to specify file locations:
|
||||
.. image:: images/dataset-arch.svg
|
||||
|
||||
.. code-block:: python
|
||||
This guide surveys the many ways to create a ``Dataset``. If none of these meet your
|
||||
needs, please reach out on `Discourse <https://discuss.ray.io/>`__ or open a feature
|
||||
request on the `Ray GitHub repo <https://github.com/ray-project/ray>`__, and check out
|
||||
our :ref:`guide for implementing a custom Datasets datasource <datasets_custom_datasource>`
|
||||
if you're interested in rolling your own integration!
|
||||
|
||||
# Read a directory of files in remote storage.
|
||||
ds = ray.data.read_csv("s3://bucket/path")
|
||||
.. _dataset_generate_data:
|
||||
|
||||
# Read multiple local files.
|
||||
ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
|
||||
-------------------------
|
||||
Generating Synthetic Data
|
||||
-------------------------
|
||||
|
||||
# Read multiple directories.
|
||||
ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
|
||||
Using Datasets with small, generated data is a great way to test out some of Datasets'
|
||||
features. Each of these synthetic data generators will generate data in Ray tasks that
|
||||
will execute in parallel and will be load-balanced across your Ray cluster; the
|
||||
``Dataset`` will hold a set of futures representing the return values of those tasks,
|
||||
serving as pointers to a collection of distributed data blocks.
|
||||
|
||||
Finally, you can create a ``Dataset`` from existing data in the Ray object store or Ray-compatible distributed DataFrames:
|
||||
.. tabbed:: Int Range
|
||||
|
||||
.. code-block:: python
|
||||
Create a ``Dataset`` from a range of integers.
|
||||
|
||||
import pandas as pd
|
||||
import dask.dataframe as dd
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __gen_synth_int_range_begin__
|
||||
:end-before: __gen_synth_int_range_end__
|
||||
|
||||
# Create a Dataset from a list of Pandas DataFrame objects.
|
||||
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
|
||||
ds = ray.data.from_pandas([pdf])
|
||||
.. tabbed:: Arrow Range
|
||||
|
||||
# Create a Dataset from a Dask-on-Ray DataFrame.
|
||||
dask_df = dd.from_pandas(pdf, npartitions=10)
|
||||
ds = ray.data.from_dask(dask_df)
|
||||
Create an Arrow (tabular) ``Dataset`` from a range of integers,
|
||||
with a single column containing this integer range.
|
||||
|
||||
From Torch/TensorFlow
|
||||
---------------------
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __gen_synth_arrow_range_begin__
|
||||
:end-before: __gen_synth_arrow_range_end__
|
||||
|
||||
.. tabbed:: Tensor Range
|
||||
|
||||
Create a tensor dataset from a range of integers, packing this integer range into
|
||||
tensors of the provided shape.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __gen_synth_tensor_range_begin__
|
||||
:end-before: __gen_synth_tensor_range_end__
|
||||
|
||||
.. _dataset_from_in_memory_data:
|
||||
|
||||
-----------------------------------------
|
||||
From Local and Distributed In-Memory Data
|
||||
-----------------------------------------
|
||||
|
||||
Datasets can be constructed from existing in-memory data. In addition to being able to
|
||||
construct a ``Dataset`` from plain Python objects, Datasets also interoperates with popular
|
||||
single-node libraries (`Pandas <https://pandas.pydata.org/>`__,
|
||||
`NumPy <https://numpy.org/>`__, `Arrow <https://arrow.apache.org/>`__) as well as
|
||||
distributed frameworks (:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`,
|
||||
:ref:`Modin <modin-on-ray>`, :ref:`Mars <mars-on-ray>`).
|
||||
|
||||
.. _dataset_from_in_memory_data_single_node:
|
||||
|
||||
Integration with Single-Node Data Libraries
|
||||
===========================================
|
||||
|
||||
In this section, we demonstrate creating a ``Dataset`` from single-node in-memory data.
|
||||
|
||||
.. tabbed:: Pandas
|
||||
|
||||
Create a ``Dataset`` from a Pandas DataFrame. This constructs a ``Dataset``
|
||||
backed by a single Pandas DataFrame block.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_pandas_begin__
|
||||
:end-before: __from_pandas_end__
|
||||
|
||||
We can also build a ``Dataset`` from more than one Pandas DataFrame, where each said
|
||||
DataFrame will become a block in the ``Dataset``.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_pandas_mult_begin__
|
||||
:end-before: __from_pandas_mult_end__
|
||||
|
||||
.. tabbed:: NumPy
|
||||
|
||||
Create a ``Dataset`` from a NumPy ndarray. This constructs a ``Dataset``
|
||||
backed by a single-column Arrow table block; the outer dimension of the ndarray
|
||||
will be treated as the row dimension, and the column with have name ``"value"``.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_numpy_begin__
|
||||
:end-before: __from_numpy_end__
|
||||
|
||||
We can also build a ``Dataset`` from more than one NumPy ndarray, where each said
|
||||
ndarray will become a single-column Arrow table block in the ``Dataset``.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_numpy_mult_begin__
|
||||
:end-before: __from_numpy_mult_end__
|
||||
|
||||
.. tabbed:: Arrow
|
||||
|
||||
Create a ``Dataset`` from an
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__.
|
||||
This constructs a ``Dataset`` backed by a single Arrow ``Table`` block.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_arrow_begin__
|
||||
:end-before: __from_arrow_end__
|
||||
|
||||
We can also build a ``Dataset`` from more than one Arrow Table, where each said
|
||||
``Table`` will become a block in the ``Dataset``.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_arrow_mult_begin__
|
||||
:end-before: __from_arrow_mult_end__
|
||||
|
||||
.. tabbed:: Python Objects
|
||||
|
||||
Create a ``Dataset`` from a list of Python objects; since each object in this
|
||||
particular list is a dictionary, Datasets will treat this list as a list of tabular
|
||||
records, and will construct an Arrow ``Dataset``.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_items_begin__
|
||||
:end-before: __from_items_end__
|
||||
|
||||
.. _dataset_from_in_memory_data_distributed:
|
||||
|
||||
Integration with Distributed Data Processing Frameworks
|
||||
=======================================================
|
||||
|
||||
In addition to working with single-node in-memory data, Datasets can be constructed from
|
||||
distributed (multi-node) in-memory data, interoperating with popular distributed
|
||||
data processing frameworks such as :ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`,
|
||||
:ref:`Modin <modin-on-ray>`, and :ref:`Mars <mars-on-ray>`.
|
||||
|
||||
The common paradigm used by
|
||||
these conversions is to send out Ray tasks converting each Dask/Spark/Modin/Mars
|
||||
data partition to a format that Datasets can understand (if needed), and using the
|
||||
futures representing the return value of those conversion tasks as the ``Dataset`` block
|
||||
futures. If the upstream framework's data partitions are already in a format that
|
||||
Datasets understands (e.g. Arrow or Pandas), Datasets will elide the conversion task and
|
||||
will instead reinterpret those data partitions directly as its blocks.
|
||||
|
||||
.. note::
|
||||
|
||||
These data processing frameworks must be running on Ray in order for these Datasets
|
||||
integrations to work. See how these frameworks can be run on Ray in our
|
||||
:ref:`data processing integrations docs <data_integrations>`.
|
||||
|
||||
.. tabbed:: Dask
|
||||
|
||||
Create a ``Dataset`` from a
|
||||
`Dask DataFrame <https://docs.dask.org/en/stable/dataframe.html>`__. This constructs a
|
||||
``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the
|
||||
Dask DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_dask_begin__
|
||||
:end-before: __from_dask_end__
|
||||
|
||||
.. tabbed:: Spark
|
||||
|
||||
Create a ``Dataset`` from a `Spark DataFrame
|
||||
<https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html>`__.
|
||||
This constructs a ``Dataset`` backed by the distributed Spark DataFrame partitions
|
||||
that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP)
|
||||
will save the Spark DataFrame partitions to Ray's object store in the Arrow format,
|
||||
which Datasets will then interpret as its blocks.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_spark_begin__
|
||||
:end-before: __from_spark_end__
|
||||
|
||||
.. tabbed:: Modin
|
||||
|
||||
Create a ``Dataset`` from a Modin DataFrame. This constructs a ``Dataset``
|
||||
backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_modin_begin__
|
||||
:end-before: __from_modin_end__
|
||||
|
||||
.. tabbed:: Mars
|
||||
|
||||
Create a ``Dataset`` from a Mars DataFrame. This constructs a ``Dataset``
|
||||
backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.
|
||||
|
||||
.. note::
|
||||
|
||||
This conversion should have near-zero overhead: it involves zero data copying and
|
||||
zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions
|
||||
as Ray Datasets partitions without touching the underlying data.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __from_mars_begin__
|
||||
:end-before: __from_mars_end__
|
||||
|
||||
.. _dataset_reading_from_storage:
|
||||
|
||||
--------------------------
|
||||
Reading Files From Storage
|
||||
--------------------------
|
||||
|
||||
Using the ``ray.data.read_*()`` APIs, Datasets can be created from files on local disk
|
||||
or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any filesystem
|
||||
`supported by pyarrow <http://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`__
|
||||
can be used to specify file locations, and many common file formats are supported:
|
||||
Parquet, CSV, JSON, NPY, text, binary.
|
||||
|
||||
Parallel + Distributed Reading
|
||||
==============================
|
||||
|
||||
Each of these APIs take a path or list of paths to files or directories. Any directories
|
||||
provided will be walked in order to obtain concrete file paths, at which point all files
|
||||
will be read in parallel.
|
||||
|
||||
Datasets uses a default parallelism of 200, truncated by the number of files being read:
|
||||
``parallelism = min(num_files, 200)``. ``parallelism`` parallel read tasks will be
|
||||
launched, each reading one or more files and each creating a single block of data.
|
||||
When reading from remote datasources, these parallel read tasks will be spread across
|
||||
the nodes in your Ray cluster, creating the distributed collection of blocks that makes
|
||||
up a distributed Ray Dataset.
|
||||
|
||||
.. image:: images/dataset-read.svg
|
||||
:width: 650px
|
||||
:align: center
|
||||
|
||||
This default parallelism can be overridden via the ``parallelism`` argument; see the
|
||||
:ref:`performance guide <data_performance_tips>` for tips on how to tune this read
|
||||
parallelism.
|
||||
|
||||
Deferred Read Task Execution
|
||||
============================
|
||||
|
||||
Datasets created via the ``ray.data.read_*()`` APIs are semi-lazy: initially, only the
|
||||
first read task will be executed. This avoids blocking Dataset creation on the reading
|
||||
of all data files, enabling inspection functions like
|
||||
:meth:`ds.schema() <ray.data.Dataset.schema>` and
|
||||
:meth:`ds.show() <ray.data.Dataset.show>` to be used right away. Executing further
|
||||
transformations on the Dataset will trigger execution of all read tasks, and execution
|
||||
of all read tasks can be triggered manually using the
|
||||
:meth:`ds.fully_executed() <ray.data.Dataset.fully_executed>` API.
|
||||
|
||||
|
||||
.. _dataset_supported_file_formats:
|
||||
|
||||
Supported File Formats
|
||||
======================
|
||||
|
||||
.. tabbed:: Parquet
|
||||
|
||||
Read Parquet files into a tabular ``Dataset``. The Parquet data will be read into
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
blocks. Although this simple example demonstrates reading a single file, note that
|
||||
Datasets can also read directories of Parquet files, with one tabular block created
|
||||
per file. For Parquet in particular, we also support reading partitioned Parquet
|
||||
datasets with partition column values pulled from the file paths.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_begin__
|
||||
:end-before: __read_parquet_end__
|
||||
|
||||
Datasets' Parquet reader also supports projection and filter pushdown, allowing column
|
||||
selection and row filtering to be pushed down to the file scan. For column selection,
|
||||
unselected columns will never be read from the file.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_pushdown_begin__
|
||||
:end-before: __read_parquet_pushdown_end__
|
||||
|
||||
See the API docs for :func:`read_parquet() <ray.data.read_parquet>`.
|
||||
|
||||
.. tabbed:: CSV
|
||||
|
||||
Read CSV files into a tabular ``Dataset``. The CSV data will be read into
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
blocks. Although this simple example demonstrates reading a single file, note that
|
||||
Datasets can also read directories of CSV files, with one tabular block created
|
||||
per file.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_csv_begin__
|
||||
:end-before: __read_csv_end__
|
||||
|
||||
See the API docs for :func:`read_csv() <ray.data.read_csv>`.
|
||||
|
||||
.. tabbed:: JSON
|
||||
|
||||
Read JSON files into a tabular ``Dataset``. The JSON data will be read into
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
blocks. Although this simple example demonstrates reading a single file, note that
|
||||
Datasets can also read directories of JSON files, with one tabular block created
|
||||
per file.
|
||||
|
||||
Currently, only newline-delimited JSON (NDJSON) is supported.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_json_begin__
|
||||
:end-before: __read_json_end__
|
||||
|
||||
See the API docs for :func:`read_json() <ray.data.read_json>`.
|
||||
|
||||
.. tabbed:: NumPy
|
||||
|
||||
Read NumPy files into a tensor ``Dataset``. The NumPy ndarray data will be read into
|
||||
single-column
|
||||
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
|
||||
blocks using our
|
||||
:class:`tensor extension type <ray.data.extensions.tensor_extension.ArrowTensorType>`,
|
||||
treating the outermost ndarray dimension as the row dimension. See our
|
||||
:ref:`tensor data guide <datasets_tensor_support>` for more information on working
|
||||
with tensors in Datasets. Although this simple example demonstrates reading a single
|
||||
file, note that Datasets can also read directories of JSON files, with one tensor
|
||||
block created per file.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_numpy_begin__
|
||||
:end-before: __read_numpy_end__
|
||||
|
||||
See the API docs for :func:`read_numpy() <ray.data.read_numpy>`.
|
||||
|
||||
.. tabbed:: Text
|
||||
|
||||
Read text files into a ``Dataset``. Each line in each text file will be treated as a
|
||||
row in the dataset, resulting in a list-of-strings block being created for each text
|
||||
file.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_text_begin__
|
||||
:end-before: __read_text_end__
|
||||
|
||||
See the API docs for :func:`read_text() <ray.data.read_text>`.
|
||||
|
||||
.. tabbed:: Binary
|
||||
|
||||
Read binary files into a ``Dataset``. Each binary file will be treated as a single row
|
||||
of opaque bytes. These bytes can be decoded into tensor, tabular, text, or any other
|
||||
kind of data using ``ds.map()`` to apply a per-row decoding UDF.
|
||||
|
||||
Although this simple example demonstrates reading a single file, note that Datasets
|
||||
can also read directories of binary files, with one bytes block created per file.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_binary_begin__
|
||||
:end-before: __read_binary_end__
|
||||
|
||||
See the API docs for :func:`read_binary_files() <ray.data.read_binary_files>`.
|
||||
|
||||
.. _dataset_reading_remote_storage:
|
||||
|
||||
Reading from Remote Storage
|
||||
===========================
|
||||
|
||||
All of the file formats mentioned above can be read from remote storage, such as S3,
|
||||
GCS, Azure Blob Storage, and HDFS. These storage systems are supported via Arrow's
|
||||
filesystem APIs natively for S3 and HDFS, and as a wrapper around fsspec for GCS and
|
||||
HDFS. All ``ray.data.read_*()`` APIs expose a ``filesystem`` argument that accepts both
|
||||
`Arrow FileSystem <https://arrow.apache.org/docs/python/filesystems.html>`__ instances
|
||||
and `fsspec FileSystem <https://filesystem-spec.readthedocs.io/en/latest/>`__ instances,
|
||||
allowing you to configure this connection to the remote storage system, such as
|
||||
authn/authz and buffer/block size.
|
||||
|
||||
For S3 and HDFS, the underlying `FileSystem
|
||||
<https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`__
|
||||
implementation will be inferred from the URL scheme (``"s3://"`` and ``"hdfs://"``); if
|
||||
the default connection configuration suffices for your workload, you won't need to
|
||||
specify a ``filesystem`` argument.
|
||||
|
||||
We use Parquet files for the below examples, but all of the aforementioned file formats
|
||||
are supported for each of these storage systems.
|
||||
|
||||
.. tabbed:: S3
|
||||
|
||||
The AWS S3 storage system is inferred from the URI scheme (``s3://``), with required connection
|
||||
configuration such as S3 credentials being pulled from the machine's environment
|
||||
(e.g. the ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environment variables).
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_s3_begin__
|
||||
:end-before: __read_parquet_s3_end__
|
||||
|
||||
If needing to customize this S3 storage system connection (credentials, region,
|
||||
endpoint override, etc.), you can pass in an
|
||||
`S3FileSystem <https://arrow.apache.org/docs/python/filesystems.html#s3>`__ instance
|
||||
to :func:`read_parquet() <ray.data.read_parquet>`.
|
||||
|
||||
.. note::
|
||||
|
||||
This example is not runnable as-is; to run it on your own private S3 data, add in a
|
||||
path to your private bucket and specify your S3 credentials.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_s3_with_fs_begin__
|
||||
:end-before: __read_parquet_s3_with_fs_end__
|
||||
|
||||
.. tabbed:: HDFS
|
||||
|
||||
The HDFS storage system is inferred from the URI scheme (``hdfs://``), with required connection
|
||||
configuration such as the host and the port being derived from the URI.
|
||||
|
||||
.. note::
|
||||
|
||||
This example is not runnable as-is; you'll need to point it at your HDFS
|
||||
cluster/data.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_hdfs_begin__
|
||||
:end-before: __read_parquet_hdfs_end__
|
||||
|
||||
If needing to customize this HDFS storage system connection (host, port, user, kerb
|
||||
ticket, etc.), you can pass in an `HDFSFileSystem
|
||||
<https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs>`__
|
||||
instance to :func:`read_parquet() <ray.data.read_parquet>`.
|
||||
|
||||
.. note::
|
||||
|
||||
This example is not runnable as-is; you'll need to point it at your HDFS
|
||||
cluster/data.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_hdfs_with_fs_begin__
|
||||
:end-before: __read_parquet_hdfs_with_fs_end__
|
||||
|
||||
.. tabbed:: GCS
|
||||
|
||||
Data can be read from Google Cloud Storage by providing a configured
|
||||
`gcsfs GCSFileSystem <https://gcsfs.readthedocs.io/en/latest/>`__, where the
|
||||
appropriate Google Cloud project and credentials can be specified.
|
||||
|
||||
.. note::
|
||||
This example is not runnable as-is; you'll need to point it at your GCS bucket and
|
||||
configure your GCP project and credentials.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_gcs_begin__
|
||||
:end-before: __read_parquet_gcs_end__
|
||||
|
||||
.. tabbed:: ADL/ABS (Azure)
|
||||
|
||||
Data can be read from Azure Blob Storage by providing a configured
|
||||
`adlfs AzureBlobFileSystem <https://github.com/fsspec/adlfs>`__, where the appropriate
|
||||
account name and account key can be specified.
|
||||
|
||||
.. literalinclude:: ./doc_code/creating_datasets.py
|
||||
:language: python
|
||||
:start-after: __read_parquet_az_begin__
|
||||
:end-before: __read_parquet_az_end__
|
||||
|
||||
.. _dataset_from_torch_tf:
|
||||
|
||||
-------------------------
|
||||
From Torch and TensorFlow
|
||||
-------------------------
|
||||
|
||||
.. tabbed:: PyTorch
|
||||
|
||||
|
@ -120,7 +588,9 @@ From Torch/TensorFlow
|
|||
features.shape # TensorShape([32, 32, 3])
|
||||
label # <tf.Tensor: shape=(), dtype=int64, numpy=7>
|
||||
|
||||
.. _dataset_from_huggingface:
|
||||
|
||||
-------------------------------
|
||||
From 🤗 (Hugging Face) Datasets
|
||||
-------------------------------
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
.. _datasets_custom_datasource:
|
||||
|
||||
------------------------
|
||||
Using Custom Datasources
|
||||
------------------------
|
||||
|
|
592
doc/source/data/doc_code/creating_datasets.py
Normal file
592
doc/source/data/doc_code/creating_datasets.py
Normal file
|
@ -0,0 +1,592 @@
|
|||
# flake8: noqa
|
||||
|
||||
# fmt: off
|
||||
# __creating_datasets_import_begin__
|
||||
import ray
|
||||
# __creating_datasets_import_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __gen_synth_int_range_begin__
|
||||
# Create a Dataset of Python objects.
|
||||
ds = ray.data.range(10000)
|
||||
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
|
||||
|
||||
ds.take(5)
|
||||
# -> [0, 1, 2, 3, 4]
|
||||
# __gen_synth_int_range_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __gen_synth_arrow_range_begin__
|
||||
# Create a Dataset of Arrow records.
|
||||
ds = ray.data.range_arrow(10000)
|
||||
# -> Dataset(num_blocks=200, num_rows=10000, schema={value: int64})
|
||||
|
||||
ds.take(5)
|
||||
# -> [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': 4}]
|
||||
# __gen_synth_arrow_range_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __gen_synth_tensor_range_begin__
|
||||
# Create a Dataset of tensors.
|
||||
ds = ray.data.range_tensor(100 * 64 * 64, shape=(64, 64))
|
||||
# -> Dataset(
|
||||
# num_blocks=200,
|
||||
# num_rows=409600,
|
||||
# schema={value: <ArrowTensorType: shape=(64, 64), dtype=int64>}
|
||||
# )
|
||||
|
||||
ds.take(2)
|
||||
# -> [array([[0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# ...,
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0],
|
||||
# [0, 0, 0, ..., 0, 0, 0]]),
|
||||
# array([[1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# ...,
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1],
|
||||
# [1, 1, 1, ..., 1, 1, 1]])]
|
||||
# __gen_synth_tensor_range_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_items_begin__
|
||||
# Create a Dataset of tabular (Arrow) records.
|
||||
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
|
||||
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_items_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_pandas_begin__
|
||||
import pandas as pd
|
||||
|
||||
# Create a tabular Dataset from a Pandas DataFrame.
|
||||
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
|
||||
ds = ray.data.from_pandas(df)
|
||||
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: object})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_pandas_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_pandas_mult_begin__
|
||||
import pandas as pd
|
||||
|
||||
data = list(range(10000))
|
||||
num_chunks = 10
|
||||
chunk_size = len(data) // num_chunks
|
||||
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
|
||||
dfs = [
|
||||
pd.DataFrame({"col1": list(chunk), "col2": list(map(str, chunk))})
|
||||
for chunk in chunks
|
||||
]
|
||||
# Create a tabular Dataset from multiple Pandas DataFrames.
|
||||
ds = ray.data.from_pandas(dfs)
|
||||
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_pandas_mult_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_numpy_begin__
|
||||
import numpy as np
|
||||
|
||||
# Create a tensor Dataset from a 1D NumPy ndarray.
|
||||
arr = np.arange(100)
|
||||
ds = ray.data.from_numpy(arr)
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=100,
|
||||
# schema={value: <ArrowTensorType: shape=(), dtype=int64>},
|
||||
# )
|
||||
|
||||
# Each element is a scalar ndarray.
|
||||
ds.show(3)
|
||||
# -> {'value': array(0)}
|
||||
# -> {'value': array(1)}
|
||||
# -> {'value': array(2)}
|
||||
|
||||
# Create a tensor Dataset from a 3D NumPy ndarray.
|
||||
arr = np.ones((3, 4, 4))
|
||||
# The outer dimension is treated as the row dimension.
|
||||
ds = ray.data.from_numpy(arr)
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=3,
|
||||
# schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {'value': array([[1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.]])}
|
||||
# -> {'value': array([[1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.],
|
||||
# [1., 1., 1., 1.]])}
|
||||
# __from_numpy_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_numpy_mult_begin__
|
||||
import numpy as np
|
||||
|
||||
# Create a tensor Dataset from multiple 3D NumPy ndarray.
|
||||
arrs = [np.random.rand(2, 4, 4) for _ in range(4)]
|
||||
# The outer dimension is treated as the row dimension.
|
||||
ds = ray.data.from_numpy(arrs)
|
||||
# -> Dataset(
|
||||
# num_blocks=4,
|
||||
# num_rows=8,
|
||||
# schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {'value': array([[0.06587483, 0.67808656, 0.76461924, 0.83428549],
|
||||
# [0.04932103, 0.25112165, 0.26476714, 0.24599738],
|
||||
# [0.67624391, 0.58689537, 0.12594709, 0.94663371],
|
||||
# [0.32435665, 0.97719096, 0.03234169, 0.71563231]])}
|
||||
# -> {'value': array([[0.98570318, 0.65956399, 0.82168898, 0.09798336],
|
||||
# [0.22426704, 0.34209978, 0.02605247, 0.48200137],
|
||||
# [0.17312096, 0.38789983, 0.42663678, 0.92652456],
|
||||
# [0.80787394, 0.92437162, 0.11185822, 0.3319638 ]])}
|
||||
# __from_numpy_mult_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_arrow_begin__
|
||||
import pyarrow as pa
|
||||
|
||||
# Create a tabular Dataset from an Arrow Table.
|
||||
t = pa.table({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
|
||||
ds = ray.data.from_arrow(t)
|
||||
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: string})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_arrow_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_arrow_mult_begin__
|
||||
import arrow as pd
|
||||
|
||||
data = list(range(10000))
|
||||
num_chunks = 10
|
||||
chunk_size = len(data) // num_chunks
|
||||
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
|
||||
ts = [
|
||||
pa.table({"col1": list(chunk), "col2": list(map(str, chunk))})
|
||||
for chunk in chunks
|
||||
]
|
||||
# Create a tabular Dataset from multiple Arrow Tables.
|
||||
ds = ray.data.from_arrow(ts)
|
||||
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_arrow_mult_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_dask_begin__
|
||||
import pandas as pd
|
||||
import dask.dataframe as dd
|
||||
|
||||
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
|
||||
ddf = dd.from_pandas(df, npartitions=4)
|
||||
# Create a tabular Dataset from a Dask DataFrame.
|
||||
ds = ray.data.from_dask(ddf)
|
||||
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_dask_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_spark_begin__
|
||||
import raydp
|
||||
|
||||
spark = raydp.init_spark(app_name="Spark -> Datasets Example",
|
||||
num_executors=2,
|
||||
executor_cores=2,
|
||||
executor_memory="500MB")
|
||||
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
|
||||
# Create a tabular Dataset from a Spark DataFrame.
|
||||
ds = ray.data.from_dask(df)
|
||||
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_spark_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_modin_begin__
|
||||
import modin.pandas as md
|
||||
|
||||
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
|
||||
mdf = md.DataFrame(df)
|
||||
# Create a tabular Dataset from a Modin DataFrame.
|
||||
ds = ray.data.from_modin(mdf)
|
||||
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_modin_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __from_mars_begin__
|
||||
import mars
|
||||
import mars.dataframe as md
|
||||
|
||||
cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
|
||||
|
||||
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
|
||||
mdf = md.DataFrame(df, num_partitions=8)
|
||||
# Create a tabular Dataset from a Mars DataFrame.
|
||||
ds = ray.data.from_mars(mdf)
|
||||
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})
|
||||
|
||||
ds.show(3)
|
||||
# -> {'col1': 0, 'col2': '0'}
|
||||
# -> {'col1': 1, 'col2': '1'}
|
||||
# -> {'col1': 2, 'col2': '2'}
|
||||
# __from_mars_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_begin__
|
||||
# Create a tabular Dataset by reading a Parquet file.
|
||||
ds = ray.data.read_parquet("example://iris.parquet")
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=150,
|
||||
# schema={
|
||||
# sepal.length: double,
|
||||
# sepal.width: double,
|
||||
# petal.length: double,
|
||||
# petal.width: double,
|
||||
# variety: string,
|
||||
# }
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {
|
||||
# 'sepal.length': 5.1,
|
||||
# 'sepal.width': 3.5,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# -> {
|
||||
# 'sepal.length': 4.9,
|
||||
# 'sepal.width': 3.0,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# __read_parquet_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_pushdown_begin__
|
||||
import pyarrow as pa
|
||||
|
||||
# Create a tabular Dataset by reading a Parquet file, pushing column selection and row
|
||||
# filtering down to the file scan.
|
||||
ds = ray.data.read_parquet(
|
||||
"example://iris.parquet",
|
||||
columns=["sepal.length", "variety"],
|
||||
filter=pa.dataset.field("sepal.length") > 5.0,
|
||||
).fully_executed() # Force a full read of the file.
|
||||
# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string})
|
||||
|
||||
ds.show(2)
|
||||
# -> {'sepal.length': 5.1, 'variety': 'Setosa'}
|
||||
# {'sepal.length': 5.4, 'variety': 'Setosa'}
|
||||
# __read_parquet_pushdown_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_csv_begin__
|
||||
# Create a tabular Dataset by reading a CSV file.
|
||||
ds = ray.data.read_csv("example://iris.csv")
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=150,
|
||||
# schema={
|
||||
# sepal.length: double,
|
||||
# sepal.width: double,
|
||||
# petal.length: double,
|
||||
# petal.width: double,
|
||||
# variety: string,
|
||||
# }
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {
|
||||
# 'sepal.length': 5.1,
|
||||
# 'sepal.width': 3.5,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# -> {
|
||||
# 'sepal.length': 4.9,
|
||||
# 'sepal.width': 3.0,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# __read_csv_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_json_begin__
|
||||
# Create a tabular Dataset by reading a JSON file.
|
||||
ds = ray.data.read_json("example://iris.json")
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=150,
|
||||
# schema={
|
||||
# sepal.length: double,
|
||||
# sepal.width: double,
|
||||
# petal.length: double,
|
||||
# petal.width: double,
|
||||
# variety: string,
|
||||
# }
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {
|
||||
# 'sepal.length': 5.1,
|
||||
# 'sepal.width': 3.5,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# -> {
|
||||
# 'sepal.length': 4.9,
|
||||
# 'sepal.width': 3.0,
|
||||
# 'petal.length': 1.4,
|
||||
# 'petal.width': 0.2,
|
||||
# 'variety': 'Setosa',
|
||||
# }
|
||||
# __read_json_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_numpy_begin__
|
||||
# Create a tensor Dataset by reading a NumPy file.
|
||||
ds = ray.data.read_numpy("example://mnist_subset.npy")
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=3,
|
||||
# schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# [array([[0, ...]]), array([[0, ...]])]
|
||||
# __read_numpy_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_text_begin__
|
||||
# Create a tabular Dataset by reading a text file.
|
||||
ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
|
||||
# -> Dataset(num_blocks=1, num_rows=10, schema=<class 'str'>)
|
||||
|
||||
ds.show(3)
|
||||
# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e
|
||||
# buffet... Cine there got amore wat...
|
||||
# ham Ok lar... Joking wif u oni...
|
||||
# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
|
||||
# to 87121 to receive entry question(std txt rate)T&C's apply
|
||||
# 08452810075over18's
|
||||
# __read_text_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_binary_begin__
|
||||
from io import BytesIO
|
||||
import PIL
|
||||
|
||||
# Create a tabular Dataset by reading a binary file.
|
||||
ds = ray.data.read_binary_files("example://mnist_subset_partitioned/0/1.png")
|
||||
# -> Dataset(num_blocks=1, num_rows=1, schema=<class 'bytes'>)
|
||||
|
||||
ds = ds.map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L")))
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=1,
|
||||
# schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
|
||||
# )
|
||||
|
||||
ds.show(3)
|
||||
# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e
|
||||
# buffet... Cine there got amore wat...
|
||||
# ham Ok lar... Joking wif u oni...
|
||||
# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
|
||||
# to 87121 to receive entry question(std txt rate)T&C's apply
|
||||
# 08452810075over18's
|
||||
# __read_binary_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_s3_begin__
|
||||
# Create a tabular Dataset by reading a Parquet file from S3.
|
||||
ds = ray.data.read_parquet("s3://ursa-labs-taxi-data/2009/01/data.parquet")
|
||||
# -> Dataset(
|
||||
# num_blocks=1,
|
||||
# num_rows=14092413,
|
||||
# schema={
|
||||
# vendor_id: string,
|
||||
# pickup_at: timestamp[us],
|
||||
# dropoff_at: timestamp[us],
|
||||
# passenger_count: int8,
|
||||
# trip_distance: float,
|
||||
# pickup_longitude: float,
|
||||
# pickup_latitude: float,
|
||||
# ...,
|
||||
# },
|
||||
# )
|
||||
|
||||
ds.show(2)
|
||||
# -> {
|
||||
# 'vendor_id': 'VTS',
|
||||
# 'pickup_at': datetime.datetime(2009, 1, 4, 2, 52),
|
||||
# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 2),
|
||||
# 'passenger_count': 1,
|
||||
# 'trip_distance': 2.630000114440918,
|
||||
# 'pickup_longitude': -73.99195861816406,
|
||||
# 'pickup_latitude': 40.72156524658203,
|
||||
# ...,
|
||||
# }
|
||||
# {
|
||||
# 'vendor_id': 'VTS',
|
||||
# 'pickup_at': datetime.datetime(2009, 1, 4, 3, 31),
|
||||
# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 38),
|
||||
# 'passenger_count': 3,
|
||||
# 'trip_distance': 4.550000190734863,
|
||||
# 'pickup_longitude': -73.98210144042969,
|
||||
# 'pickup_latitude': 40.736289978027344,
|
||||
# ...,
|
||||
# }
|
||||
# __read_parquet_s3_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_s3_with_fs_begin__
|
||||
import pyarrow as pa
|
||||
|
||||
# Create a tabular Dataset by reading a Parquet file from a private S3 bucket.
|
||||
# NOTE: This example is not runnable as-is; add in a path to your private bucket and the
|
||||
# required S3 credentials!
|
||||
ds = ray.data.read_parquet(
|
||||
"s3://some/private/bucket",
|
||||
filesystem=pa.fs.S3FileSystem(
|
||||
region="us-west-2",
|
||||
access_key="XXXX",
|
||||
secret_key="XXXX",
|
||||
),
|
||||
)
|
||||
# __read_parquet_s3_with_fs_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_hdfs_begin__
|
||||
# Create a tabular Dataset by reading a Parquet file from HDFS using HDFS connection
|
||||
# automatically constructed based on the URI.
|
||||
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
|
||||
# cluster/data.
|
||||
ds = ray.data.read_parquet("hdfs://<host:port>/path/to/file.parquet")
|
||||
# __read_parquet_hdfs_end__
|
||||
# fmt: on
|
||||
|
||||
# TODO(Clark): Find clean way to start local HDFS cluster in the below example (that
|
||||
# works in CI).
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_hdfs_with_fs_begin__
|
||||
import pyarrow as pa
|
||||
|
||||
# Create a tabular Dataset by reading a Parquet file from HDFS, manually specifying a
|
||||
# configured HDFS connection via a Pyarrow HDFSFileSystem instance.
|
||||
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
|
||||
# cluster/data.
|
||||
ds = ray.data.read_parquet(
|
||||
"hdfs://path/to/file.parquet",
|
||||
filesystem=pa.fs.HDFSFileSystem(host="localhost", port=9000, user="bob"),
|
||||
)
|
||||
# __read_parquet_hdfs_with_fs_end__
|
||||
# fmt: on
|
||||
|
||||
# TODO(Clark): Find open data for below GCS example.
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_gcs_begin__
|
||||
import gcsfs
|
||||
|
||||
# Create a tabular Dataset by reading a Parquet file from GCS, passing the configured
|
||||
# GCSFileSystem.
|
||||
# NOTE: This example is not runnable as-is; you'll need to point it at your GCS bucket
|
||||
# and configure your GCP project and credentials.
|
||||
ds = ray.data.read_parquet(
|
||||
"gs://path/to/file.parquet",
|
||||
filesystem=gcsfs.GCSFileSystem(project="my-google-project"),
|
||||
)
|
||||
# __read_parquet_gcs_end__
|
||||
# fmt: on
|
||||
|
||||
# fmt: off
|
||||
# __read_parquet_az_begin__
|
||||
import adlfs
|
||||
|
||||
# Create a tabular Dataset by reading a Parquet file from Azure Blob Storage, passing
|
||||
# the configured AzureBlobFileSystem.
|
||||
path = (
|
||||
"az://nyctlc/yellow/puYear=2009/puMonth=1/"
|
||||
"part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-4"
|
||||
".c000.snappy.parquet"
|
||||
)
|
||||
ds = ray.data.read_parquet(
|
||||
path,
|
||||
filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
|
||||
)
|
||||
# __read_parquet_az_end__
|
||||
# fmt: on
|
Loading…
Add table
Reference in a new issue