From 2038cc96c6833d296ffbeb8a4473bbe73391776c Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Tue, 1 Feb 2022 04:09:51 +0800 Subject: [PATCH] Revert "Revert "[Dataset] [DataFrame 2/n] Add pandas block format implementation (partial) (#20988) (#21661)" (#21894) This PR adds pandas block format support by implementing `PandasRow`, `PandasBlockBuilder`, `PandasBlockAccessor`. Note that `sort_and_partition`, `combine`, `merge_sorted_blocks`, `aggregate_combined_blocks` in `PandasBlockAccessor` redirects to arrow block format implementation for now. They'll be implemented in a later PR. --- .../datasets_train/datasets_train.py | 11 +- python/ray/data/block.py | 7 +- python/ray/data/context.py | 7 + python/ray/data/dataset.py | 58 +++--- python/ray/data/impl/arrow_block.py | 2 +- .../ray/data/impl/delegating_block_builder.py | 4 + python/ray/data/impl/pandas_block.py | 195 ++++++++++++++++++ python/ray/data/impl/simple_block.py | 2 +- python/ray/data/read_api.py | 34 ++- python/ray/data/tests/test_dataset.py | 118 +++++++---- 10 files changed, 363 insertions(+), 75 deletions(-) create mode 100644 python/ray/data/impl/pandas_block.py diff --git a/doc/source/ray-core/_examples/datasets_train/datasets_train.py b/doc/source/ray-core/_examples/datasets_train/datasets_train.py index f668f82f0..712b07114 100644 --- a/doc/source/ray-core/_examples/datasets_train/datasets_train.py +++ b/doc/source/ray-core/_examples/datasets_train/datasets_train.py @@ -271,6 +271,7 @@ def inference( model_cls, compute="actors", batch_size=batch_size, + batch_format="pandas", num_gpus=num_gpus, num_cpus=0, ).write_parquet(result_path) @@ -584,8 +585,8 @@ if __name__ == "__main__": ) num_columns = len(train_dataset.schema().names) - # remove label column and internal Arrow column. - num_features = num_columns - 2 + # remove label column. + num_features = num_columns - 1 NUM_EPOCHS = 2 BATCH_SIZE = 512 @@ -688,8 +689,10 @@ if __name__ == "__main__": self.model = load_model_func().to(self.device) def __call__(self, batch) -> "pd.DataFrame": - tensor = torch.FloatTensor(batch.to_pandas().values).to(self.device) - return pd.DataFrame(self.model(tensor).cpu().detach().numpy()) + tensor = torch.FloatTensor(batch.values).to(self.device) + return pd.DataFrame( + self.model(tensor).cpu().detach().numpy(), columns=["value"] + ) inference_dataset = preprocessor.preprocess_inference_data( read_dataset(inference_path) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index fb9968b77..7cf54d421 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -79,7 +79,7 @@ def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None: # # Block data can be accessed in a uniform way via ``BlockAccessors`` such as # ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``. -Block = Union[List[T], "pyarrow.Table", bytes] +Block = Union[List[T], "pyarrow.Table", "pandas.DataFrame", bytes] # A list of block references pending computation by a single task. For example, # this may be the output of a task reading a file. @@ -260,11 +260,16 @@ class BlockAccessor(Generic[T]): """Create a block accessor for the given block.""" _check_pyarrow_version() import pyarrow + import pandas if isinstance(block, pyarrow.Table): from ray.data.impl.arrow_block import ArrowBlockAccessor return ArrowBlockAccessor(block) + elif isinstance(block, pandas.DataFrame): + from ray.data.impl.pandas_block import PandasBlockAccessor + + return PandasBlockAccessor(block) elif isinstance(block, bytes): from ray.data.impl.arrow_block import ArrowBlockAccessor diff --git a/python/ray/data/context.py b/python/ray/data/context.py index beafd3dc8..ebc532803 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -14,6 +14,10 @@ DEFAULT_TARGET_MAX_BLOCK_SIZE = 2048 * 1024 * 1024 # Whether block splitting is on by default DEFAULT_BLOCK_SPLITTING_ENABLED = False +# Whether pandas block format is enabled. +# TODO (kfstorm): Remove this once stable. +DEFAULT_ENABLE_PANDAS_BLOCK = True + @DeveloperAPI class DatasetContext: @@ -28,11 +32,13 @@ class DatasetContext: block_owner: ray.actor.ActorHandle, block_splitting_enabled: bool, target_max_block_size: int, + enable_pandas_block: bool, ): """Private constructor (use get_current() instead).""" self.block_owner = block_owner self.block_splitting_enabled = block_splitting_enabled self.target_max_block_size = target_max_block_size + self.enable_pandas_block = enable_pandas_block @staticmethod def get_current() -> "DatasetContext": @@ -50,6 +56,7 @@ class DatasetContext: block_owner=None, block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED, target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE, + enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK, ) if _default_context.block_owner is None: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0d46616c5..07bfabb34 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -74,6 +74,7 @@ from ray.data.impl.shuffle import simple_shuffle, _shuffle_reduce from ray.data.impl.sort import sort_impl from ray.data.impl.block_list import BlockList from ray.data.impl.lazy_block_list import LazyBlockList +from ray.data.impl.table_block import TableRow from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder # An output type of iter_batches() determined by the batch_format parameter. @@ -263,11 +264,11 @@ class Dataset(Generic[T]): ) applied = fn(view) - if isinstance(applied, list) or isinstance(applied, pa.Table): - applied = applied - elif isinstance(applied, pd.core.frame.DataFrame): - applied = pa.Table.from_pandas(applied) - else: + if not ( + isinstance(applied, list) + or isinstance(applied, pa.Table) + or isinstance(applied, pd.core.frame.DataFrame) + ): raise ValueError( "The map batches UDF returned the value " f"{applied}, which is not allowed. " @@ -440,12 +441,15 @@ class Dataset(Generic[T]): # Handle empty blocks. if len(new_blocks) < num_blocks: from ray.data.impl.arrow_block import ArrowBlockBuilder + from ray.data.impl.pandas_block import PandasBlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder num_empties = num_blocks - len(new_blocks) dataset_format = self._dataset_format() if dataset_format == "arrow": builder = ArrowBlockBuilder() + elif dataset_format == "pandas": + builder = PandasBlockBuilder() else: builder = SimpleBlockBuilder() empty_block = builder.build() @@ -1030,10 +1034,8 @@ class Dataset(Generic[T]): ret = self._aggregate_on(Sum, on) if ret is None: return 0 - elif len(ret) == 1: - return ret[0] else: - return ret + return self._aggregate_result(ret) def min(self, on: Union[KeyFn, List[KeyFn]] = None) -> U: """Compute minimum over entire dataset. @@ -1085,10 +1087,8 @@ class Dataset(Generic[T]): ret = self._aggregate_on(Min, on) if ret is None: raise ValueError("Cannot compute min on an empty dataset") - elif len(ret) == 1: - return ret[0] else: - return ret + return self._aggregate_result(ret) def max(self, on: Union[KeyFn, List[KeyFn]] = None) -> U: """Compute maximum over entire dataset. @@ -1140,10 +1140,8 @@ class Dataset(Generic[T]): ret = self._aggregate_on(Max, on) if ret is None: raise ValueError("Cannot compute max on an empty dataset") - elif len(ret) == 1: - return ret[0] else: - return ret + return self._aggregate_result(ret) def mean(self, on: Union[KeyFn, List[KeyFn]] = None) -> U: """Compute mean over entire dataset. @@ -1195,10 +1193,8 @@ class Dataset(Generic[T]): ret = self._aggregate_on(Mean, on) if ret is None: raise ValueError("Cannot compute mean on an empty dataset") - elif len(ret) == 1: - return ret[0] else: - return ret + return self._aggregate_result(ret) def std(self, on: Union[KeyFn, List[KeyFn]] = None, ddof: int = 1) -> U: """Compute standard deviation over entire dataset. @@ -1260,10 +1256,8 @@ class Dataset(Generic[T]): ret = self._aggregate_on(Std, on, ddof=ddof) if ret is None: raise ValueError("Cannot compute std on an empty dataset") - elif len(ret) == 1: - return ret[0] else: - return ret + return self._aggregate_result(ret) def sort(self, key: KeyFn = None, descending: bool = False) -> "Dataset[T]": """Sort the dataset by the specified key column or key function. @@ -2261,10 +2255,10 @@ Dict[str, List[str]]]): The names of the columns def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame": """Convert this dataset into a single Pandas DataFrame. - This is only supported for datasets convertible to Arrow records. An - error is raised if the number of records exceeds the provided limit. - Note that you can use ``.limit()`` on the dataset beforehand to - truncate the dataset manually. + This is only supported for datasets convertible to Arrow or Pandas + records. An error is raised if the number of records exceeds the + provided limit. Note that you can use ``.limit()`` on the dataset + beforehand to truncate the dataset manually. Time complexity: O(dataset size) @@ -2606,6 +2600,10 @@ Dict[str, List[str]]]): The names of the columns return "arrow" except ModuleNotFoundError: pass + from ray.data.impl.pandas_block import PandasBlockSchema + + if isinstance(schema, PandasBlockSchema): + return "pandas" return "simple" def _aggregate_on( @@ -2652,6 +2650,18 @@ Dict[str, List[str]]]): The names of the columns on = [on] return [agg_cls(on_, *args, **kwargs) for on_ in on] + def _aggregate_result(self, result: Union[Tuple, TableRow]) -> U: + if len(result) == 1: + if isinstance(result, tuple): + return result[0] + else: + # NOTE (kfstorm): We cannot call `result[0]` directly on + # `PandasRow` because indexing a column with position is not + # supported by pandas. + return list(result.values())[0] + else: + return result + def __repr__(self) -> str: schema = self.schema() if schema is None: diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index 42e700181..6a4a4e46e 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -79,7 +79,7 @@ class ArrowBlockAccessor(TableBlockAccessor): view = _copy_table(view) return view - def random_shuffle(self, random_seed: Optional[int]) -> List[T]: + def random_shuffle(self, random_seed: Optional[int]) -> "pyarrow.Table": random = np.random.RandomState(random_seed) return self._table.take(random.permutation(self.num_rows())) diff --git a/python/ray/data/impl/delegating_block_builder.py b/python/ray/data/impl/delegating_block_builder.py index 7cf65b9c1..50c4a084b 100644 --- a/python/ray/data/impl/delegating_block_builder.py +++ b/python/ray/data/impl/delegating_block_builder.py @@ -4,6 +4,7 @@ from ray.data.block import Block, T, BlockAccessor from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder from ray.data.impl.arrow_block import ArrowRow, ArrowBlockBuilder +from ray.data.impl.pandas_block import PandasRow, PandasBlockBuilder class DelegatingBlockBuilder(BlockBuilder[T]): @@ -13,6 +14,7 @@ class DelegatingBlockBuilder(BlockBuilder[T]): def add(self, item: Any) -> None: if self._builder is None: + # TODO (kfstorm): Maybe we can use Pandas block format for dict. if isinstance(item, dict) or isinstance(item, ArrowRow): import pyarrow @@ -23,6 +25,8 @@ class DelegatingBlockBuilder(BlockBuilder[T]): self._builder = ArrowBlockBuilder() except (TypeError, pyarrow.lib.ArrowInvalid): self._builder = SimpleBlockBuilder() + elif isinstance(item, PandasRow): + self._builder = PandasBlockBuilder() else: self._builder = SimpleBlockBuilder() self._builder.add(item) diff --git a/python/ray/data/impl/pandas_block.py b/python/ray/data/impl/pandas_block.py new file mode 100644 index 000000000..915dd7fd0 --- /dev/null +++ b/python/ray/data/impl/pandas_block.py @@ -0,0 +1,195 @@ +from typing import Dict, List, Tuple, Any, TypeVar, Optional, TYPE_CHECKING + +import collections +import numpy as np + +from ray.data.block import BlockAccessor, BlockMetadata, KeyFn +from ray.data.impl.table_block import TableBlockAccessor, TableRow, TableBlockBuilder +from ray.data.impl.arrow_block import ArrowBlockAccessor +from ray.data.aggregate import AggregateFn + +if TYPE_CHECKING: + import pyarrow + import pandas + from ray.data.impl.sort import SortKeyT + +T = TypeVar("T") + +_pandas = None + + +def lazy_import_pandas(): + global _pandas + if _pandas is None: + import pandas + + _pandas = pandas + return _pandas + + +class PandasRow(TableRow): + def as_pydict(self) -> dict: + return {k: v[0] for k, v in self._row.to_dict("list").items()} + + def __getitem__(self, key: str) -> Any: + assert isinstance(key, str) + col = self._row[key] + if len(col) == 0: + return None + item = col.iloc[0] + try: + # Try to interpret this as a numpy-type value. + # See https://stackoverflow.com/questions/9452775/converting-numpy-dtypes-to-native-python-types. # noqa: E501 + return item.item() + except AttributeError: + # Fallback to the original form. + return item + + def __len__(self): + return self._row.shape[1] + + +class PandasBlockBuilder(TableBlockBuilder[T]): + def __init__(self): + pandas = lazy_import_pandas() + super().__init__(pandas.DataFrame) + + def _table_from_pydict(self, columns: Dict[str, List[Any]]) -> "pandas.DataFrame": + pandas = lazy_import_pandas() + return pandas.DataFrame(columns) + + def _concat_tables(self, tables: List["pandas.DataFrame"]) -> "pandas.DataFrame": + pandas = lazy_import_pandas() + return pandas.concat(tables, ignore_index=True) + + @staticmethod + def _empty_table() -> "pandas.DataFrame": + pandas = lazy_import_pandas() + return pandas.DataFrame() + + +# This is to be compatible with pyarrow.lib.schema +# TODO (kfstorm): We need a format-independent way to represent schema. +PandasBlockSchema = collections.namedtuple("PandasBlockSchema", ["names", "types"]) + + +class PandasBlockAccessor(TableBlockAccessor): + def __init__(self, table: "pandas.DataFrame"): + super().__init__(table) + + def _create_table_row(self, row: "pandas.DataFrame") -> PandasRow: + return PandasRow(row) + + def slice(self, start: int, end: int, copy: bool) -> "pandas.DataFrame": + view = self._table[start:end] + if copy: + view = view.copy(deep=True) + return view + + def random_shuffle(self, random_seed: Optional[int]) -> "pandas.DataFrame": + return self._table.sample(frac=1, random_state=random_seed) + + def schema(self) -> PandasBlockSchema: + dtypes = self._table.dtypes + schema = PandasBlockSchema( + names=dtypes.index.tolist(), types=dtypes.values.tolist() + ) + # Column names with non-str types of a pandas DataFrame is not + # supported by Ray Dataset. + if any(not isinstance(name, str) for name in schema.names): + raise ValueError( + "A Pandas DataFrame with column names of non-str types" + " is not supported by Ray Dataset. Column names of this" + f" DataFrame: {schema.names!r}." + ) + return schema + + def to_pandas(self) -> "pandas.DataFrame": + return self._table + + def to_numpy(self, column: str = None) -> np.ndarray: + if not column: + raise ValueError( + "`column` must be specified when calling .to_numpy() " + "on Pandas blocks." + ) + if column not in self._table.columns: + raise ValueError( + "Cannot find column {}, available columns: {}".format( + column, self._table.columns.tolist() + ) + ) + return self._table[column].to_numpy() + + def to_arrow(self) -> "pyarrow.Table": + import pyarrow + + return pyarrow.table(self._table) + + def num_rows(self) -> int: + return self._table.shape[0] + + def size_bytes(self) -> int: + return self._table.memory_usage(index=True, deep=True).sum() + + def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame": + r = self.to_pandas().copy(deep=False) + s = acc.to_pandas() + for col_name in s.columns: + col = s[col_name] + # Ensure the column names are unique after zip. + if col_name in r.column_names: + i = 1 + new_name = col_name + while new_name in r.column_names: + new_name = "{}_{}".format(col_name, i) + i += 1 + col_name = new_name + r[col_name] = col + return r + + @staticmethod + def builder() -> PandasBlockBuilder[T]: + return PandasBlockBuilder() + + @staticmethod + def _empty_table() -> "pandas.DataFrame": + return PandasBlockBuilder._empty_table() + + def _sample(self, n_samples: int, key: "SortKeyT") -> "pandas.DataFrame": + return self._table[[k[0] for k in key]].sample(n_samples, ignore_index=True) + + def sort_and_partition( + self, boundaries: List[T], key: "SortKeyT", descending: bool + ) -> List["pandas.DataFrame"]: + # TODO (kfstorm): A workaround to pass tests. Not efficient. + delegated_result = BlockAccessor.for_block(self.to_arrow()).sort_and_partition( + boundaries, key, descending + ) + return [BlockAccessor.for_block(_).to_pandas() for _ in delegated_result] + + def combine(self, key: KeyFn, aggs: Tuple[AggregateFn]) -> "pandas.DataFrame": + # TODO (kfstorm): A workaround to pass tests. Not efficient. + return BlockAccessor.for_block(self.to_arrow()).combine(key, aggs).to_pandas() + + @staticmethod + def merge_sorted_blocks( + blocks: List["pandas.DataFrame"], key: "SortKeyT", _descending: bool + ) -> Tuple["pandas.DataFrame", BlockMetadata]: + # TODO (kfstorm): A workaround to pass tests. Not efficient. + block, metadata = ArrowBlockAccessor.merge_sorted_blocks( + [BlockAccessor.for_block(block).to_arrow() for block in blocks], + key, + _descending, + ) + return BlockAccessor.for_block(block).to_pandas(), metadata + + @staticmethod + def aggregate_combined_blocks( + blocks: List["pandas.DataFrame"], key: KeyFn, aggs: Tuple[AggregateFn] + ) -> Tuple["pandas.DataFrame", BlockMetadata]: + # TODO (kfstorm): A workaround to pass tests. Not efficient. + block, metadata = ArrowBlockAccessor.aggregate_combined_blocks( + [BlockAccessor.for_block(block).to_arrow() for block in blocks], key, aggs + ) + return BlockAccessor.for_block(block).to_pandas(), metadata diff --git a/python/ray/data/impl/simple_block.py b/python/ray/data/impl/simple_block.py index 638b73de0..e1d07a43e 100644 --- a/python/ray/data/impl/simple_block.py +++ b/python/ray/data/impl/simple_block.py @@ -76,7 +76,7 @@ class SimpleBlockAccessor(BlockAccessor): def to_pandas(self) -> "pandas.DataFrame": import pandas - return pandas.DataFrame(self._items) + return pandas.DataFrame({"value": self._items}) def to_numpy(self, column: str = None) -> np.ndarray: if column: diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 744ae907b..6face8241 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -606,8 +606,21 @@ def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]: partitions = df.to_delayed() persisted_partitions = dask.persist(*partitions, scheduler=ray_dask_get) + + import pandas + + def to_ref(df): + if isinstance(df, pandas.DataFrame): + return ray.put(df) + elif isinstance(df, ray.ObjectRef): + return df + else: + raise ValueError( + "Expected a Ray object ref or a Pandas DataFrame, " f"got {type(df)}" + ) + return from_pandas_refs( - [next(iter(part.dask.values())) for part in persisted_partitions] + [to_ref(next(iter(part.dask.values()))) for part in persisted_partitions] ) @@ -675,6 +688,23 @@ def from_pandas_refs( """ if isinstance(dfs, ray.ObjectRef): dfs = [dfs] + elif isinstance(dfs, list): + for df in dfs: + if not isinstance(df, ray.ObjectRef): + raise ValueError( + "Expected list of Ray object refs, " + f"got list containing {type(df)}" + ) + else: + raise ValueError( + "Expected Ray object ref or list of Ray object refs, " f"got {type(df)}" + ) + + context = DatasetContext.get_current() + if context.enable_pandas_block: + get_metadata = cached_remote_fn(_get_metadata) + metadata = [get_metadata.remote(df) for df in dfs] + return Dataset(BlockList(dfs, ray.get(metadata)), 0, DatasetStats.TODO()) df_to_block = cached_remote_fn(_df_to_block, num_returns=2) @@ -803,7 +833,7 @@ def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: ) -def _get_metadata(table: "pyarrow.Table") -> BlockMetadata: +def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMetadata: stats = BlockExecStats.builder() return BlockAccessor.for_block(table).get_metadata( input_files=None, exec_stats=stats.build() diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 83fcaac58..420be3766 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -327,7 +327,7 @@ def test_batch_tensors(ray_start_regular_shared): with pytest.raises(pa.lib.ArrowInvalid): next(ds.iter_batches(batch_format="pyarrow")) df = next(ds.iter_batches(batch_format="pandas")) - assert df.to_dict().keys() == {0, 1} + assert df.to_dict().keys() == {"value"} def test_arrow_block_slice_copy(): @@ -1178,34 +1178,52 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared): assert large._block_num_rows() == [500] * 20 -def test_from_pandas(ray_start_regular_shared): - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) - ds = ray.data.from_pandas([df1, df2]) - values = [(r["one"], r["two"]) for r in ds.take(6)] - rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()] - assert values == rows +@pytest.mark.parametrize("enable_pandas_block", [False, True]) +def test_from_pandas(ray_start_regular_shared, enable_pandas_block): + ctx = ray.data.context.DatasetContext.get_current() + old_enable_pandas_block = ctx.enable_pandas_block + ctx.enable_pandas_block = enable_pandas_block + try: + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) + ds = ray.data.from_pandas([df1, df2]) + assert ds._dataset_format() == "pandas" if enable_pandas_block else "arrow" + values = [(r["one"], r["two"]) for r in ds.take(6)] + rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()] + assert values == rows - # test from single pandas dataframe - ds = ray.data.from_pandas(df1) - values = [(r["one"], r["two"]) for r in ds.take(3)] - rows = [(r.one, r.two) for _, r in df1.iterrows()] - assert values == rows + # test from single pandas dataframe + ds = ray.data.from_pandas(df1) + assert ds._dataset_format() == "pandas" if enable_pandas_block else "arrow" + values = [(r["one"], r["two"]) for r in ds.take(3)] + rows = [(r.one, r.two) for _, r in df1.iterrows()] + assert values == rows + finally: + ctx.enable_pandas_block = old_enable_pandas_block -def test_from_pandas_refs(ray_start_regular_shared): - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) - ds = ray.data.from_pandas_refs([ray.put(df1), ray.put(df2)]) - values = [(r["one"], r["two"]) for r in ds.take(6)] - rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()] - assert values == rows +@pytest.mark.parametrize("enable_pandas_block", [False, True]) +def test_from_pandas_refs(ray_start_regular_shared, enable_pandas_block): + ctx = ray.data.context.DatasetContext.get_current() + old_enable_pandas_block = ctx.enable_pandas_block + ctx.enable_pandas_block = enable_pandas_block + try: + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) + ds = ray.data.from_pandas_refs([ray.put(df1), ray.put(df2)]) + assert ds._dataset_format() == "pandas" if enable_pandas_block else "arrow" + values = [(r["one"], r["two"]) for r in ds.take(6)] + rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()] + assert values == rows - # test from single pandas dataframe ref - ds = ray.data.from_pandas_refs(ray.put(df1)) - values = [(r["one"], r["two"]) for r in ds.take(3)] - rows = [(r.one, r.two) for _, r in df1.iterrows()] - assert values == rows + # test from single pandas dataframe ref + ds = ray.data.from_pandas_refs(ray.put(df1)) + assert ds._dataset_format() == "pandas" if enable_pandas_block else "arrow" + values = [(r["one"], r["two"]) for r in ds.take(3)] + rows = [(r.one, r.two) for _, r in df1.iterrows()] + assert values == rows + finally: + ctx.enable_pandas_block = old_enable_pandas_block def test_from_numpy(ray_start_regular_shared): @@ -1310,7 +1328,7 @@ def test_to_arrow_refs(ray_start_regular_shared): assert df.equals(dfds) # Conversion. - df = pd.DataFrame({0: list(range(n))}) + df = pd.DataFrame({"value": list(range(n))}) ds = ray.data.range(n) dfds = pd.concat( [t.to_pandas() for t in ray.get(ds.to_arrow_refs())], ignore_index=True @@ -1721,8 +1739,8 @@ def test_parquet_write_with_udf(ray_start_regular_shared, tmp_path): df = pd.concat([df1, df2]) ds = ray.data.from_pandas([df1, df2]) - def _block_udf(block: pa.Table): - df = block.to_pandas() + def _block_udf(block): + df = BlockAccessor.for_block(block).to_pandas().copy() df["one"] += 1 return pa.Table.from_pandas(df) @@ -1925,7 +1943,7 @@ def test_iter_batches_basic(ray_start_regular_shared): # blocks format. for batch, df in zip(ds.iter_batches(batch_format="native"), dfs): - assert batch.to_pandas().equals(df) + assert BlockAccessor.for_block(batch).to_pandas().equals(df) # Batch size. batch_size = 2 @@ -2088,9 +2106,9 @@ def test_map_batch(ray_start_regular_shared, tmp_path): table = pa.Table.from_pandas(df) pq.write_table(table, os.path.join(tmp_path, "test1.parquet")) ds = ray.data.read_parquet(str(tmp_path)) - ds_list = ds.map_batches( - lambda df: df + 1, batch_size=1, batch_format="pandas" - ).take() + ds2 = ds.map_batches(lambda df: df + 1, batch_size=1, batch_format="pandas") + assert ds2._dataset_format() == "pandas" + ds_list = ds2.take() values = [s["one"] for s in ds_list] assert values == [2, 3, 4] values = [s["two"] for s in ds_list] @@ -2098,7 +2116,9 @@ def test_map_batch(ray_start_regular_shared, tmp_path): # Test Pyarrow ds = ray.data.read_parquet(str(tmp_path)) - ds_list = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow").take() + ds2 = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") + assert ds2._dataset_format() == "arrow" + ds_list = ds2.take() values = [s["one"] for s in ds_list] assert values == [1, 2, 3] values = [s["two"] for s in ds_list] @@ -2107,28 +2127,30 @@ def test_map_batch(ray_start_regular_shared, tmp_path): # Test batch size = 300 ds = ray.data.range(size) - ds_list = ds.map_batches( - lambda df: df + 1, batch_size=17, batch_format="pandas" - ).take(limit=size) + ds2 = ds.map_batches(lambda df: df + 1, batch_size=17, batch_format="pandas") + assert ds2._dataset_format() == "pandas" + ds_list = ds2.take(limit=size) for i in range(size): - # The pandas column is "0", and it originally has rows from 0~299. + # The pandas column is "value", and it originally has rows from 0~299. # After the map batch, it should have 1~300. row = ds_list[i] - assert row["0"] == i + 1 + assert row["value"] == i + 1 assert ds.count() == 300 # Test the lambda returns different types than the batch_format # pandas => list block ds = ray.data.read_parquet(str(tmp_path)) - ds_list = ds.map_batches(lambda df: [1], batch_size=1).take() + ds2 = ds.map_batches(lambda df: [1], batch_size=1) + assert ds2._dataset_format() == "simple" + ds_list = ds2.take() assert ds_list == [1, 1, 1] assert ds.count() == 3 # pyarrow => list block ds = ray.data.read_parquet(str(tmp_path)) - ds_list = ds.map_batches( - lambda df: [1], batch_size=1, batch_format="pyarrow" - ).take() + ds2 = ds.map_batches(lambda df: [1], batch_size=1, batch_format="pyarrow") + assert ds2._dataset_format() == "simple" + ds_list = ds2.take() assert ds_list == [1, 1, 1] assert ds.count() == 3 @@ -3812,6 +3834,18 @@ def test_sort_simple(ray_start_regular_shared): assert ds.count() == 0 +def test_column_name_type_check(ray_start_regular_shared): + df = pd.DataFrame({"1": np.random.rand(10), "a": np.random.rand(10)}) + ds = ray.data.from_pandas(df) + expected_str = ( + "Dataset(num_blocks=1, num_rows=10, " "schema={1: float64, a: float64})" + ) + assert str(ds) == expected_str, str(ds) + df = pd.DataFrame({1: np.random.rand(10), "a": np.random.rand(10)}) + with pytest.raises(ValueError): + ray.data.from_pandas(df) + + @pytest.mark.parametrize("pipelined", [False, True]) def test_random_shuffle(shutdown_only, pipelined): def range(n, parallelism=200):