[Datasets] Add fast file metadata provider and refactor Parquet datasource (#24094)

Adds a fast file metadata provider that trades comprehensive file metadata collection for speed of metadata collection, and which also disabled directory path expansion which can be very slow on some cloud storage service providers. This PR also refactors the Parquet datasource to be able to take advantage of both these changes and the content-type agnostic partitioning support from #23624.

This is the second PR of a series originally proposed in #23179.
This commit is contained in:
Patrick Ames 2022-04-29 09:39:13 -07:00 committed by GitHub
parent 539832f2c5
commit 4691d2d339
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 856 additions and 261 deletions

View file

@ -9,21 +9,23 @@ from ray.data.datasource.datasource import (
WriteResult,
)
from ray.data.datasource.file_based_datasource import (
BaseFileMetadataProvider,
BlockWritePathProvider,
DefaultBlockWritePathProvider,
DefaultFileMetadataProvider,
FileBasedDatasource,
_S3FileSystemWrapper,
)
from ray.data.datasource.file_meta_provider import FileMetadataProvider
from ray.data.datasource.json_datasource import JSONDatasource
from ray.data.datasource.numpy_datasource import NumpyDatasource
from ray.data.datasource.parquet_datasource import (
from ray.data.datasource.file_meta_provider import (
BaseFileMetadataProvider,
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
ParquetDatasource,
FastFileMetadataProvider,
FileMetadataProvider,
ParquetMetadataProvider,
)
from ray.data.datasource.json_datasource import JSONDatasource
from ray.data.datasource.numpy_datasource import NumpyDatasource
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
from ray.data.datasource.parquet_datasource import ParquetDatasource
from ray.data.datasource.partitioning import (
PartitionStyle,
PathPartitionEncoder,
@ -42,10 +44,12 @@ __all__ = [
"DefaultFileMetadataProvider",
"DefaultParquetMetadataProvider",
"DummyOutputDatasource",
"FastFileMetadataProvider",
"FileBasedDatasource",
"FileMetadataProvider",
"JSONDatasource",
"NumpyDatasource",
"ParquetBaseDatasource",
"ParquetDatasource",
"ParquetMetadataProvider",
"PartitionStyle",

View file

@ -42,5 +42,13 @@ class BinaryDatasource(FileBasedDatasource):
else:
return [data]
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)
def _rows_per_file(self):
return 1

View file

@ -43,12 +43,20 @@ class CSVDatasource(FileBasedDatasource):
except StopIteration:
return
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args
**writer_args,
):
from pyarrow import csv

View file

@ -26,7 +26,10 @@ from ray.data.impl.arrow_block import ArrowRow
from ray.data.impl.block_list import BlockMetadata
from ray.data.impl.output_buffer import BlockOutputBuffer
from ray.data.datasource.datasource import Datasource, ReadTask, WriteResult
from ray.data.datasource.file_meta_provider import FileMetadataProvider
from ray.data.datasource.file_meta_provider import (
BaseFileMetadataProvider,
DefaultFileMetadataProvider,
)
from ray.util.annotations import DeveloperAPI
from ray.data.impl.util import _check_pyarrow_version
from ray.data.impl.remote_fn import cached_remote_fn
@ -120,123 +123,6 @@ class DefaultBlockWritePathProvider(BlockWritePathProvider):
return posixpath.join(base_path, suffix)
@DeveloperAPI
class BaseFileMetadataProvider(FileMetadataProvider):
"""Abstract callable that provides metadata for FileBasedDatasource
implementations that reuse the base `prepare_read` method.
Also supports file and file size discovery in input directory paths.
Current subclasses:
DefaultFileMetadataProvider
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
rows_per_file: Optional[int],
file_sizes: List[Optional[int]],
) -> BlockMetadata:
"""Resolves and returns block metadata for the given file paths.
Args:
paths: The file paths to aggregate block metadata across. These
paths will always be a subset of those previously returned from
`expand_paths()`.
schema: The user-provided or inferred schema for the given file
paths, if any.
rows_per_file: The fixed number of rows per input file, or None.
file_sizes: Optional file size per input file previously returned
from `expand_paths()`, where `file_sizes[i]` holds the size of
the file at `paths[i]`.
Returns:
BlockMetadata aggregated across the given file paths.
"""
raise NotImplementedError
def expand_paths(
self,
paths: List[str],
filesystem: Optional["pyarrow.fs.FileSystem"],
) -> Tuple[List[str], List[Optional[int]]]:
"""Expands all paths into concrete file paths by walking directories.
Also returns a sidecar of file sizes.
The input paths will be normalized for compatibility with the input
filesystem prior to invocation.
Args:
paths: A list of file and/or directory paths compatible with the
given filesystem.
filesystem: The filesystem implementation that should be used for
expanding all paths and reading their files.
Returns:
A tuple whose first item contains the list of file paths discovered,
and whose second item contains the size of each file. `None` may be
returned if a file size is either unknown or will be fetched later
by `_get_block_metadata()`, but the length of both lists must be
equal.
"""
raise NotImplementedError
class DefaultFileMetadataProvider(BaseFileMetadataProvider):
"""Default metadata provider for FileBasedDatasource implementations that
reuse the base `prepare_read` method.
Calculates block size in bytes as the sum of its constituent file sizes,
and assumes a fixed number of rows per file.
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
rows_per_file: Optional[int],
file_sizes: List[Optional[int]],
) -> BlockMetadata:
if rows_per_file is None:
num_rows = None
else:
num_rows = len(paths) * rows_per_file
return BlockMetadata(
num_rows=num_rows,
size_bytes=None if None in file_sizes else sum(file_sizes),
schema=schema,
input_files=paths,
exec_stats=None,
) # Exec stats filled in later.
def expand_paths(
self,
paths: List[str],
filesystem: "pyarrow.fs.FileSystem",
) -> Tuple[List[str], List[Optional[int]]]:
from pyarrow.fs import FileType
expanded_paths = []
file_infos = []
for path in paths:
file_info = filesystem.get_file_info(path)
if file_info.type == FileType.Directory:
paths, file_infos_ = _expand_directory(path, filesystem)
expanded_paths.extend(paths)
file_infos.extend(file_infos_)
elif file_info.type == FileType.File:
expanded_paths.append(path)
file_infos.append(file_info)
else:
raise FileNotFoundError(path)
file_sizes = [file_info.size for file_info in file_infos]
return expanded_paths, file_sizes
@DeveloperAPI
class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
"""File-based datasource, for reading and writing files.
@ -318,7 +204,7 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
# Non-Snappy compression, pass as open_input_stream() arg so Arrow
# can take care of streaming decompression for us.
open_stream_args["compression"] = compression
with fs.open_input_stream(read_path, **open_stream_args) as f:
with self._open_input_source(fs, read_path, **open_stream_args) as f:
for data in read_stream(f, read_path, **reader_args):
output_buffer.add_block(data)
if output_buffer.has_next():
@ -372,6 +258,21 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
"Subclasses of FileBasedDatasource must implement _read_file()."
)
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
"""Opens a source path for reading and returns the associated Arrow
NativeFile.
This method should be implemented by subclasses.
"""
raise NotImplementedError(
"Subclasses of FileBasedDatasource must implement _open_input_source()."
)
def do_write(
self,
blocks: List[ObjectRef[Block]],

View file

@ -1,8 +1,11 @@
import logging
from typing import (
List,
Optional,
Union,
TYPE_CHECKING,
Tuple,
Any,
)
if TYPE_CHECKING:
@ -11,10 +14,12 @@ if TYPE_CHECKING:
from ray.data.block import BlockMetadata
from ray.util.annotations import DeveloperAPI
logger = logging.getLogger(__name__)
@DeveloperAPI
class FileMetadataProvider:
"""Abstract callable that provides metadata for files in a list of paths.
"""Abstract callable that provides metadata for the files of a single dataset block.
Current subclasses:
BaseFileMetadataProvider
@ -29,8 +34,10 @@ class FileMetadataProvider:
) -> BlockMetadata:
"""Resolves and returns block metadata for files in the given paths.
All file paths provided should belong to a single dataset block.
Args:
paths: The paths to aggregate block metadata across.
paths: The file paths for a single dataset block.
schema: The user-provided or inferred schema for the given paths,
if any.
@ -46,3 +53,271 @@ class FileMetadataProvider:
**kwargs,
) -> BlockMetadata:
return self._get_block_metadata(paths, schema, **kwargs)
@DeveloperAPI
class BaseFileMetadataProvider(FileMetadataProvider):
"""Abstract callable that provides metadata for FileBasedDatasource
implementations that reuse the base `prepare_read` method.
Also supports file and file size discovery in input directory paths.
Current subclasses:
DefaultFileMetadataProvider
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
rows_per_file: Optional[int],
file_sizes: List[Optional[int]],
) -> BlockMetadata:
"""Resolves and returns block metadata for files of a single dataset block.
Args:
paths: The file paths for a single dataset block. These
paths will always be a subset of those previously returned from
`expand_paths()`.
schema: The user-provided or inferred schema for the given file
paths, if any.
rows_per_file: The fixed number of rows per input file, or None.
file_sizes: Optional file size per input file previously returned
from `expand_paths()`, where `file_sizes[i]` holds the size of
the file at `paths[i]`.
Returns:
BlockMetadata aggregated across the given file paths.
"""
raise NotImplementedError
def expand_paths(
self,
paths: List[str],
filesystem: Optional["pyarrow.fs.FileSystem"],
) -> Tuple[List[str], List[Optional[int]]]:
"""Expands all paths into concrete file paths by walking directories.
Also returns a sidecar of file sizes.
The input paths must be normalized for compatibility with the input
filesystem prior to invocation.
Args:
paths: A list of file and/or directory paths compatible with the
given filesystem.
filesystem: The filesystem implementation that should be used for
expanding all paths and reading their files.
Returns:
A tuple whose first item contains the list of file paths discovered,
and whose second item contains the size of each file. `None` may be
returned if a file size is either unknown or will be fetched later
by `_get_block_metadata()`, but the length of both lists must be
equal.
"""
raise NotImplementedError
class DefaultFileMetadataProvider(BaseFileMetadataProvider):
"""Default metadata provider for FileBasedDatasource implementations that
reuse the base `prepare_read` method.
Calculates block size in bytes as the sum of its constituent file sizes,
and assumes a fixed number of rows per file.
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
rows_per_file: Optional[int],
file_sizes: List[Optional[int]],
) -> BlockMetadata:
if rows_per_file is None:
num_rows = None
else:
num_rows = len(paths) * rows_per_file
return BlockMetadata(
num_rows=num_rows,
size_bytes=None if None in file_sizes else sum(file_sizes),
schema=schema,
input_files=paths,
exec_stats=None,
) # Exec stats filled in later.
def expand_paths(
self,
paths: List[str],
filesystem: "pyarrow.fs.FileSystem",
) -> Tuple[List[str], List[Optional[int]]]:
from pyarrow.fs import FileType
from ray.data.datasource.file_based_datasource import _expand_directory
if len(paths) > 1:
logger.warning(
f"Expanding {len(paths)} path(s). This may be a HIGH LATENCY "
f"operation on some cloud storage services. If the specified paths "
f"all point to files and never directories, try rerunning this read "
f"with `meta_provider=FastFileMetadataProvider()`."
)
expanded_paths = []
file_infos = []
for path in paths:
file_info = filesystem.get_file_info(path)
if file_info.type == FileType.Directory:
paths, file_infos_ = _expand_directory(path, filesystem)
expanded_paths.extend(paths)
file_infos.extend(file_infos_)
elif file_info.type == FileType.File:
expanded_paths.append(path)
file_infos.append(file_info)
else:
raise FileNotFoundError(path)
file_sizes = [file_info.size for file_info in file_infos]
return expanded_paths, file_sizes
class FastFileMetadataProvider(DefaultFileMetadataProvider):
"""Fast Metadata provider for FileBasedDatasource implementations.
Offers improved performance vs. DefaultFileMetadataProvider by skipping directory
path expansion and file size collection. While this performance improvement may be
negligible for local filesystems, it can be substantial for cloud storage service
providers.
This should only be used when all input paths are known to be files.
"""
def expand_paths(
self,
paths: List[str],
filesystem: "pyarrow.fs.FileSystem",
) -> Tuple[List[str], List[Optional[int]]]:
logger.warning(
f"Skipping expansion of {len(paths)} path(s). If your paths contain "
f"directories or if file size collection is required, try rerunning this "
f"read with `meta_provider=DefaultFileMetadataProvider()`."
)
import numpy as np
return paths, np.empty(len(paths), dtype=object)
@DeveloperAPI
class ParquetMetadataProvider(FileMetadataProvider):
"""Abstract callable that provides block metadata for Arrow Parquet file fragments.
All file fragments should belong to a single dataset block.
Supports optional pre-fetching of ordered metadata for all file fragments in
a single batch to help optimize metadata resolution.
Current subclasses:
DefaultParquetMetadataProvider
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
prefetched_metadata: Optional[List[Any]],
) -> BlockMetadata:
"""Resolves and returns block metadata for files of a single dataset block.
Args:
paths: The file paths for a single dataset block.
schema: The user-provided or inferred schema for the given file
paths, if any.
pieces: The Parquet file fragments derived from the input file paths.
prefetched_metadata: Metadata previously returned from
`prefetch_file_metadata()` for each file fragment, where
`prefetched_metadata[i]` contains the metadata for `pieces[i]`.
Returns:
BlockMetadata aggregated across the given file paths.
"""
raise NotImplementedError
def prefetch_file_metadata(
self,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
) -> Optional[List[Any]]:
"""Pre-fetches file metadata for all Parquet file fragments in a single batch.
Subsets of the metadata returned will be provided as input to
subsequent calls to _get_block_metadata() together with their
corresponding Parquet file fragments.
Implementations that don't support pre-fetching file metadata shouldn't
override this method.
Args:
pieces: The Parquet file fragments to fetch metadata for.
Returns:
Metadata resolved for each input file fragment, or `None`. Metadata
must be returned in the same order as all input file fragments, such
that `metadata[i]` always contains the metadata for `pieces[i]`.
"""
return None
class DefaultParquetMetadataProvider(ParquetMetadataProvider):
"""The default file metadata provider for ParquetDatasource.
Aggregates total block bytes and number of rows using the Parquet file metadata
associated with a list of Arrow Parquet dataset file fragments.
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
prefetched_metadata: Optional[List["pyarrow.parquet.FileMetaData"]],
) -> BlockMetadata:
if prefetched_metadata is not None and len(prefetched_metadata) == len(pieces):
# Piece metadata was available, construct a normal
# BlockMetadata.
block_metadata = BlockMetadata(
num_rows=sum(m.num_rows for m in prefetched_metadata),
size_bytes=sum(
sum(m.row_group(i).total_byte_size for i in range(m.num_row_groups))
for m in prefetched_metadata
),
schema=schema,
input_files=paths,
exec_stats=None,
) # Exec stats filled in later.
else:
# Piece metadata was not available, construct an empty
# BlockMetadata.
block_metadata = BlockMetadata(
num_rows=None,
size_bytes=None,
schema=schema,
input_files=paths,
exec_stats=None,
)
return block_metadata
def prefetch_file_metadata(
self,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
) -> Optional[List["pyarrow.parquet.FileMetaData"]]:
from ray.data.datasource.parquet_datasource import (
PARALLELIZE_META_FETCH_THRESHOLD,
_fetch_metadata_remotely,
_fetch_metadata,
)
if len(pieces) > PARALLELIZE_META_FETCH_THRESHOLD:
return _fetch_metadata_remotely(pieces)
else:
return _fetch_metadata(pieces)

View file

@ -31,12 +31,20 @@ class JSONDatasource(FileBasedDatasource):
)
return json.read_json(f, read_options=read_options, **reader_args)
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args
**writer_args,
):
writer_args = _resolve_kwargs(writer_args_fn, **writer_args)
orient = writer_args.pop("orient", "records")

View file

@ -37,13 +37,21 @@ class NumpyDatasource(FileBasedDatasource):
{"value": TensorArray(np.load(buf, allow_pickle=True))}
)
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
column: str,
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args
**writer_args,
):
value = block.to_numpy(column)
np.save(f, value)

View file

@ -0,0 +1,52 @@
import logging
from typing import (
Dict,
Any,
Callable,
TYPE_CHECKING,
)
if TYPE_CHECKING:
import pyarrow
from ray.data.block import BlockAccessor
from ray.data.datasource.file_based_datasource import (
FileBasedDatasource,
_resolve_kwargs,
)
logger = logging.getLogger(__name__)
class ParquetBaseDatasource(FileBasedDatasource):
"""Minimal Parquet datasource, for reading and writing Parquet files."""
def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
import pyarrow.parquet as pq
use_threads = reader_args.pop("use_threads", False)
return pq.read_table(f, use_threads=use_threads, **reader_args)
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
# Parquet requires `open_input_file` due to random access reads
return filesystem.open_input_file(path, **open_args)
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args,
):
import pyarrow.parquet as pq
writer_args = _resolve_kwargs(writer_args_fn, **writer_args)
pq.write_table(block.to_arrow(), f, **writer_args)
def _file_format(self) -> str:
return "parquet"

View file

@ -1,30 +1,29 @@
import logging
import itertools
from typing import Any, Callable, Dict, Optional, List, Union, Iterator, TYPE_CHECKING
from typing import Callable, Optional, List, Union, Iterator, TYPE_CHECKING
import numpy as np
from ray.util.annotations import DeveloperAPI
if TYPE_CHECKING:
import pyarrow
import ray
from ray.types import ObjectRef
from ray.data.block import Block, BlockAccessor
from ray.data.block import Block
from ray.data.context import DatasetContext
from ray.data.datasource.datasource import ReadTask
from ray.data.datasource.file_based_datasource import (
FileBasedDatasource,
_resolve_paths_and_filesystem,
_resolve_kwargs,
from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
from ray.data.datasource.file_meta_provider import (
ParquetMetadataProvider,
DefaultParquetMetadataProvider,
)
from ray.data.datasource.file_meta_provider import FileMetadataProvider
from ray.data.impl.block_list import BlockMetadata
from ray.data.impl.output_buffer import BlockOutputBuffer
from ray.data.impl.progress_bar import ProgressBar
from ray.data.impl.remote_fn import cached_remote_fn
from ray.data.impl.util import _check_pyarrow_version
logger = logging.getLogger(__name__)
PIECES_PER_META_FETCH = 6
@ -56,114 +55,14 @@ def _deregister_parquet_file_fragment_serialization():
ray.util.deregister_serializer(ParquetFileFragment)
@DeveloperAPI
class ParquetMetadataProvider(FileMetadataProvider):
"""Abstract callable that provides metadata for Arrow Parquet file fragments.
Supports optional pre-fetching of ordered metadata for all file fragments in
a single batch to help optimize metadata resolution.
Current subclasses:
DefaultParquetMetadataProvider
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
prefetched_metadata: Optional[List[Any]],
) -> BlockMetadata:
"""Resolves and returns block metadata for the given file paths.
Args:
paths: The file paths to aggregate block metadata across.
schema: The user-provided or inferred schema for the given file
paths, if any.
pieces: The Parquet file fragments derived from the input file paths.
prefetched_metadata: Metadata previously returned from
`prefetch_file_metadata()` for each file fragment, where
`prefetched_metadata[i]` contains the metadata for `pieces[i]`.
Returns:
BlockMetadata aggregated across the given file paths.
"""
raise NotImplementedError
def prefetch_file_metadata(
self,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
) -> Optional[List[Any]]:
"""Pre-fetches file metadata for all Parquet file fragments in a single batch.
Subsets of the metadata returned will be provided as input to
subsequent calls to _get_block_metadata() together with their
corresponding Parquet file fragments.
Implementations that don't support pre-fetching file metadata shouldn't
override this method.
Args:
pieces: The Parquet file fragments to fetch metadata for.
Returns:
Metadata resolved for each input file fragment, or `None`. Metadata
must be returned in the same order as all input file fragments, such
that `metadata[i]` always contains the metadata for `pieces[i]`.
"""
return None
class DefaultParquetMetadataProvider(ParquetMetadataProvider):
"""The default file metadata provider for ParquetDatasource.
Aggregates total block bytes and number of rows using the Parquet file metadata
associated with a list of Arrow Parquet dataset file fragments.
"""
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
prefetched_metadata: Optional[List["pyarrow.parquet.FileMetaData"]],
) -> BlockMetadata:
if prefetched_metadata is not None and len(prefetched_metadata) == len(pieces):
# Piece metadata was available, construct a normal
# BlockMetadata.
block_metadata = BlockMetadata(
num_rows=sum(m.num_rows for m in prefetched_metadata),
size_bytes=sum(
sum(m.row_group(i).total_byte_size for i in range(m.num_row_groups))
for m in prefetched_metadata
),
schema=schema,
input_files=paths,
exec_stats=None,
) # Exec stats filled in later.
else:
# Piece metadata was not available, construct an empty
# BlockMetadata.
block_metadata = BlockMetadata(
num_rows=None, size_bytes=None, schema=schema, input_files=paths
)
return block_metadata
def prefetch_file_metadata(
self,
pieces: List["pyarrow.dataset.ParquetFileFragment"],
) -> Optional[List["pyarrow.parquet.FileMetaData"]]:
if len(pieces) > PARALLELIZE_META_FETCH_THRESHOLD:
return _fetch_metadata_remotely(pieces)
else:
return _fetch_metadata(pieces)
class ParquetDatasource(FileBasedDatasource):
class ParquetDatasource(ParquetBaseDatasource):
"""Parquet datasource, for reading and writing Parquet files.
The primary difference from ParquetBaseDatasource is that this uses
PyArrow's `ParquetDataset` abstraction for dataset reads, and thus offers
automatic Arrow dataset schema inference and row count collection at the
cost of some potential performance and/or compatibility penalties.
Examples:
>>> import ray
>>> from ray.data.datasource import ParquetDatasource
@ -304,21 +203,6 @@ class ParquetDatasource(FileBasedDatasource):
return read_tasks
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args,
):
import pyarrow.parquet as pq
writer_args = _resolve_kwargs(writer_args_fn, **writer_args)
pq.write_table(block.to_arrow(), f, **writer_args)
def _file_format(self) -> str:
return "parquet"
def _fetch_metadata_remotely(
pieces: List["pyarrow._dataset.ParquetFileFragment"],

View file

@ -21,6 +21,9 @@ from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.datasource import (
Datasource,
DummyOutputDatasource,
BaseFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
PathPartitionFilter,
PathPartitionEncoder,
PartitionStyle,
@ -339,6 +342,71 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
assert ds.schema().names == ["one"]
@pytest.mark.parametrize(
"fs,data_path",
[
(None, lazy_fixture("local_path")),
(lazy_fixture("local_fs"), lazy_fixture("local_path")),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path")),
],
)
def test_parquet_read_meta_provider(ray_start_regular_shared, fs, data_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
table = pa.Table.from_pandas(df1)
setup_data_path = _unwrap_protocol(data_path)
path1 = os.path.join(setup_data_path, "test1.parquet")
pq.write_table(table, path1, filesystem=fs)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
table = pa.Table.from_pandas(df2)
path2 = os.path.join(setup_data_path, "test2.parquet")
pq.write_table(table, path2, filesystem=fs)
class TestMetadataProvider(DefaultParquetMetadataProvider):
def prefetch_file_metadata(self, pieces):
return None
ds = ray.data.read_parquet(
data_path,
filesystem=fs,
meta_provider=TestMetadataProvider(),
)
# Expect precomputed row counts and block sizes to be missing.
assert ds._meta_count() is None
assert ds._plan._snapshot_blocks.size_bytes() == -1
# Expect to lazily compute all metadata correctly.
assert ds._plan.execute()._num_computed() == 1
assert ds.count() == 6
assert ds.size_bytes() > 0
assert ds.schema() is not None
input_files = ds.input_files()
assert len(input_files) == 2, input_files
assert "test1.parquet" in str(input_files)
assert "test2.parquet" in str(input_files)
assert (
str(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert (
repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
"schema={one: int64, two: string})"
), ds
assert ds._plan.execute()._num_computed() == 2
# Forces a data read.
values = [[s["one"], s["two"]] for s in ds.take()]
assert ds._plan.execute()._num_computed() == 2
assert sorted(values) == [
[1, "a"],
[2, "b"],
[3, "c"],
[4, "e"],
[5, "f"],
[6, "g"],
]
@pytest.mark.parametrize(
"fs,data_path",
[
@ -800,6 +868,25 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]"
def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "test_np_dir")
os.mkdir(path)
path = os.path.join(path, "test.npy")
np.save(path, np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider())
assert str(ds) == (
"Dataset(num_blocks=1, num_rows=10, "
"schema={value: <ArrowTensorType: shape=(1,), dtype=int64>})"
)
assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]"
with pytest.raises(NotImplementedError):
ray.data.read_binary_files(
path,
meta_provider=BaseFileMetadataProvider(),
)
def test_numpy_read_partitioned_with_filter(
ray_start_regular_shared,
tmp_path,
@ -943,6 +1030,30 @@ def test_read_text(ray_start_regular_shared, tmp_path):
assert ds.count() == 5
def test_read_text_meta_provider(
ray_start_regular_shared,
tmp_path,
):
path = os.path.join(tmp_path, "test_text")
os.mkdir(path)
path = os.path.join(path, "file1.txt")
with open(path, "w") as f:
f.write("hello\n")
f.write("world\n")
f.write("goodbye\n")
f.write("ray\n")
ds = ray.data.read_text(path, meta_provider=FastFileMetadataProvider())
assert sorted(ds.take()) == ["goodbye", "hello", "ray", "world"]
ds = ray.data.read_text(path, drop_empty_lines=False)
assert ds.count() == 5
with pytest.raises(NotImplementedError):
ray.data.read_text(
path,
meta_provider=BaseFileMetadataProvider(),
)
def test_read_text_partitioned_with_filter(
ray_start_regular_shared,
tmp_path,
@ -1020,6 +1131,31 @@ def test_read_binary_snappy_inferred(ray_start_regular_shared, tmp_path):
assert sorted(ds.take()) == [byte_str]
def test_read_binary_meta_provider(
ray_start_regular_shared,
tmp_path,
):
path = os.path.join(tmp_path, "test_binary_snappy")
os.mkdir(path)
path = os.path.join(path, "file")
with open(path, "wb") as f:
byte_str = "hello, world".encode()
bytes = BytesIO(byte_str)
snappy.stream_compress(bytes, f)
ds = ray.data.read_binary_files(
path,
arrow_open_stream_args=dict(compression="snappy"),
meta_provider=FastFileMetadataProvider(),
)
assert sorted(ds.take()) == [byte_str]
with pytest.raises(NotImplementedError):
ray.data.read_binary_files(
path,
meta_provider=BaseFileMetadataProvider(),
)
def test_read_binary_snappy_partitioned_with_filter(
ray_start_regular_shared,
tmp_path,
@ -1269,6 +1405,47 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path):
shutil.rmtree(dir_path)
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_json_read_meta_provider(
ray_start_regular_shared,
fs,
data_path,
endpoint_url,
):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(data_path, "test1.json")
df1.to_json(path1, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(
path1,
filesystem=fs,
meta_provider=FastFileMetadataProvider(),
)
# Expect to lazily compute all metadata correctly.
assert ds.count() == 3
assert ds.input_files() == [_unwrap_protocol(path1)]
assert "{one: int64, two: string}" in str(ds), ds
with pytest.raises(NotImplementedError):
ray.data.read_json(
path1,
filesystem=fs,
meta_provider=BaseFileMetadataProvider(),
)
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
@ -1617,6 +1794,50 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url):
fs.delete_dir(_unwrap_protocol(dir_path))
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_csv_read_meta_provider(
ray_start_regular_shared,
fs,
data_path,
endpoint_url,
):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(data_path, "test1.csv")
df1.to_csv(path1, index=False, storage_options=storage_options)
ds = ray.data.read_csv(
path1,
filesystem=fs,
meta_provider=FastFileMetadataProvider(),
)
dsdf = ds.to_pandas()
assert df1.equals(dsdf)
# Expect to lazily compute all metadata correctly.
assert ds.count() == 3
assert ds.input_files() == [_unwrap_protocol(path1)]
assert "{one: int64, two: string}" in str(ds), ds
with pytest.raises(NotImplementedError):
ray.data.read_csv(
path1,
filesystem=fs,
meta_provider=BaseFileMetadataProvider(),
)
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[

View file

@ -0,0 +1,226 @@
import pytest
import posixpath
import urllib.parse
import os
import logging
import pyarrow as pa
import pandas as pd
import pyarrow.parquet as pq
from pytest_lazyfixture import lazy_fixture
from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem
from ray.tests.conftest import * # noqa
from ray.data.datasource import (
FileMetadataProvider,
BaseFileMetadataProvider,
ParquetMetadataProvider,
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
)
from ray.data.tests.conftest import * # noqa
def _get_parquet_file_meta_size_bytes(file_metas):
return sum(
sum(m.row_group(i).total_byte_size for i in range(m.num_row_groups))
for m in file_metas
)
def _get_file_sizes_bytes(paths, fs):
from pyarrow.fs import FileType
file_sizes = []
for path in paths:
file_info = fs.get_file_info(path)
if file_info.type == FileType.File:
file_sizes.append(file_info.size)
else:
raise FileNotFoundError(path)
return file_sizes
def test_file_metadata_providers_not_implemented():
meta_provider = FileMetadataProvider()
with pytest.raises(NotImplementedError):
meta_provider(["/foo/bar.csv"], None)
meta_provider = BaseFileMetadataProvider()
with pytest.raises(NotImplementedError):
meta_provider(["/foo/bar.csv"], None, rows_per_file=None, file_sizes=[None])
with pytest.raises(NotImplementedError):
meta_provider.expand_paths(["/foo/bar.csv"], None)
meta_provider = ParquetMetadataProvider()
with pytest.raises(NotImplementedError):
meta_provider(["/foo/bar.csv"], None, pieces=[], prefetched_metadata=None)
assert meta_provider.prefetch_file_metadata(["test"]) is None
@pytest.mark.parametrize(
"fs,data_path",
[
(None, lazy_fixture("local_path")),
(lazy_fixture("local_fs"), lazy_fixture("local_path")),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path")),
(
lazy_fixture("s3_fs_with_space"),
lazy_fixture("s3_path_with_space"),
), # Path contains space.
(
lazy_fixture("s3_fs_with_special_chars"),
lazy_fixture("s3_path_with_special_chars"),
),
],
)
def test_default_parquet_metadata_provider(fs, data_path):
path_module = os.path if urllib.parse.urlparse(data_path).scheme else posixpath
paths = [
path_module.join(data_path, "test1.parquet"),
path_module.join(data_path, "test2.parquet"),
]
paths, fs = _resolve_paths_and_filesystem(paths, fs)
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
table = pa.Table.from_pandas(df1)
pq.write_table(table, paths[0], filesystem=fs)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
table = pa.Table.from_pandas(df2)
pq.write_table(table, paths[1], filesystem=fs)
meta_provider = DefaultParquetMetadataProvider()
pq_ds = pq.ParquetDataset(paths, filesystem=fs, use_legacy_dataset=False)
file_metas = meta_provider.prefetch_file_metadata(pq_ds.pieces)
meta = meta_provider(
[p.path for p in pq_ds.pieces],
pq_ds.schema,
pieces=pq_ds.pieces,
prefetched_metadata=file_metas,
)
expected_meta_size_bytes = _get_parquet_file_meta_size_bytes(file_metas)
assert meta.size_bytes == expected_meta_size_bytes
assert meta.num_rows == 6
assert len(paths) == 2
assert all(path in meta.input_files for path in paths)
assert meta.schema.equals(pq_ds.schema)
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
(
lazy_fixture("s3_fs_with_space"),
lazy_fixture("s3_path_with_space"),
lazy_fixture("s3_server"),
), # Path contains space.
(
lazy_fixture("s3_fs_with_special_chars"),
lazy_fixture("s3_path_with_special_chars"),
lazy_fixture("s3_server"),
),
],
)
def test_default_file_metadata_provider(caplog, fs, data_path, endpoint_url):
storage_options = (
{}
if endpoint_url is None
else dict(client_kwargs=dict(endpoint_url=endpoint_url))
)
path_module = os.path if urllib.parse.urlparse(data_path).scheme else posixpath
path1 = path_module.join(data_path, "test1.csv")
path2 = path_module.join(data_path, "test2.csv")
paths = [path1, path2]
paths, fs = _resolve_paths_and_filesystem(paths, fs)
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df1.to_csv(path1, index=False, storage_options=storage_options)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df2.to_csv(path2, index=False, storage_options=storage_options)
meta_provider = DefaultFileMetadataProvider()
with caplog.at_level(logging.WARNING):
file_paths, file_sizes = meta_provider.expand_paths(paths, fs)
assert "meta_provider=FastFileMetadataProvider()" in caplog.text
assert file_paths == paths
expected_file_sizes = _get_file_sizes_bytes(paths, fs)
assert file_sizes == expected_file_sizes
meta = meta_provider(
paths,
None,
rows_per_file=3,
file_sizes=file_sizes,
)
assert meta.size_bytes == sum(expected_file_sizes)
assert meta.num_rows == 6
assert len(paths) == 2
assert all(path in meta.input_files for path in paths)
assert meta.schema is None
@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
(
lazy_fixture("s3_fs_with_space"),
lazy_fixture("s3_path_with_space"),
lazy_fixture("s3_server"),
), # Path contains space.
(
lazy_fixture("s3_fs_with_special_chars"),
lazy_fixture("s3_path_with_special_chars"),
lazy_fixture("s3_server"),
),
],
)
def test_fast_file_metadata_provider(caplog, fs, data_path, endpoint_url):
storage_options = (
{}
if endpoint_url is None
else dict(client_kwargs=dict(endpoint_url=endpoint_url))
)
path_module = os.path if urllib.parse.urlparse(data_path).scheme else posixpath
path1 = path_module.join(data_path, "test1.csv")
path2 = path_module.join(data_path, "test2.csv")
paths = [path1, path2]
paths, fs = _resolve_paths_and_filesystem(paths, fs)
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df1.to_csv(path1, index=False, storage_options=storage_options)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df2.to_csv(path2, index=False, storage_options=storage_options)
meta_provider = FastFileMetadataProvider()
with caplog.at_level(logging.WARNING):
file_paths, file_sizes = meta_provider.expand_paths(paths, fs)
assert "meta_provider=DefaultFileMetadataProvider()" in caplog.text
assert file_paths == paths
assert len(file_sizes) == len(file_paths)
meta = meta_provider(
paths,
None,
rows_per_file=3,
file_sizes=file_sizes,
)
assert meta.size_bytes is None
assert meta.num_rows == 6
assert len(paths) == 2
assert all(path in meta.input_files for path in paths)
assert meta.schema is None
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))