diff --git a/doc/source/data/creating-datasets.rst b/doc/source/data/creating-datasets.rst index 1e8a3a6a1..9f774b3fd 100644 --- a/doc/source/data/creating-datasets.rst +++ b/doc/source/data/creating-datasets.rst @@ -4,71 +4,539 @@ Creating Datasets ================= -You can get started by creating Datasets from synthetic data using ``ray.data.range()`` and ``ray.data.from_items()``. -Datasets can hold either plain Python objects (i.e. their schema is a Python type), or Arrow records -(in which case their schema is Arrow). +A :class:`Dataset ` can be created from: -.. code-block:: python +* generated synthetic data, +* local and distributed in-memory data, and +* local and external storage systems (local disk, cloud storage, HDFS, etc.). - import ray +Creation from existing in-memory data is enabled via Datasets' integrations +with familiar single-node data libraries (`Pandas `__, +`NumPy `__, `Arrow `__) and distributed +data processing frameworks (:ref:`Dask `, :ref:`Spark `, +:ref:`Modin `, :ref:`Mars `). Creating datasets from +persistent storage is enabled by Datasets' support for reading many common file +formats (Parquet, CSV, JSON, NPY, text, binary). - # Create a Dataset of Python objects. - ds = ray.data.range(10000) - # -> Dataset(num_blocks=200, num_rows=10000, schema=) +A :class:`Dataset ` can hold plain Python objects (simple datasets), +Arrow records (Arrow datasets), or Pandas records (Pandas datasets). These records are +grouped into one or more data **blocks**, and these blocks can be spread across +multiple Ray nodes. Simple datasets are represented by simple blocks (lists of Python +objects), Arrow datasets are represented by Arrow blocks ( +`Arrow Tables `__ +), and Pandas +datasets are represented by Pandas blocks ( +`Pandas DataFrames `__ +). - ds.take(5) - # -> [0, 1, 2, 3, 4] +The method of +creating the dataset will determine the format of its internal block representation. - ds.count() - # -> 10000 +.. dropdown:: See more about Datasets' internal block representation - # Create a Dataset of Arrow records. - ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)]) - # -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string}) + The following details the block representation for each creation method: - ds.show(5) - # -> {'col1': 0, 'col2': '0'} - # -> {'col1': 1, 'col2': '1'} - # -> {'col1': 2, 'col2': '2'} - # -> {'col1': 3, 'col2': '3'} - # -> {'col1': 4, 'col2': '4'} + * Reading tabular files (Parquet, CSV, JSON) and converting directly from Arrow produces Arrow datasets. + * Converting from Pandas, Dask, Modin, and Mars produces Pandas datasets. + * Reading NumPy files or converting from NumPy ndarrays produces Arrow datasets. + * Reading text and raw binary files produces simple datasets. - ds.schema() - # -> col1: int64 - # -> col2: string + The following figure visualizes a ``Dataset`` that has three + `Arrow Table `__ + blocks, each block holding 1000 rows: -Datasets can be created from files on local disk or remote datasources such as S3. -Any filesystem `supported by pyarrow `__ -can be used to specify file locations: + .. image:: images/dataset-arch.svg -.. code-block:: python +This guide surveys the many ways to create a ``Dataset``. If none of these meet your +needs, please reach out on `Discourse `__ or open a feature +request on the `Ray GitHub repo `__, and check out +our :ref:`guide for implementing a custom Datasets datasource ` +if you're interested in rolling your own integration! - # Read a directory of files in remote storage. - ds = ray.data.read_csv("s3://bucket/path") +.. _dataset_generate_data: - # Read multiple local files. - ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) +------------------------- +Generating Synthetic Data +------------------------- - # Read multiple directories. - ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"]) +Using Datasets with small, generated data is a great way to test out some of Datasets' +features. Each of these synthetic data generators will generate data in Ray tasks that +will execute in parallel and will be load-balanced across your Ray cluster; the +``Dataset`` will hold a set of futures representing the return values of those tasks, +serving as pointers to a collection of distributed data blocks. -Finally, you can create a ``Dataset`` from existing data in the Ray object store or Ray-compatible distributed DataFrames: +.. tabbed:: Int Range -.. code-block:: python + Create a ``Dataset`` from a range of integers. - import pandas as pd - import dask.dataframe as dd + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __gen_synth_int_range_begin__ + :end-before: __gen_synth_int_range_end__ - # Create a Dataset from a list of Pandas DataFrame objects. - pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - ds = ray.data.from_pandas([pdf]) +.. tabbed:: Arrow Range - # Create a Dataset from a Dask-on-Ray DataFrame. - dask_df = dd.from_pandas(pdf, npartitions=10) - ds = ray.data.from_dask(dask_df) + Create an Arrow (tabular) ``Dataset`` from a range of integers, + with a single column containing this integer range. -From Torch/TensorFlow ---------------------- + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __gen_synth_arrow_range_begin__ + :end-before: __gen_synth_arrow_range_end__ + +.. tabbed:: Tensor Range + + Create a tensor dataset from a range of integers, packing this integer range into + tensors of the provided shape. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __gen_synth_tensor_range_begin__ + :end-before: __gen_synth_tensor_range_end__ + +.. _dataset_from_in_memory_data: + +----------------------------------------- +From Local and Distributed In-Memory Data +----------------------------------------- + +Datasets can be constructed from existing in-memory data. In addition to being able to +construct a ``Dataset`` from plain Python objects, Datasets also interoperates with popular +single-node libraries (`Pandas `__, +`NumPy `__, `Arrow `__) as well as +distributed frameworks (:ref:`Dask `, :ref:`Spark `, +:ref:`Modin `, :ref:`Mars `). + +.. _dataset_from_in_memory_data_single_node: + +Integration with Single-Node Data Libraries +=========================================== + +In this section, we demonstrate creating a ``Dataset`` from single-node in-memory data. + +.. tabbed:: Pandas + + Create a ``Dataset`` from a Pandas DataFrame. This constructs a ``Dataset`` + backed by a single Pandas DataFrame block. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_pandas_begin__ + :end-before: __from_pandas_end__ + + We can also build a ``Dataset`` from more than one Pandas DataFrame, where each said + DataFrame will become a block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_pandas_mult_begin__ + :end-before: __from_pandas_mult_end__ + +.. tabbed:: NumPy + + Create a ``Dataset`` from a NumPy ndarray. This constructs a ``Dataset`` + backed by a single-column Arrow table block; the outer dimension of the ndarray + will be treated as the row dimension, and the column with have name ``"value"``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_numpy_begin__ + :end-before: __from_numpy_end__ + + We can also build a ``Dataset`` from more than one NumPy ndarray, where each said + ndarray will become a single-column Arrow table block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_numpy_mult_begin__ + :end-before: __from_numpy_mult_end__ + +.. tabbed:: Arrow + + Create a ``Dataset`` from an + `Arrow Table `__. + This constructs a ``Dataset`` backed by a single Arrow ``Table`` block. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_arrow_begin__ + :end-before: __from_arrow_end__ + + We can also build a ``Dataset`` from more than one Arrow Table, where each said + ``Table`` will become a block in the ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_arrow_mult_begin__ + :end-before: __from_arrow_mult_end__ + +.. tabbed:: Python Objects + + Create a ``Dataset`` from a list of Python objects; since each object in this + particular list is a dictionary, Datasets will treat this list as a list of tabular + records, and will construct an Arrow ``Dataset``. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_items_begin__ + :end-before: __from_items_end__ + +.. _dataset_from_in_memory_data_distributed: + +Integration with Distributed Data Processing Frameworks +======================================================= + +In addition to working with single-node in-memory data, Datasets can be constructed from +distributed (multi-node) in-memory data, interoperating with popular distributed +data processing frameworks such as :ref:`Dask `, :ref:`Spark `, +:ref:`Modin `, and :ref:`Mars `. + +The common paradigm used by +these conversions is to send out Ray tasks converting each Dask/Spark/Modin/Mars +data partition to a format that Datasets can understand (if needed), and using the +futures representing the return value of those conversion tasks as the ``Dataset`` block +futures. If the upstream framework's data partitions are already in a format that +Datasets understands (e.g. Arrow or Pandas), Datasets will elide the conversion task and +will instead reinterpret those data partitions directly as its blocks. + +.. note:: + + These data processing frameworks must be running on Ray in order for these Datasets + integrations to work. See how these frameworks can be run on Ray in our + :ref:`data processing integrations docs `. + +.. tabbed:: Dask + + Create a ``Dataset`` from a + `Dask DataFrame `__. This constructs a + ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the + Dask DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_dask_begin__ + :end-before: __from_dask_end__ + +.. tabbed:: Spark + + Create a ``Dataset`` from a `Spark DataFrame + `__. + This constructs a ``Dataset`` backed by the distributed Spark DataFrame partitions + that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP) + will save the Spark DataFrame partitions to Ray's object store in the Arrow format, + which Datasets will then interpret as its blocks. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_spark_begin__ + :end-before: __from_spark_end__ + +.. tabbed:: Modin + + Create a ``Dataset`` from a Modin DataFrame. This constructs a ``Dataset`` + backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_modin_begin__ + :end-before: __from_modin_end__ + +.. tabbed:: Mars + + Create a ``Dataset`` from a Mars DataFrame. This constructs a ``Dataset`` + backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame. + + .. note:: + + This conversion should have near-zero overhead: it involves zero data copying and + zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions + as Ray Datasets partitions without touching the underlying data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __from_mars_begin__ + :end-before: __from_mars_end__ + +.. _dataset_reading_from_storage: + +-------------------------- +Reading Files From Storage +-------------------------- + +Using the ``ray.data.read_*()`` APIs, Datasets can be created from files on local disk +or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any filesystem +`supported by pyarrow `__ +can be used to specify file locations, and many common file formats are supported: +Parquet, CSV, JSON, NPY, text, binary. + +Parallel + Distributed Reading +============================== + +Each of these APIs take a path or list of paths to files or directories. Any directories +provided will be walked in order to obtain concrete file paths, at which point all files +will be read in parallel. + +Datasets uses a default parallelism of 200, truncated by the number of files being read: +``parallelism = min(num_files, 200)``. ``parallelism`` parallel read tasks will be +launched, each reading one or more files and each creating a single block of data. +When reading from remote datasources, these parallel read tasks will be spread across +the nodes in your Ray cluster, creating the distributed collection of blocks that makes +up a distributed Ray Dataset. + +.. image:: images/dataset-read.svg + :width: 650px + :align: center + +This default parallelism can be overridden via the ``parallelism`` argument; see the +:ref:`performance guide ` for tips on how to tune this read +parallelism. + +Deferred Read Task Execution +============================ + +Datasets created via the ``ray.data.read_*()`` APIs are semi-lazy: initially, only the +first read task will be executed. This avoids blocking Dataset creation on the reading +of all data files, enabling inspection functions like +:meth:`ds.schema() ` and +:meth:`ds.show() ` to be used right away. Executing further +transformations on the Dataset will trigger execution of all read tasks, and execution +of all read tasks can be triggered manually using the +:meth:`ds.fully_executed() ` API. + + +.. _dataset_supported_file_formats: + +Supported File Formats +====================== + +.. tabbed:: Parquet + + Read Parquet files into a tabular ``Dataset``. The Parquet data will be read into + `Arrow Table `__ + blocks. Although this simple example demonstrates reading a single file, note that + Datasets can also read directories of Parquet files, with one tabular block created + per file. For Parquet in particular, we also support reading partitioned Parquet + datasets with partition column values pulled from the file paths. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_begin__ + :end-before: __read_parquet_end__ + + Datasets' Parquet reader also supports projection and filter pushdown, allowing column + selection and row filtering to be pushed down to the file scan. For column selection, + unselected columns will never be read from the file. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_pushdown_begin__ + :end-before: __read_parquet_pushdown_end__ + + See the API docs for :func:`read_parquet() `. + +.. tabbed:: CSV + + Read CSV files into a tabular ``Dataset``. The CSV data will be read into + `Arrow Table `__ + blocks. Although this simple example demonstrates reading a single file, note that + Datasets can also read directories of CSV files, with one tabular block created + per file. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_csv_begin__ + :end-before: __read_csv_end__ + + See the API docs for :func:`read_csv() `. + +.. tabbed:: JSON + + Read JSON files into a tabular ``Dataset``. The JSON data will be read into + `Arrow Table `__ + blocks. Although this simple example demonstrates reading a single file, note that + Datasets can also read directories of JSON files, with one tabular block created + per file. + + Currently, only newline-delimited JSON (NDJSON) is supported. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_json_begin__ + :end-before: __read_json_end__ + + See the API docs for :func:`read_json() `. + +.. tabbed:: NumPy + + Read NumPy files into a tensor ``Dataset``. The NumPy ndarray data will be read into + single-column + `Arrow Table `__ + blocks using our + :class:`tensor extension type `, + treating the outermost ndarray dimension as the row dimension. See our + :ref:`tensor data guide ` for more information on working + with tensors in Datasets. Although this simple example demonstrates reading a single + file, note that Datasets can also read directories of JSON files, with one tensor + block created per file. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_numpy_begin__ + :end-before: __read_numpy_end__ + + See the API docs for :func:`read_numpy() `. + +.. tabbed:: Text + + Read text files into a ``Dataset``. Each line in each text file will be treated as a + row in the dataset, resulting in a list-of-strings block being created for each text + file. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_text_begin__ + :end-before: __read_text_end__ + + See the API docs for :func:`read_text() `. + +.. tabbed:: Binary + + Read binary files into a ``Dataset``. Each binary file will be treated as a single row + of opaque bytes. These bytes can be decoded into tensor, tabular, text, or any other + kind of data using ``ds.map()`` to apply a per-row decoding UDF. + + Although this simple example demonstrates reading a single file, note that Datasets + can also read directories of binary files, with one bytes block created per file. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_binary_begin__ + :end-before: __read_binary_end__ + + See the API docs for :func:`read_binary_files() `. + +.. _dataset_reading_remote_storage: + +Reading from Remote Storage +=========================== + +All of the file formats mentioned above can be read from remote storage, such as S3, +GCS, Azure Blob Storage, and HDFS. These storage systems are supported via Arrow's +filesystem APIs natively for S3 and HDFS, and as a wrapper around fsspec for GCS and +HDFS. All ``ray.data.read_*()`` APIs expose a ``filesystem`` argument that accepts both +`Arrow FileSystem `__ instances +and `fsspec FileSystem `__ instances, +allowing you to configure this connection to the remote storage system, such as +authn/authz and buffer/block size. + +For S3 and HDFS, the underlying `FileSystem +`__ +implementation will be inferred from the URL scheme (``"s3://"`` and ``"hdfs://"``); if +the default connection configuration suffices for your workload, you won't need to +specify a ``filesystem`` argument. + +We use Parquet files for the below examples, but all of the aforementioned file formats +are supported for each of these storage systems. + +.. tabbed:: S3 + + The AWS S3 storage system is inferred from the URI scheme (``s3://``), with required connection + configuration such as S3 credentials being pulled from the machine's environment + (e.g. the ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environment variables). + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_s3_begin__ + :end-before: __read_parquet_s3_end__ + + If needing to customize this S3 storage system connection (credentials, region, + endpoint override, etc.), you can pass in an + `S3FileSystem `__ instance + to :func:`read_parquet() `. + + .. note:: + + This example is not runnable as-is; to run it on your own private S3 data, add in a + path to your private bucket and specify your S3 credentials. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_s3_with_fs_begin__ + :end-before: __read_parquet_s3_with_fs_end__ + +.. tabbed:: HDFS + + The HDFS storage system is inferred from the URI scheme (``hdfs://``), with required connection + configuration such as the host and the port being derived from the URI. + + .. note:: + + This example is not runnable as-is; you'll need to point it at your HDFS + cluster/data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_hdfs_begin__ + :end-before: __read_parquet_hdfs_end__ + + If needing to customize this HDFS storage system connection (host, port, user, kerb + ticket, etc.), you can pass in an `HDFSFileSystem + `__ + instance to :func:`read_parquet() `. + + .. note:: + + This example is not runnable as-is; you'll need to point it at your HDFS + cluster/data. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_hdfs_with_fs_begin__ + :end-before: __read_parquet_hdfs_with_fs_end__ + +.. tabbed:: GCS + + Data can be read from Google Cloud Storage by providing a configured + `gcsfs GCSFileSystem `__, where the + appropriate Google Cloud project and credentials can be specified. + + .. note:: + This example is not runnable as-is; you'll need to point it at your GCS bucket and + configure your GCP project and credentials. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_gcs_begin__ + :end-before: __read_parquet_gcs_end__ + +.. tabbed:: ADL/ABS (Azure) + + Data can be read from Azure Blob Storage by providing a configured + `adlfs AzureBlobFileSystem `__, where the appropriate + account name and account key can be specified. + + .. literalinclude:: ./doc_code/creating_datasets.py + :language: python + :start-after: __read_parquet_az_begin__ + :end-before: __read_parquet_az_end__ + +.. _dataset_from_torch_tf: + +------------------------- +From Torch and TensorFlow +------------------------- .. tabbed:: PyTorch @@ -120,7 +588,9 @@ From Torch/TensorFlow features.shape # TensorShape([32, 32, 3]) label # +.. _dataset_from_huggingface: +------------------------------- From 🤗 (Hugging Face) Datasets ------------------------------- @@ -142,4 +612,4 @@ converts it into a Ray Dataset directly. hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1") ray_datasets = ray.data.from_huggingface(hf_datasets) ray_datasets["train"].take(2) - # [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}] \ No newline at end of file + # [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}] diff --git a/doc/source/data/custom-data.rst b/doc/source/data/custom-data.rst index a7610fe21..094185294 100644 --- a/doc/source/data/custom-data.rst +++ b/doc/source/data/custom-data.rst @@ -1,3 +1,5 @@ +.. _datasets_custom_datasource: + ------------------------ Using Custom Datasources ------------------------ diff --git a/doc/source/data/doc_code/creating_datasets.py b/doc/source/data/doc_code/creating_datasets.py new file mode 100644 index 000000000..f20ddbd5d --- /dev/null +++ b/doc/source/data/doc_code/creating_datasets.py @@ -0,0 +1,592 @@ +# flake8: noqa + +# fmt: off +# __creating_datasets_import_begin__ +import ray +# __creating_datasets_import_end__ +# fmt: on + +# fmt: off +# __gen_synth_int_range_begin__ +# Create a Dataset of Python objects. +ds = ray.data.range(10000) +# -> Dataset(num_blocks=200, num_rows=10000, schema=) + +ds.take(5) +# -> [0, 1, 2, 3, 4] +# __gen_synth_int_range_end__ +# fmt: on + +# fmt: off +# __gen_synth_arrow_range_begin__ +# Create a Dataset of Arrow records. +ds = ray.data.range_arrow(10000) +# -> Dataset(num_blocks=200, num_rows=10000, schema={value: int64}) + +ds.take(5) +# -> [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': 4}] +# __gen_synth_arrow_range_end__ +# fmt: on + +# fmt: off +# __gen_synth_tensor_range_begin__ +# Create a Dataset of tensors. +ds = ray.data.range_tensor(100 * 64 * 64, shape=(64, 64)) +# -> Dataset( +# num_blocks=200, +# num_rows=409600, +# schema={value: } +# ) + +ds.take(2) +# -> [array([[0, 0, 0, ..., 0, 0, 0], +# [0, 0, 0, ..., 0, 0, 0], +# [0, 0, 0, ..., 0, 0, 0], +# ..., +# [0, 0, 0, ..., 0, 0, 0], +# [0, 0, 0, ..., 0, 0, 0], +# [0, 0, 0, ..., 0, 0, 0]]), +# array([[1, 1, 1, ..., 1, 1, 1], +# [1, 1, 1, ..., 1, 1, 1], +# [1, 1, 1, ..., 1, 1, 1], +# ..., +# [1, 1, 1, ..., 1, 1, 1], +# [1, 1, 1, ..., 1, 1, 1], +# [1, 1, 1, ..., 1, 1, 1]])] +# __gen_synth_tensor_range_end__ +# fmt: on + +# fmt: off +# __from_items_begin__ +# Create a Dataset of tabular (Arrow) records. +ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)]) +# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_items_end__ +# fmt: on + +# fmt: off +# __from_pandas_begin__ +import pandas as pd + +# Create a tabular Dataset from a Pandas DataFrame. +df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) +ds = ray.data.from_pandas(df) +# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: object}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_pandas_end__ +# fmt: on + +# fmt: off +# __from_pandas_mult_begin__ +import pandas as pd + +data = list(range(10000)) +num_chunks = 10 +chunk_size = len(data) // num_chunks +chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] +dfs = [ + pd.DataFrame({"col1": list(chunk), "col2": list(map(str, chunk))}) + for chunk in chunks +] +# Create a tabular Dataset from multiple Pandas DataFrames. +ds = ray.data.from_pandas(dfs) +# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_pandas_mult_end__ +# fmt: on + +# fmt: off +# __from_numpy_begin__ +import numpy as np + +# Create a tensor Dataset from a 1D NumPy ndarray. +arr = np.arange(100) +ds = ray.data.from_numpy(arr) +# -> Dataset( +# num_blocks=1, +# num_rows=100, +# schema={value: }, +# ) + +# Each element is a scalar ndarray. +ds.show(3) +# -> {'value': array(0)} +# -> {'value': array(1)} +# -> {'value': array(2)} + +# Create a tensor Dataset from a 3D NumPy ndarray. +arr = np.ones((3, 4, 4)) +# The outer dimension is treated as the row dimension. +ds = ray.data.from_numpy(arr) +# -> Dataset( +# num_blocks=1, +# num_rows=3, +# schema={value: }, +# ) + +ds.show(2) +# -> {'value': array([[1., 1., 1., 1.], +# [1., 1., 1., 1.], +# [1., 1., 1., 1.], +# [1., 1., 1., 1.]])} +# -> {'value': array([[1., 1., 1., 1.], +# [1., 1., 1., 1.], +# [1., 1., 1., 1.], +# [1., 1., 1., 1.]])} +# __from_numpy_end__ +# fmt: on + +# fmt: off +# __from_numpy_mult_begin__ +import numpy as np + +# Create a tensor Dataset from multiple 3D NumPy ndarray. +arrs = [np.random.rand(2, 4, 4) for _ in range(4)] +# The outer dimension is treated as the row dimension. +ds = ray.data.from_numpy(arrs) +# -> Dataset( +# num_blocks=4, +# num_rows=8, +# schema={value: }, +# ) + +ds.show(2) +# -> {'value': array([[0.06587483, 0.67808656, 0.76461924, 0.83428549], +# [0.04932103, 0.25112165, 0.26476714, 0.24599738], +# [0.67624391, 0.58689537, 0.12594709, 0.94663371], +# [0.32435665, 0.97719096, 0.03234169, 0.71563231]])} +# -> {'value': array([[0.98570318, 0.65956399, 0.82168898, 0.09798336], +# [0.22426704, 0.34209978, 0.02605247, 0.48200137], +# [0.17312096, 0.38789983, 0.42663678, 0.92652456], +# [0.80787394, 0.92437162, 0.11185822, 0.3319638 ]])} +# __from_numpy_mult_end__ +# fmt: on + +# fmt: off +# __from_arrow_begin__ +import pyarrow as pa + +# Create a tabular Dataset from an Arrow Table. +t = pa.table({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) +ds = ray.data.from_arrow(t) +# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: string}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_arrow_end__ +# fmt: on + +# fmt: off +# __from_arrow_mult_begin__ +import arrow as pd + +data = list(range(10000)) +num_chunks = 10 +chunk_size = len(data) // num_chunks +chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] +ts = [ + pa.table({"col1": list(chunk), "col2": list(map(str, chunk))}) + for chunk in chunks +] +# Create a tabular Dataset from multiple Arrow Tables. +ds = ray.data.from_arrow(ts) +# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_arrow_mult_end__ +# fmt: on + +# fmt: off +# __from_dask_begin__ +import pandas as pd +import dask.dataframe as dd + +df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) +ddf = dd.from_pandas(df, npartitions=4) +# Create a tabular Dataset from a Dask DataFrame. +ds = ray.data.from_dask(ddf) +# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_dask_end__ +# fmt: on + +# fmt: off +# __from_spark_begin__ +import raydp + +spark = raydp.init_spark(app_name="Spark -> Datasets Example", + num_executors=2, + executor_cores=2, + executor_memory="500MB") +df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"]) +# Create a tabular Dataset from a Spark DataFrame. +ds = ray.data.from_dask(df) +# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_spark_end__ +# fmt: on + +# fmt: off +# __from_modin_begin__ +import modin.pandas as md + +df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) +mdf = md.DataFrame(df) +# Create a tabular Dataset from a Modin DataFrame. +ds = ray.data.from_modin(mdf) +# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_modin_end__ +# fmt: on + +# fmt: off +# __from_mars_begin__ +import mars +import mars.dataframe as md + +cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1) + +df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) +mdf = md.DataFrame(df, num_partitions=8) +# Create a tabular Dataset from a Mars DataFrame. +ds = ray.data.from_mars(mdf) +# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object}) + +ds.show(3) +# -> {'col1': 0, 'col2': '0'} +# -> {'col1': 1, 'col2': '1'} +# -> {'col1': 2, 'col2': '2'} +# __from_mars_end__ +# fmt: on + +# fmt: off +# __read_parquet_begin__ +# Create a tabular Dataset by reading a Parquet file. +ds = ray.data.read_parquet("example://iris.parquet") +# -> Dataset( +# num_blocks=1, +# num_rows=150, +# schema={ +# sepal.length: double, +# sepal.width: double, +# petal.length: double, +# petal.width: double, +# variety: string, +# } +# ) + +ds.show(2) +# -> { +# 'sepal.length': 5.1, +# 'sepal.width': 3.5, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# -> { +# 'sepal.length': 4.9, +# 'sepal.width': 3.0, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# __read_parquet_end__ +# fmt: on + +# fmt: off +# __read_parquet_pushdown_begin__ +import pyarrow as pa + +# Create a tabular Dataset by reading a Parquet file, pushing column selection and row +# filtering down to the file scan. +ds = ray.data.read_parquet( + "example://iris.parquet", + columns=["sepal.length", "variety"], + filter=pa.dataset.field("sepal.length") > 5.0, +).fully_executed() # Force a full read of the file. +# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string}) + +ds.show(2) +# -> {'sepal.length': 5.1, 'variety': 'Setosa'} +# {'sepal.length': 5.4, 'variety': 'Setosa'} +# __read_parquet_pushdown_end__ +# fmt: on + +# fmt: off +# __read_csv_begin__ +# Create a tabular Dataset by reading a CSV file. +ds = ray.data.read_csv("example://iris.csv") +# -> Dataset( +# num_blocks=1, +# num_rows=150, +# schema={ +# sepal.length: double, +# sepal.width: double, +# petal.length: double, +# petal.width: double, +# variety: string, +# } +# ) + +ds.show(2) +# -> { +# 'sepal.length': 5.1, +# 'sepal.width': 3.5, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# -> { +# 'sepal.length': 4.9, +# 'sepal.width': 3.0, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# __read_csv_end__ +# fmt: on + +# fmt: off +# __read_json_begin__ +# Create a tabular Dataset by reading a JSON file. +ds = ray.data.read_json("example://iris.json") +# -> Dataset( +# num_blocks=1, +# num_rows=150, +# schema={ +# sepal.length: double, +# sepal.width: double, +# petal.length: double, +# petal.width: double, +# variety: string, +# } +# ) + +ds.show(2) +# -> { +# 'sepal.length': 5.1, +# 'sepal.width': 3.5, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# -> { +# 'sepal.length': 4.9, +# 'sepal.width': 3.0, +# 'petal.length': 1.4, +# 'petal.width': 0.2, +# 'variety': 'Setosa', +# } +# __read_json_end__ +# fmt: on + +# fmt: off +# __read_numpy_begin__ +# Create a tensor Dataset by reading a NumPy file. +ds = ray.data.read_numpy("example://mnist_subset.npy") +# -> Dataset( +# num_blocks=1, +# num_rows=3, +# schema={__RAY_TC__: }, +# ) + +ds.show(2) +# [array([[0, ...]]), array([[0, ...]])] +# __read_numpy_end__ +# fmt: on + +# fmt: off +# __read_text_begin__ +# Create a tabular Dataset by reading a text file. +ds = ray.data.read_text("example://sms_spam_collection_subset.txt") +# -> Dataset(num_blocks=1, num_rows=10, schema=) + +ds.show(3) +# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e +# buffet... Cine there got amore wat... +# ham Ok lar... Joking wif u oni... +# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA +# to 87121 to receive entry question(std txt rate)T&C's apply +# 08452810075over18's +# __read_text_end__ +# fmt: on + +# fmt: off +# __read_binary_begin__ +from io import BytesIO +import PIL + +# Create a tabular Dataset by reading a binary file. +ds = ray.data.read_binary_files("example://mnist_subset_partitioned/0/1.png") +# -> Dataset(num_blocks=1, num_rows=1, schema=) + +ds = ds.map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L"))) +# -> Dataset( +# num_blocks=1, +# num_rows=1, +# schema={__RAY_TC__: }, +# ) + +ds.show(3) +# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e +# buffet... Cine there got amore wat... +# ham Ok lar... Joking wif u oni... +# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA +# to 87121 to receive entry question(std txt rate)T&C's apply +# 08452810075over18's +# __read_binary_end__ +# fmt: on + +# fmt: off +# __read_parquet_s3_begin__ +# Create a tabular Dataset by reading a Parquet file from S3. +ds = ray.data.read_parquet("s3://ursa-labs-taxi-data/2009/01/data.parquet") +# -> Dataset( +# num_blocks=1, +# num_rows=14092413, +# schema={ +# vendor_id: string, +# pickup_at: timestamp[us], +# dropoff_at: timestamp[us], +# passenger_count: int8, +# trip_distance: float, +# pickup_longitude: float, +# pickup_latitude: float, +# ..., +# }, +# ) + +ds.show(2) +# -> { +# 'vendor_id': 'VTS', +# 'pickup_at': datetime.datetime(2009, 1, 4, 2, 52), +# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 2), +# 'passenger_count': 1, +# 'trip_distance': 2.630000114440918, +# 'pickup_longitude': -73.99195861816406, +# 'pickup_latitude': 40.72156524658203, +# ..., +# } +# { +# 'vendor_id': 'VTS', +# 'pickup_at': datetime.datetime(2009, 1, 4, 3, 31), +# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 38), +# 'passenger_count': 3, +# 'trip_distance': 4.550000190734863, +# 'pickup_longitude': -73.98210144042969, +# 'pickup_latitude': 40.736289978027344, +# ..., +# } +# __read_parquet_s3_end__ +# fmt: on + +# fmt: off +# __read_parquet_s3_with_fs_begin__ +import pyarrow as pa + +# Create a tabular Dataset by reading a Parquet file from a private S3 bucket. +# NOTE: This example is not runnable as-is; add in a path to your private bucket and the +# required S3 credentials! +ds = ray.data.read_parquet( + "s3://some/private/bucket", + filesystem=pa.fs.S3FileSystem( + region="us-west-2", + access_key="XXXX", + secret_key="XXXX", + ), +) +# __read_parquet_s3_with_fs_end__ +# fmt: on + +# fmt: off +# __read_parquet_hdfs_begin__ +# Create a tabular Dataset by reading a Parquet file from HDFS using HDFS connection +# automatically constructed based on the URI. +# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS +# cluster/data. +ds = ray.data.read_parquet("hdfs:///path/to/file.parquet") +# __read_parquet_hdfs_end__ +# fmt: on + +# TODO(Clark): Find clean way to start local HDFS cluster in the below example (that +# works in CI). + +# fmt: off +# __read_parquet_hdfs_with_fs_begin__ +import pyarrow as pa + +# Create a tabular Dataset by reading a Parquet file from HDFS, manually specifying a +# configured HDFS connection via a Pyarrow HDFSFileSystem instance. +# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS +# cluster/data. +ds = ray.data.read_parquet( + "hdfs://path/to/file.parquet", + filesystem=pa.fs.HDFSFileSystem(host="localhost", port=9000, user="bob"), +) +# __read_parquet_hdfs_with_fs_end__ +# fmt: on + +# TODO(Clark): Find open data for below GCS example. + +# fmt: off +# __read_parquet_gcs_begin__ +import gcsfs + +# Create a tabular Dataset by reading a Parquet file from GCS, passing the configured +# GCSFileSystem. +# NOTE: This example is not runnable as-is; you'll need to point it at your GCS bucket +# and configure your GCP project and credentials. +ds = ray.data.read_parquet( + "gs://path/to/file.parquet", + filesystem=gcsfs.GCSFileSystem(project="my-google-project"), +) +# __read_parquet_gcs_end__ +# fmt: on + +# fmt: off +# __read_parquet_az_begin__ +import adlfs + +# Create a tabular Dataset by reading a Parquet file from Azure Blob Storage, passing +# the configured AzureBlobFileSystem. +path = ( + "az://nyctlc/yellow/puYear=2009/puMonth=1/" + "part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-4" + ".c000.snappy.parquet" +) +ds = ray.data.read_parquet( + path, + filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage") +) +# __read_parquet_az_end__ +# fmt: on