From 4584193308a749bdea6dd757fc0148c2950d0b30 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Tue, 22 May 2018 16:34:07 -0700 Subject: [PATCH] [DataFrame] Refactor GroupBy Methods and Implement Reindex (#2101) * fix 1D blocks case * Add groupby test code * begin writing tests * Fix bug on groupby(axis=1, ...) * implement reindex * fix index misalignment after groupby * fix df.apply bug * fix groupby.apply * fix agg funcs * finish groupby tests * finish test suite for groupby * fixing lint * undo new line * fix python2 index copy bug * Concat Series into ray.df * fixing python2 issues * resolving all python 2 tests * handle multiindex on apply * resolve comments * updating docstring * fix lint * fix lint again * address comments --- .travis.yml | 2 + python/ray/dataframe/concat.py | 6 +- python/ray/dataframe/dataframe.py | 168 ++++-- python/ray/dataframe/groupby.py | 415 +++++++++----- python/ray/dataframe/test/test_dataframe.py | 43 +- python/ray/dataframe/test/test_groupby.py | 589 ++++++++++++++++++++ python/ray/dataframe/utils.py | 22 +- 7 files changed, 1054 insertions(+), 191 deletions(-) create mode 100644 python/ray/dataframe/test/test_groupby.py diff --git a/.travis.yml b/.travis.yml index cb87bf07f..e781f3f64 100644 --- a/.travis.yml +++ b/.travis.yml @@ -135,6 +135,7 @@ matrix: # - python -m pytest python/ray/dataframe/test/test_dataframe.py - python -m pytest python/ray/dataframe/test/test_concat.py - python -m pytest python/ray/dataframe/test/test_io.py + # - python -m pytest python/ray/dataframe/test/test_groupby.py # ray tune tests # - python python/ray/tune/test/dependency_test.py @@ -199,6 +200,7 @@ script: - python -m pytest python/ray/dataframe/test/test_dataframe.py - python -m pytest python/ray/dataframe/test/test_concat.py - python -m pytest python/ray/dataframe/test/test_io.py + - python -m pytest python/ray/dataframe/test/test_groupby.py # ray tune tests - python python/ray/tune/test/dependency_test.py diff --git a/python/ray/dataframe/concat.py b/python/ray/dataframe/concat.py index 0206be643..ef3686284 100644 --- a/python/ray/dataframe/concat.py +++ b/python/ray/dataframe/concat.py @@ -41,9 +41,9 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, all_series = all(isinstance(obj, pandas.Series) for obj in objs) if all_series: - return pandas.concat(objs, axis, join, join_axes, - ignore_index, keys, levels, names, - verify_integrity, copy) + return DataFrame(pandas.concat(objs, axis, join, join_axes, + ignore_index, keys, levels, names, + verify_integrity, copy)) if isinstance(objs, dict): raise NotImplementedError( diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index e9fcac295..37cb90bc2 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -29,12 +29,12 @@ import io import sys import re -from .groupby import DataFrameGroupBy from .utils import ( _deploy_func, _map_partitions, _partition_pandas_dataframe, to_pandas, + create_blocks_helper, _blocks_to_col, _blocks_to_row, _create_block_partitions, @@ -43,7 +43,8 @@ from .utils import ( _co_op_helper, _match_partitioning, _concat_index, - _correct_column_dtypes) + _correct_column_dtypes, + fix_blocks_dimensions) from . import get_npartitions from .index_metadata import _IndexMetadata from .iterator import PartitionIterator @@ -119,9 +120,12 @@ class DataFrame(object): "for internal DataFrame creations" if block_partitions is not None: + axis = 0 # put in numpy array here to make accesses easier since it's 2D self._block_partitions = np.array(block_partitions) - axis = 0 + self._block_partitions = \ + fix_blocks_dimensions(self._block_partitions, axis) + else: if row_partitions is not None: axis = 0 @@ -144,13 +148,6 @@ class DataFrame(object): _create_block_partitions(partitions, axis=axis, length=axis_length) - # Sometimes we only get a single column or row, which is - # problematic for building blocks from the partitions, so we - # add whatever dimension we're missing from the input. - if self._block_partitions.ndim < 2: - self._block_partitions = np.expand_dims(self._block_partitions, - axis=axis ^ 1) - assert self._block_partitions.ndim == 2, "Block Partitions must be 2D." # Create the row and column index objects for using our partitioning. @@ -674,6 +671,7 @@ class DataFrame(object): elif mismatch: raise KeyError(next(x for x in by if x not in self)) + from .groupby import DataFrameGroupBy return DataFrameGroupBy(self, by, axis, level, as_index, sort, group_keys, squeeze, **kwargs) @@ -984,36 +982,23 @@ class DataFrame(object): raise ValueError("{} is an unknown string function".format(func)) def _callable_function(self, func, axis, *args, **kwargs): - if axis == 0: - partitions = self._col_partitions - else: - partitions = self._row_partitions + kwargs['axis'] = axis - if axis == 1: - kwargs['axis'] = axis - kwargs['temp_columns'] = self.columns - else: - kwargs['temp_index'] = self.index - - def agg_helper(df, arg, *args, **kwargs): - if 'temp_index' in kwargs: - df.index = kwargs.pop('temp_index', None) - else: - df.columns = kwargs.pop('temp_columns', None) + def agg_helper(df, arg, index, columns, *args, **kwargs): + df.index = index + df.columns = columns is_transform = kwargs.pop('is_transform', False) new_df = df.agg(arg, *args, **kwargs) is_series = False + index = None + columns = None if isinstance(new_df, pd.Series): is_series = True - index = None - columns = None else: - index = new_df.index \ - if not isinstance(new_df.index, pd.RangeIndex) \ - else None columns = new_df.columns + index = new_df.index new_df.columns = pd.RangeIndex(0, len(new_df.columns)) new_df.reset_index(drop=True, inplace=True) @@ -1024,13 +1009,37 @@ class DataFrame(object): return is_series, new_df, index, columns - remote_result = \ - [_deploy_func._submit(args=(lambda df: agg_helper(df, - func, - *args, - **kwargs), - part), num_return_vals=4) - for part in partitions] + if axis == 0: + index = self.index + columns = [self._col_metadata.partition_series(i).index + for i in range(len(self._col_partitions))] + + remote_result = \ + [_deploy_func._submit(args=( + lambda df: agg_helper(df, + func, + index, + cols, + *args, + **kwargs), + part), num_return_vals=4) + for cols, part in zip(columns, self._col_partitions)] + + if axis == 1: + indexes = [self._row_metadata.partition_series(i).index + for i in range(len(self._row_partitions))] + columns = self.columns + + remote_result = \ + [_deploy_func._submit(args=( + lambda df: agg_helper(df, + func, + index, + columns, + *args, + **kwargs), + part), num_return_vals=4) + for index, part in zip(indexes, self._row_partitions)] # This magic transposes the list comprehension returned from remote is_series, new_parts, index, columns = \ @@ -1053,21 +1062,22 @@ class DataFrame(object): # remote objects. We build a Ray DataFrame from the Pandas partitions. elif axis == 0: new_index = ray.get(index[0]) - columns = ray.get(columns) - columns = columns[0].append(columns[1:]) + # This does not handle the Multi Index case + new_columns = ray.get(columns) + new_columns = new_columns[0].append(new_columns[1:]) return DataFrame(col_partitions=new_parts, - columns=columns, - index=self.index if new_index is None - else new_index) + columns=new_columns, + index=new_index) else: - new_index = ray.get(index[0]) - columns = ray.get(columns) - columns = columns[0].append(columns[1:]) + new_columns = ray.get(columns[0]) + # This does not handle the Multi Index case + new_index = ray.get(index) + new_index = new_index[0].append(new_index[1:]) + return DataFrame(row_partitions=new_parts, - columns=columns, - index=self.index if new_index is None - else new_index) + columns=new_columns, + index=new_index) def align(self, other, join='outer', axis=None, level=None, copy=True, fill_value=None, method=None, limit=None, fill_axis=0, @@ -1234,7 +1244,7 @@ class DataFrame(object): Returns: values: ndarray """ - # TODO this is very inneficient, also see __array__ + # TODO this is very inefficient, also see __array__ return to_pandas(self).as_matrix(columns) def asfreq(self, freq, method=None, how=None, normalize=False, @@ -3323,9 +3333,48 @@ class DataFrame(object): def reindex(self, labels=None, index=None, columns=None, axis=None, method=None, copy=True, level=None, fill_value=np.nan, limit=None, tolerance=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + if level is not None: + raise NotImplementedError( + "Multilevel Index not Implemented. " + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + + axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + else 0 + if axis == 0 and labels is not None: + index = labels + elif labels is not None: + columns = labels + + new_blocks = self._block_partitions + if index is not None: + old_index = self.index + new_blocks = np.array([reindex_helper._submit( + args=(old_index, index, 1, len(new_blocks), method, + fill_value, limit, tolerance) + tuple(block.tolist()), + num_return_vals=len(new_blocks)) + for block in new_blocks.T]).T + else: + index = self.index + + if columns is not None: + old_columns = self.columns + new_blocks = np.array([reindex_helper._submit( + args=(old_columns, columns, 0, new_blocks.shape[1], method, + fill_value, limit, tolerance) + tuple(block.tolist()), + num_return_vals=new_blocks.shape[1]) + for block in new_blocks]) + else: + columns = self.columns + + if copy: + return DataFrame(block_partitions=new_blocks, + index=index, + columns=columns) + + self._update_inplace(block_partitions=new_blocks, + index=index, + columns=columns) def reindex_axis(self, labels, axis=0, method=None, level=None, copy=True, limit=None, fill_value=np.nan): @@ -5324,3 +5373,18 @@ def _where_helper(left, cond, other, left_columns, cond_columns, other.columns = other_columns return left.where(cond, other, *args) + + +@ray.remote +def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value, + limit, tolerance, *df): + df = pd.concat(df, axis=axis ^ 1) + if axis == 1: + df.index = old_index + else: + df.columns = old_index + + df = df.reindex(new_index, copy=False, axis=axis ^ 1, + method=method, fill_value=fill_value, + limit=limit, tolerance=tolerance) + return create_blocks_helper(df, npartitions, axis) diff --git a/python/ray/dataframe/groupby.py b/python/ray/dataframe/groupby.py index 5925ca849..b2c2b9fdc 100644 --- a/python/ray/dataframe/groupby.py +++ b/python/ray/dataframe/groupby.py @@ -2,13 +2,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pandas.core.groupby import pandas as pd +import numpy as np +import pandas.core.groupby from pandas.core.dtypes.common import is_list_like +import pandas.core.common as com + import ray -from .utils import _map_partitions -from .utils import _inherit_docstrings +from .utils import _inherit_docstrings, _reindex_helper +from .concat import concat @_inherit_docstrings(pandas.core.groupby.DataFrameGroupBy, @@ -32,22 +35,30 @@ class DataFrameGroupBy(object): .groupby(by=by, sort=sort) else: partitions = [row for row in df._block_partitions] - self._index_grouped = pd.Series(self._columns, index=self._index)\ + self._index_grouped = \ + pd.Series(self._columns, index=self._columns) \ .groupby(by=by, sort=sort) self._keys_and_values = [(k, v) for k, v in self._index_grouped] - self._grouped_partitions = \ - list(zip(*(groupby._submit(args=(by, - axis, - level, - as_index, - sort, - group_keys, - squeeze) + tuple(part.tolist()), - num_return_vals=len(self)) - for part in partitions))) + if len(self) > 1: + self._grouped_partitions = \ + list(zip(*(groupby._submit(args=(by, + axis, + level, + as_index, + sort, + group_keys, + squeeze) + + tuple(part.tolist()), + num_return_vals=len(self)) + for part in partitions))) + else: + if axis == 0: + self._grouped_partitions = [df._col_partitions] + else: + self._grouped_partitions = [df._row_partitions] @property def _iter(self): @@ -58,8 +69,6 @@ class DataFrameGroupBy(object): DataFrame(col_partitions=part, columns=self._columns, index=self._keys_and_values[i][1].index, - row_metadata=self._row_metadata[ - self._keys_and_values[i][1].index], col_metadata=self._col_metadata)) for i, part in enumerate(self._grouped_partitions)] else: @@ -67,9 +76,7 @@ class DataFrameGroupBy(object): DataFrame(row_partitions=part, columns=self._keys_and_values[i][1].index, index=self._index, - row_metadata=self._row_metadata, - col_metadata=self._col_metadata[ - self._keys_and_values[i][1].index])) + row_metadata=self._row_metadata)) for i, part in enumerate(self._grouped_partitions)] @property @@ -77,84 +84,151 @@ class DataFrameGroupBy(object): return len(self) def skew(self, **kwargs): - return self._apply_agg_function(lambda df: df.skew(**kwargs)) + return self._apply_agg_function(lambda df: df.skew(axis=self._axis, + **kwargs)) def ffill(self, limit=None): - return self._apply_agg_function(lambda df: df.ffill(limit=limit)) + return self._apply_df_function(lambda df: df.ffill(axis=self._axis, + limit=limit)) def sem(self, ddof=1): - return self._apply_agg_function(lambda df: df.sem(ddof=ddof)) + return self._apply_agg_function(lambda df: df.sem(axis=self._axis, + ddof=ddof)) def mean(self, *args, **kwargs): - return self._apply_agg_function(lambda df: df.mean(*args, **kwargs)) + return self._apply_agg_function(lambda df: df.mean(axis=self._axis, + *args, + **kwargs)) def any(self): - return self._apply_agg_function(lambda df: df.any()) + return self._apply_agg_function(lambda df: df.any(axis=self._axis)) @property def plot(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def ohlc(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def __bytes__(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") @property def tshift(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") @property def groups(self): return {k: pd.Index(v) for k, v in self._keys_and_values} def min(self, **kwargs): - return self._apply_agg_function(lambda df: df.min(**kwargs)) + return self._apply_agg_function(lambda df: df.min(axis=self._axis, + **kwargs)) def idxmax(self): - return self._apply_agg_function(lambda df: df.idxmax()) + def idxmax_helper(df, index): + result = df.idxmax(axis=self._axis) + result = result.apply(lambda v: index[v]) + return result + + results = [idxmax_helper(g[1], i[1]) + for g, i in zip(self._iter, self._index_grouped)] + + new_df = concat(results, axis=1) + if self._axis == 0: + new_df = new_df.T + new_df.columns = self._columns + new_df.index = [k for k, v in self._iter] + else: + new_df.columns = [k for k, v in self._iter] + new_df.index = self._index + return new_df @property def ndim(self): - return self._index_grouped.ndim + return 2 # ndim is always 2 for DataFrames def shift(self, periods=1, freq=None, axis=0): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def nth(self, n, dropna=None): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def cumsum(self, axis=0, *args, **kwargs): - return self._apply_agg_function(lambda df: df.cumsum(axis, - *args, - **kwargs)) + return self._apply_df_function(lambda df: df.cumsum(axis, + *args, + **kwargs)) @property def indices(self): return dict(self._keys_and_values) def pct_change(self): - return self._apply_agg_function(lambda df: df.pct_change()) + return self._apply_agg_function( + lambda df: df.pct_change(axis=self._axis)) def filter(self, func, dropna=True, *args, **kwargs): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def cummax(self, axis=0, **kwargs): - return self._apply_agg_function(lambda df: df.cummax(axis=axis, - **kwargs)) + return self._apply_df_function(lambda df: df.cummax(axis, + **kwargs)) def apply(self, func, *args, **kwargs): - return self._apply_df_function(lambda df: df.apply(func, - *args, - **kwargs)) \ - if is_list_like(func) \ - else self._apply_agg_function(lambda df: df.apply(func, - *args, - **kwargs)) + def apply_helper(df): + return df.apply(func, axis=self._axis, *args, **kwargs) + + result = [func(v) for k, v in self._iter] + if self._axis == 0: + if isinstance(result[0], pd.Series): + # Applied an aggregation function + new_df = concat(result, axis=1).T + new_df.columns = self._columns + new_df.index = [k for k, v in self._iter] + else: + new_df = concat(result, axis=self._axis) + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.index, self._index, self._axis ^ 1, + len(new_df._block_partitions)] + + block.tolist()), + num_return_vals=len(new_df._block_partitions)) + for block in new_df._block_partitions.T]).T + new_df.index = self._index + else: + if isinstance(result[0], pd.Series): + # Applied an aggregation function + new_df = concat(result, axis=1) + new_df.columns = [k for k, v in self._iter] + new_df.index = self._index + else: + new_df = concat(result, axis=self._axis) + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.columns, self._columns, self._axis ^ 1, + new_df._block_partitions.shape[1]] + + block.tolist()), + num_return_vals=new_df._block_partitions.shape[1]) + for block in new_df._block_partitions]) + new_df.columns = self._columns + return new_df @property def dtypes(self): + if self._axis == 1: + raise ValueError("Cannot call dtypes on groupby with axis=1") return self._apply_agg_function(lambda df: df.dtypes) def first(self, **kwargs): @@ -166,60 +240,98 @@ class DataFrameGroupBy(object): def __getitem__(self, key): # This operation requires a SeriesGroupBy Object - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def cummin(self, axis=0, **kwargs): - return self._apply_agg_function(lambda df: df.cummin(axis=axis, - **kwargs)) - - def bfill(self, limit=None): - return self._apply_agg_function(lambda df: df.bfill(limit=limit)) - - def idxmin(self): - return self._apply_agg_function(lambda df: df.idxmin()) - - def prod(self, **kwargs): - return self._apply_agg_function(lambda df: df.prod(**kwargs)) - - def std(self, ddof=1, *args, **kwargs): - return self._apply_agg_function(lambda df: df.std(ddof=ddof, - *args, **kwargs)) - - def aggregate(self, arg, *args, **kwargs): - return self._apply_df_function(lambda df: df.agg(arg, - *args, - **kwargs)) \ - if is_list_like(arg) \ - else self._apply_agg_function(lambda df: df.agg(arg, - *args, + return self._apply_df_function(lambda df: df.cummin(axis=axis, **kwargs)) + def bfill(self, limit=None): + return self._apply_df_function(lambda df: df.bfill(axis=self._axis, + limit=limit)) + + def idxmin(self): + def idxmin_helper(df, index): + result = df.idxmin(axis=self._axis) + result = result.apply(lambda v: index[v]) + return result + + results = [idxmin_helper(g[1], i[1]) + for g, i in zip(self._iter, self._index_grouped)] + + new_df = concat(results, axis=1) + if self._axis == 0: + new_df = new_df.T + new_df.columns = self._columns + new_df.index = [k for k, v in self._iter] + else: + new_df.columns = [k for k, v in self._iter] + new_df.index = self._index + return new_df + + def prod(self, **kwargs): + return self._apply_agg_function(lambda df: df.prod(axis=self._axis, + **kwargs)) + + def std(self, ddof=1, *args, **kwargs): + return self._apply_agg_function(lambda df: df.std(axis=self._axis, + ddof=ddof, + *args, + **kwargs)) + + def aggregate(self, arg, *args, **kwargs): + if self._axis != 0: + # This is not implemented in pandas, + # so we throw a different message + raise NotImplementedError("axis other than 0 is not supported") + + if is_list_like(arg): + raise NotImplementedError( + "This requires Multi-level index to be implemented. " + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + return self._apply_agg_function(lambda df: df.agg(arg, + axis=self._axis, + *args, + **kwargs)) + def last(self, **kwargs): - return self._apply_df_function(lambda df: df.last(**kwargs)) + return self._apply_df_function(lambda df: df.last(offset=0, + **kwargs)) def mad(self): return self._apply_agg_function(lambda df: df.mad()) def rank(self): - return self._apply_df_function(lambda df: df.rank()) + return self._apply_df_function(lambda df: df.rank(axis=self._axis)) @property def corrwith(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def pad(self, limit=None): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def max(self, **kwargs): - return self._apply_agg_function(lambda df: df.max(**kwargs)) + return self._apply_agg_function(lambda df: df.max(axis=self._axis, + **kwargs)) def var(self, ddof=1, *args, **kwargs): - return self._apply_agg_function(lambda df: df.var(ddof, + return self._apply_agg_function(lambda df: df.var(ddof=ddof, + axis=self._axis, *args, **kwargs)) def get_group(self, name, obj=None): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def __len__(self): return len(self._keys_and_values) @@ -235,29 +347,53 @@ class DataFrameGroupBy(object): df.sum(axis=self._axis, **kwargs)) def __unicode__(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def describe(self, **kwargs): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def boxplot(self, grouped, subplots=True, column=None, fontsize=None, rot=0, grid=True, ax=None, figsize=None, layout=None, **kwds): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def ngroup(self, ascending=True): return self._index_grouped.ngroup(ascending) def nunique(self, dropna=True): - return self._apply_agg_function(lambda df: df.nunique(dropna)) + return self._apply_agg_function(lambda df: df.nunique(dropna=dropna, + axis=self._axis)) def resample(self, rule, *args, **kwargs): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def median(self, **kwargs): - return self._apply_agg_function(lambda df: df.median(**kwargs)) + return self._apply_agg_function(lambda df: df.median(axis=self._axis, + **kwargs)) def head(self, n=5): - return self._apply_df_function(lambda df: df.head(n)) + result = [v.head(n) for k, v in self._iter] + new_df = concat(result, axis=self._axis) + + if self._axis == 0: + index_head = [v[:n] for k, v in self._keys_and_values] + flattened_index = {i for j in index_head for i in j} + sorted_index = [i for i in self._index if i in flattened_index] + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.index, sorted_index, 1, + len(new_df._block_partitions)] + block.tolist()), + num_return_vals=len(new_df._block_partitions)) + for block in new_df._block_partitions.T]).T + new_df.index = sorted_index + + return new_df def cumprod(self, axis=0, *args, **kwargs): return self._apply_df_function(lambda df: df.cumprod(axis, @@ -268,69 +404,84 @@ class DataFrameGroupBy(object): return self._iter.__iter__() def agg(self, arg, *args, **kwargs): - def agg_help(df): - if isinstance(df, pd.Series): - return pd.DataFrame(df).T - else: - return df - x = [v.agg(arg, axis=self._axis, *args, **kwargs) - for k, v in self._iter] - - new_parts = _map_partitions(lambda df: agg_help(df), x) - - from .concat import concat - result = concat(new_parts) - - return result + return self.aggregate(arg, *args, **kwargs) def cov(self): return self._apply_agg_function(lambda df: df.cov()) def transform(self, func, *args, **kwargs): - from .concat import concat - - new_parts = concat([v.transform(func, *args, **kwargs) - for k, v in self._iter]) - return new_parts + return self._apply_df_function(lambda df: df.transform(func, + *args, + **kwargs)) def corr(self, **kwargs): return self._apply_agg_function(lambda df: df.corr(**kwargs)) def fillna(self, **kwargs): - return self._apply_df_function(lambda df: df.fillna(**kwargs)) + return self._apply_df_function(lambda df: df.fillna(axis=self._axis, + **kwargs)) def count(self, **kwargs): - return self._apply_agg_function(lambda df: df.count(**kwargs)) + return self._apply_agg_function(lambda df: df.count(self._axis, + **kwargs)) def pipe(self, func, *args, **kwargs): - return self._apply_df_function(lambda df: df.pipe(func, - *args, - **kwargs)) + return com._pipe(self, func, *args, **kwargs) def cumcount(self, ascending=True): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def tail(self, n=5): - return self._apply_df_function(lambda df: df.tail(n)) + result = [v.tail(n) for k, v in self._iter] + new_df = concat(result, axis=self._axis) + + if self._axis == 0: + index_tail = [v[-n:] for k, v in self._keys_and_values] + flattened_index = {i for j in index_tail for i in j} + sorted_index = [i for i in self._index if i in flattened_index] + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.index, sorted_index, 1, + len(new_df._block_partitions)] + block.tolist()), + num_return_vals=len(new_df._block_partitions)) + for block in new_df._block_partitions.T]).T + new_df.index = sorted_index + + return new_df # expanding and rolling are unique cases and need to likely be handled # separately. They do not appear to be commonly used. def expanding(self, *args, **kwargs): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def rolling(self, *args, **kwargs): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def hist(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def quantile(self, q=0.5, **kwargs): - return self._apply_df_function(lambda df: df.quantile(q, **kwargs)) \ - if is_list_like(q) \ - else self._apply_agg_function(lambda df: df.quantile(q, **kwargs)) + if is_list_like(q): + raise NotImplementedError( + "This requires Multi-level index to be implemented. " + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + + return self._apply_agg_function(lambda df: df.quantile(q=q, + axis=self._axis, + **kwargs)) def diff(self): - raise NotImplementedError("Not Yet implemented.") + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") def take(self, **kwargs): return self._apply_df_function(lambda df: df.take(**kwargs)) @@ -338,26 +489,42 @@ class DataFrameGroupBy(object): def _apply_agg_function(self, f): assert callable(f), "\'{0}\' object is not callable".format(type(f)) - result = [pd.DataFrame(f(v)).T for k, v in self._iter] + result = [f(v) for k, v in self._iter] + new_df = concat(result, axis=1) - new_df = pd.concat(result) if self._axis == 0: + new_df = new_df.T new_df.columns = self._columns new_df.index = [k for k, v in self._iter] else: - new_df = new_df.T new_df.columns = [k for k, v in self._iter] new_df.index = self._index return new_df - def _apply_df_function(self, f): + def _apply_df_function(self, f, concat_axis=None): assert callable(f), "\'{0}\' object is not callable".format(type(f)) result = [f(v) for k, v in self._iter] + concat_axis = self._axis if concat_axis is None else concat_axis - from .concat import concat + new_df = concat(result, axis=concat_axis) + + if self._axis == 0: + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.index, self._index, 1, + len(new_df._block_partitions)] + block.tolist()), + num_return_vals=len(new_df._block_partitions)) + for block in new_df._block_partitions.T]).T + new_df.index = self._index + else: + new_df._block_partitions = np.array([_reindex_helper._submit( + args=tuple([new_df.columns, self._columns, 0, + new_df._block_partitions.shape[1]] + + block.tolist()), + num_return_vals=new_df._block_partitions.shape[1]) + for block in new_df._block_partitions]) + new_df.columns = self._columns - new_df = concat(result) return new_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index cc20aa101..537110b6b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -330,7 +330,9 @@ def test_int_dataframe(): with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - test_transform(ray_df, pandas_df) + test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1) + test_apply(ray_df, pandas_df, lambda df: -df, 0) + test_transform(ray_df, pandas_df) def test_float_dataframe(): @@ -499,7 +501,9 @@ def test_float_dataframe(): with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - test_transform(ray_df, pandas_df) + test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1) + test_apply(ray_df, pandas_df, lambda df: -df, 0) + test_transform(ray_df, pandas_df) def test_mixed_dtype_dataframe(): @@ -665,7 +669,8 @@ def test_mixed_dtype_dataframe(): with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - test_transform(ray_df, pandas_df) + test_transform(ray_df, pandas_df) + test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1) def test_nan_dataframe(): @@ -828,7 +833,9 @@ def test_nan_dataframe(): with pytest.raises(TypeError): test_agg(ray_df, pandas_df, func, 1) - test_transform(ray_df, pandas_df) + test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1) + test_apply(ray_df, pandas_df, lambda df: -df, 0) + test_transform(ray_df, pandas_df) def test_dense_nan_df(): @@ -2452,10 +2459,32 @@ def test_rdiv(): def test_reindex(): - ray_df = create_test_dataframe() + pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) + ray_df = from_pandas(pandas_df, 2) - with pytest.raises(NotImplementedError): - ray_df.reindex() + assert ray_df_equals_pandas( + ray_df.reindex([0, 3, 2, 1]), pandas_df.reindex([0, 3, 2, 1])) + + assert ray_df_equals_pandas( + ray_df.reindex([0, 6, 2]), pandas_df.reindex([0, 6, 2])) + + assert ray_df_equals_pandas( + ray_df.reindex(['col1', 'col3', 'col4', 'col2'], axis=1), + pandas_df.reindex(['col1', 'col3', 'col4', 'col2'], axis=1)) + + assert ray_df_equals_pandas( + ray_df.reindex(['col1', 'col7', 'col4', 'col8'], axis=1), + pandas_df.reindex(['col1', 'col7', 'col4', 'col8'], axis=1)) + + assert ray_df_equals_pandas( + ray_df.reindex(index=[0, 1, 5], + columns=['col1', 'col7', 'col4', 'col8']), + pandas_df.reindex(index=[0, 1, 5], + columns=['col1', 'col7', 'col4', 'col8'])) def test_reindex_axis(): diff --git a/python/ray/dataframe/test/test_groupby.py b/python/ray/dataframe/test/test_groupby.py new file mode 100644 index 000000000..47945eb37 --- /dev/null +++ b/python/ray/dataframe/test/test_groupby.py @@ -0,0 +1,589 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import sys +import pandas +import numpy as np +import ray.dataframe as pd +from ray.dataframe.utils import ( + from_pandas, + to_pandas) + +PY2 = False +if sys.version_info.major < 3: + PY2 = True + + +@pytest.fixture +def ray_df_equals_pandas(ray_df, pandas_df): + assert isinstance(ray_df, pd.DataFrame) + assert to_pandas(ray_df).equals(pandas_df) + + +@pytest.fixture +def ray_df_almost_equals_pandas(ray_df, pandas_df): + assert isinstance(ray_df, pd.DataFrame) + difference = to_pandas(ray_df) - pandas_df + diff_max = difference.max().max() + assert to_pandas(ray_df).equals(pandas_df) or diff_max < 0.0001 + + +@pytest.fixture +def ray_series_equals_pandas(ray_df, pandas_df): + assert ray_df.equals(pandas_df) + + +@pytest.fixture +def ray_df_equals(ray_df1, ray_df2): + assert to_pandas(ray_df1).equals(to_pandas(ray_df2)) + + +@pytest.fixture +def ray_groupby_equals_pandas(ray_groupby, pandas_groupby): + for g1, g2 in zip(ray_groupby, pandas_groupby): + assert g1[0] == g2[0] + ray_df_equals_pandas(g1[1], g2[1]) + + +def test_simple_row_groupby(): + pandas_df = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [3, 8, 12, 10], + 'col4': [17, 13, 16, 15], + 'col5': [-4, -5, -6, -7]}) + + ray_df = from_pandas(pandas_df, 2) + + by = [1, 2, 1, 2] + n = 1 + + ray_groupby = ray_df.groupby(by=by) + pandas_groupby = pandas_df.groupby(by=by) + + ray_groupby_equals_pandas(ray_groupby, pandas_groupby) + test_ngroups(ray_groupby, pandas_groupby) + test_skew(ray_groupby, pandas_groupby) + test_ffill(ray_groupby, pandas_groupby) + test_sem(ray_groupby, pandas_groupby) + test_mean(ray_groupby, pandas_groupby) + test_any(ray_groupby, pandas_groupby) + test_min(ray_groupby, pandas_groupby) + test_idxmax(ray_groupby, pandas_groupby) + test_ndim(ray_groupby, pandas_groupby) + test_cumsum(ray_groupby, pandas_groupby) + test_pct_change(ray_groupby, pandas_groupby) + test_cummax(ray_groupby, pandas_groupby) + + apply_functions = [lambda df: df.sum(), lambda df: -df] + for func in apply_functions: + test_apply(ray_groupby, pandas_groupby, func) + + test_dtypes(ray_groupby, pandas_groupby) + test_first(ray_groupby, pandas_groupby) + test_backfill(ray_groupby, pandas_groupby) + test_cummin(ray_groupby, pandas_groupby) + test_bfill(ray_groupby, pandas_groupby) + test_idxmin(ray_groupby, pandas_groupby) + test_prod(ray_groupby, pandas_groupby) + test_std(ray_groupby, pandas_groupby) + + agg_functions = ['min', 'max'] + for func in agg_functions: + test_agg(ray_groupby, pandas_groupby, func) + test_aggregate(ray_groupby, pandas_groupby, func) + + test_last(ray_groupby, pandas_groupby) + test_mad(ray_groupby, pandas_groupby) + test_rank(ray_groupby, pandas_groupby) + test_max(ray_groupby, pandas_groupby) + test_var(ray_groupby, pandas_groupby) + test_len(ray_groupby, pandas_groupby) + test_sum(ray_groupby, pandas_groupby) + test_ngroup(ray_groupby, pandas_groupby) + test_nunique(ray_groupby, pandas_groupby) + test_median(ray_groupby, pandas_groupby) + test_head(ray_groupby, pandas_groupby, n) + test_cumprod(ray_groupby, pandas_groupby) + test_cov(ray_groupby, pandas_groupby) + + transform_functions = [lambda df: df + 4, lambda df: -df - 10] + for func in transform_functions: + test_transform(ray_groupby, pandas_groupby, func) + + pipe_functions = [lambda dfgb: dfgb.sum()] + for func in pipe_functions: + test_pipe(ray_groupby, pandas_groupby, func) + + test_corr(ray_groupby, pandas_groupby) + test_fillna(ray_groupby, pandas_groupby) + test_count(ray_groupby, pandas_groupby) + test_tail(ray_groupby, pandas_groupby, n) + test_quantile(ray_groupby, pandas_groupby) + test_take(ray_groupby, pandas_groupby) + + +def test_single_group_row_groupby(): + pandas_df = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 36, 7], + 'col3': [3, 8, 12, 10], + 'col4': [17, 3, 16, 15], + 'col5': [-4, 5, -6, -7]}) + + ray_df = from_pandas(pandas_df, 2) + + by = [1, 1, 1, 1] + n = 6 + + ray_groupby = ray_df.groupby(by=by) + pandas_groupby = pandas_df.groupby(by=by) + + ray_groupby_equals_pandas(ray_groupby, pandas_groupby) + test_ngroups(ray_groupby, pandas_groupby) + test_skew(ray_groupby, pandas_groupby) + test_ffill(ray_groupby, pandas_groupby) + test_sem(ray_groupby, pandas_groupby) + test_mean(ray_groupby, pandas_groupby) + test_any(ray_groupby, pandas_groupby) + test_min(ray_groupby, pandas_groupby) + test_idxmax(ray_groupby, pandas_groupby) + test_ndim(ray_groupby, pandas_groupby) + test_cumsum(ray_groupby, pandas_groupby) + test_pct_change(ray_groupby, pandas_groupby) + test_cummax(ray_groupby, pandas_groupby) + + apply_functions = [lambda df: df.sum(), lambda df: -df] + for func in apply_functions: + test_apply(ray_groupby, pandas_groupby, func) + + test_dtypes(ray_groupby, pandas_groupby) + test_first(ray_groupby, pandas_groupby) + test_backfill(ray_groupby, pandas_groupby) + test_cummin(ray_groupby, pandas_groupby) + test_bfill(ray_groupby, pandas_groupby) + test_idxmin(ray_groupby, pandas_groupby) + test_prod(ray_groupby, pandas_groupby) + test_std(ray_groupby, pandas_groupby) + + agg_functions = ['min', 'max'] + for func in agg_functions: + test_agg(ray_groupby, pandas_groupby, func) + test_aggregate(ray_groupby, pandas_groupby, func) + + test_last(ray_groupby, pandas_groupby) + test_mad(ray_groupby, pandas_groupby) + test_rank(ray_groupby, pandas_groupby) + test_max(ray_groupby, pandas_groupby) + test_var(ray_groupby, pandas_groupby) + test_len(ray_groupby, pandas_groupby) + test_sum(ray_groupby, pandas_groupby) + test_ngroup(ray_groupby, pandas_groupby) + test_nunique(ray_groupby, pandas_groupby) + test_median(ray_groupby, pandas_groupby) + test_head(ray_groupby, pandas_groupby, n) + test_cumprod(ray_groupby, pandas_groupby) + test_cov(ray_groupby, pandas_groupby) + + transform_functions = [lambda df: df + 4, lambda df: -df - 10] + for func in transform_functions: + test_transform(ray_groupby, pandas_groupby, func) + + pipe_functions = [lambda dfgb: dfgb.sum()] + for func in pipe_functions: + test_pipe(ray_groupby, pandas_groupby, func) + + test_corr(ray_groupby, pandas_groupby) + test_fillna(ray_groupby, pandas_groupby) + test_count(ray_groupby, pandas_groupby) + test_tail(ray_groupby, pandas_groupby, n) + test_quantile(ray_groupby, pandas_groupby) + test_take(ray_groupby, pandas_groupby) + + +def test_large_row_groupby(): + pandas_df = pandas.DataFrame(np.random.randint(0, 8, size=(100, 4)), + columns=list('ABCD')) + + ray_df = from_pandas(pandas_df, 2) + + by = pandas_df['A'].tolist() + n = 4 + + ray_groupby = ray_df.groupby(by=by) + pandas_groupby = pandas_df.groupby(by=by) + + ray_groupby_equals_pandas(ray_groupby, pandas_groupby) + test_ngroups(ray_groupby, pandas_groupby) + test_skew(ray_groupby, pandas_groupby) + test_ffill(ray_groupby, pandas_groupby) + test_sem(ray_groupby, pandas_groupby) + test_mean(ray_groupby, pandas_groupby) + test_any(ray_groupby, pandas_groupby) + test_min(ray_groupby, pandas_groupby) + test_idxmax(ray_groupby, pandas_groupby) + test_ndim(ray_groupby, pandas_groupby) + test_cumsum(ray_groupby, pandas_groupby) + test_pct_change(ray_groupby, pandas_groupby) + test_cummax(ray_groupby, pandas_groupby) + + apply_functions = [lambda df: df.sum(), lambda df: -df] + for func in apply_functions: + test_apply(ray_groupby, pandas_groupby, func) + + test_dtypes(ray_groupby, pandas_groupby) + test_first(ray_groupby, pandas_groupby) + test_backfill(ray_groupby, pandas_groupby) + test_cummin(ray_groupby, pandas_groupby) + test_bfill(ray_groupby, pandas_groupby) + test_idxmin(ray_groupby, pandas_groupby) + # test_prod(ray_groupby, pandas_groupby) causes overflows + test_std(ray_groupby, pandas_groupby) + + agg_functions = ['min', 'max'] + for func in agg_functions: + test_agg(ray_groupby, pandas_groupby, func) + test_aggregate(ray_groupby, pandas_groupby, func) + + test_last(ray_groupby, pandas_groupby) + test_mad(ray_groupby, pandas_groupby) + test_rank(ray_groupby, pandas_groupby) + test_max(ray_groupby, pandas_groupby) + test_var(ray_groupby, pandas_groupby) + test_len(ray_groupby, pandas_groupby) + test_sum(ray_groupby, pandas_groupby) + test_ngroup(ray_groupby, pandas_groupby) + test_nunique(ray_groupby, pandas_groupby) + test_median(ray_groupby, pandas_groupby) + test_head(ray_groupby, pandas_groupby, n) + # test_cumprod(ray_groupby, pandas_groupby) causes overflows + test_cov(ray_groupby, pandas_groupby) + + transform_functions = [lambda df: df + 4, lambda df: -df - 10] + for func in transform_functions: + test_transform(ray_groupby, pandas_groupby, func) + + pipe_functions = [lambda dfgb: dfgb.sum()] + for func in pipe_functions: + test_pipe(ray_groupby, pandas_groupby, func) + + test_corr(ray_groupby, pandas_groupby) + test_fillna(ray_groupby, pandas_groupby) + test_count(ray_groupby, pandas_groupby) + test_tail(ray_groupby, pandas_groupby, n) + test_quantile(ray_groupby, pandas_groupby) + test_take(ray_groupby, pandas_groupby) + + +def test_simple_col_groupby(): + pandas_df = pandas.DataFrame({'col1': [0, 3, 2, 3], + 'col2': [4, 1, 6, 7], + 'col3': [3, 8, 2, 10], + 'col4': [1, 13, 6, 15], + 'col5': [-4, 5, 6, -7]}) + + ray_df = from_pandas(pandas_df, 2) + + by = [1, 2, 3, 2, 1] + + ray_groupby = ray_df.groupby(axis=1, by=by) + pandas_groupby = pandas_df.groupby(axis=1, by=by) + + ray_groupby_equals_pandas(ray_groupby, pandas_groupby) + test_ngroups(ray_groupby, pandas_groupby) + test_skew(ray_groupby, pandas_groupby) + test_ffill(ray_groupby, pandas_groupby) + test_sem(ray_groupby, pandas_groupby) + test_mean(ray_groupby, pandas_groupby) + test_any(ray_groupby, pandas_groupby) + test_min(ray_groupby, pandas_groupby) + test_ndim(ray_groupby, pandas_groupby) + + if not PY2: + # idxmax and idxmin fail on column groupby in pandas with python2 + test_idxmax(ray_groupby, pandas_groupby) + test_idxmin(ray_groupby, pandas_groupby) + test_rank(ray_groupby, pandas_groupby) + test_quantile(ray_groupby, pandas_groupby) + + # https://github.com/pandas-dev/pandas/issues/21127 + # test_cumsum(ray_groupby, pandas_groupby) + # test_cummax(ray_groupby, pandas_groupby) + # test_cummin(ray_groupby, pandas_groupby) + # test_cumprod(ray_groupby, pandas_groupby) + + test_pct_change(ray_groupby, pandas_groupby) + apply_functions = [lambda df: -df, lambda df: df.sum(axis=1)] + for func in apply_functions: + test_apply(ray_groupby, pandas_groupby, func) + + test_first(ray_groupby, pandas_groupby) + test_backfill(ray_groupby, pandas_groupby) + test_bfill(ray_groupby, pandas_groupby) + test_prod(ray_groupby, pandas_groupby) + test_std(ray_groupby, pandas_groupby) + test_last(ray_groupby, pandas_groupby) + test_mad(ray_groupby, pandas_groupby) + test_max(ray_groupby, pandas_groupby) + test_var(ray_groupby, pandas_groupby) + test_len(ray_groupby, pandas_groupby) + test_sum(ray_groupby, pandas_groupby) + + # Pandas fails on this case with ValueError + # test_ngroup(ray_groupby, pandas_groupby) + # test_nunique(ray_groupby, pandas_groupby) + test_median(ray_groupby, pandas_groupby) + test_cov(ray_groupby, pandas_groupby) + + transform_functions = [lambda df: df + 4, lambda df: -df - 10] + for func in transform_functions: + test_transform(ray_groupby, pandas_groupby, func) + + pipe_functions = [lambda dfgb: dfgb.sum()] + for func in pipe_functions: + test_pipe(ray_groupby, pandas_groupby, func) + + test_corr(ray_groupby, pandas_groupby) + test_fillna(ray_groupby, pandas_groupby) + test_count(ray_groupby, pandas_groupby) + test_take(ray_groupby, pandas_groupby) + + +@pytest.fixture +def test_ngroups(ray_groupby, pandas_groupby): + assert ray_groupby.ngroups == pandas_groupby.ngroups + + +@pytest.fixture +def test_skew(ray_groupby, pandas_groupby): + ray_df_almost_equals_pandas(ray_groupby.skew(), pandas_groupby.skew()) + + +@pytest.fixture +def test_ffill(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.ffill(), pandas_groupby.ffill()) + + +@pytest.fixture +def test_sem(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.sem() + + +@pytest.fixture +def test_mean(ray_groupby, pandas_groupby): + ray_df_almost_equals_pandas(ray_groupby.mean(), pandas_groupby.mean()) + + +@pytest.fixture +def test_any(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.any(), pandas_groupby.any()) + + +@pytest.fixture +def test_min(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.min(), pandas_groupby.min()) + + +@pytest.fixture +def test_idxmax(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.idxmax(), pandas_groupby.idxmax()) + + +@pytest.fixture +def test_ndim(ray_groupby, pandas_groupby): + assert ray_groupby.ndim == pandas_groupby.ndim + + +@pytest.fixture +def test_cumsum(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.cumsum(), pandas_groupby.cumsum()) + ray_df_equals_pandas(ray_groupby.cumsum(axis=1), + pandas_groupby.cumsum(axis=1)) + + +@pytest.fixture +def test_pct_change(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.pct_change() + + +@pytest.fixture +def test_cummax(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.cummax(), pandas_groupby.cummax()) + ray_df_equals_pandas(ray_groupby.cummax(axis=1), + pandas_groupby.cummax(axis=1)) + + +@pytest.fixture +def test_apply(ray_groupby, pandas_groupby, func): + ray_df_equals_pandas(ray_groupby.apply(func), pandas_groupby.apply(func)) + + +@pytest.fixture +def test_dtypes(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.dtypes, pandas_groupby.dtypes) + + +@pytest.fixture +def test_first(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.first() + + +@pytest.fixture +def test_backfill(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.backfill(), pandas_groupby.backfill()) + + +@pytest.fixture +def test_cummin(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.cummin(), pandas_groupby.cummin()) + ray_df_equals_pandas(ray_groupby.cummin(axis=1), + pandas_groupby.cummin(axis=1)) + + +@pytest.fixture +def test_bfill(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.bfill(), pandas_groupby.bfill()) + + +@pytest.fixture +def test_idxmin(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.idxmin(), pandas_groupby.idxmin()) + + +@pytest.fixture +def test_prod(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.prod(), pandas_groupby.prod()) + + +@pytest.fixture +def test_std(ray_groupby, pandas_groupby): + ray_df_almost_equals_pandas(ray_groupby.std(), pandas_groupby.std()) + + +@pytest.fixture +def test_aggregate(ray_groupby, pandas_groupby, func): + ray_df_equals_pandas(ray_groupby.aggregate(func), + pandas_groupby.aggregate(func)) + + +@pytest.fixture +def test_agg(ray_groupby, pandas_groupby, func): + ray_df_equals_pandas(ray_groupby.agg(func), pandas_groupby.agg(func)) + + +@pytest.fixture +def test_last(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.last() + + +@pytest.fixture +def test_mad(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.mad() + + +@pytest.fixture +def test_rank(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.rank(), pandas_groupby.rank()) + + +@pytest.fixture +def test_max(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.max(), pandas_groupby.max()) + + +@pytest.fixture +def test_var(ray_groupby, pandas_groupby): + ray_df_almost_equals_pandas(ray_groupby.var(), pandas_groupby.var()) + + +@pytest.fixture +def test_len(ray_groupby, pandas_groupby): + assert len(ray_groupby) == len(pandas_groupby) + + +@pytest.fixture +def test_sum(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.sum(), pandas_groupby.sum()) + + +@pytest.fixture +def test_ngroup(ray_groupby, pandas_groupby): + ray_series_equals_pandas(ray_groupby.ngroup(), pandas_groupby.ngroup()) + + +@pytest.fixture +def test_nunique(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.nunique(), pandas_groupby.nunique()) + + +@pytest.fixture +def test_median(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.median(), pandas_groupby.median()) + + +@pytest.fixture +def test_head(ray_groupby, pandas_groupby, n): + ray_df_equals_pandas(ray_groupby.head(n=n), pandas_groupby.head(n=n)) + + +@pytest.fixture +def test_cumprod(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.cumprod(), pandas_groupby.cumprod()) + ray_df_equals_pandas(ray_groupby.cumprod(axis=1), + pandas_groupby.cumprod(axis=1)) + + +@pytest.fixture +def test_cov(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.cov() + + +@pytest.fixture +def test_transform(ray_groupby, pandas_groupby, func): + ray_df_equals_pandas(ray_groupby.transform(func), + pandas_groupby.transform(func)) + + +@pytest.fixture +def test_corr(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.corr() + + +@pytest.fixture +def test_fillna(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.fillna(method="ffill"), + pandas_groupby.fillna(method="ffill")) + + +@pytest.fixture +def test_count(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.count(), pandas_groupby.count()) + + +@pytest.fixture +def test_pipe(ray_groupby, pandas_groupby, func): + ray_df_equals_pandas(ray_groupby.pipe(func), pandas_groupby.pipe(func)) + + +@pytest.fixture +def test_tail(ray_groupby, pandas_groupby, n): + ray_df_equals_pandas(ray_groupby.tail(n=n), pandas_groupby.tail(n=n)) + + +@pytest.fixture +def test_quantile(ray_groupby, pandas_groupby): + ray_df_equals_pandas(ray_groupby.quantile(q=0.4), + pandas_groupby.quantile(q=0.4)) + + +@pytest.fixture +def test_take(ray_groupby, pandas_groupby): + with pytest.raises(NotImplementedError): + ray_groupby.take(indices=[1]) diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index f19dd497f..911ae911b 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -271,7 +271,12 @@ def _create_block_partitions(partitions, axis=0, length=None): # In the case that axis is 1 we have to transpose because we build the # columns into rows. Fortunately numpy is efficient at this. - return np.array(x) if axis == 0 else np.array(x).T + blocks = np.array(x) if axis == 0 else np.array(x).T + + # Sometimes we only get a single column or row, which is + # problematic for building blocks from the partitions, so we + # add whatever dimension we're missing from the input. + return fix_blocks_dimensions(blocks, axis) @ray.remote @@ -299,6 +304,7 @@ def create_blocks_helper(df, npartitions, axis): for block in blocks: block.columns = pd.RangeIndex(0, len(block.columns)) + block.reset_index(inplace=True, drop=True) return blocks @@ -372,12 +378,10 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df): df = pd.concat(df, axis=axis ^ 1) if axis == 1: df.index = old_index - df = df.reindex(new_index, copy=False) - df.reset_index(inplace=True, drop=True) elif axis == 0: df.columns = old_index - df = df.reindex(columns=new_index, copy=False) - df.columns = pd.RangeIndex(len(df.columns)) + + df = df.reindex(new_index, copy=False, axis=axis ^ 1) return create_blocks_helper(df, npartitions, axis) @@ -461,3 +465,11 @@ def _correct_column_dtypes(*column): """ concat_column = pd.concat(column, copy=False) return create_blocks_helper(concat_column, len(column), 1) + + +def fix_blocks_dimensions(blocks, axis): + """Checks that blocks is 2D, and adds a dimension if not. + """ + if blocks.ndim < 2: + return np.expand_dims(blocks, axis=axis ^ 1) + return blocks