[DataFrame] Refactor GroupBy Methods and Implement Reindex (#2101)

* fix 1D blocks case

* Add groupby test code

* begin writing tests

* Fix bug on groupby(axis=1, ...)

* implement reindex

* fix index misalignment after groupby

* fix df.apply bug

* fix groupby.apply

* fix agg funcs

* finish groupby tests

* finish test suite for groupby

* fixing lint

* undo new line

* fix python2 index copy bug

* Concat Series into ray.df

* fixing python2 issues

* resolving all python 2 tests

* handle multiindex on apply

* resolve comments

* updating docstring

* fix lint

* fix lint again

* address comments
This commit is contained in:
Kunal Gosar 2018-05-22 16:34:07 -07:00 committed by Devin Petersohn
parent 01f69689ed
commit 4584193308
7 changed files with 1054 additions and 191 deletions

View file

@ -135,6 +135,7 @@ matrix:
# - python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_concat.py
- python -m pytest python/ray/dataframe/test/test_io.py
# - python -m pytest python/ray/dataframe/test/test_groupby.py
# ray tune tests
# - python python/ray/tune/test/dependency_test.py
@ -199,6 +200,7 @@ script:
- python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_concat.py
- python -m pytest python/ray/dataframe/test/test_io.py
- python -m pytest python/ray/dataframe/test/test_groupby.py
# ray tune tests
- python python/ray/tune/test/dependency_test.py

View file

@ -41,9 +41,9 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
all_series = all(isinstance(obj, pandas.Series)
for obj in objs)
if all_series:
return pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
return DataFrame(pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy))
if isinstance(objs, dict):
raise NotImplementedError(

View file

@ -29,12 +29,12 @@ import io
import sys
import re
from .groupby import DataFrameGroupBy
from .utils import (
_deploy_func,
_map_partitions,
_partition_pandas_dataframe,
to_pandas,
create_blocks_helper,
_blocks_to_col,
_blocks_to_row,
_create_block_partitions,
@ -43,7 +43,8 @@ from .utils import (
_co_op_helper,
_match_partitioning,
_concat_index,
_correct_column_dtypes)
_correct_column_dtypes,
fix_blocks_dimensions)
from . import get_npartitions
from .index_metadata import _IndexMetadata
from .iterator import PartitionIterator
@ -119,9 +120,12 @@ class DataFrame(object):
"for internal DataFrame creations"
if block_partitions is not None:
axis = 0
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
axis = 0
self._block_partitions = \
fix_blocks_dimensions(self._block_partitions, axis)
else:
if row_partitions is not None:
axis = 0
@ -144,13 +148,6 @@ class DataFrame(object):
_create_block_partitions(partitions, axis=axis,
length=axis_length)
# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
if self._block_partitions.ndim < 2:
self._block_partitions = np.expand_dims(self._block_partitions,
axis=axis ^ 1)
assert self._block_partitions.ndim == 2, "Block Partitions must be 2D."
# Create the row and column index objects for using our partitioning.
@ -674,6 +671,7 @@ class DataFrame(object):
elif mismatch:
raise KeyError(next(x for x in by if x not in self))
from .groupby import DataFrameGroupBy
return DataFrameGroupBy(self, by, axis, level, as_index, sort,
group_keys, squeeze, **kwargs)
@ -984,36 +982,23 @@ class DataFrame(object):
raise ValueError("{} is an unknown string function".format(func))
def _callable_function(self, func, axis, *args, **kwargs):
if axis == 0:
partitions = self._col_partitions
else:
partitions = self._row_partitions
kwargs['axis'] = axis
if axis == 1:
kwargs['axis'] = axis
kwargs['temp_columns'] = self.columns
else:
kwargs['temp_index'] = self.index
def agg_helper(df, arg, *args, **kwargs):
if 'temp_index' in kwargs:
df.index = kwargs.pop('temp_index', None)
else:
df.columns = kwargs.pop('temp_columns', None)
def agg_helper(df, arg, index, columns, *args, **kwargs):
df.index = index
df.columns = columns
is_transform = kwargs.pop('is_transform', False)
new_df = df.agg(arg, *args, **kwargs)
is_series = False
index = None
columns = None
if isinstance(new_df, pd.Series):
is_series = True
index = None
columns = None
else:
index = new_df.index \
if not isinstance(new_df.index, pd.RangeIndex) \
else None
columns = new_df.columns
index = new_df.index
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
new_df.reset_index(drop=True, inplace=True)
@ -1024,13 +1009,37 @@ class DataFrame(object):
return is_series, new_df, index, columns
remote_result = \
[_deploy_func._submit(args=(lambda df: agg_helper(df,
func,
*args,
**kwargs),
part), num_return_vals=4)
for part in partitions]
if axis == 0:
index = self.index
columns = [self._col_metadata.partition_series(i).index
for i in range(len(self._col_partitions))]
remote_result = \
[_deploy_func._submit(args=(
lambda df: agg_helper(df,
func,
index,
cols,
*args,
**kwargs),
part), num_return_vals=4)
for cols, part in zip(columns, self._col_partitions)]
if axis == 1:
indexes = [self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions))]
columns = self.columns
remote_result = \
[_deploy_func._submit(args=(
lambda df: agg_helper(df,
func,
index,
columns,
*args,
**kwargs),
part), num_return_vals=4)
for index, part in zip(indexes, self._row_partitions)]
# This magic transposes the list comprehension returned from remote
is_series, new_parts, index, columns = \
@ -1053,21 +1062,22 @@ class DataFrame(object):
# remote objects. We build a Ray DataFrame from the Pandas partitions.
elif axis == 0:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])
# This does not handle the Multi Index case
new_columns = ray.get(columns)
new_columns = new_columns[0].append(new_columns[1:])
return DataFrame(col_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)
columns=new_columns,
index=new_index)
else:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])
new_columns = ray.get(columns[0])
# This does not handle the Multi Index case
new_index = ray.get(index)
new_index = new_index[0].append(new_index[1:])
return DataFrame(row_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)
columns=new_columns,
index=new_index)
def align(self, other, join='outer', axis=None, level=None, copy=True,
fill_value=None, method=None, limit=None, fill_axis=0,
@ -1234,7 +1244,7 @@ class DataFrame(object):
Returns:
values: ndarray
"""
# TODO this is very inneficient, also see __array__
# TODO this is very inefficient, also see __array__
return to_pandas(self).as_matrix(columns)
def asfreq(self, freq, method=None, how=None, normalize=False,
@ -3323,9 +3333,48 @@ class DataFrame(object):
def reindex(self, labels=None, index=None, columns=None, axis=None,
method=None, copy=True, level=None, fill_value=np.nan,
limit=None, tolerance=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if level is not None:
raise NotImplementedError(
"Multilevel Index not Implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
else 0
if axis == 0 and labels is not None:
index = labels
elif labels is not None:
columns = labels
new_blocks = self._block_partitions
if index is not None:
old_index = self.index
new_blocks = np.array([reindex_helper._submit(
args=(old_index, index, 1, len(new_blocks), method,
fill_value, limit, tolerance) + tuple(block.tolist()),
num_return_vals=len(new_blocks))
for block in new_blocks.T]).T
else:
index = self.index
if columns is not None:
old_columns = self.columns
new_blocks = np.array([reindex_helper._submit(
args=(old_columns, columns, 0, new_blocks.shape[1], method,
fill_value, limit, tolerance) + tuple(block.tolist()),
num_return_vals=new_blocks.shape[1])
for block in new_blocks])
else:
columns = self.columns
if copy:
return DataFrame(block_partitions=new_blocks,
index=index,
columns=columns)
self._update_inplace(block_partitions=new_blocks,
index=index,
columns=columns)
def reindex_axis(self, labels, axis=0, method=None, level=None, copy=True,
limit=None, fill_value=np.nan):
@ -5324,3 +5373,18 @@ def _where_helper(left, cond, other, left_columns, cond_columns,
other.columns = other_columns
return left.where(cond, other, *args)
@ray.remote
def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value,
limit, tolerance, *df):
df = pd.concat(df, axis=axis ^ 1)
if axis == 1:
df.index = old_index
else:
df.columns = old_index
df = df.reindex(new_index, copy=False, axis=axis ^ 1,
method=method, fill_value=fill_value,
limit=limit, tolerance=tolerance)
return create_blocks_helper(df, npartitions, axis)

View file

@ -2,13 +2,16 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas.core.groupby
import pandas as pd
import numpy as np
import pandas.core.groupby
from pandas.core.dtypes.common import is_list_like
import pandas.core.common as com
import ray
from .utils import _map_partitions
from .utils import _inherit_docstrings
from .utils import _inherit_docstrings, _reindex_helper
from .concat import concat
@_inherit_docstrings(pandas.core.groupby.DataFrameGroupBy,
@ -32,22 +35,30 @@ class DataFrameGroupBy(object):
.groupby(by=by, sort=sort)
else:
partitions = [row for row in df._block_partitions]
self._index_grouped = pd.Series(self._columns, index=self._index)\
self._index_grouped = \
pd.Series(self._columns, index=self._columns) \
.groupby(by=by, sort=sort)
self._keys_and_values = [(k, v)
for k, v in self._index_grouped]
self._grouped_partitions = \
list(zip(*(groupby._submit(args=(by,
axis,
level,
as_index,
sort,
group_keys,
squeeze) + tuple(part.tolist()),
num_return_vals=len(self))
for part in partitions)))
if len(self) > 1:
self._grouped_partitions = \
list(zip(*(groupby._submit(args=(by,
axis,
level,
as_index,
sort,
group_keys,
squeeze)
+ tuple(part.tolist()),
num_return_vals=len(self))
for part in partitions)))
else:
if axis == 0:
self._grouped_partitions = [df._col_partitions]
else:
self._grouped_partitions = [df._row_partitions]
@property
def _iter(self):
@ -58,8 +69,6 @@ class DataFrameGroupBy(object):
DataFrame(col_partitions=part,
columns=self._columns,
index=self._keys_and_values[i][1].index,
row_metadata=self._row_metadata[
self._keys_and_values[i][1].index],
col_metadata=self._col_metadata))
for i, part in enumerate(self._grouped_partitions)]
else:
@ -67,9 +76,7 @@ class DataFrameGroupBy(object):
DataFrame(row_partitions=part,
columns=self._keys_and_values[i][1].index,
index=self._index,
row_metadata=self._row_metadata,
col_metadata=self._col_metadata[
self._keys_and_values[i][1].index]))
row_metadata=self._row_metadata))
for i, part in enumerate(self._grouped_partitions)]
@property
@ -77,84 +84,151 @@ class DataFrameGroupBy(object):
return len(self)
def skew(self, **kwargs):
return self._apply_agg_function(lambda df: df.skew(**kwargs))
return self._apply_agg_function(lambda df: df.skew(axis=self._axis,
**kwargs))
def ffill(self, limit=None):
return self._apply_agg_function(lambda df: df.ffill(limit=limit))
return self._apply_df_function(lambda df: df.ffill(axis=self._axis,
limit=limit))
def sem(self, ddof=1):
return self._apply_agg_function(lambda df: df.sem(ddof=ddof))
return self._apply_agg_function(lambda df: df.sem(axis=self._axis,
ddof=ddof))
def mean(self, *args, **kwargs):
return self._apply_agg_function(lambda df: df.mean(*args, **kwargs))
return self._apply_agg_function(lambda df: df.mean(axis=self._axis,
*args,
**kwargs))
def any(self):
return self._apply_agg_function(lambda df: df.any())
return self._apply_agg_function(lambda df: df.any(axis=self._axis))
@property
def plot(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def ohlc(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __bytes__(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
@property
def tshift(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
@property
def groups(self):
return {k: pd.Index(v) for k, v in self._keys_and_values}
def min(self, **kwargs):
return self._apply_agg_function(lambda df: df.min(**kwargs))
return self._apply_agg_function(lambda df: df.min(axis=self._axis,
**kwargs))
def idxmax(self):
return self._apply_agg_function(lambda df: df.idxmax())
def idxmax_helper(df, index):
result = df.idxmax(axis=self._axis)
result = result.apply(lambda v: index[v])
return result
results = [idxmax_helper(g[1], i[1])
for g, i in zip(self._iter, self._index_grouped)]
new_df = concat(results, axis=1)
if self._axis == 0:
new_df = new_df.T
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
return new_df
@property
def ndim(self):
return self._index_grouped.ndim
return 2 # ndim is always 2 for DataFrames
def shift(self, periods=1, freq=None, axis=0):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def nth(self, n, dropna=None):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def cumsum(self, axis=0, *args, **kwargs):
return self._apply_agg_function(lambda df: df.cumsum(axis,
*args,
**kwargs))
return self._apply_df_function(lambda df: df.cumsum(axis,
*args,
**kwargs))
@property
def indices(self):
return dict(self._keys_and_values)
def pct_change(self):
return self._apply_agg_function(lambda df: df.pct_change())
return self._apply_agg_function(
lambda df: df.pct_change(axis=self._axis))
def filter(self, func, dropna=True, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def cummax(self, axis=0, **kwargs):
return self._apply_agg_function(lambda df: df.cummax(axis=axis,
**kwargs))
return self._apply_df_function(lambda df: df.cummax(axis,
**kwargs))
def apply(self, func, *args, **kwargs):
return self._apply_df_function(lambda df: df.apply(func,
*args,
**kwargs)) \
if is_list_like(func) \
else self._apply_agg_function(lambda df: df.apply(func,
*args,
**kwargs))
def apply_helper(df):
return df.apply(func, axis=self._axis, *args, **kwargs)
result = [func(v) for k, v in self._iter]
if self._axis == 0:
if isinstance(result[0], pd.Series):
# Applied an aggregation function
new_df = concat(result, axis=1).T
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df = concat(result, axis=self._axis)
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.index, self._index, self._axis ^ 1,
len(new_df._block_partitions)]
+ block.tolist()),
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = self._index
else:
if isinstance(result[0], pd.Series):
# Applied an aggregation function
new_df = concat(result, axis=1)
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
else:
new_df = concat(result, axis=self._axis)
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.columns, self._columns, self._axis ^ 1,
new_df._block_partitions.shape[1]]
+ block.tolist()),
num_return_vals=new_df._block_partitions.shape[1])
for block in new_df._block_partitions])
new_df.columns = self._columns
return new_df
@property
def dtypes(self):
if self._axis == 1:
raise ValueError("Cannot call dtypes on groupby with axis=1")
return self._apply_agg_function(lambda df: df.dtypes)
def first(self, **kwargs):
@ -166,60 +240,98 @@ class DataFrameGroupBy(object):
def __getitem__(self, key):
# This operation requires a SeriesGroupBy Object
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def cummin(self, axis=0, **kwargs):
return self._apply_agg_function(lambda df: df.cummin(axis=axis,
**kwargs))
def bfill(self, limit=None):
return self._apply_agg_function(lambda df: df.bfill(limit=limit))
def idxmin(self):
return self._apply_agg_function(lambda df: df.idxmin())
def prod(self, **kwargs):
return self._apply_agg_function(lambda df: df.prod(**kwargs))
def std(self, ddof=1, *args, **kwargs):
return self._apply_agg_function(lambda df: df.std(ddof=ddof,
*args, **kwargs))
def aggregate(self, arg, *args, **kwargs):
return self._apply_df_function(lambda df: df.agg(arg,
*args,
**kwargs)) \
if is_list_like(arg) \
else self._apply_agg_function(lambda df: df.agg(arg,
*args,
return self._apply_df_function(lambda df: df.cummin(axis=axis,
**kwargs))
def bfill(self, limit=None):
return self._apply_df_function(lambda df: df.bfill(axis=self._axis,
limit=limit))
def idxmin(self):
def idxmin_helper(df, index):
result = df.idxmin(axis=self._axis)
result = result.apply(lambda v: index[v])
return result
results = [idxmin_helper(g[1], i[1])
for g, i in zip(self._iter, self._index_grouped)]
new_df = concat(results, axis=1)
if self._axis == 0:
new_df = new_df.T
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
return new_df
def prod(self, **kwargs):
return self._apply_agg_function(lambda df: df.prod(axis=self._axis,
**kwargs))
def std(self, ddof=1, *args, **kwargs):
return self._apply_agg_function(lambda df: df.std(axis=self._axis,
ddof=ddof,
*args,
**kwargs))
def aggregate(self, arg, *args, **kwargs):
if self._axis != 0:
# This is not implemented in pandas,
# so we throw a different message
raise NotImplementedError("axis other than 0 is not supported")
if is_list_like(arg):
raise NotImplementedError(
"This requires Multi-level index to be implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._apply_agg_function(lambda df: df.agg(arg,
axis=self._axis,
*args,
**kwargs))
def last(self, **kwargs):
return self._apply_df_function(lambda df: df.last(**kwargs))
return self._apply_df_function(lambda df: df.last(offset=0,
**kwargs))
def mad(self):
return self._apply_agg_function(lambda df: df.mad())
def rank(self):
return self._apply_df_function(lambda df: df.rank())
return self._apply_df_function(lambda df: df.rank(axis=self._axis))
@property
def corrwith(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def pad(self, limit=None):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def max(self, **kwargs):
return self._apply_agg_function(lambda df: df.max(**kwargs))
return self._apply_agg_function(lambda df: df.max(axis=self._axis,
**kwargs))
def var(self, ddof=1, *args, **kwargs):
return self._apply_agg_function(lambda df: df.var(ddof,
return self._apply_agg_function(lambda df: df.var(ddof=ddof,
axis=self._axis,
*args,
**kwargs))
def get_group(self, name, obj=None):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __len__(self):
return len(self._keys_and_values)
@ -235,29 +347,53 @@ class DataFrameGroupBy(object):
df.sum(axis=self._axis, **kwargs))
def __unicode__(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def describe(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def boxplot(self, grouped, subplots=True, column=None, fontsize=None,
rot=0, grid=True, ax=None, figsize=None, layout=None, **kwds):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def ngroup(self, ascending=True):
return self._index_grouped.ngroup(ascending)
def nunique(self, dropna=True):
return self._apply_agg_function(lambda df: df.nunique(dropna))
return self._apply_agg_function(lambda df: df.nunique(dropna=dropna,
axis=self._axis))
def resample(self, rule, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def median(self, **kwargs):
return self._apply_agg_function(lambda df: df.median(**kwargs))
return self._apply_agg_function(lambda df: df.median(axis=self._axis,
**kwargs))
def head(self, n=5):
return self._apply_df_function(lambda df: df.head(n))
result = [v.head(n) for k, v in self._iter]
new_df = concat(result, axis=self._axis)
if self._axis == 0:
index_head = [v[:n] for k, v in self._keys_and_values]
flattened_index = {i for j in index_head for i in j}
sorted_index = [i for i in self._index if i in flattened_index]
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.index, sorted_index, 1,
len(new_df._block_partitions)] + block.tolist()),
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = sorted_index
return new_df
def cumprod(self, axis=0, *args, **kwargs):
return self._apply_df_function(lambda df: df.cumprod(axis,
@ -268,69 +404,84 @@ class DataFrameGroupBy(object):
return self._iter.__iter__()
def agg(self, arg, *args, **kwargs):
def agg_help(df):
if isinstance(df, pd.Series):
return pd.DataFrame(df).T
else:
return df
x = [v.agg(arg, axis=self._axis, *args, **kwargs)
for k, v in self._iter]
new_parts = _map_partitions(lambda df: agg_help(df), x)
from .concat import concat
result = concat(new_parts)
return result
return self.aggregate(arg, *args, **kwargs)
def cov(self):
return self._apply_agg_function(lambda df: df.cov())
def transform(self, func, *args, **kwargs):
from .concat import concat
new_parts = concat([v.transform(func, *args, **kwargs)
for k, v in self._iter])
return new_parts
return self._apply_df_function(lambda df: df.transform(func,
*args,
**kwargs))
def corr(self, **kwargs):
return self._apply_agg_function(lambda df: df.corr(**kwargs))
def fillna(self, **kwargs):
return self._apply_df_function(lambda df: df.fillna(**kwargs))
return self._apply_df_function(lambda df: df.fillna(axis=self._axis,
**kwargs))
def count(self, **kwargs):
return self._apply_agg_function(lambda df: df.count(**kwargs))
return self._apply_agg_function(lambda df: df.count(self._axis,
**kwargs))
def pipe(self, func, *args, **kwargs):
return self._apply_df_function(lambda df: df.pipe(func,
*args,
**kwargs))
return com._pipe(self, func, *args, **kwargs)
def cumcount(self, ascending=True):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def tail(self, n=5):
return self._apply_df_function(lambda df: df.tail(n))
result = [v.tail(n) for k, v in self._iter]
new_df = concat(result, axis=self._axis)
if self._axis == 0:
index_tail = [v[-n:] for k, v in self._keys_and_values]
flattened_index = {i for j in index_tail for i in j}
sorted_index = [i for i in self._index if i in flattened_index]
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.index, sorted_index, 1,
len(new_df._block_partitions)] + block.tolist()),
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = sorted_index
return new_df
# expanding and rolling are unique cases and need to likely be handled
# separately. They do not appear to be commonly used.
def expanding(self, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def rolling(self, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def hist(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def quantile(self, q=0.5, **kwargs):
return self._apply_df_function(lambda df: df.quantile(q, **kwargs)) \
if is_list_like(q) \
else self._apply_agg_function(lambda df: df.quantile(q, **kwargs))
if is_list_like(q):
raise NotImplementedError(
"This requires Multi-level index to be implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._apply_agg_function(lambda df: df.quantile(q=q,
axis=self._axis,
**kwargs))
def diff(self):
raise NotImplementedError("Not Yet implemented.")
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def take(self, **kwargs):
return self._apply_df_function(lambda df: df.take(**kwargs))
@ -338,26 +489,42 @@ class DataFrameGroupBy(object):
def _apply_agg_function(self, f):
assert callable(f), "\'{0}\' object is not callable".format(type(f))
result = [pd.DataFrame(f(v)).T for k, v in self._iter]
result = [f(v) for k, v in self._iter]
new_df = concat(result, axis=1)
new_df = pd.concat(result)
if self._axis == 0:
new_df = new_df.T
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df = new_df.T
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
return new_df
def _apply_df_function(self, f):
def _apply_df_function(self, f, concat_axis=None):
assert callable(f), "\'{0}\' object is not callable".format(type(f))
result = [f(v) for k, v in self._iter]
concat_axis = self._axis if concat_axis is None else concat_axis
from .concat import concat
new_df = concat(result, axis=concat_axis)
if self._axis == 0:
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.index, self._index, 1,
len(new_df._block_partitions)] + block.tolist()),
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = self._index
else:
new_df._block_partitions = np.array([_reindex_helper._submit(
args=tuple([new_df.columns, self._columns, 0,
new_df._block_partitions.shape[1]]
+ block.tolist()),
num_return_vals=new_df._block_partitions.shape[1])
for block in new_df._block_partitions])
new_df.columns = self._columns
new_df = concat(result)
return new_df

View file

@ -330,7 +330,9 @@ def test_int_dataframe():
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1)
test_apply(ray_df, pandas_df, lambda df: -df, 0)
test_transform(ray_df, pandas_df)
def test_float_dataframe():
@ -499,7 +501,9 @@ def test_float_dataframe():
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1)
test_apply(ray_df, pandas_df, lambda df: -df, 0)
test_transform(ray_df, pandas_df)
def test_mixed_dtype_dataframe():
@ -665,7 +669,8 @@ def test_mixed_dtype_dataframe():
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
test_transform(ray_df, pandas_df)
test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1)
def test_nan_dataframe():
@ -828,7 +833,9 @@ def test_nan_dataframe():
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
test_apply(ray_df, pandas_df, lambda df: df.drop('col1'), 1)
test_apply(ray_df, pandas_df, lambda df: -df, 0)
test_transform(ray_df, pandas_df)
def test_dense_nan_df():
@ -2452,10 +2459,32 @@ def test_rdiv():
def test_reindex():
ray_df = create_test_dataframe()
pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
ray_df = from_pandas(pandas_df, 2)
with pytest.raises(NotImplementedError):
ray_df.reindex()
assert ray_df_equals_pandas(
ray_df.reindex([0, 3, 2, 1]), pandas_df.reindex([0, 3, 2, 1]))
assert ray_df_equals_pandas(
ray_df.reindex([0, 6, 2]), pandas_df.reindex([0, 6, 2]))
assert ray_df_equals_pandas(
ray_df.reindex(['col1', 'col3', 'col4', 'col2'], axis=1),
pandas_df.reindex(['col1', 'col3', 'col4', 'col2'], axis=1))
assert ray_df_equals_pandas(
ray_df.reindex(['col1', 'col7', 'col4', 'col8'], axis=1),
pandas_df.reindex(['col1', 'col7', 'col4', 'col8'], axis=1))
assert ray_df_equals_pandas(
ray_df.reindex(index=[0, 1, 5],
columns=['col1', 'col7', 'col4', 'col8']),
pandas_df.reindex(index=[0, 1, 5],
columns=['col1', 'col7', 'col4', 'col8']))
def test_reindex_axis():

View file

@ -0,0 +1,589 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import pandas
import numpy as np
import ray.dataframe as pd
from ray.dataframe.utils import (
from_pandas,
to_pandas)
PY2 = False
if sys.version_info.major < 3:
PY2 = True
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
assert isinstance(ray_df, pd.DataFrame)
assert to_pandas(ray_df).equals(pandas_df)
@pytest.fixture
def ray_df_almost_equals_pandas(ray_df, pandas_df):
assert isinstance(ray_df, pd.DataFrame)
difference = to_pandas(ray_df) - pandas_df
diff_max = difference.max().max()
assert to_pandas(ray_df).equals(pandas_df) or diff_max < 0.0001
@pytest.fixture
def ray_series_equals_pandas(ray_df, pandas_df):
assert ray_df.equals(pandas_df)
@pytest.fixture
def ray_df_equals(ray_df1, ray_df2):
assert to_pandas(ray_df1).equals(to_pandas(ray_df2))
@pytest.fixture
def ray_groupby_equals_pandas(ray_groupby, pandas_groupby):
for g1, g2 in zip(ray_groupby, pandas_groupby):
assert g1[0] == g2[0]
ray_df_equals_pandas(g1[1], g2[1])
def test_simple_row_groupby():
pandas_df = pandas.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [3, 8, 12, 10],
'col4': [17, 13, 16, 15],
'col5': [-4, -5, -6, -7]})
ray_df = from_pandas(pandas_df, 2)
by = [1, 2, 1, 2]
n = 1
ray_groupby = ray_df.groupby(by=by)
pandas_groupby = pandas_df.groupby(by=by)
ray_groupby_equals_pandas(ray_groupby, pandas_groupby)
test_ngroups(ray_groupby, pandas_groupby)
test_skew(ray_groupby, pandas_groupby)
test_ffill(ray_groupby, pandas_groupby)
test_sem(ray_groupby, pandas_groupby)
test_mean(ray_groupby, pandas_groupby)
test_any(ray_groupby, pandas_groupby)
test_min(ray_groupby, pandas_groupby)
test_idxmax(ray_groupby, pandas_groupby)
test_ndim(ray_groupby, pandas_groupby)
test_cumsum(ray_groupby, pandas_groupby)
test_pct_change(ray_groupby, pandas_groupby)
test_cummax(ray_groupby, pandas_groupby)
apply_functions = [lambda df: df.sum(), lambda df: -df]
for func in apply_functions:
test_apply(ray_groupby, pandas_groupby, func)
test_dtypes(ray_groupby, pandas_groupby)
test_first(ray_groupby, pandas_groupby)
test_backfill(ray_groupby, pandas_groupby)
test_cummin(ray_groupby, pandas_groupby)
test_bfill(ray_groupby, pandas_groupby)
test_idxmin(ray_groupby, pandas_groupby)
test_prod(ray_groupby, pandas_groupby)
test_std(ray_groupby, pandas_groupby)
agg_functions = ['min', 'max']
for func in agg_functions:
test_agg(ray_groupby, pandas_groupby, func)
test_aggregate(ray_groupby, pandas_groupby, func)
test_last(ray_groupby, pandas_groupby)
test_mad(ray_groupby, pandas_groupby)
test_rank(ray_groupby, pandas_groupby)
test_max(ray_groupby, pandas_groupby)
test_var(ray_groupby, pandas_groupby)
test_len(ray_groupby, pandas_groupby)
test_sum(ray_groupby, pandas_groupby)
test_ngroup(ray_groupby, pandas_groupby)
test_nunique(ray_groupby, pandas_groupby)
test_median(ray_groupby, pandas_groupby)
test_head(ray_groupby, pandas_groupby, n)
test_cumprod(ray_groupby, pandas_groupby)
test_cov(ray_groupby, pandas_groupby)
transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
test_transform(ray_groupby, pandas_groupby, func)
pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
test_pipe(ray_groupby, pandas_groupby, func)
test_corr(ray_groupby, pandas_groupby)
test_fillna(ray_groupby, pandas_groupby)
test_count(ray_groupby, pandas_groupby)
test_tail(ray_groupby, pandas_groupby, n)
test_quantile(ray_groupby, pandas_groupby)
test_take(ray_groupby, pandas_groupby)
def test_single_group_row_groupby():
pandas_df = pandas.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 36, 7],
'col3': [3, 8, 12, 10],
'col4': [17, 3, 16, 15],
'col5': [-4, 5, -6, -7]})
ray_df = from_pandas(pandas_df, 2)
by = [1, 1, 1, 1]
n = 6
ray_groupby = ray_df.groupby(by=by)
pandas_groupby = pandas_df.groupby(by=by)
ray_groupby_equals_pandas(ray_groupby, pandas_groupby)
test_ngroups(ray_groupby, pandas_groupby)
test_skew(ray_groupby, pandas_groupby)
test_ffill(ray_groupby, pandas_groupby)
test_sem(ray_groupby, pandas_groupby)
test_mean(ray_groupby, pandas_groupby)
test_any(ray_groupby, pandas_groupby)
test_min(ray_groupby, pandas_groupby)
test_idxmax(ray_groupby, pandas_groupby)
test_ndim(ray_groupby, pandas_groupby)
test_cumsum(ray_groupby, pandas_groupby)
test_pct_change(ray_groupby, pandas_groupby)
test_cummax(ray_groupby, pandas_groupby)
apply_functions = [lambda df: df.sum(), lambda df: -df]
for func in apply_functions:
test_apply(ray_groupby, pandas_groupby, func)
test_dtypes(ray_groupby, pandas_groupby)
test_first(ray_groupby, pandas_groupby)
test_backfill(ray_groupby, pandas_groupby)
test_cummin(ray_groupby, pandas_groupby)
test_bfill(ray_groupby, pandas_groupby)
test_idxmin(ray_groupby, pandas_groupby)
test_prod(ray_groupby, pandas_groupby)
test_std(ray_groupby, pandas_groupby)
agg_functions = ['min', 'max']
for func in agg_functions:
test_agg(ray_groupby, pandas_groupby, func)
test_aggregate(ray_groupby, pandas_groupby, func)
test_last(ray_groupby, pandas_groupby)
test_mad(ray_groupby, pandas_groupby)
test_rank(ray_groupby, pandas_groupby)
test_max(ray_groupby, pandas_groupby)
test_var(ray_groupby, pandas_groupby)
test_len(ray_groupby, pandas_groupby)
test_sum(ray_groupby, pandas_groupby)
test_ngroup(ray_groupby, pandas_groupby)
test_nunique(ray_groupby, pandas_groupby)
test_median(ray_groupby, pandas_groupby)
test_head(ray_groupby, pandas_groupby, n)
test_cumprod(ray_groupby, pandas_groupby)
test_cov(ray_groupby, pandas_groupby)
transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
test_transform(ray_groupby, pandas_groupby, func)
pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
test_pipe(ray_groupby, pandas_groupby, func)
test_corr(ray_groupby, pandas_groupby)
test_fillna(ray_groupby, pandas_groupby)
test_count(ray_groupby, pandas_groupby)
test_tail(ray_groupby, pandas_groupby, n)
test_quantile(ray_groupby, pandas_groupby)
test_take(ray_groupby, pandas_groupby)
def test_large_row_groupby():
pandas_df = pandas.DataFrame(np.random.randint(0, 8, size=(100, 4)),
columns=list('ABCD'))
ray_df = from_pandas(pandas_df, 2)
by = pandas_df['A'].tolist()
n = 4
ray_groupby = ray_df.groupby(by=by)
pandas_groupby = pandas_df.groupby(by=by)
ray_groupby_equals_pandas(ray_groupby, pandas_groupby)
test_ngroups(ray_groupby, pandas_groupby)
test_skew(ray_groupby, pandas_groupby)
test_ffill(ray_groupby, pandas_groupby)
test_sem(ray_groupby, pandas_groupby)
test_mean(ray_groupby, pandas_groupby)
test_any(ray_groupby, pandas_groupby)
test_min(ray_groupby, pandas_groupby)
test_idxmax(ray_groupby, pandas_groupby)
test_ndim(ray_groupby, pandas_groupby)
test_cumsum(ray_groupby, pandas_groupby)
test_pct_change(ray_groupby, pandas_groupby)
test_cummax(ray_groupby, pandas_groupby)
apply_functions = [lambda df: df.sum(), lambda df: -df]
for func in apply_functions:
test_apply(ray_groupby, pandas_groupby, func)
test_dtypes(ray_groupby, pandas_groupby)
test_first(ray_groupby, pandas_groupby)
test_backfill(ray_groupby, pandas_groupby)
test_cummin(ray_groupby, pandas_groupby)
test_bfill(ray_groupby, pandas_groupby)
test_idxmin(ray_groupby, pandas_groupby)
# test_prod(ray_groupby, pandas_groupby) causes overflows
test_std(ray_groupby, pandas_groupby)
agg_functions = ['min', 'max']
for func in agg_functions:
test_agg(ray_groupby, pandas_groupby, func)
test_aggregate(ray_groupby, pandas_groupby, func)
test_last(ray_groupby, pandas_groupby)
test_mad(ray_groupby, pandas_groupby)
test_rank(ray_groupby, pandas_groupby)
test_max(ray_groupby, pandas_groupby)
test_var(ray_groupby, pandas_groupby)
test_len(ray_groupby, pandas_groupby)
test_sum(ray_groupby, pandas_groupby)
test_ngroup(ray_groupby, pandas_groupby)
test_nunique(ray_groupby, pandas_groupby)
test_median(ray_groupby, pandas_groupby)
test_head(ray_groupby, pandas_groupby, n)
# test_cumprod(ray_groupby, pandas_groupby) causes overflows
test_cov(ray_groupby, pandas_groupby)
transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
test_transform(ray_groupby, pandas_groupby, func)
pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
test_pipe(ray_groupby, pandas_groupby, func)
test_corr(ray_groupby, pandas_groupby)
test_fillna(ray_groupby, pandas_groupby)
test_count(ray_groupby, pandas_groupby)
test_tail(ray_groupby, pandas_groupby, n)
test_quantile(ray_groupby, pandas_groupby)
test_take(ray_groupby, pandas_groupby)
def test_simple_col_groupby():
pandas_df = pandas.DataFrame({'col1': [0, 3, 2, 3],
'col2': [4, 1, 6, 7],
'col3': [3, 8, 2, 10],
'col4': [1, 13, 6, 15],
'col5': [-4, 5, 6, -7]})
ray_df = from_pandas(pandas_df, 2)
by = [1, 2, 3, 2, 1]
ray_groupby = ray_df.groupby(axis=1, by=by)
pandas_groupby = pandas_df.groupby(axis=1, by=by)
ray_groupby_equals_pandas(ray_groupby, pandas_groupby)
test_ngroups(ray_groupby, pandas_groupby)
test_skew(ray_groupby, pandas_groupby)
test_ffill(ray_groupby, pandas_groupby)
test_sem(ray_groupby, pandas_groupby)
test_mean(ray_groupby, pandas_groupby)
test_any(ray_groupby, pandas_groupby)
test_min(ray_groupby, pandas_groupby)
test_ndim(ray_groupby, pandas_groupby)
if not PY2:
# idxmax and idxmin fail on column groupby in pandas with python2
test_idxmax(ray_groupby, pandas_groupby)
test_idxmin(ray_groupby, pandas_groupby)
test_rank(ray_groupby, pandas_groupby)
test_quantile(ray_groupby, pandas_groupby)
# https://github.com/pandas-dev/pandas/issues/21127
# test_cumsum(ray_groupby, pandas_groupby)
# test_cummax(ray_groupby, pandas_groupby)
# test_cummin(ray_groupby, pandas_groupby)
# test_cumprod(ray_groupby, pandas_groupby)
test_pct_change(ray_groupby, pandas_groupby)
apply_functions = [lambda df: -df, lambda df: df.sum(axis=1)]
for func in apply_functions:
test_apply(ray_groupby, pandas_groupby, func)
test_first(ray_groupby, pandas_groupby)
test_backfill(ray_groupby, pandas_groupby)
test_bfill(ray_groupby, pandas_groupby)
test_prod(ray_groupby, pandas_groupby)
test_std(ray_groupby, pandas_groupby)
test_last(ray_groupby, pandas_groupby)
test_mad(ray_groupby, pandas_groupby)
test_max(ray_groupby, pandas_groupby)
test_var(ray_groupby, pandas_groupby)
test_len(ray_groupby, pandas_groupby)
test_sum(ray_groupby, pandas_groupby)
# Pandas fails on this case with ValueError
# test_ngroup(ray_groupby, pandas_groupby)
# test_nunique(ray_groupby, pandas_groupby)
test_median(ray_groupby, pandas_groupby)
test_cov(ray_groupby, pandas_groupby)
transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
test_transform(ray_groupby, pandas_groupby, func)
pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
test_pipe(ray_groupby, pandas_groupby, func)
test_corr(ray_groupby, pandas_groupby)
test_fillna(ray_groupby, pandas_groupby)
test_count(ray_groupby, pandas_groupby)
test_take(ray_groupby, pandas_groupby)
@pytest.fixture
def test_ngroups(ray_groupby, pandas_groupby):
assert ray_groupby.ngroups == pandas_groupby.ngroups
@pytest.fixture
def test_skew(ray_groupby, pandas_groupby):
ray_df_almost_equals_pandas(ray_groupby.skew(), pandas_groupby.skew())
@pytest.fixture
def test_ffill(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.ffill(), pandas_groupby.ffill())
@pytest.fixture
def test_sem(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.sem()
@pytest.fixture
def test_mean(ray_groupby, pandas_groupby):
ray_df_almost_equals_pandas(ray_groupby.mean(), pandas_groupby.mean())
@pytest.fixture
def test_any(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.any(), pandas_groupby.any())
@pytest.fixture
def test_min(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.min(), pandas_groupby.min())
@pytest.fixture
def test_idxmax(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.idxmax(), pandas_groupby.idxmax())
@pytest.fixture
def test_ndim(ray_groupby, pandas_groupby):
assert ray_groupby.ndim == pandas_groupby.ndim
@pytest.fixture
def test_cumsum(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.cumsum(), pandas_groupby.cumsum())
ray_df_equals_pandas(ray_groupby.cumsum(axis=1),
pandas_groupby.cumsum(axis=1))
@pytest.fixture
def test_pct_change(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.pct_change()
@pytest.fixture
def test_cummax(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.cummax(), pandas_groupby.cummax())
ray_df_equals_pandas(ray_groupby.cummax(axis=1),
pandas_groupby.cummax(axis=1))
@pytest.fixture
def test_apply(ray_groupby, pandas_groupby, func):
ray_df_equals_pandas(ray_groupby.apply(func), pandas_groupby.apply(func))
@pytest.fixture
def test_dtypes(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.dtypes, pandas_groupby.dtypes)
@pytest.fixture
def test_first(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.first()
@pytest.fixture
def test_backfill(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.backfill(), pandas_groupby.backfill())
@pytest.fixture
def test_cummin(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.cummin(), pandas_groupby.cummin())
ray_df_equals_pandas(ray_groupby.cummin(axis=1),
pandas_groupby.cummin(axis=1))
@pytest.fixture
def test_bfill(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.bfill(), pandas_groupby.bfill())
@pytest.fixture
def test_idxmin(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.idxmin(), pandas_groupby.idxmin())
@pytest.fixture
def test_prod(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.prod(), pandas_groupby.prod())
@pytest.fixture
def test_std(ray_groupby, pandas_groupby):
ray_df_almost_equals_pandas(ray_groupby.std(), pandas_groupby.std())
@pytest.fixture
def test_aggregate(ray_groupby, pandas_groupby, func):
ray_df_equals_pandas(ray_groupby.aggregate(func),
pandas_groupby.aggregate(func))
@pytest.fixture
def test_agg(ray_groupby, pandas_groupby, func):
ray_df_equals_pandas(ray_groupby.agg(func), pandas_groupby.agg(func))
@pytest.fixture
def test_last(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.last()
@pytest.fixture
def test_mad(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.mad()
@pytest.fixture
def test_rank(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.rank(), pandas_groupby.rank())
@pytest.fixture
def test_max(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.max(), pandas_groupby.max())
@pytest.fixture
def test_var(ray_groupby, pandas_groupby):
ray_df_almost_equals_pandas(ray_groupby.var(), pandas_groupby.var())
@pytest.fixture
def test_len(ray_groupby, pandas_groupby):
assert len(ray_groupby) == len(pandas_groupby)
@pytest.fixture
def test_sum(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.sum(), pandas_groupby.sum())
@pytest.fixture
def test_ngroup(ray_groupby, pandas_groupby):
ray_series_equals_pandas(ray_groupby.ngroup(), pandas_groupby.ngroup())
@pytest.fixture
def test_nunique(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.nunique(), pandas_groupby.nunique())
@pytest.fixture
def test_median(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.median(), pandas_groupby.median())
@pytest.fixture
def test_head(ray_groupby, pandas_groupby, n):
ray_df_equals_pandas(ray_groupby.head(n=n), pandas_groupby.head(n=n))
@pytest.fixture
def test_cumprod(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.cumprod(), pandas_groupby.cumprod())
ray_df_equals_pandas(ray_groupby.cumprod(axis=1),
pandas_groupby.cumprod(axis=1))
@pytest.fixture
def test_cov(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.cov()
@pytest.fixture
def test_transform(ray_groupby, pandas_groupby, func):
ray_df_equals_pandas(ray_groupby.transform(func),
pandas_groupby.transform(func))
@pytest.fixture
def test_corr(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.corr()
@pytest.fixture
def test_fillna(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.fillna(method="ffill"),
pandas_groupby.fillna(method="ffill"))
@pytest.fixture
def test_count(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.count(), pandas_groupby.count())
@pytest.fixture
def test_pipe(ray_groupby, pandas_groupby, func):
ray_df_equals_pandas(ray_groupby.pipe(func), pandas_groupby.pipe(func))
@pytest.fixture
def test_tail(ray_groupby, pandas_groupby, n):
ray_df_equals_pandas(ray_groupby.tail(n=n), pandas_groupby.tail(n=n))
@pytest.fixture
def test_quantile(ray_groupby, pandas_groupby):
ray_df_equals_pandas(ray_groupby.quantile(q=0.4),
pandas_groupby.quantile(q=0.4))
@pytest.fixture
def test_take(ray_groupby, pandas_groupby):
with pytest.raises(NotImplementedError):
ray_groupby.take(indices=[1])

View file

@ -271,7 +271,12 @@ def _create_block_partitions(partitions, axis=0, length=None):
# In the case that axis is 1 we have to transpose because we build the
# columns into rows. Fortunately numpy is efficient at this.
return np.array(x) if axis == 0 else np.array(x).T
blocks = np.array(x) if axis == 0 else np.array(x).T
# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
return fix_blocks_dimensions(blocks, axis)
@ray.remote
@ -299,6 +304,7 @@ def create_blocks_helper(df, npartitions, axis):
for block in blocks:
block.columns = pd.RangeIndex(0, len(block.columns))
block.reset_index(inplace=True, drop=True)
return blocks
@ -372,12 +378,10 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df):
df = pd.concat(df, axis=axis ^ 1)
if axis == 1:
df.index = old_index
df = df.reindex(new_index, copy=False)
df.reset_index(inplace=True, drop=True)
elif axis == 0:
df.columns = old_index
df = df.reindex(columns=new_index, copy=False)
df.columns = pd.RangeIndex(len(df.columns))
df = df.reindex(new_index, copy=False, axis=axis ^ 1)
return create_blocks_helper(df, npartitions, axis)
@ -461,3 +465,11 @@ def _correct_column_dtypes(*column):
"""
concat_column = pd.concat(column, copy=False)
return create_blocks_helper(concat_column, len(column), 1)
def fix_blocks_dimensions(blocks, axis):
"""Checks that blocks is 2D, and adds a dimension if not.
"""
if blocks.ndim < 2:
return np.expand_dims(blocks, axis=axis ^ 1)
return blocks