From fbfb134b8c1196511f6da5b73ba97de04efe7242 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 20 May 2022 15:37:40 +0100 Subject: [PATCH] Revert "[Datasets] [Tensor Story - 1/2] Automatically provide tensor views to UDFs and infer tensor blocks for pure-tensor datasets. (#24812)" (#25017) This reverts commit 841f7c81ff3c44d23a7eaea6376b8a266467a0af. Reverts #24812 Broke e.g. ML tests: https://buildkite.com/ray-project/ray-builders-branch/builds/7667#55e7473e-f6a8-4d72-a875-cd68acf8b0c4 --- doc/source/data/dataset-tensor-support.rst | 55 ++---- python/ray/data/block.py | 26 +-- python/ray/data/dataset.py | 40 ++--- python/ray/data/datasource/datasource.py | 21 ++- .../ray/data/datasource/numpy_datasource.py | 7 +- python/ray/data/impl/arrow_block.py | 163 ++++-------------- python/ray/data/impl/block_batching.py | 18 +- .../ray/data/impl/delegating_block_builder.py | 18 +- python/ray/data/impl/output_buffer.py | 7 +- python/ray/data/impl/pandas_block.py | 58 ++----- python/ray/data/impl/simple_block.py | 8 +- python/ray/data/impl/table_block.py | 47 +---- python/ray/data/random_access_dataset.py | 6 +- python/ray/data/read_api.py | 17 +- python/ray/data/tests/test_dataset.py | 135 +-------------- python/ray/data/tests/test_dataset_formats.py | 42 ++--- 16 files changed, 165 insertions(+), 503 deletions(-) diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index cedad2bdf..70e4f4872 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -15,57 +15,22 @@ Automatic conversion between the Pandas and Arrow extension types/arrays keeps t Single-column tensor datasets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The most basic case is when a dataset only has a single column, which is of tensor -type. This kind of dataset can be: - -* created with :func:`range_tensor() ` - or :func:`from_numpy() `, -* transformed with NumPy UDFs via - :meth:`ds.map_batches() `, -* consumed with :meth:`ds.iter_rows() ` and - :meth:`ds.iter_batches() `, and -* can be read from and written to ``.npy`` files. - -Here is an end-to-end example: +The most basic case is when a dataset only has a single column, which is of tensor type. This kind of dataset can be created with ``.range_tensor()``, and can be read from and written to ``.npy`` files. Here are some examples: .. code-block:: python - # Create a synthetic pure-tensor Dataset. - ds = ray.data.range_tensor(10, shape=(3, 5)) - # -> Dataset(num_blocks=10, num_rows=10, - # schema={__value__: }) + # Create a Dataset of tensor-typed values. + ds = ray.data.range_tensor(10000, shape=(3, 5)) + # -> Dataset(num_blocks=200, num_rows=10000, + # schema={value: }) - # Create a pure-tensor Dataset from an existing NumPy ndarray. - arr = np.arange(10 * 3 * 5).reshape((10, 3, 5)) - ds = ray.data.from_numpy(arr) - # -> Dataset(num_blocks=1, num_rows=10, - # schema={__value__: }) + # Save to storage. + ds.write_numpy("/tmp/tensor_out", column="value") - # Transform the tensors. Datasets will automatically unpack the single-column Arrow - # table into a NumPy ndarray, provide that ndarray to your UDF, and then repack it - # into a single-column Arrow table; this will be a zero-copy conversion in both - # cases. - ds = ds.map_batches(lambda arr: arr / arr.max()) - # -> Dataset(num_blocks=1, num_rows=10, - # schema={__value__: }) - - # Consume the tensor. This will yield the underlying (3, 5) ndarrays. - for arr in ds.iter_rows(): - assert isinstance(arr, np.ndarray) - assert arr.shape == (3, 5) - - # Consume the tensor in batches. - for arr in ds.iter_batches(batch_size=2): - assert isinstance(arr, np.ndarray) - assert arr.shape == (2, 3, 5) - - # Save to storage. This will write out the blocks of the tensor column as NPY files. - ds.write_numpy("/tmp/tensor_out") - - # Read back from storage. + # Read from storage. ray.data.read_numpy("/tmp/tensor_out") - # -> Dataset(num_blocks=1, num_rows=?, - # schema={__value__: }) + # -> Dataset(num_blocks=200, num_rows=?, + # schema={value: }) Reading existing serialized tensor columns ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/python/ray/data/block.py b/python/ray/data/block.py index b396d17c2..225baf4cf 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -3,7 +3,6 @@ import time from typing import ( TypeVar, List, - Dict, Generic, Iterator, Tuple, @@ -83,10 +82,6 @@ def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None: # ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``. Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes] -# User-facing data batch type. This is the data type for data that is supplied to and -# returned from batch UDFs. -DataBatch = Union[Block, np.ndarray] - # A list of block references pending computation by a single task. For example, # this may be the output of a task reading a file. BlockPartition = List[Tuple[ObjectRef[Block], "BlockMetadata"]] @@ -215,13 +210,11 @@ class BlockAccessor(Generic[T]): """Convert this block into a Pandas dataframe.""" raise NotImplementedError - def to_numpy( - self, columns: Optional[Union[str, List[str]]] = None - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - """Convert this block (or columns of block) into a NumPy ndarray. + def to_numpy(self, column: str = None) -> np.ndarray: + """Convert this block (or column of block) into a NumPy ndarray. Args: - columns: Name of columns to convert, or None if converting all columns. + column: Name of column to convert, or None. """ raise NotImplementedError @@ -233,10 +226,6 @@ class BlockAccessor(Generic[T]): """Return the base block that this accessor wraps.""" raise NotImplementedError - def to_native(self) -> Block: - """Return the native data format for this accessor.""" - return self.to_block() - def size_bytes(self) -> int: """Return the approximate size in bytes of this block.""" raise NotImplementedError @@ -266,15 +255,6 @@ class BlockAccessor(Generic[T]): """Create a builder for this block type.""" raise NotImplementedError - @staticmethod - def batch_to_block(batch: DataBatch) -> Block: - """Create a block from user-facing data formats.""" - if isinstance(batch, np.ndarray): - from ray.data.impl.arrow_block import ArrowBlockAccessor - - return ArrowBlockAccessor.numpy_to_block(batch) - return batch - @staticmethod def for_block(block: Block) -> "BlockAccessor[T]": """Create a block accessor for the given block.""" diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 5657b7186..77b25ea8c 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -68,7 +68,6 @@ from ray.data.datasource.file_based_datasource import ( from ray.data.row import TableRow from ray.data.aggregate import AggregateFn, Sum, Max, Min, Mean, Std from ray.data.random_access_dataset import RandomAccessDataset -from ray.data.impl.table_block import VALUE_COL_NAME from ray.data.impl.remote_fn import cached_remote_fn from ray.data.impl.block_batching import batch_blocks, BatchType from ray.data.impl.plan import ExecutionPlan, OneToOneStage, AllToAllStage @@ -235,8 +234,8 @@ class Dataset(Generic[T]): def transform(block: Block) -> Iterable[Block]: DatasetContext._set_current(context) - output_buffer = BlockOutputBuffer(None, context.target_max_block_size) block = BlockAccessor.for_block(block) + output_buffer = BlockOutputBuffer(None, context.target_max_block_size) for row in block.iter_rows(): output_buffer.add(fn(row)) if output_buffer.has_next(): @@ -261,9 +260,6 @@ class Dataset(Generic[T]): ) -> "Dataset[Any]": """Apply the given function to batches of records of this dataset. - The format of the data batch provided to ``fn`` can be controlled via the - ``batch_format`` argument, and the output of the UDF can be any batch type. - This is a blocking operation. Examples: @@ -310,9 +306,10 @@ class Dataset(Generic[T]): blocks as batches. Defaults to a system-chosen batch size. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. - batch_format: Specify "native" to use the native block format (promotes - tables to Pandas and tensors to NumPy), "pandas" to select - ``pandas.DataFrame``, or "pyarrow" to select `pyarrow.Table``. + batch_format: Specify "native" to use the native block format + (promotes Arrow to pandas), "pandas" to select + ``pandas.DataFrame`` as the batch format, + or "pyarrow" to select ``pyarrow.Table``. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ @@ -341,7 +338,9 @@ class Dataset(Generic[T]): # bug where we include the entire base view on serialization. view = block.slice(start, end, copy=batch_size is not None) if batch_format == "native": - view = BlockAccessor.for_block(view).to_native() + # Always promote Arrow blocks to pandas for consistency. + if isinstance(view, pa.Table) or isinstance(view, bytes): + view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pandas": view = BlockAccessor.for_block(view).to_pandas() elif batch_format == "pyarrow": @@ -356,7 +355,6 @@ class Dataset(Generic[T]): if not ( isinstance(applied, list) or isinstance(applied, pa.Table) - or isinstance(applied, np.ndarray) or isinstance(applied, pd.core.frame.DataFrame) ): raise ValueError( @@ -366,7 +364,7 @@ class Dataset(Generic[T]): "The return type must be either list, " "pandas.DataFrame, or pyarrow.Table" ) - output_buffer.add_batch(applied) + output_buffer.add_block(applied) if output_buffer.has_next(): yield output_buffer.next() @@ -703,8 +701,6 @@ class Dataset(Generic[T]): ) if isinstance(batch, pd.DataFrame): return batch.sample(frac=fraction) - if isinstance(batch, np.ndarray): - return np.array([row for row in batch if random.random() <= fraction]) raise ValueError(f"Unsupported batch type: {type(batch)}") return self.map_batches(process_batch) @@ -2075,7 +2071,7 @@ class Dataset(Generic[T]): self, path: str, *, - column: str = VALUE_COL_NAME, + column: str = "value", filesystem: Optional["pyarrow.fs.FileSystem"] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, @@ -2103,8 +2099,7 @@ class Dataset(Generic[T]): path: The path to the destination root directory, where npy files will be written to. column: The name of the table column that contains the tensor to - be written. The default is ``"__value__"``, the column name that - Datasets uses for storing tensors in single-column tables. + be written. This defaults to "value". filesystem: The filesystem implementation to write to. try_create_dir: Try to create all directories in destination path if True. Does nothing if all directories already exist. @@ -2251,10 +2246,10 @@ class Dataset(Generic[T]): current block during the scan. batch_size: Record batch size, or None to let the system pick. batch_format: The format in which to return each batch. - Specify "native" to use the native block format (promoting - tables to Pandas and tensors to NumPy), "pandas" to select - ``pandas.DataFrame``, or "pyarrow" to select ``pyarrow.Table``. Default - is "native". + Specify "native" to use the current block format (promoting + Arrow to pandas automatically), "pandas" to + select ``pandas.DataFrame`` or "pyarrow" to select + ``pyarrow.Table``. Default is "native". drop_last: Whether to drop the last batch if it's incomplete. Returns: @@ -2776,9 +2771,8 @@ List[str]]]): The names of the columns to use as the features. Can be a list of Time complexity: O(dataset size / parallelism) Args: - column: The name of the column to convert to numpy, or None to specify the - entire row. If not specified for Arrow or Pandas blocks, each returned - future will represent a dict of column ndarrays. + column: The name of the column to convert to numpy, or None to + specify the entire row. Required for Arrow tables. Returns: A list of remote NumPy ndarrays created from this dataset. diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 5bb3ffab6..15b3471d3 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -192,11 +192,14 @@ class RangeDatasource(Datasource[Union[ArrowRow, int]]): elif block_format == "tensor": import pyarrow as pa - tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( - np.arange(start, start + count), - tuple(range(1, 1 + len(tensor_shape))), + tensor = TensorArray( + np.ones(tensor_shape, dtype=np.int64) + * np.expand_dims( + np.arange(start, start + count), + tuple(range(1, 1 + len(tensor_shape))), + ) ) - return BlockAccessor.batch_to_block(tensor) + return pa.Table.from_pydict({"value": tensor}) else: return list(builtins.range(start, start + count)) @@ -210,12 +213,16 @@ class RangeDatasource(Datasource[Union[ArrowRow, int]]): schema = pa.Table.from_pydict({"value": [0]}).schema elif block_format == "tensor": _check_pyarrow_version() + from ray.data.extensions import TensorArray import pyarrow as pa - tensor = np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( - np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) + tensor = TensorArray( + np.ones(tensor_shape, dtype=np.int64) + * np.expand_dims( + np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape))) + ) ) - schema = BlockAccessor.batch_to_block(tensor).schema + schema = pa.Table.from_pydict({"value": tensor}).schema elif block_format == "list": schema = int else: diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index 7b65d01e1..b4883246d 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -24,13 +24,18 @@ class NumpyDatasource(FileBasedDatasource): """ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): + from ray.data.extensions import TensorArray + import pyarrow as pa + # TODO(ekl) Ideally numpy can read directly from the file, but it # seems like it requires the file to be seekable. buf = BytesIO() data = f.readall() buf.write(data) buf.seek(0) - return BlockAccessor.batch_to_block(np.load(buf, allow_pickle=True)) + return pa.Table.from_pydict( + {"value": TensorArray(np.load(buf, allow_pickle=True))} + ) def _write_block( self, diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index 726927235..76b986bcc 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -6,7 +6,6 @@ from typing import ( Dict, List, Tuple, - Union, Iterator, Any, TypeVar, @@ -31,11 +30,7 @@ from ray.data.block import ( KeyType, ) from ray.data.row import TableRow -from ray.data.impl.table_block import ( - TableBlockAccessor, - TableBlockBuilder, - VALUE_COL_NAME, -) +from ray.data.impl.table_block import TableBlockAccessor, TableBlockBuilder from ray.data.aggregate import AggregateFn if TYPE_CHECKING: @@ -78,13 +73,6 @@ class ArrowBlockBuilder(TableBlockBuilder[T]): super().__init__(pyarrow.Table) def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> Block: - for col_name, col in columns.items(): - if col_name == VALUE_COL_NAME or isinstance( - next(iter(col), None), np.ndarray - ): - from ray.data.extensions.tensor_extension import ArrowTensorArray - - columns[col_name] = ArrowTensorArray.from_numpy(col) return pyarrow.Table.from_pydict(columns) def _concat_tables(self, tables: List[Block]) -> Block: @@ -96,35 +84,19 @@ class ArrowBlockBuilder(TableBlockBuilder[T]): class ArrowBlockAccessor(TableBlockAccessor): - ROW_TYPE = ArrowRow - def __init__(self, table: "pyarrow.Table"): if pyarrow is None: raise ImportError("Run `pip install pyarrow` for Arrow support") super().__init__(table) - def column_names(self) -> List[str]: - return self._table.column_names + def _create_table_row(self, row: "pyarrow.Table") -> ArrowRow: + return ArrowRow(row) @classmethod - def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor": + def from_bytes(cls, data: bytes): reader = pyarrow.ipc.open_stream(data) return cls(reader.read_all()) - @staticmethod - def numpy_to_block(batch: np.ndarray) -> "pyarrow.Table": - import pyarrow as pa - from ray.data.extensions.tensor_extension import ArrowTensorArray - - return pa.Table.from_pydict( - {VALUE_COL_NAME: ArrowTensorArray.from_numpy(batch)} - ) - - @staticmethod - def _build_tensor_row(row: ArrowRow) -> np.ndarray: - # Getting an item in a tensor column automatically does a NumPy conversion. - return row[VALUE_COL_NAME][0] - def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table": view = self._table.slice(start, end - start) if copy: @@ -133,7 +105,7 @@ class ArrowBlockAccessor(TableBlockAccessor): def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table": random = np.random.RandomState(random_seed) - return self.take(random.permutation(self.num_rows())) + return self._table.take(random.permutation(self.num_rows())) def schema(self) -> "pyarrow.lib.Schema": return self._table.schema @@ -141,34 +113,26 @@ class ArrowBlockAccessor(TableBlockAccessor): def to_pandas(self) -> "pandas.DataFrame": return self._table.to_pandas() - def to_numpy( - self, columns: Optional[Union[str, List[str]]] = None - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - if columns is None: - columns = self._table.column_names - if not isinstance(columns, list): - columns = [columns] - for column in columns: - if column not in self._table.column_names: - raise ValueError( - f"Cannot find column {column}, available columns: " - f"{self._table.column_names}" - ) - arrays = [] - for column in columns: - array = self._table[column] - if array.num_chunks == 0: - array = pyarrow.array([], type=array.type) - elif _is_column_extension_type(array): - array = _concatenate_extension_column(array) - else: - array = array.combine_chunks() - arrays.append(array.to_numpy(zero_copy_only=False)) - if len(arrays) == 1: - arrays = arrays[0] + def to_numpy(self, column: str = None) -> np.ndarray: + if column is None: + raise ValueError( + "`column` must be specified when calling .to_numpy() " + "on Arrow blocks." + ) + if column not in self._table.column_names: + raise ValueError( + f"Cannot find column {column}, available columns: " + f"{self._table.column_names}" + ) + array = self._table[column] + if array.num_chunks > 1: + # TODO(ekl) combine fails since we can't concat + # ArrowTensorType? + array = array.combine_chunks() else: - arrays = dict(zip(columns, arrays)) - return arrays + assert array.num_chunks == 1, array + array = array.chunk(0) + return array.to_numpy(zero_copy_only=False) def to_arrow(self) -> "pyarrow.Table": return self._table @@ -205,45 +169,9 @@ class ArrowBlockAccessor(TableBlockAccessor): def _empty_table() -> "pyarrow.Table": return ArrowBlockBuilder._empty_table() - @staticmethod - def take_table( - table: "pyarrow.Table", - indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], - ) -> "pyarrow.Table": - """Select rows from the table. - - This method is an alternative to pyarrow.Table.take(), which breaks for - extension arrays. This is exposed as a static method for easier use on - intermediate tables, not underlying an ArrowBlockAccessor. - """ - if any(_is_column_extension_type(col) for col in table.columns): - new_cols = [] - for col in table.columns: - if _is_column_extension_type(col): - # .take() will concatenate internally, which currently breaks for - # extension arrays. - col = _concatenate_extension_column(col) - new_cols.append(col.take(indices)) - table = pyarrow.Table.from_arrays(new_cols, schema=table.schema) - else: - table = table.take(indices) - return table - - def take( - self, - indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], - ) -> "pyarrow.Table": - """Select rows from the underlying table. - - This method is an alternative to pyarrow.Table.take(), which breaks for - extension arrays. - """ - return self.take_table(self._table, indices) - def _sample(self, n_samples: int, key: "SortKeyT") -> "pyarrow.Table": indices = random.sample(range(self._table.num_rows), n_samples) - table = self._table.select([k[0] for k in key]) - return self.take_table(table, indices) + return self._table.select([k[0] for k in key]).take(indices) def count(self, on: KeyFn) -> Optional[U]: """Count the number of non-null values in the provided column.""" @@ -340,7 +268,7 @@ class ArrowBlockAccessor(TableBlockAccessor): import pyarrow.compute as pac indices = pac.sort_indices(self._table, sort_keys=key) - table = self.take(indices) + table = self._table.take(indices) if len(boundaries) == 0: return [table] @@ -465,7 +393,7 @@ class ArrowBlockAccessor(TableBlockAccessor): else: ret = pyarrow.concat_tables(blocks, promote=True) indices = pyarrow.compute.sort_indices(ret, sort_keys=key) - ret = ArrowBlockAccessor.take_table(ret, indices) + ret = ret.take(indices) return ret, ArrowBlockAccessor(ret).get_metadata(None, exec_stats=stats.build()) @staticmethod @@ -561,33 +489,6 @@ class ArrowBlockAccessor(TableBlockAccessor): return ret, ArrowBlockAccessor(ret).get_metadata(None, exec_stats=stats.build()) -def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: - """Whether the provided Arrow Table column is an extension array, using an Arrow - extension type. - """ - return isinstance(ca.type, pyarrow.ExtensionType) - - -def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": - """Concatenate chunks of an extension column into a contiguous array. - - This concatenation is required for creating copies and for .take() to work on - extension arrays. - See https://issues.apache.org/jira/browse/ARROW-16503. - """ - if not _is_column_extension_type(ca): - raise ValueError("Chunked array isn't an extension array: {ca}") - - if ca.num_chunks == 0: - # No-op for no-chunk chunked arrays, since there's nothing to concatenate. - return ca - - chunk = ca.chunk(0) - return type(chunk).from_storage( - chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) - ) - - def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table": """Copy the provided Arrow table.""" import pyarrow as pa @@ -597,10 +498,14 @@ def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table": cols = table.columns new_cols = [] for col in cols: - if _is_column_extension_type(col): - # Extension arrays don't support concatenation. - arr = _concatenate_extension_column(col) + if col.num_chunks > 0 and isinstance(col.chunk(0), pa.ExtensionArray): + # If an extension array, we copy the underlying storage arrays. + chunk = col.chunk(0) + arr = type(chunk).from_storage( + chunk.type, pa.concat_arrays([c.storage for c in col.chunks]) + ) else: + # Otherwise, we copy the top-level chunk arrays. arr = col.combine_chunks() new_cols.append(arr) return pa.Table.from_arrays(new_cols, schema=table.schema) diff --git a/python/ray/data/impl/block_batching.py b/python/ray/data/impl/block_batching.py index 4cd3660c9..702e7927e 100644 --- a/python/ray/data/impl/block_batching.py +++ b/python/ray/data/impl/block_batching.py @@ -93,20 +93,26 @@ def batch_blocks( def _format_batch(batch: Block, batch_format: str) -> BatchType: + import pyarrow as pa + if batch_format == "native": - batch = BlockAccessor.for_block(batch).to_native() + # Always promote Arrow blocks to pandas for consistency, since + # we lazily convert pandas->Arrow internally for efficiency. + if isinstance(batch, pa.Table) or isinstance(batch, bytes): + batch = BlockAccessor.for_block(batch) + batch = batch.to_pandas() + return batch elif batch_format == "pandas": - batch = BlockAccessor.for_block(batch).to_pandas() + batch = BlockAccessor.for_block(batch) + return batch.to_pandas() elif batch_format == "pyarrow": - batch = BlockAccessor.for_block(batch).to_arrow() - elif batch_format == "numpy": - batch = BlockAccessor.for_block(batch).to_numpy() + batch = BlockAccessor.for_block(batch) + return batch.to_arrow() else: raise ValueError( f"The given batch format: {batch_format} " f"is invalid. Supported batch type: {BatchType}" ) - return batch def _sliding_window(iterable: Iterable, n: int): diff --git a/python/ray/data/impl/delegating_block_builder.py b/python/ray/data/impl/delegating_block_builder.py index d88fac07f..b66598b60 100644 --- a/python/ray/data/impl/delegating_block_builder.py +++ b/python/ray/data/impl/delegating_block_builder.py @@ -1,8 +1,6 @@ from typing import Any -import numpy as np - -from ray.data.block import Block, DataBatch, T, BlockAccessor +from ray.data.block import Block, T, BlockAccessor from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder from ray.data.impl.arrow_block import ArrowRow, ArrowBlockBuilder @@ -15,6 +13,7 @@ class DelegatingBlockBuilder(BlockBuilder[T]): self._empty_block = None def add(self, item: Any) -> None: + if self._builder is None: # TODO (kfstorm): Maybe we can use Pandas block format for dict. if isinstance(item, dict) or isinstance(item, ArrowRow): @@ -27,24 +26,13 @@ class DelegatingBlockBuilder(BlockBuilder[T]): self._builder = ArrowBlockBuilder() except (TypeError, pyarrow.lib.ArrowInvalid): self._builder = SimpleBlockBuilder() - elif isinstance(item, np.ndarray): - self._builder = ArrowBlockBuilder() elif isinstance(item, PandasRow): self._builder = PandasBlockBuilder() else: self._builder = SimpleBlockBuilder() self._builder.add(item) - def add_batch(self, batch: DataBatch): - """Add a user-facing data batch to the builder. - - This data batch will be converted to an internal block and then added to the - underlying builder. - """ - block = BlockAccessor.batch_to_block(batch) - return self.add_block(block) - - def add_block(self, block: Block): + def add_block(self, block: Block) -> None: accessor = BlockAccessor.for_block(block) if accessor.num_rows() == 0: # Don't infer types of empty lists. Store the block and use it if no diff --git a/python/ray/data/impl/output_buffer.py b/python/ray/data/impl/output_buffer.py index b5a07cbb6..bc7ea61cb 100644 --- a/python/ray/data/impl/output_buffer.py +++ b/python/ray/data/impl/output_buffer.py @@ -1,6 +1,6 @@ from typing import Callable, Any, Optional -from ray.data.block import Block, DataBatch, BlockAccessor +from ray.data.block import Block, BlockAccessor from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder @@ -44,11 +44,6 @@ class BlockOutputBuffer(object): assert not self._finalized self._buffer.add(item) - def add_batch(self, batch: DataBatch) -> None: - """Add a data batch to this output buffer.""" - assert not self._finalized - self._buffer.add_batch(batch) - def add_block(self, block: Block) -> None: """Add a data block to this output buffer.""" assert not self._finalized diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py index 70302d9f0..07bbdc1eb 100644 --- a/python/ray/data/impl/pandas_block.py +++ b/python/ray/data/impl/pandas_block.py @@ -3,7 +3,6 @@ from typing import ( Dict, List, Tuple, - Union, Iterator, Any, TypeVar, @@ -16,11 +15,7 @@ import numpy as np from ray.data.block import BlockAccessor, BlockMetadata, KeyFn, U from ray.data.row import TableRow -from ray.data.impl.table_block import ( - TableBlockAccessor, - TableBlockBuilder, - VALUE_COL_NAME, -) +from ray.data.impl.table_block import TableBlockAccessor, TableBlockBuilder from ray.data.impl.arrow_block import ArrowBlockAccessor from ray.data.aggregate import AggregateFn @@ -76,13 +71,6 @@ class PandasBlockBuilder(TableBlockBuilder[T]): def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> "pandas.DataFrame": pandas = lazy_import_pandas() - for key, value in columns.items(): - if key == VALUE_COL_NAME or isinstance(next(iter(value), None), np.ndarray): - from ray.data.extensions.tensor_extension import TensorArray - - if len(value) == 1: - value = value[0] - columns[key] = TensorArray(value) return pandas.DataFrame(columns) def _concat_tables(self, tables: List["pandas.DataFrame"]) -> "pandas.DataFrame": @@ -101,19 +89,11 @@ PandasBlockSchema = collections.namedtuple("PandasBlockSchema", ["names", "types class PandasBlockAccessor(TableBlockAccessor): - ROW_TYPE = PandasRow - def __init__(self, table: "pandas.DataFrame"): super().__init__(table) - def column_names(self) -> List[str]: - return self._table.columns.tolist() - - @staticmethod - def _build_tensor_row(row: PandasRow) -> np.ndarray: - # Getting an item in a Pandas tensor column returns a TensorArrayElement, which - # we have to convert to an ndarray. - return row[VALUE_COL_NAME].iloc[0].to_numpy() + def _create_table_row(self, row: "pandas.DataFrame") -> PandasRow: + return PandasRow(row) def slice(self, start: int, end: int, copy: bool) -> "pandas.DataFrame": view = self._table[start:end] @@ -142,27 +122,19 @@ class PandasBlockAccessor(TableBlockAccessor): def to_pandas(self) -> "pandas.DataFrame": return self._table - def to_numpy( - self, columns: Optional[Union[str, List[str]]] = None - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - if columns is None: - columns = self._table.columns.tolist() - if not isinstance(columns, list): - columns = [columns] - for column in columns: - if column not in self._table.columns: - raise ValueError( - f"Cannot find column {column}, available columns: " - f"{self._table.columns.tolist()}" + def to_numpy(self, column: str = None) -> np.ndarray: + if not column: + raise ValueError( + "`column` must be specified when calling .to_numpy() " + "on Pandas blocks." + ) + if column not in self._table.columns: + raise ValueError( + "Cannot find column {}, available columns: {}".format( + column, self._table.columns.tolist() ) - arrays = [] - for column in columns: - arrays.append(self._table[column].to_numpy()) - if len(arrays) == 1: - arrays = arrays[0] - else: - arrays = dict(zip(columns, arrays)) - return arrays + ) + return self._table[column].to_numpy() def to_arrow(self) -> "pyarrow.Table": import pyarrow diff --git a/python/ray/data/impl/simple_block.py b/python/ray/data/impl/simple_block.py index 05acbf0fc..d952583a3 100644 --- a/python/ray/data/impl/simple_block.py +++ b/python/ray/data/impl/simple_block.py @@ -1,7 +1,7 @@ import random import sys import heapq -from typing import Union, Callable, Iterator, List, Tuple, Any, Optional, TYPE_CHECKING +from typing import Callable, Iterator, List, Tuple, Any, Optional, TYPE_CHECKING import numpy as np @@ -84,9 +84,9 @@ class SimpleBlockAccessor(BlockAccessor): return pandas.DataFrame({"value": self._items}) - def to_numpy(self, columns: Optional[Union[str, List[str]]] = None) -> np.ndarray: - if columns: - raise ValueError("`columns` arg is not supported for list block.") + def to_numpy(self, column: str = None) -> np.ndarray: + if column: + raise ValueError("`column` arg not supported for list block") return np.array(self._items) def to_arrow(self) -> "pyarrow.Table": diff --git a/python/ray/data/impl/table_block.py b/python/ray/data/impl/table_block.py index 8b9284390..ad760de85 100644 --- a/python/ray/data/impl/table_block.py +++ b/python/ray/data/impl/table_block.py @@ -1,7 +1,6 @@ import collections -from typing import Dict, Iterator, List, Union, Any, TypeVar, TYPE_CHECKING -import numpy as np +from typing import Dict, Iterator, List, Union, Any, TypeVar, TYPE_CHECKING from ray.data.block import Block, BlockAccessor from ray.data.row import TableRow @@ -11,11 +10,6 @@ from ray.data.impl.size_estimator import SizeEstimator if TYPE_CHECKING: from ray.data.impl.sort import SortKeyT - -# The internal column name used for pure-tensor datasets, represented as -# single-tensor-column tables. -VALUE_COL_NAME = "__value__" - T = TypeVar("T") # The max size of Python tuples to buffer before compacting them into a @@ -36,11 +30,9 @@ class TableBlockBuilder(BlockBuilder[T]): self._num_compactions = 0 self._block_type = block_type - def add(self, item: Union[dict, TableRow, np.ndarray]) -> None: + def add(self, item: Union[dict, TableRow]) -> None: if isinstance(item, TableRow): item = item.as_pydict() - elif isinstance(item, np.ndarray): - item = {VALUE_COL_NAME: item} if not isinstance(item, dict): raise ValueError( "Returned elements of an TableBlock must be of type `dict`, " @@ -108,42 +100,16 @@ class TableBlockBuilder(BlockBuilder[T]): class TableBlockAccessor(BlockAccessor): - ROW_TYPE: TableRow = TableRow - def __init__(self, table: Any): self._table = table - def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray]: - row = self.slice(index, index + 1, copy=copy) - if self.is_tensor_wrapper(): - row = self._build_tensor_row(row) - else: - row = self.ROW_TYPE(row) - return row - - @staticmethod - def _build_tensor_row(row: TableRow) -> np.ndarray: - raise NotImplementedError - - def to_native(self) -> Block: - if self.is_tensor_wrapper(): - native = self.to_numpy() - else: - # Always promote Arrow blocks to pandas for consistency, since - # we lazily convert pandas->Arrow internally for efficiency. - native = self.to_pandas() - return native - - def column_names(self) -> List[str]: + def _create_table_row(self, row: Any) -> TableRow: raise NotImplementedError def to_block(self) -> Block: return self._table - def is_tensor_wrapper(self) -> bool: - return self.column_names() == [VALUE_COL_NAME] - - def iter_rows(self) -> Iterator[Union[TableRow, np.ndarray]]: + def iter_rows(self) -> Iterator[TableRow]: outer = self class Iter: @@ -156,7 +122,10 @@ class TableBlockAccessor(BlockAccessor): def __next__(self): self._cur += 1 if self._cur < outer.num_rows(): - return outer._get_row(self._cur) + row = outer._create_table_row( + outer.slice(self._cur, self._cur + 1, copy=False) + ) + return row raise StopIteration return Iter() diff --git a/python/ray/data/random_access_dataset.py b/python/ray/data/random_access_dataset.py index 54ae33993..018f69485 100644 --- a/python/ray/data/random_access_dataset.py +++ b/python/ray/data/random_access_dataset.py @@ -223,7 +223,9 @@ class _RandomAccessWorker: col = block[self.key_field] indices = np.searchsorted(col, keys) acc = BlockAccessor.for_block(block) - result = [acc._get_row(i, copy=True) for i in indices] + result = [ + acc._create_table_row(acc.slice(i, i + 1, copy=True)) for i in indices + ] # assert result == [self._get(i, k) for i, k in zip(block_indices, keys)] else: result = [self._get(i, k) for i, k in zip(block_indices, keys)] @@ -252,7 +254,7 @@ class _RandomAccessWorker: if i is None: return None acc = BlockAccessor.for_block(block) - return acc._get_row(i, copy=True) + return acc._create_table_row(acc.slice(i, i + 1, copy=True)) def _binary_search_find(column, x): diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index a30664dac..a2a67c493 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -173,10 +173,10 @@ def range_tensor( >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(3, 10)) # doctest: +SKIP >>> ds.map_batches( # doctest: +SKIP - ... lambda arr: arr * 2).show() + ... lambda arr: arr * 2, batch_format="pandas").show() This is similar to range_table(), but uses the ArrowTensorArray extension - type. The dataset elements take the form {VALUE_COL_NAME: array(N, shape=shape)}. + type. The dataset elements take the form {"value": array(N, shape=shape)}. Args: n: The upper bound of the range of integer records. @@ -1019,11 +1019,16 @@ def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: stats = BlockExecStats.builder() - block = BlockAccessor.batch_to_block(ndarray) - metadata = BlockAccessor.for_block(block).get_metadata( - input_files=None, exec_stats=stats.build() + import pyarrow as pa + from ray.data.extensions import TensorArray + + table = pa.Table.from_pydict({"value": TensorArray(ndarray)}) + return ( + table, + BlockAccessor.for_block(table).get_metadata( + input_files=None, exec_stats=stats.build() + ), ) - return block, metadata def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMetadata: diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index ffee72f24..78f5928e4 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -443,138 +443,21 @@ def test_range_table(ray_start_regular_shared): assert ds.take() == [{"value": i} for i in range(10)] -def test_tensors_basic(ray_start_regular_shared): +def test_tensors(ray_start_regular_shared): # Create directly. - tensor_shape = (3, 5) - ds = ray.data.range_tensor(6, shape=tensor_shape) + ds = ray.data.range_tensor(5, shape=(3, 5)) assert str(ds) == ( - "Dataset(num_blocks=6, num_rows=6, " - "schema={__value__: })" + "Dataset(num_blocks=5, num_rows=5, " + "schema={value: })" ) - # Test row iterator yields tensors. - for tensor in ds.iter_rows(): - assert isinstance(tensor, np.ndarray) - assert tensor.shape == tensor_shape - - # Test batch iterator yields tensors. - for tensor in ds.iter_batches(batch_size=2): - assert isinstance(tensor, np.ndarray) - assert tensor.shape == (2,) + tensor_shape - - # Native format. - def np_mapper(arr): - assert isinstance(arr, np.ndarray) - return arr + 1 - - res = ray.data.range_tensor(2, shape=(2, 2)).map(np_mapper).take() - np.testing.assert_equal(res, [np.ones((2, 2)), 2 * np.ones((2, 2))]) - # Pandas conversion. - def pd_mapper(df): - assert isinstance(df, pd.DataFrame) - return df + 2 - - res = ray.data.range_tensor(2).map_batches(pd_mapper, batch_format="pandas").take() - np.testing.assert_equal(res, [np.array([2]), np.array([3])]) - - -def test_tensors_shuffle(ray_start_regular_shared): - # Test Arrow table representation. - tensor_shape = (3, 5) - ds = ray.data.range_tensor(6, shape=tensor_shape) - shuffled_ds = ds.random_shuffle() - shuffled = shuffled_ds.take() - base = ds.take() - np.testing.assert_raises( - AssertionError, - np.testing.assert_equal, - shuffled, - base, - ) - np.testing.assert_equal( - sorted(shuffled, key=lambda arr: arr.min()), - sorted(base, key=lambda arr: arr.min()), - ) - - # Test Pandas table representation. - tensor_shape = (3, 5) - ds = ray.data.range_tensor(6, shape=tensor_shape) - ds = ds.map_batches(lambda df: df, batch_format="pandas") - shuffled_ds = ds.random_shuffle() - shuffled = shuffled_ds.take() - base = ds.take() - np.testing.assert_raises( - AssertionError, - np.testing.assert_equal, - shuffled, - base, - ) - np.testing.assert_equal( - sorted(shuffled, key=lambda arr: arr.min()), - sorted(base, key=lambda arr: arr.min()), - ) - - -def test_tensors_sort(ray_start_regular_shared): - # Test Arrow table representation. - t = pa.table({"a": TensorArray(np.arange(32).reshape((2, 4, 4))), "b": [1, 2]}) - ds = ray.data.from_arrow(t) - sorted_ds = ds.sort(key="b", descending=True) - sorted_arrs = [row["a"] for row in sorted_ds.take()] - base = [row["a"] for row in ds.take()] - np.testing.assert_raises( - AssertionError, - np.testing.assert_equal, - sorted_arrs, - base, - ) - np.testing.assert_equal( - sorted_arrs, - sorted(base, key=lambda arr: -arr.min()), - ) - - # Test Pandas table representation. - df = pd.DataFrame({"a": TensorArray(np.arange(32).reshape((2, 4, 4))), "b": [1, 2]}) - ds = ray.data.from_pandas(df) - sorted_ds = ds.sort(key="b", descending=True) - sorted_arrs = [np.asarray(row["a"]) for row in sorted_ds.take()] - base = [np.asarray(row["a"]) for row in ds.take()] - np.testing.assert_raises( - AssertionError, - np.testing.assert_equal, - sorted_arrs, - base, - ) - np.testing.assert_equal( - sorted_arrs, - sorted(base, key=lambda arr: -arr.min()), - ) - - -def test_tensors_inferred_from_map(ray_start_regular_shared): - # Test map. - ds = ray.data.range(10).map(lambda _: np.ones((4, 4))) - assert str(ds) == ( - "Dataset(num_blocks=10, num_rows=10, " - "schema={__value__: })" - ) - - # Test map_batches. - ds = ray.data.range(16, parallelism=4).map_batches( - lambda _: np.ones((3, 4, 4)), batch_size=2 - ) - assert str(ds) == ( - "Dataset(num_blocks=4, num_rows=24, " - "schema={__value__: })" - ) - - # Test flat_map. - ds = ray.data.range(10).flat_map(lambda _: [np.ones((4, 4)), np.ones((4, 4))]) - assert str(ds) == ( - "Dataset(num_blocks=10, num_rows=20, " - "schema={__value__: })" + res = ( + ray.data.range_tensor(10) + .map_batches(lambda t: t + 2, batch_format="pandas") + .take(2) ) + assert str(res) == "[{'value': array([2])}, {'value': array([3])}]" def test_tensor_array_ops(ray_start_regular_shared): diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index fe43a36ce..9d4097605 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -121,7 +121,7 @@ def test_from_numpy(ray_start_regular_shared, from_ref): ds = ray.data.from_numpy_refs([ray.put(arr) for arr in arrs]) else: ds = ray.data.from_numpy(arrs) - values = np.stack(ds.take(8)) + values = np.stack([x["value"] for x in ds.take(8)]) np.testing.assert_array_equal(values, np.concatenate((arr1, arr2))) # Test from single NumPy ndarray. @@ -129,7 +129,7 @@ def test_from_numpy(ray_start_regular_shared, from_ref): ds = ray.data.from_numpy_refs(ray.put(arr1)) else: ds = ray.data.from_numpy(arr1) - values = np.stack(ds.take(4)) + values = np.stack([x["value"] for x in ds.take(4)]) np.testing.assert_array_equal(values, arr1) @@ -197,28 +197,14 @@ def test_to_numpy_refs(ray_start_regular_shared): # Tensor Dataset ds = ray.data.range_tensor(10, parallelism=2) - arr = np.concatenate(ray.get(ds.to_numpy_refs())) + arr = np.concatenate(ray.get(ds.to_numpy_refs(column="value"))) np.testing.assert_equal(arr, np.expand_dims(np.arange(0, 10), 1)) # Table Dataset ds = ray.data.range_table(10) - arr = np.concatenate(ray.get(ds.to_numpy_refs())) + arr = np.concatenate(ray.get(ds.to_numpy_refs(column="value"))) np.testing.assert_equal(arr, np.arange(0, 10)) - # Test multi-column Arrow dataset. - ds = ray.data.from_arrow(pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})) - arrs = ray.get(ds.to_numpy_refs()) - np.testing.assert_equal( - arrs, [{"a": np.array([1, 2, 3]), "b": np.array([4, 5, 6])}] - ) - - # Test multi-column Pandas dataset. - ds = ray.data.from_pandas(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})) - arrs = ray.get(ds.to_numpy_refs()) - np.testing.assert_equal( - arrs, [{"a": np.array([1, 2, 3]), "b": np.array([4, 5, 6])}] - ) - def test_to_arrow_refs(ray_start_regular_shared): n = 5 @@ -1012,9 +998,9 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.read_numpy(data_path, filesystem=fs) assert str(ds) == ( "Dataset(num_blocks=2, num_rows=None, " - "schema={__value__: })" + "schema={value: })" ) - np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) + assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" def test_numpy_read(ray_start_regular_shared, tmp_path): @@ -1024,9 +1010,9 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - "schema={__value__: })" + "schema={value: })" ) - np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) + assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): @@ -1037,9 +1023,9 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) assert str(ds) == ( "Dataset(num_blocks=1, num_rows=10, " - "schema={__value__: })" + "schema={value: })" ) - np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])]) + assert str(ds.take(2)) == "[{'value': array([0])}, {'value': array([1])}]" with pytest.raises(NotImplementedError): ray.data.read_binary_files( @@ -1091,10 +1077,10 @@ def test_numpy_read_partitioned_with_filter( ds = ray.data.read_numpy(base_dir, partition_filter=partition_path_filter) vals = [[1, 0], [1, 1], [1, 2], [3, 3], [3, 4], [3, 5]] - val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] + val_str = "".join([f"{{'value': array({v}, dtype=int8)}}, " for v in vals])[:-2] assert_base_partitioned_ds( ds, - schema="{__value__: }", + schema="{value: }", sorted_values=f"[[{val_str}]]", ds_take_transform_fn=lambda taken: [taken], sorted_values_transform_fn=lambda sorted_values: str(sorted_values), @@ -1133,7 +1119,7 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): assert len(arr2) == 5 assert arr1.sum() == 10 assert arr2.sum() == 35 - np.testing.assert_equal(ds.take(1), [np.array([0])]) + assert str(ds.take(1)) == "[{'value': array([0])}]" @pytest.mark.parametrize( @@ -1172,7 +1158,7 @@ def test_numpy_write_block_path_provider( assert len(arr2) == 5 assert arr1.sum() == 10 assert arr2.sum() == 35 - np.testing.assert_equal(ds.take(1), [np.array([0])]) + assert str(ds.take(1)) == "[{'value': array([0])}]" def test_read_text(ray_start_regular_shared, tmp_path):