[Dataset] Fix reading parquet from gcs (#17528)

* .

* .

* comments

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu 2021-08-03 10:10:42 -07:00 committed by GitHub
parent 3a90804713
commit 8efa6be913
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 6 deletions

View file

@ -43,8 +43,7 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
read_file = self._read_file
if isinstance(filesystem, pa.fs.S3FileSystem):
filesystem = _S3FileSystemWrapper(filesystem)
filesystem = _wrap_s3_serialization_workaround(filesystem)
def read_files(
read_paths: List[str],
@ -158,7 +157,10 @@ def _resolve_paths_and_filesystem(
filesystems inferred from the provided paths to ensure
compatibility.
"""
from pyarrow.fs import FileType, _resolve_filesystem_and_path
from pyarrow.fs import FileSystem, FileType, \
PyFileSystem, FSSpecHandler, \
_resolve_filesystem_and_path
import fsspec
if isinstance(paths, str):
paths = [paths]
@ -169,6 +171,14 @@ def _resolve_paths_and_filesystem(
elif len(paths) == 0:
raise ValueError("Must provide at least one path.")
if filesystem and not isinstance(filesystem, FileSystem):
if not isinstance(filesystem, fsspec.spec.AbstractFileSystem):
raise TypeError(f"The filesystem passed must either conform to "
f"pyarrow.fs.FileSystem, or "
f"fsspec.spec.AbstractFileSystem. The provided "
f"filesystem was: {filesystem}")
filesystem = PyFileSystem(FSSpecHandler(filesystem))
resolved_paths = []
for path in paths:
if filesystem is not None:
@ -179,8 +189,6 @@ def _resolve_paths_and_filesystem(
path, filesystem)
if filesystem is None:
filesystem = resolved_filesystem
elif type(resolved_filesystem) != type(filesystem):
raise ValueError("All paths must use same filesystem.")
resolved_path = filesystem.normalize_path(resolved_path)
resolved_paths.append(resolved_path)
@ -208,6 +216,15 @@ def _unwrap_protocol(path):
return parsed.netloc + parsed.path
def _wrap_s3_serialization_workaround(filesystem: "pyarrow.fs.FileSystem"):
# This is needed because pa.fs.S3FileSystem assumes pa.fs is already
# imported before deserialization. See #17085.
import pyarrow as pa
if isinstance(filesystem, pa.fs.S3FileSystem):
return _S3FileSystemWrapper(filesystem)
return filesystem
class _S3FileSystemWrapper:
def __init__(self, fs: "pyarrow.fs.S3FileSystem"):
self._fs = fs

View file

@ -11,6 +11,7 @@ import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from fsspec.implementations.local import LocalFileSystem
import ray
@ -414,7 +415,7 @@ def test_get_blocks(ray_start_regular_shared):
assert out == list(range(10)), out
def test_pandas_roundtrip(ray_start_regular_shared):
def test_pandas_roundtrip(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)])
@ -422,6 +423,31 @@ def test_pandas_roundtrip(ray_start_regular_shared):
assert pd.concat([df1, df2]).equals(dfds)
def test_fsspec_filesystem(ray_start_regular_shared, tmp_path):
"""Same as `test_parquet_read` but using a custom, fsspec filesystem.
TODO (Alex): We should write a similar test with a mock PyArrow fs, but
unfortunately pa.fs._MockFileSystem isn't serializable, so this may require
some effort.
"""
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
table = pa.Table.from_pandas(df1)
path1 = os.path.join(str(tmp_path), "test1.parquet")
path2 = os.path.join(str(tmp_path), "test2.parquet")
pq.write_table(table, path1)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
table = pa.Table.from_pandas(df2)
pq.write_table(table, path2)
fs = LocalFileSystem()
ds = ray.experimental.data.read_parquet([path1, path2], filesystem=fs)
# Test metadata-only parquet ops.
assert len(ds._blocks._blocks) == 1
assert ds.count() == 6
def test_parquet_read(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
table = pa.Table.from_pandas(df1)