[DataFrame] Implemented cummax, cummin, cumsum, cumprod (#1705)

* cummax, cummin, cumsum, cumprod

* added remote function

* Fix lint

* Fixing tests and linting

* Fix lint
This commit is contained in:
Devin Petersohn 2018-03-13 10:06:34 -07:00 committed by GitHub
parent 737120952e
commit 8c1066cdba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 46 deletions

View file

@ -24,7 +24,8 @@ from .utils import (
_shuffle,
_local_groupby,
_deploy_func,
_compute_length_and_index)
_compute_length_and_index,
_prepend_partitions)
class DataFrame(object):
@ -698,24 +699,127 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def cummax(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative maximum across the DataFrame.
Args:
axis (int): The axis to take maximum on.
skipna (bool): True to skip NA values, false otherwise.
Returns:
The cumulative maximum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cummax(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_max = [_deploy_func.remote(
lambda df: pd.DataFrame(df.max()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_max, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cummax(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df
def cummin(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative minimum across the DataFrame.
Args:
axis (int): The axis to cummin on.
skipna (bool): True to skip NA values, false otherwise.
Returns:
The cumulative minimum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cummin(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_min = [_deploy_func.remote(
lambda df: pd.DataFrame(df.min()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_min, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cummin(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df
def cumprod(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative product across the DataFrame.
Args:
axis (int): The axis to take product on.
skipna (bool): True to skip NA values, false otherwise.
Returns:
The cumulative product of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cumprod(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_prod = [_deploy_func.remote(
lambda df: pd.DataFrame(df.prod()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_prod, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cumprod(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df
def cumsum(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative sum across the DataFrame.
Args:
axis (int): The axis to take sum on.
skipna (bool): True to skip NA values, false otherwise.
Returns:
The cumulative sum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cumsum(axis=axis, skipna=skipna,
*args, **kwargs))
else:
# first take the sum of each partition,
# append the sums of all previous partitions to current partition
# take cumsum and remove the appended rows
local_sum = [_deploy_func.remote(
lambda df: pd.DataFrame(df.sum()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_sum, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cumsum(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df
def describe(self, percentiles=None, include=None, exclude=None):
raise NotImplementedError(
@ -1503,7 +1607,7 @@ class DataFrame(object):
Returns:
The max of the DataFrame.
"""
if(axis == 1):
if axis == 1:
return self._map_partitions(
lambda df: df.max(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only, **kwargs))
@ -1553,7 +1657,7 @@ class DataFrame(object):
Returns:
The min of the DataFrame.
"""
if(axis == 1):
if axis == 1:
return self._map_partitions(
lambda df: df.min(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only, **kwargs))

View file

@ -238,6 +238,10 @@ def test_int_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)
test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
@ -333,6 +337,10 @@ def test_float_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)
test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
@ -451,6 +459,10 @@ def test_mixed_dtype_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
# test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)
test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
@ -558,6 +570,10 @@ def test_nan_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)
test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
@ -824,32 +840,24 @@ def test_cov():
ray_df.cov()
def test_cummax():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.cummax()
@pytest.fixture
def test_cummax(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cummax(), pandas_df.cummax()))
def test_cummin():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.cummin()
@pytest.fixture
def test_cummin(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cummin(), pandas_df.cummin()))
def test_cumprod():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.cumprod()
@pytest.fixture
def test_cumprod(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cumprod(), pandas_df.cumprod()))
def test_cumsum():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.cumsum()
@pytest.fixture
def test_cumsum(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cumsum(), pandas_df.cumsum()))
def test_describe():
@ -1897,7 +1905,7 @@ def test_prod():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.prod()
ray_df.prod(None)
def test_product():
@ -1985,7 +1993,7 @@ def test_rename_sanity(num_partitions=2):
'D': 'd'
}
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
assert ray_df_equals_pandas(
ray_df.rename(columns=mapping),
test_data.frame.rename(columns=mapping)
@ -1997,7 +2005,7 @@ def test_rename_sanity(num_partitions=2):
renamed2
)
ray_df = rdf.from_pandas(renamed2, num_partitions)
ray_df = from_pandas(renamed2, num_partitions)
assert ray_df_equals_pandas(
ray_df.rename(columns=str.upper),
renamed2.rename(columns=str.upper)
@ -2010,7 +2018,7 @@ def test_rename_sanity(num_partitions=2):
# gets sorted alphabetical
df = pd.DataFrame(data)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
tm.assert_index_equal(
ray_df.rename(index={'foo': 'bar', 'bar': 'foo'}).index,
df.rename(index={'foo': 'bar', 'bar': 'foo'}).index
@ -2026,7 +2034,7 @@ def test_rename_sanity(num_partitions=2):
# partial columns
renamed = test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'})
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
tm.assert_index_equal(
ray_df.rename(columns={'C': 'foo', 'D': 'bar'}).index,
test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'}).index
@ -2044,7 +2052,7 @@ def test_rename_sanity(num_partitions=2):
index = pd.Index(['foo', 'bar'], name='name')
renamer = pd.DataFrame(data, index=index)
ray_df = rdf.from_pandas(renamer, num_partitions)
ray_df = from_pandas(renamer, num_partitions)
renamed = renamer.rename(index={'foo': 'bar', 'bar': 'foo'})
ray_renamed = ray_df.rename(index={'foo': 'bar', 'bar': 'foo'})
tm.assert_index_equal(
@ -2062,7 +2070,7 @@ def test_rename_multiindex(num_partitions=2):
columns = pd.MultiIndex.from_tuples(
tuples_columns, names=['fizz', 'buzz'])
df = pd.DataFrame([(0, 0), (1, 1)], index=index, columns=columns)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
#
# without specifying level -> accross all levels
@ -2133,7 +2141,7 @@ def test_rename_multiindex(num_partitions=2):
@pytest.fixture
def test_rename_nocopy(num_partitions=2):
test_data = TestData().frame
ray_df = rdf.from_pandas(test_data, num_partitions)
ray_df = from_pandas(test_data, num_partitions)
ray_renamed = ray_df.rename(columns={'C': 'foo'}, copy=False)
ray_renamed['foo'] = 1
assert (ray_df['C'] == 1).all()
@ -2142,7 +2150,7 @@ def test_rename_nocopy(num_partitions=2):
@pytest.fixture
def test_rename_inplace(num_partitions=2):
test_data = TestData().frame
ray_df = rdf.from_pandas(test_data, num_partitions)
ray_df = from_pandas(test_data, num_partitions)
assert ray_df_equals_pandas(
ray_df.rename(columns={'C': 'foo'}),
@ -2165,7 +2173,7 @@ def test_rename_bug(num_partitions=2):
# GH 5344
# rename set ref_locs, and set_index was not resetting
df = pd.DataFrame({0: ['foo', 'bar'], 1: ['bah', 'bas'], 2: [1, 2]})
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
df = df.rename(columns={0: 'a'})
df = df.rename(columns={1: 'b'})
# TODO: Uncomment when set_index is implemented
@ -2191,7 +2199,7 @@ def test_rename_axis():
@pytest.fixture
def test_rename_axis_inplace(num_partitions=2):
test_frame = TestData().frame
ray_df = rdf.from_pandas(test_frame, num_partitions)
ray_df = from_pandas(test_frame, num_partitions)
# GH 15704
result = test_frame.copy()

View file

@ -155,3 +155,10 @@ def _compute_length_and_index(dfs):
for j in range(lengths[i])]}
return lengths, pd.DataFrame(dest_indices)
@ray.remote
def _prepend_partitions(last_vals, index, partition, func):
appended_df = last_vals[:index].append(partition)
cum_df = func(appended_df)
return cum_df[index:]