diff --git a/doc/source/data/package-ref.rst b/doc/source/data/package-ref.rst index 6c8e83125..600b58052 100644 --- a/doc/source/data/package-ref.rst +++ b/doc/source/data/package-ref.rst @@ -12,6 +12,7 @@ Creating Datasets .. autofunction:: ray.data.read_csv .. autofunction:: ray.data.read_json .. autofunction:: ray.data.read_parquet +.. autofunction:: ray.data.read_parquet_bulk .. autofunction:: ray.data.read_numpy .. autofunction:: ray.data.read_text .. autofunction:: ray.data.read_binary_files diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 3d1d1cea1..c238b9d1a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1,67 +1,54 @@ -import os import logging -from typing import ( - List, - Any, - Dict, - Union, - Optional, - Tuple, - TypeVar, - TYPE_CHECKING, -) +import os +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union import numpy as np -if TYPE_CHECKING: - import pyarrow - import pandas - import dask - import mars - import modin - import pyspark - import datasets - import ray -from ray.types import ObjectRef -from ray.util.annotations import PublicAPI, DeveloperAPI, Deprecated -from ray.data.block import ( - Block, - BlockAccessor, - BlockMetadata, - BlockExecStats, -) -from ray.data.context import DatasetContext, DEFAULT_SCHEDULING_STRATEGY -from ray.data.dataset import Dataset -from ray.data.datasource import ( - Datasource, - RangeDatasource, - JSONDatasource, - CSVDatasource, - ParquetDatasource, - BinaryDatasource, - NumpyDatasource, - ReadTask, - BaseFileMetadataProvider, - DefaultFileMetadataProvider, - FastFileMetadataProvider, - ParquetMetadataProvider, - DefaultParquetMetadataProvider, - PathPartitionFilter, - ParquetBaseDatasource, -) -from ray.data.datasource.file_based_datasource import ( - _wrap_arrow_serialization_workaround, - _unwrap_arrow_serialization_workaround, -) -from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_list import BlockList +from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.lazy_block_list import LazyBlockList from ray.data._internal.plan import ExecutionPlan from ray.data._internal.remote_fn import cached_remote_fn from ray.data._internal.stats import DatasetStats from ray.data._internal.util import _lazy_import_pyarrow_dataset +from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata +from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DatasetContext +from ray.data.dataset import Dataset +from ray.data.datasource import ( + BaseFileMetadataProvider, + BinaryDatasource, + CSVDatasource, + Datasource, + DefaultFileMetadataProvider, + DefaultParquetMetadataProvider, + FastFileMetadataProvider, + JSONDatasource, + NumpyDatasource, + ParquetBaseDatasource, + ParquetDatasource, + ParquetMetadataProvider, + PathPartitionFilter, + RangeDatasource, + ReadTask, +) +from ray.data.datasource.file_based_datasource import ( + _unwrap_arrow_serialization_workaround, + _wrap_arrow_serialization_workaround, +) +from ray.types import ObjectRef +from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI + +if TYPE_CHECKING: + import dask + import datasets + import mars + import modin + import pandas + import pyarrow + import pyspark + T = TypeVar("T") @@ -319,7 +306,8 @@ def read_parquet( >>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP Args: - paths: A single file path or a list of file paths (or directories). + paths: A single file path or directory, or a list of file paths. Multiple + directories are not supported. filesystem: The filesystem implementation to read from. columns: A list of column names to read. parallelism: The requested parallelism of the read. Parallelism may be @@ -373,54 +361,54 @@ def read_parquet_bulk( By default, ONLY file paths should be provided as input (i.e. no directory paths), and an OSError will be raised if one or more paths point to directories. If your use-case requires directory paths, then the metadata provider should be changed to - one that supports directory expansion (e.g. DefaultFileMetadataProvider). + one that supports directory expansion (e.g. ``DefaultFileMetadataProvider``). - Offers improved performance vs. `read_parquet` due to not using PyArrow's - `ParquetDataset` abstraction, whose latency scales linearly with the number of + Offers improved performance vs. :func:`read_parquet` due to not using PyArrow's + ``ParquetDataset`` abstraction, whose latency scales linearly with the number of input files due to collecting all file metadata on a single node. - Also supports a wider variety of input Parquet file types than `read_parquet` due - to not trying to merge and resolve a unified schema for all files. + Also supports a wider variety of input Parquet file types than :func:`read_parquet` + due to not trying to merge and resolve a unified schema for all files. - However, unlike `read_parquet`, this does not offer file metadata resolution by - default, so a custom metadata provider should be provided if your use-case requires - a unified dataset schema, block sizes, row counts, etc. + However, unlike :func:`read_parquet`, this does not offer file metadata resolution + by default, so a custom metadata provider should be provided if your use-case + requires a unified dataset schema, block sizes, row counts, etc. Examples: >>> # Read multiple local files. You should always provide only input file >>> # paths (i.e. no directory paths) when known to minimize read latency. - >>> ray.data.read_parquet_bulk(["/path/to/file1", "/path/to/file2"]) + >>> ray.data.read_parquet_bulk( # doctest: +SKIP + ... ["/path/to/file1", "/path/to/file2"]) >>> # Read a directory of files in remote storage. Caution should be taken >>> # when providing directory paths, since the time to both check each path >>> # type and expand its contents may result in greatly increased latency >>> # and/or request rate throttling from cloud storage service providers. - >>> ray.data.read_parquet_bulk( - >>> "s3://bucket/path", - >>> meta_provider=DefaultFileMetadataProvider(), - >>> ) + >>> ray.data.read_parquet_bulk( # doctest: +SKIP + ... "s3://bucket/path", + ... meta_provider=DefaultFileMetadataProvider()) Args: paths: A single file path or a list of file paths. If one or more directories - are provided, then `meta_provider` should also be set to an implementation - that supports directory expansion (e.g. DefaultFileMetadataProvider). + are provided, then ``meta_provider`` should also be set to an implementation + that supports directory expansion (e.g. ``DefaultFileMetadataProvider``). filesystem: The filesystem implementation to read from. columns: A list of column names to read. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. ray_remote_args: kwargs passed to ray.remote in the read tasks. arrow_open_file_args: kwargs passed to - pyarrow.fs.FileSystem.open_input_file + ``pyarrow.fs.FileSystem.open_input_file``. tensor_column_schema: A dict of column name --> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via - `arr.tobytes()`). + ``arr.tobytes()``). meta_provider: File metadata provider. Defaults to a fast file metadata provider that skips file size collection and requires all input paths to be - files. Change to DefaultFileMetadataProvider or a custom metadata provider - if directory expansion and/or file metadata resolution is required. + files. Change to ``DefaultFileMetadataProvider`` or a custom metadata + provider if directory expansion and/or file metadata resolution is required. partition_filter: Path-based partition filter, if any. Can be used with a custom callback to read only selected partitions of a dataset. arrow_parquet_args: Other parquet read options to pass to pyarrow. @@ -738,6 +726,7 @@ def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]: Dataset holding Arrow records read from the DataFrame. """ import dask + from ray.util.dask import ray_dask_get partitions = df.to_delayed()