[Dataset] read binary files (#16717)

* done

* done

* done

* .

* .

* lint

* .

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu 2021-07-01 13:00:10 -07:00 committed by GitHub
parent d69e51f769
commit c27a02a807
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 2 deletions

View file

@ -0,0 +1,25 @@
from typing import Any, Union, Optional, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
import pyarrow
def read_file(path: str,
include_paths: bool = False,
filesystem: Optional["pyarrow.fs.FileSystem"] = None
) -> Union[Any, Tuple[str, Any]]:
"""A helper function which takes in the path to a single file, and the params
from `dataset.from_binary_files`.
Returns The contents of the file. If `include_paths` is True, a tuple of
the path and the contents of the file.
"""
if filesystem:
contents = filesystem.open_input_stream(path).readall()
else:
contents = open(path, "rb").read()
if include_paths:
return path, contents
else:
return contents

View file

@ -11,8 +11,9 @@ if TYPE_CHECKING:
import ray
from ray.experimental.data.dataset import Dataset
from ray.experimental.data.impl.block import ObjectRef, ListBlock, Block
from ray.experimental.data.impl.arrow_block import ArrowBlock, ArrowRow
from ray.experimental.data.impl.block import ObjectRef, ListBlock, Block
from ray.experimental.data.impl import reader as _reader
logger = logging.getLogger(__name__)
@ -398,7 +399,12 @@ def read_binary_files(
Returns:
Dataset holding Arrow records read from the specified paths.
"""
raise NotImplementedError # P0
dataset = from_items(paths, parallelism=parallelism)
return dataset.map(
lambda path: _reader.read_file(
path,
include_paths=include_paths,
filesystem=filesystem))
def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]:

View file

@ -11,6 +11,7 @@ import ray
from ray.util.dask import ray_dask_get
from ray.tests.conftest import * # noqa
import ray.experimental.data.tests.util as util
def test_basic(ray_start_regular_shared):
@ -147,6 +148,35 @@ def test_pyarrow(ray_start_regular_shared):
.take() == [{"b": 2}, {"b": 20}]
def test_read_binary_files(ray_start_regular_shared):
with util.gen_bin_files(10) as (_, paths):
ds = ray.experimental.data.read_binary_files(paths, parallelism=10)
for i, item in enumerate(ds.to_local_iterator()):
expected = open(paths[i], "rb").read()
assert expected == item
def test_read_binary_files_with_paths(ray_start_regular_shared):
with util.gen_bin_files(10) as (_, paths):
ds = ray.experimental.data.read_binary_files(
paths, include_paths=True, parallelism=10)
for i, (path, item) in enumerate(ds.to_local_iterator()):
assert path == paths[i]
expected = open(paths[i], "rb").read()
assert expected == item
def test_read_binary_files_with_fs(ray_start_regular_shared):
with util.gen_bin_files(10) as (tempdir, paths):
# All the paths are absolute, so we want the root file system.
fs, _ = pa.fs.FileSystem.from_uri("/")
ds = ray.experimental.data.read_binary_files(
paths, filesystem=fs, parallelism=10)
for i, item in enumerate(ds.to_local_iterator()):
expected = open(paths[i], "rb").read()
assert expected == item
def test_map_batch(ray_start_regular_shared, tmp_path):
# Test input validation
ds = ray.experimental.data.range(5)

View file

@ -0,0 +1,17 @@
from contextlib import contextmanager
import os
import tempfile
@contextmanager
def gen_bin_files(n):
with tempfile.TemporaryDirectory() as temp_dir:
paths = []
for i in range(n):
path = os.path.join(temp_dir, f"{i}.bin")
paths.append(path)
fp = open(path, "wb")
to_write = str(i) * 500
fp.write(to_write.encode())
yield (temp_dir, paths)