[Datasets] Make it clear that read_parquet() does not support multiple directories. (#25747)

Unfortunately, ray.data.read_parquet() doesn't work with multiple directories since it uses Arrow's Dataset abstraction under-the-hood, which doesn't accept multiple directories as a source: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html

This PR makes this clear in the docs, and as a driveby, adds ray.data.read_parquet_bulk() to the API docs.
This commit is contained in:
Clark Zinzow 2022-06-15 13:19:39 -07:00 committed by GitHub
parent 7800172041
commit 526e12074a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 72 deletions

View file

@ -12,6 +12,7 @@ Creating Datasets
.. autofunction:: ray.data.read_csv
.. autofunction:: ray.data.read_json
.. autofunction:: ray.data.read_parquet
.. autofunction:: ray.data.read_parquet_bulk
.. autofunction:: ray.data.read_numpy
.. autofunction:: ray.data.read_text
.. autofunction:: ray.data.read_binary_files

View file

@ -1,67 +1,54 @@
import os
import logging
from typing import (
List,
Any,
Dict,
Union,
Optional,
Tuple,
TypeVar,
TYPE_CHECKING,
)
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union
import numpy as np
if TYPE_CHECKING:
import pyarrow
import pandas
import dask
import mars
import modin
import pyspark
import datasets
import ray
from ray.types import ObjectRef
from ray.util.annotations import PublicAPI, DeveloperAPI, Deprecated
from ray.data.block import (
Block,
BlockAccessor,
BlockMetadata,
BlockExecStats,
)
from ray.data.context import DatasetContext, DEFAULT_SCHEDULING_STRATEGY
from ray.data.dataset import Dataset
from ray.data.datasource import (
Datasource,
RangeDatasource,
JSONDatasource,
CSVDatasource,
ParquetDatasource,
BinaryDatasource,
NumpyDatasource,
ReadTask,
BaseFileMetadataProvider,
DefaultFileMetadataProvider,
FastFileMetadataProvider,
ParquetMetadataProvider,
DefaultParquetMetadataProvider,
PathPartitionFilter,
ParquetBaseDatasource,
)
from ray.data.datasource.file_based_datasource import (
_wrap_arrow_serialization_workaround,
_unwrap_arrow_serialization_workaround,
)
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.block_list import BlockList
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import DatasetStats
from ray.data._internal.util import _lazy_import_pyarrow_dataset
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DatasetContext
from ray.data.dataset import Dataset
from ray.data.datasource import (
BaseFileMetadataProvider,
BinaryDatasource,
CSVDatasource,
Datasource,
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
JSONDatasource,
NumpyDatasource,
ParquetBaseDatasource,
ParquetDatasource,
ParquetMetadataProvider,
PathPartitionFilter,
RangeDatasource,
ReadTask,
)
from ray.data.datasource.file_based_datasource import (
_unwrap_arrow_serialization_workaround,
_wrap_arrow_serialization_workaround,
)
from ray.types import ObjectRef
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
if TYPE_CHECKING:
import dask
import datasets
import mars
import modin
import pandas
import pyarrow
import pyspark
T = TypeVar("T")
@ -319,7 +306,8 @@ def read_parquet(
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP
Args:
paths: A single file path or a list of file paths (or directories).
paths: A single file path or directory, or a list of file paths. Multiple
directories are not supported.
filesystem: The filesystem implementation to read from.
columns: A list of column names to read.
parallelism: The requested parallelism of the read. Parallelism may be
@ -373,54 +361,54 @@ def read_parquet_bulk(
By default, ONLY file paths should be provided as input (i.e. no directory paths),
and an OSError will be raised if one or more paths point to directories. If your
use-case requires directory paths, then the metadata provider should be changed to
one that supports directory expansion (e.g. DefaultFileMetadataProvider).
one that supports directory expansion (e.g. ``DefaultFileMetadataProvider``).
Offers improved performance vs. `read_parquet` due to not using PyArrow's
`ParquetDataset` abstraction, whose latency scales linearly with the number of
Offers improved performance vs. :func:`read_parquet` due to not using PyArrow's
``ParquetDataset`` abstraction, whose latency scales linearly with the number of
input files due to collecting all file metadata on a single node.
Also supports a wider variety of input Parquet file types than `read_parquet` due
to not trying to merge and resolve a unified schema for all files.
Also supports a wider variety of input Parquet file types than :func:`read_parquet`
due to not trying to merge and resolve a unified schema for all files.
However, unlike `read_parquet`, this does not offer file metadata resolution by
default, so a custom metadata provider should be provided if your use-case requires
a unified dataset schema, block sizes, row counts, etc.
However, unlike :func:`read_parquet`, this does not offer file metadata resolution
by default, so a custom metadata provider should be provided if your use-case
requires a unified dataset schema, block sizes, row counts, etc.
Examples:
>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk(["/path/to/file1", "/path/to/file2"])
>>> ray.data.read_parquet_bulk( # doctest: +SKIP
... ["/path/to/file1", "/path/to/file2"])
>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk(
>>> "s3://bucket/path",
>>> meta_provider=DefaultFileMetadataProvider(),
>>> )
>>> ray.data.read_parquet_bulk( # doctest: +SKIP
... "s3://bucket/path",
... meta_provider=DefaultFileMetadataProvider())
Args:
paths: A single file path or a list of file paths. If one or more directories
are provided, then `meta_provider` should also be set to an implementation
that supports directory expansion (e.g. DefaultFileMetadataProvider).
are provided, then ``meta_provider`` should also be set to an implementation
that supports directory expansion (e.g. ``DefaultFileMetadataProvider``).
filesystem: The filesystem implementation to read from.
columns: A list of column names to read.
parallelism: The requested parallelism of the read. Parallelism may be
limited by the number of files of the dataset.
ray_remote_args: kwargs passed to ray.remote in the read tasks.
arrow_open_file_args: kwargs passed to
pyarrow.fs.FileSystem.open_input_file
``pyarrow.fs.FileSystem.open_input_file``.
tensor_column_schema: A dict of column name --> tensor dtype and shape
mappings for converting a Parquet column containing serialized
tensors (ndarrays) as their elements to our tensor column extension
type. This assumes that the tensors were serialized in the raw
NumPy array format in C-contiguous order (e.g. via
`arr.tobytes()`).
``arr.tobytes()``).
meta_provider: File metadata provider. Defaults to a fast file metadata
provider that skips file size collection and requires all input paths to be
files. Change to DefaultFileMetadataProvider or a custom metadata provider
if directory expansion and/or file metadata resolution is required.
files. Change to ``DefaultFileMetadataProvider`` or a custom metadata
provider if directory expansion and/or file metadata resolution is required.
partition_filter: Path-based partition filter, if any. Can be used
with a custom callback to read only selected partitions of a dataset.
arrow_parquet_args: Other parquet read options to pass to pyarrow.
@ -738,6 +726,7 @@ def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]:
Dataset holding Arrow records read from the DataFrame.
"""
import dask
from ray.util.dask import ray_dask_get
partitions = df.to_delayed()