From 8efa6be9137550eb42847fa1aec050700cf771da Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Tue, 3 Aug 2021 10:10:42 -0700 Subject: [PATCH] [Dataset] Fix reading parquet from gcs (#17528) * . * . * comments Co-authored-by: Alex Wu --- .../data/datasource/file_based_datasource.py | 27 ++++++++++++++---- .../experimental/data/tests/test_dataset.py | 28 ++++++++++++++++++- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/python/ray/experimental/data/datasource/file_based_datasource.py b/python/ray/experimental/data/datasource/file_based_datasource.py index ceb1ff89f..2a2f387c5 100644 --- a/python/ray/experimental/data/datasource/file_based_datasource.py +++ b/python/ray/experimental/data/datasource/file_based_datasource.py @@ -43,8 +43,7 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]): read_file = self._read_file - if isinstance(filesystem, pa.fs.S3FileSystem): - filesystem = _S3FileSystemWrapper(filesystem) + filesystem = _wrap_s3_serialization_workaround(filesystem) def read_files( read_paths: List[str], @@ -158,7 +157,10 @@ def _resolve_paths_and_filesystem( filesystems inferred from the provided paths to ensure compatibility. """ - from pyarrow.fs import FileType, _resolve_filesystem_and_path + from pyarrow.fs import FileSystem, FileType, \ + PyFileSystem, FSSpecHandler, \ + _resolve_filesystem_and_path + import fsspec if isinstance(paths, str): paths = [paths] @@ -169,6 +171,14 @@ def _resolve_paths_and_filesystem( elif len(paths) == 0: raise ValueError("Must provide at least one path.") + if filesystem and not isinstance(filesystem, FileSystem): + if not isinstance(filesystem, fsspec.spec.AbstractFileSystem): + raise TypeError(f"The filesystem passed must either conform to " + f"pyarrow.fs.FileSystem, or " + f"fsspec.spec.AbstractFileSystem. The provided " + f"filesystem was: {filesystem}") + filesystem = PyFileSystem(FSSpecHandler(filesystem)) + resolved_paths = [] for path in paths: if filesystem is not None: @@ -179,8 +189,6 @@ def _resolve_paths_and_filesystem( path, filesystem) if filesystem is None: filesystem = resolved_filesystem - elif type(resolved_filesystem) != type(filesystem): - raise ValueError("All paths must use same filesystem.") resolved_path = filesystem.normalize_path(resolved_path) resolved_paths.append(resolved_path) @@ -208,6 +216,15 @@ def _unwrap_protocol(path): return parsed.netloc + parsed.path +def _wrap_s3_serialization_workaround(filesystem: "pyarrow.fs.FileSystem"): + # This is needed because pa.fs.S3FileSystem assumes pa.fs is already + # imported before deserialization. See #17085. + import pyarrow as pa + if isinstance(filesystem, pa.fs.S3FileSystem): + return _S3FileSystemWrapper(filesystem) + return filesystem + + class _S3FileSystemWrapper: def __init__(self, fs: "pyarrow.fs.S3FileSystem"): self._fs = fs diff --git a/python/ray/experimental/data/tests/test_dataset.py b/python/ray/experimental/data/tests/test_dataset.py index ffab82e5d..eb604f2ba 100644 --- a/python/ray/experimental/data/tests/test_dataset.py +++ b/python/ray/experimental/data/tests/test_dataset.py @@ -11,6 +11,7 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import pytest +from fsspec.implementations.local import LocalFileSystem import ray @@ -414,7 +415,7 @@ def test_get_blocks(ray_start_regular_shared): assert out == list(range(10)), out -def test_pandas_roundtrip(ray_start_regular_shared): +def test_pandas_roundtrip(ray_start_regular_shared, tmp_path): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) @@ -422,6 +423,31 @@ def test_pandas_roundtrip(ray_start_regular_shared): assert pd.concat([df1, df2]).equals(dfds) +def test_fsspec_filesystem(ray_start_regular_shared, tmp_path): + """Same as `test_parquet_read` but using a custom, fsspec filesystem. + + TODO (Alex): We should write a similar test with a mock PyArrow fs, but + unfortunately pa.fs._MockFileSystem isn't serializable, so this may require + some effort. + """ + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + table = pa.Table.from_pandas(df1) + path1 = os.path.join(str(tmp_path), "test1.parquet") + path2 = os.path.join(str(tmp_path), "test2.parquet") + pq.write_table(table, path1) + df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) + table = pa.Table.from_pandas(df2) + pq.write_table(table, path2) + + fs = LocalFileSystem() + + ds = ray.experimental.data.read_parquet([path1, path2], filesystem=fs) + + # Test metadata-only parquet ops. + assert len(ds._blocks._blocks) == 1 + assert ds.count() == 6 + + def test_parquet_read(ray_start_regular_shared, tmp_path): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) table = pa.Table.from_pandas(df1)