[DataFrame] MVP (1/4) (#1495)

* Implement __{getitem, delitem, copy, deepcopy}__

* Implement all(), any()

* Revert "Implement all(), any()"

This reverts commit 784052414f063662cdb30943297dc9ddfd3ca300.

* Address Comments + Fix axis indexing

* Update syntax for test_axes

* Implement bfill, bool, count

* Implement round

* Resolve bfill inplace issue

* Deimplement bfill; wait for the distributed version

* Fix format

* Copy df for __delitem__
This commit is contained in:
Simon Mo 2018-02-03 09:26:18 -08:00 committed by Robert Nishihara
parent b948405532
commit 0a79442954
2 changed files with 194 additions and 61 deletions

View file

@ -15,7 +15,8 @@ class DataFrame(object):
Args: Args:
df ([ObjectID]): The list of ObjectIDs that contain the dataframe df ([ObjectID]): The list of ObjectIDs that contain the dataframe
partitions. partitions.
columns ([str]): The list of column names for this dataframe. columns (pandas.Index): The column names for this dataframe, in
pandas Index object.
""" """
assert(len(df) > 0) assert(len(df) > 0)
@ -36,7 +37,13 @@ class DataFrame(object):
The union of all indexes across the partitions. The union of all indexes across the partitions.
""" """
indices = ray.get(self._map_partitions(lambda df: df.index)._df) indices = ray.get(self._map_partitions(lambda df: df.index)._df)
return indices[0].append(indices[1:]) if isinstance(indices[0], pd.RangeIndex):
merged = indices[0]
for index in indices[1:]:
merged = merged.union(index)
return merged
else:
return indices[0].append(indices[1:])
@property @property
def size(self): def size(self):
@ -346,11 +353,49 @@ class DataFrame(object):
def all(self, axis=None, bool_only=None, skipna=None, level=None, def all(self, axis=None, bool_only=None, skipna=None, level=None,
**kwargs): **kwargs):
raise NotImplementedError("Not Yet implemented.") """Return whether all elements are True over requested axis
Note:
If axis=None or axis=0, this call applies df.all(axis=1)
to the transpose of df.
"""
if axis is None or axis == 0:
df = self.T
axis = 1
ordered_index = df.columns
else:
df = self
ordered_index = df.index
mapped = df._map_partitions(lambda df: df.all(axis,
bool_only,
skipna,
level,
**kwargs))
return to_pandas(mapped)[ordered_index]
def any(self, axis=None, bool_only=None, skipna=None, level=None, def any(self, axis=None, bool_only=None, skipna=None, level=None,
**kwargs): **kwargs):
raise NotImplementedError("Not Yet implemented.") """Return whether all elements are True over requested axis
Note:
If axis=None or axis=0, this call applies df.all(axis=1)
to the transpose of df.
"""
if axis is None or axis == 0:
df = self.T
axis = 1
ordered_index = df.columns
else:
df = self
ordered_index = df.index
mapped = df._map_partitions(lambda df: df.any(axis,
bool_only,
skipna,
level,
**kwargs))
return to_pandas(mapped)[ordered_index]
def append(self, other, ignore_index=False, verify_integrity=False): def append(self, other, ignore_index=False, verify_integrity=False):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
@ -389,7 +434,21 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def bool(self): def bool(self):
raise NotImplementedError("Not Yet implemented.") """Return the bool of a single element PandasObject.
This must be a boolean scalar value, either True or False. Raise a
ValueError if the PandasObject does not have exactly 1 element, or that
element is not boolean
"""
shape = self.shape
if shape != (1,) and shape != (1, 1):
raise ValueError("""The PandasObject does not have exactly
1 element. Return the bool of a single
element PandasObject. The truth value is
ambiguous. Use a.empty, a.item(), a.any()
or a.all().""")
else:
return to_pandas(self).bool()
def boxplot(self, column=None, by=None, ax=None, fontsize=None, rot=0, def boxplot(self, column=None, by=None, ax=None, fontsize=None, rot=0,
grid=True, figsize=None, layout=None, return_type=None, grid=True, figsize=None, layout=None, return_type=None,
@ -429,7 +488,15 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def count(self, axis=0, level=None, numeric_only=False): def count(self, axis=0, level=None, numeric_only=False):
raise NotImplementedError("Not Yet implemented.") if axis == 1:
original_index = self.index
return self.T.count(axis=0,
level=level,
numeric_only=numeric_only)[original_index]
else:
return sum(ray.get(self._map_partitions(lambda df: df.count(
axis=axis, level=level, numeric_only=numeric_only
))._df))
def cov(self, min_periods=None): def cov(self, min_periods=None):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
@ -784,7 +851,9 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def round(self, decimals=0, *args, **kwargs): def round(self, decimals=0, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.") return self._map_partitions(lambda df: df.round(decimals=decimals,
*args,
**kwargs))
def rpow(self, other, axis='columns', level=None, fill_value=None): def rpow(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
@ -1008,7 +1077,17 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def __getitem__(self, key): def __getitem__(self, key):
raise NotImplementedError("Not Yet implemented.") """Get the column specified by key for this DataFrame.
Args:
key : The column name.
Returns:
A Pandas Series representing the value fo the column.
"""
result_column_chunks = self._map_partitions(
lambda df: df.__getitem__(key))
return to_pandas(result_column_chunks)
def __setitem__(self, key, value): def __setitem__(self, key, value):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
@ -1056,16 +1135,44 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def __delitem__(self, key): def __delitem__(self, key):
raise NotImplementedError("Not Yet implemented.") """Delete an item by key. `del a[key]` for example.
Operation happnes in place.
Args:
key: key to delete
"""
def del_helper(df):
df.__delitem__(key)
return df
self._df = self._map_partitions(del_helper)._df
self.columns = self.columns.drop(key)
def __finalize__(self, other, method=None, **kwargs): def __finalize__(self, other, method=None, **kwargs):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
def __copy__(self, deep=True): def __copy__(self, deep=True):
raise NotImplementedError("Not Yet implemented.") """Make a copy using Ray.DataFrame.copy method
Args:
deep: Boolean, deep copy or not.
Currently we do not support deep copy.
Returns:
A Ray DataFrame object.
"""
return self.copy(deep=deep)
def __deepcopy__(self, memo=None): def __deepcopy__(self, memo=None):
raise NotImplementedError("Not Yet implemented.") """Make a -deep- copy using Ray.DataFrame.copy method
This is equivalent to copy(deep=True).
Args:
memo: No effect. Just to comply with Pandas API.
Returns:
A Ray DataFrame object.
"""
return self.copy(deep=True)
def __and__(self, other): def __and__(self, other):
raise NotImplementedError("Not Yet implemented.") raise NotImplementedError("Not Yet implemented.")
@ -1266,7 +1373,7 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
def to_pandas(df): def to_pandas(df):
"""Converts a Ray DataFrame to a pandas DataFrame. """Converts a Ray DataFrame to a pandas DataFrame/Series.
Args: Args:
df (ray.DataFrame): The Ray DataFrame to convert. df (ray.DataFrame): The Ray DataFrame to convert.

View file

@ -46,7 +46,8 @@ def test_values(ray_df, pandas_df):
@pytest.fixture @pytest.fixture
def test_axes(ray_df, pandas_df): def test_axes(ray_df, pandas_df):
assert(np.array_equal(ray_df.axes, pandas_df.axes)) for ray_axis, pd_axis in zip(ray_df.axes, pandas_df.axes):
assert (np.array_equal(ray_axis, pd_axis))
@pytest.fixture @pytest.fixture
@ -113,7 +114,8 @@ def create_test_dataframe():
df = pd.DataFrame({'col1': [0, 1, 2, 3], df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7], 'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11], 'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15]}) 'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
return rdf.from_pandas(df, 2) return rdf.from_pandas(df, 2)
@ -124,8 +126,8 @@ def test_int_dataframe():
pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3], pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7], 'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11], 'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15]}) 'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
ray_df = rdf.from_pandas(pandas_df, 2) ray_df = rdf.from_pandas(pandas_df, 2)
testfuncs = [lambda x: x + 1, testfuncs = [lambda x: x + 1,
@ -153,6 +155,16 @@ def test_int_dataframe():
test_abs(ray_df, pandas_df) test_abs(ray_df, pandas_df)
test_keys(ray_df, pandas_df) test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df) test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
def test_float_dataframe(): def test_float_dataframe():
@ -160,7 +172,8 @@ def test_float_dataframe():
pandas_df = pd.DataFrame({'col1': [0.0, 1.0, 2.0, 3.0], pandas_df = pd.DataFrame({'col1': [0.0, 1.0, 2.0, 3.0],
'col2': [4.0, 5.0, 6.0, 7.0], 'col2': [4.0, 5.0, 6.0, 7.0],
'col3': [8.0, 9.0, 10.0, 11.0], 'col3': [8.0, 9.0, 10.0, 11.0],
'col4': [12.0, 13.0, 14.0, 15.0]}) 'col4': [12.0, 13.0, 14.0, 15.0],
'col5': [0.0, 0.0, 0.0, 0.0]})
ray_df = rdf.from_pandas(pandas_df, 2) ray_df = rdf.from_pandas(pandas_df, 2)
@ -189,6 +202,16 @@ def test_float_dataframe():
test_abs(ray_df, pandas_df) test_abs(ray_df, pandas_df)
test_keys(ray_df, pandas_df) test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df) test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
def test_add(): def test_add():
@ -219,18 +242,16 @@ def test_align():
ray_df.align(None) ray_df.align(None)
def test_all(): @pytest.fixture
ray_df = create_test_dataframe() def test_all(ray_df, pd_df):
assert pd_df.all().equals(ray_df.all())
with pytest.raises(NotImplementedError): assert pd_df.all(axis=1).equals(ray_df.all(axis=1))
ray_df.all()
def test_any(): @pytest.fixture
ray_df = create_test_dataframe() def test_any(ray_df, pd_df):
assert pd_df.any().equals(ray_df.any())
with pytest.raises(NotImplementedError): assert pd_df.any(axis=1).equals(ray_df.any(axis=1))
ray_df.any()
def test_append(): def test_append():
@ -307,14 +328,19 @@ def test_bfill():
ray_df = create_test_dataframe() ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
ray_df.bfill() ray_df.between_time(None, None)
def test_bool(): @pytest.fixture
ray_df = create_test_dataframe() def test_bool(ray_df, pd_df):
with pytest.raises(ValueError):
with pytest.raises(NotImplementedError):
ray_df.bool() ray_df.bool()
pd_df.bool()
single_bool_pd_df = pd.DataFrame([True])
single_bool_ray_df = rdf.from_pandas(single_bool_pd_df, 1)
assert single_bool_pd_df.bool() == single_bool_ray_df.bool()
def test_boxplot(): def test_boxplot():
@ -394,11 +420,10 @@ def test_corrwith():
ray_df.corrwith(None) ray_df.corrwith(None)
def test_count(): @pytest.fixture
ray_df = create_test_dataframe() def test_count(ray_df, pd_df):
assert ray_df.count().equals(pd_df.count())
with pytest.raises(NotImplementedError): assert ray_df.count(axis=1).equals(pd_df.count(axis=1))
ray_df.count()
def test_cov(): def test_cov():
@ -1100,11 +1125,10 @@ def test_rolling():
ray_df.rolling(None) ray_df.rolling(None)
def test_round(): @pytest.fixture
ray_df = create_test_dataframe() def test_round(ray_df, pd_df):
assert ray_df_equals_pandas(ray_df.round(), pd_df.round())
with pytest.raises(NotImplementedError): assert ray_df_equals_pandas(ray_df.round(1), pd_df.round(1))
ray_df.round()
def test_rpow(): def test_rpow():
@ -1520,11 +1544,13 @@ def test_xs():
ray_df.xs(None) ray_df.xs(None)
def test___getitem__(): @pytest.fixture
ray_df = create_test_dataframe() def test___getitem__(ray_df, pd_df):
ray_col = ray_df.__getitem__('col1')
assert isinstance(ray_col, pd.Series)
with pytest.raises(NotImplementedError): pd_col = pd_df['col1']
ray_df.__getitem__(None) assert pd_col.equals(ray_col)
def test___setitem__(): def test___setitem__():
@ -1639,11 +1665,13 @@ def test___setstate__():
ray_df.__setstate__(None) ray_df.__setstate__(None)
def test___delitem__(): @pytest.fixture
ray_df = create_test_dataframe() def test___delitem__(ray_df, pd_df):
ray_df = ray_df.copy()
with pytest.raises(NotImplementedError): pd_df = pd_df.copy()
ray_df.__delitem__(None) ray_df.__delitem__('col1')
pd_df.__delitem__('col1')
ray_df_equals_pandas(ray_df, pd_df)
def test___finalize__(): def test___finalize__():
@ -1653,18 +1681,16 @@ def test___finalize__():
ray_df.__finalize__(None) ray_df.__finalize__(None)
def test___copy__(): @pytest.fixture
ray_df = create_test_dataframe() def test___copy__(ray_df, pd_df):
ray_df_copy, pd_df_copy = ray_df.__copy__(), pd_df.__copy__()
with pytest.raises(NotImplementedError): assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)
ray_df.__copy__()
def test___deepcopy__(): @pytest.fixture
ray_df = create_test_dataframe() def test___deepcopy__(ray_df, pd_df):
ray_df_copy, pd_df_copy = ray_df.__deepcopy__(), pd_df.__deepcopy__()
with pytest.raises(NotImplementedError): assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)
ray_df.__deepcopy__()
def test_blocks(): def test_blocks():