Revert "[Datasets] Explicitly define Dataset-like APIs in DatasetPipeline class (#26394)" (#26625)

This commit is contained in:
matthewdeng 2022-07-15 21:10:59 -07:00 committed by GitHub
parent cf980c3020
commit 9256668b90
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 392 deletions

View file

@ -1,3 +1,4 @@
import inspect
import itertools
import logging
import time
@ -5,13 +6,11 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Tuple,
Union,
)
@ -19,34 +18,53 @@ import ray
from ray.data._internal import progress_bar
from ray.data._internal.block_batching import BatchType, batch_blocks
from ray.data._internal.block_list import BlockList
from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.pipeline_executor import (
PipelineExecutor,
PipelineSplitExecutorCoordinator,
)
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.stats import DatasetPipelineStats, DatasetStats
from ray.data.block import BatchUDF, Block, KeyFn, RowUDF
from ray.data.block import Block
from ray.data.context import DatasetContext
from ray.data.dataset import Dataset, T, U, TensorflowFeatureTypeSpec
from ray.data.datasource import Datasource
from ray.data.datasource.file_based_datasource import (
BlockWritePathProvider,
DefaultBlockWritePathProvider,
)
from ray.data.dataset import Dataset, T, U
from ray.data.row import TableRow
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI
if TYPE_CHECKING:
import pandas
import pyarrow
import tensorflow as tf
import torch
logger = logging.getLogger(__name__)
# Operations that can be naively applied per dataset row in the pipeline.
_PER_DATASET_OPS = [
"map",
"map_batches",
"add_column",
"drop_columns",
"flat_map",
"filter",
]
# Operations that apply to each dataset holistically in the pipeline.
_HOLISTIC_PER_DATASET_OPS = [
"repartition",
"random_shuffle",
"sort",
"randomize_block_order",
]
# Similar to above but we should force evaluation immediately.
_PER_DATASET_OUTPUT_OPS = [
"write_json",
"write_csv",
"write_parquet",
"write_datasource",
]
# Operations that operate over the stream of output batches from the pipeline.
_OUTPUT_ITER_OPS = ["take", "take_all", "show", "to_tf", "to_torch"]
@PublicAPI
class DatasetPipeline(Generic[T]):
@ -682,308 +700,6 @@ class DatasetPipeline(Generic[T]):
return EpochDelimitedIterator(self, max_epoch)
def map(
self,
fn: RowUDF,
*,
compute: Union[str, ComputeStrategy] = None,
**ray_remote_args,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.map <ray.data.Dataset.map>` to each dataset/window
in this pipeline."""
return self.foreach_window(
lambda ds: ds.map(fn, compute=compute, **ray_remote_args)
)
def map_batches(
self,
fn: BatchUDF,
*,
batch_size: Optional[int] = 4096,
compute: Union[str, ComputeStrategy] = None,
batch_format: str = "native",
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
**ray_remote_args,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.map_batches <ray.data.Dataset.map_batches>` to each
dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.map_batches(
fn,
batch_size=batch_size,
compute=compute,
batch_format=batch_format,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
fn_constructor_kwargs=fn_constructor_kwargs,
**ray_remote_args,
)
)
def flat_map(
self,
fn: RowUDF,
*,
compute: Union[str, ComputeStrategy] = None,
**ray_remote_args,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.flat_map <ray.data.Dataset.flat_map>` to each
dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.flat_map(fn, compute=compute, **ray_remote_args)
)
def filter(
self,
fn: RowUDF,
*,
compute: Union[str, ComputeStrategy] = None,
**ray_remote_args,
) -> "DatasetPipeline[T]":
"""Apply :py:meth:`Dataset.filter <ray.data.Dataset.filter>` to each
dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.filter(fn, compute=compute, **ray_remote_args)
)
def add_column(
self,
col: str,
fn: Callable[["pandas.DataFrame"], "pandas.Series"],
*,
compute: Optional[str] = None,
**ray_remote_args,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.add_column <ray.data.Dataset.add_column>` to each
dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.add_column(col, fn, compute=compute, **ray_remote_args)
)
def drop_columns(
self,
cols: List[str],
*,
compute: Optional[str] = None,
**ray_remote_args,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.drop_columns <ray.data.Dataset.drop_columns>` to
each dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.drop_columns(cols, compute=compute, **ray_remote_args)
)
def repartition_each_window(
self, num_blocks: int, *, shuffle: bool = False
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.repartition <ray.data.Dataset.repartition>` to each
dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.repartition(num_blocks, shuffle=shuffle)
)
def random_shuffle_each_window(
self,
*,
seed: Optional[int] = None,
num_blocks: Optional[int] = None,
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>` to
each dataset/window in this pipeline."""
return self.foreach_window(
lambda ds: ds.random_shuffle(seed=seed, num_blocks=num_blocks)
)
def sort_each_window(
self, key: Optional[KeyFn] = None, descending: bool = False
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.sort <ray.data.Dataset.sort>` to each dataset/window
in this pipeline."""
return self.foreach_window(lambda ds: ds.sort(key, descending))
def randomize_block_order_each_window(
self, *, seed: Optional[int] = None
) -> "DatasetPipeline[U]":
"""Apply :py:meth:`Dataset.randomize_block_order
<ray.data.Dataset.randomize_block_order>` to each dataset/window in this
pipeline."""
return self.foreach_window(lambda ds: ds.randomize_block_order(seed=seed))
def write_json(
self,
path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
try_create_dir: bool = True,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(),
pandas_json_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
ray_remote_args: Dict[str, Any] = None,
**pandas_json_args,
) -> None:
"""Call :py:meth:`Dataset.write_json <ray.data.Dataset.write_json>` on each
output dataset of this pipeline."""
self._write_each_dataset(
lambda ds: ds.write_json(
path,
filesystem=filesystem,
try_create_dir=try_create_dir,
arrow_open_stream_args=arrow_open_stream_args,
block_path_provider=block_path_provider,
pandas_json_args_fn=pandas_json_args_fn,
ray_remote_args=ray_remote_args,
**pandas_json_args,
)
)
def write_csv(
self,
path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
try_create_dir: bool = True,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(),
arrow_csv_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
ray_remote_args: Dict[str, Any] = None,
**arrow_csv_args,
) -> None:
"""Call :py:meth:`Dataset.write_csv <ray.data.Dataset.write_csv>` on each
output dataset of this pipeline."""
self._write_each_dataset(
lambda ds: ds.write_csv(
path,
filesystem=filesystem,
try_create_dir=try_create_dir,
arrow_open_stream_args=arrow_open_stream_args,
block_path_provider=block_path_provider,
arrow_csv_args_fn=arrow_csv_args_fn,
ray_remote_args=ray_remote_args,
**arrow_csv_args,
)
)
def write_parquet(
self,
path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
try_create_dir: bool = True,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(),
arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
ray_remote_args: Dict[str, Any] = None,
**arrow_parquet_args,
) -> None:
"""Call :py:meth:`Dataset.write_parquet <ray.data.Dataset.write_parquet>` on
each output dataset of this pipeline."""
self._write_each_dataset(
lambda ds: ds.write_parquet(
path,
filesystem=filesystem,
try_create_dir=try_create_dir,
arrow_open_stream_args=arrow_open_stream_args,
block_path_provider=block_path_provider,
arrow_parquet_args_fn=arrow_parquet_args_fn,
ray_remote_args=ray_remote_args,
**arrow_parquet_args,
)
)
def write_datasource(
self,
datasource: Datasource[T],
*,
ray_remote_args: Dict[str, Any] = None,
**write_args,
) -> None:
"""Call :py:meth:`Dataset.write_datasource <ray.data.Dataset.write_datasource>`
on each output dataset of this pipeline."""
self._write_each_dataset(
lambda ds: ds.write_datasource(
datasource,
ray_remote_args=ray_remote_args,
**write_args,
)
)
def take(self, limit: int = 20) -> List[T]:
"""Call :py:meth:`Dataset.take <ray.data.Dataset.take>` over the stream of
output batches from the pipeline"""
return Dataset.take(self, limit)
def take_all(self, limit: int = 100000) -> List[T]:
"""Call :py:meth:`Dataset.take_all <ray.data.Dataset.take_all>` over the stream
of output batches from the pipeline"""
return Dataset.take_all(self, limit)
def show(self, limit: int = 20) -> None:
"""Call :py:meth:`Dataset.show <ray.data.Dataset.show>` over the stream of
output batches from the pipeline"""
return Dataset.show(self, limit)
def to_tf(
self,
*,
output_signature: Union[
TensorflowFeatureTypeSpec, Tuple[TensorflowFeatureTypeSpec, "tf.TypeSpec"]
],
label_column: Optional[str] = None,
feature_columns: Optional[
Union[List[str], List[List[str]], Dict[str, List[str]]]
] = None,
prefetch_blocks: int = 0,
batch_size: int = 1,
drop_last: bool = False,
) -> "tf.data.Dataset":
"""Call :py:meth:`Dataset.to_tf <ray.data.Dataset.to_tf>` over the stream of
output batches from the pipeline"""
return Dataset.to_tf(
self,
output_signature=output_signature,
label_column=label_column,
feature_columns=feature_columns,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
)
def to_torch(
self,
*,
label_column: Optional[str] = None,
feature_columns: Optional[
Union[List[str], List[List[str]], Dict[str, List[str]]]
] = None,
label_column_dtype: Optional["torch.dtype"] = None,
feature_column_dtypes: Optional[
Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]]
] = None,
batch_size: int = 1,
prefetch_blocks: int = 0,
drop_last: bool = False,
unsqueeze_label_tensor: bool = True,
unsqueeze_feature_tensors: bool = True,
) -> "torch.utils.data.IterableDataset":
"""Call :py:meth:`Dataset.to_torch <ray.data.Dataset.to_torch>` over the stream
of output batches from the pipeline"""
return Dataset.to_torch(
self,
label_column=label_column,
feature_columns=feature_columns,
label_column_dtype=label_column_dtype,
feature_column_dtypes=feature_column_dtypes,
batch_size=batch_size,
prefetch_blocks=prefetch_blocks,
drop_last=drop_last,
unsqueeze_label_tensor=unsqueeze_label_tensor,
unsqueeze_feature_tensors=unsqueeze_feature_tensors,
)
@DeveloperAPI
def iter_datasets(self) -> Iterator[Dataset[T]]:
"""Iterate over the output datasets of this pipeline.
@ -1113,15 +829,101 @@ class DatasetPipeline(Generic[T]):
# self._base_datasets_can_be_cleared.
return len(self._stages) > 0 or self._base_datasets_can_be_cleared
def _write_each_dataset(self, write_fn: Callable[[Dataset[T]], None]) -> None:
"""Write output for each dataset.
This is utility method used for write_json,
write_csv, write_parquet, write_datasource, etc.
"""
for method in _PER_DATASET_OPS:
def _make_impl(method):
delegate = getattr(Dataset, method)
def impl(self, *args, **kwargs) -> "DatasetPipeline[U]":
return self.foreach_window(lambda ds: getattr(ds, method)(*args, **kwargs))
impl.__name__ = delegate.__name__
impl.__doc__ = """
Apply ``Dataset.{method}`` to each dataset/window in this pipeline.
""".format(
method=method
)
setattr(
impl,
"__signature__",
inspect.signature(delegate).replace(return_annotation="DatasetPipeline[U]"),
)
return impl
setattr(DatasetPipeline, method, _make_impl(method))
for method in _HOLISTIC_PER_DATASET_OPS:
def _make_impl(method):
delegate = getattr(Dataset, method)
def impl(self, *args, **kwargs) -> "DatasetPipeline[U]":
return self.foreach_window(lambda ds: getattr(ds, method)(*args, **kwargs))
impl.__name__ = delegate.__name__
impl.__doc__ = """
Apply ``Dataset.{method}`` to each dataset/window in this pipeline.
""".format(
method=method
)
setattr(
impl,
"__signature__",
inspect.signature(delegate).replace(return_annotation="DatasetPipeline[U]"),
)
return impl
def _deprecation_warning(method: str):
def impl(*a, **kw):
raise DeprecationWarning(
"`{}` has been renamed to `{}_each_window`.".format(method, method)
)
return impl
setattr(DatasetPipeline, method, _deprecation_warning(method))
setattr(DatasetPipeline, method + "_each_window", _make_impl(method))
for method in _PER_DATASET_OUTPUT_OPS:
def _make_impl(method):
delegate = getattr(Dataset, method)
def impl(self, *args, **kwargs):
uuid = None
for i, ds in enumerate(self.iter_datasets()):
if uuid is None:
uuid = self._get_uuid() or ds._get_uuid()
ds._set_uuid(f"{uuid}_{i:06}")
write_fn(ds)
getattr(ds, method)(*args, **kwargs)
impl.__name__ = delegate.__name__
impl.__doc__ = """
Call ``Dataset.{method}`` on each output dataset of this pipeline.
""".format(
method=method
)
setattr(impl, "__signature__", inspect.signature(delegate))
return impl
setattr(DatasetPipeline, method, _make_impl(method))
for method in _OUTPUT_ITER_OPS:
def _make_impl(method):
delegate = getattr(Dataset, method)
def impl(self, *args, **kwargs):
return delegate(self, *args, **kwargs)
impl.__name__ = delegate.__name__
impl.__doc__ = """
Call ``Dataset.{method}`` over the stream of output batches from the pipeline.
""".format(
method=method
)
setattr(impl, "__signature__", inspect.signature(delegate))
return impl
setattr(DatasetPipeline, method, _make_impl(method))

View file

@ -1,15 +1,12 @@
import os
import time
from typing import Tuple
import pytest
import pandas as pd
import numpy as np
import ray
from ray.data._internal.arrow_block import ArrowRow
from ray.data.context import DatasetContext
from ray.data.dataset import Dataset
from ray.data.dataset_pipeline import DatasetPipeline
from ray.tests.conftest import * # noqa
@ -186,23 +183,11 @@ def test_basic_pipeline(ray_start_regular_shared):
pipe = ds.window(blocks_per_window=1)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)"
assert pipe.count() == 10
pipe = ds.window(blocks_per_window=1)
pipe.show()
pipe = ds.window(blocks_per_window=1).map(lambda x: x).map(lambda x: x)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=4)"
assert pipe.take() == list(range(10))
pipe = (
ds.window(blocks_per_window=1).map(lambda x: x).flat_map(lambda x: [x, x + 1])
)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=4)"
assert pipe.count() == 20
pipe = ds.window(blocks_per_window=1).filter(lambda x: x % 2 == 0)
assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=3)"
assert pipe.count() == 5
pipe = ds.window(blocks_per_window=999)
assert str(pipe) == "DatasetPipeline(num_windows=1, num_stages=2)"
assert pipe.count() == 10
@ -212,8 +197,6 @@ def test_basic_pipeline(ray_start_regular_shared):
assert pipe.count() == 100
pipe = ds.repeat(10)
assert pipe.sum() == 450
pipe = ds.repeat(10)
assert len(pipe.take_all()) == 100
def test_window(ray_start_regular_shared):
@ -411,40 +394,15 @@ def test_split_at_indices(ray_start_regular_shared):
)
def _prepare_dataset_to_write(tmp_dir: str) -> Tuple[Dataset[ArrowRow], pd.DataFrame]:
def test_parquet_write(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
ds = ds.window(blocks_per_window=1)
os.mkdir(tmp_dir)
ds._set_uuid("data")
return (ds, df)
def test_json_write(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "test_json_dir")
ds, df = _prepare_dataset_to_write(path)
ds.write_json(path)
path1 = os.path.join(path, "data_000000_000000.json")
path2 = os.path.join(path, "data_000001_000000.json")
dfds = pd.concat([pd.read_json(path1, lines=True), pd.read_json(path2, lines=True)])
assert df.equals(dfds)
def test_csv_write(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "test_csv_dir")
ds, df = _prepare_dataset_to_write(path)
ds.write_csv(path)
path1 = os.path.join(path, "data_000000_000000.csv")
path2 = os.path.join(path, "data_000001_000000.csv")
dfds = pd.concat([pd.read_csv(path1), pd.read_csv(path2)])
assert df.equals(dfds)
def test_parquet_write(ray_start_regular_shared, tmp_path):
path = os.path.join(tmp_path, "test_parquet_dir")
ds, df = _prepare_dataset_to_write(path)
os.mkdir(path)
ds._set_uuid("data")
ds.write_parquet(path)
path1 = os.path.join(path, "data_000000_000000.parquet")
path2 = os.path.join(path, "data_000001_000000.parquet")
@ -491,23 +449,6 @@ def test_count_sum_on_infinite_pipeline(ray_start_regular_shared):
assert 9 == pipe.sum()
def test_sort_each_window(ray_start_regular_shared):
pipe = ray.data.range(12).window(blocks_per_window=3).sort_each_window()
assert pipe.take() == list(range(12))
pipe = (
ray.data.range(12).window(blocks_per_window=3).sort_each_window(descending=True)
)
assert pipe.take() == [2, 1, 0, 5, 4, 3, 8, 7, 6, 11, 10, 9]
pipe = (
ray.data.range(12)
.window(blocks_per_window=3)
.sort_each_window(key=lambda x: -x, descending=True)
)
assert pipe.take() == list(range(12))
def test_randomize_block_order_each_window(ray_start_regular_shared):
pipe = ray.data.range(12).repartition(6).window(blocks_per_window=3)
pipe = pipe.randomize_block_order_each_window(seed=0)
@ -570,12 +511,11 @@ def test_preserve_whether_base_datasets_can_be_cleared(ray_start_regular_shared)
assert not p2._base_datasets_can_be_cleared
def test_add_and_drop_columns(ray_start_regular_shared):
def test_drop_columns(ray_start_regular_shared):
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]})
ds = ray.data.from_pandas(df)
pipe = ds.repeat()
pipe = pipe.add_column("col4", lambda _: 1)
assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3, "col4": 1}]
assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}]
if __name__ == "__main__":