diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 33093ece6..3089081ec 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -15,7 +15,8 @@ class DataFrame(object): Args: df ([ObjectID]): The list of ObjectIDs that contain the dataframe partitions. - columns ([str]): The list of column names for this dataframe. + columns (pandas.Index): The column names for this dataframe, in + pandas Index object. """ assert(len(df) > 0) @@ -36,7 +37,13 @@ class DataFrame(object): The union of all indexes across the partitions. """ indices = ray.get(self._map_partitions(lambda df: df.index)._df) - return indices[0].append(indices[1:]) + if isinstance(indices[0], pd.RangeIndex): + merged = indices[0] + for index in indices[1:]: + merged = merged.union(index) + return merged + else: + return indices[0].append(indices[1:]) @property def size(self): @@ -346,11 +353,49 @@ class DataFrame(object): def all(self, axis=None, bool_only=None, skipna=None, level=None, **kwargs): - raise NotImplementedError("Not Yet implemented.") + """Return whether all elements are True over requested axis + + Note: + If axis=None or axis=0, this call applies df.all(axis=1) + to the transpose of df. + """ + if axis is None or axis == 0: + df = self.T + axis = 1 + ordered_index = df.columns + else: + df = self + ordered_index = df.index + + mapped = df._map_partitions(lambda df: df.all(axis, + bool_only, + skipna, + level, + **kwargs)) + return to_pandas(mapped)[ordered_index] def any(self, axis=None, bool_only=None, skipna=None, level=None, **kwargs): - raise NotImplementedError("Not Yet implemented.") + """Return whether all elements are True over requested axis + + Note: + If axis=None or axis=0, this call applies df.all(axis=1) + to the transpose of df. + """ + if axis is None or axis == 0: + df = self.T + axis = 1 + ordered_index = df.columns + else: + df = self + ordered_index = df.index + + mapped = df._map_partitions(lambda df: df.any(axis, + bool_only, + skipna, + level, + **kwargs)) + return to_pandas(mapped)[ordered_index] def append(self, other, ignore_index=False, verify_integrity=False): raise NotImplementedError("Not Yet implemented.") @@ -389,7 +434,21 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def bool(self): - raise NotImplementedError("Not Yet implemented.") + """Return the bool of a single element PandasObject. + + This must be a boolean scalar value, either True or False. Raise a + ValueError if the PandasObject does not have exactly 1 element, or that + element is not boolean + """ + shape = self.shape + if shape != (1,) and shape != (1, 1): + raise ValueError("""The PandasObject does not have exactly + 1 element. Return the bool of a single + element PandasObject. The truth value is + ambiguous. Use a.empty, a.item(), a.any() + or a.all().""") + else: + return to_pandas(self).bool() def boxplot(self, column=None, by=None, ax=None, fontsize=None, rot=0, grid=True, figsize=None, layout=None, return_type=None, @@ -429,7 +488,15 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def count(self, axis=0, level=None, numeric_only=False): - raise NotImplementedError("Not Yet implemented.") + if axis == 1: + original_index = self.index + return self.T.count(axis=0, + level=level, + numeric_only=numeric_only)[original_index] + else: + return sum(ray.get(self._map_partitions(lambda df: df.count( + axis=axis, level=level, numeric_only=numeric_only + ))._df)) def cov(self, min_periods=None): raise NotImplementedError("Not Yet implemented.") @@ -784,7 +851,9 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def round(self, decimals=0, *args, **kwargs): - raise NotImplementedError("Not Yet implemented.") + return self._map_partitions(lambda df: df.round(decimals=decimals, + *args, + **kwargs)) def rpow(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1008,7 +1077,17 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __getitem__(self, key): - raise NotImplementedError("Not Yet implemented.") + """Get the column specified by key for this DataFrame. + + Args: + key : The column name. + + Returns: + A Pandas Series representing the value fo the column. + """ + result_column_chunks = self._map_partitions( + lambda df: df.__getitem__(key)) + return to_pandas(result_column_chunks) def __setitem__(self, key, value): raise NotImplementedError("Not Yet implemented.") @@ -1056,16 +1135,44 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __delitem__(self, key): - raise NotImplementedError("Not Yet implemented.") + """Delete an item by key. `del a[key]` for example. + Operation happnes in place. + + Args: + key: key to delete + """ + def del_helper(df): + df.__delitem__(key) + return df + self._df = self._map_partitions(del_helper)._df + self.columns = self.columns.drop(key) def __finalize__(self, other, method=None, **kwargs): raise NotImplementedError("Not Yet implemented.") def __copy__(self, deep=True): - raise NotImplementedError("Not Yet implemented.") + """Make a copy using Ray.DataFrame.copy method + + Args: + deep: Boolean, deep copy or not. + Currently we do not support deep copy. + + Returns: + A Ray DataFrame object. + """ + return self.copy(deep=deep) def __deepcopy__(self, memo=None): - raise NotImplementedError("Not Yet implemented.") + """Make a -deep- copy using Ray.DataFrame.copy method + This is equivalent to copy(deep=True). + + Args: + memo: No effect. Just to comply with Pandas API. + + Returns: + A Ray DataFrame object. + """ + return self.copy(deep=True) def __and__(self, other): raise NotImplementedError("Not Yet implemented.") @@ -1266,7 +1373,7 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): def to_pandas(df): - """Converts a Ray DataFrame to a pandas DataFrame. + """Converts a Ray DataFrame to a pandas DataFrame/Series. Args: df (ray.DataFrame): The Ray DataFrame to convert. diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 9b4caf3d5..d0c168a1e 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -46,7 +46,8 @@ def test_values(ray_df, pandas_df): @pytest.fixture def test_axes(ray_df, pandas_df): - assert(np.array_equal(ray_df.axes, pandas_df.axes)) + for ray_axis, pd_axis in zip(ray_df.axes, pandas_df.axes): + assert (np.array_equal(ray_axis, pd_axis)) @pytest.fixture @@ -113,7 +114,8 @@ def create_test_dataframe(): df = pd.DataFrame({'col1': [0, 1, 2, 3], 'col2': [4, 5, 6, 7], 'col3': [8, 9, 10, 11], - 'col4': [12, 13, 14, 15]}) + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) return rdf.from_pandas(df, 2) @@ -124,8 +126,8 @@ def test_int_dataframe(): pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3], 'col2': [4, 5, 6, 7], 'col3': [8, 9, 10, 11], - 'col4': [12, 13, 14, 15]}) - + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) ray_df = rdf.from_pandas(pandas_df, 2) testfuncs = [lambda x: x + 1, @@ -153,6 +155,16 @@ def test_int_dataframe(): test_abs(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) + test_round(ray_df, pandas_df) + + test_all(ray_df, pandas_df) + test_any(ray_df, pandas_df) + test___getitem__(ray_df, pandas_df) + test___delitem__(ray_df, pandas_df) + test___copy__(ray_df, pandas_df) + test___deepcopy__(ray_df, pandas_df) + test_bool(ray_df, pandas_df) + test_count(ray_df, pandas_df) def test_float_dataframe(): @@ -160,7 +172,8 @@ def test_float_dataframe(): pandas_df = pd.DataFrame({'col1': [0.0, 1.0, 2.0, 3.0], 'col2': [4.0, 5.0, 6.0, 7.0], 'col3': [8.0, 9.0, 10.0, 11.0], - 'col4': [12.0, 13.0, 14.0, 15.0]}) + 'col4': [12.0, 13.0, 14.0, 15.0], + 'col5': [0.0, 0.0, 0.0, 0.0]}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -189,6 +202,16 @@ def test_float_dataframe(): test_abs(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) + test_round(ray_df, pandas_df) + + test_all(ray_df, pandas_df) + test_any(ray_df, pandas_df) + test___getitem__(ray_df, pandas_df) + test___delitem__(ray_df, pandas_df) + test___copy__(ray_df, pandas_df) + test___deepcopy__(ray_df, pandas_df) + test_bool(ray_df, pandas_df) + test_count(ray_df, pandas_df) def test_add(): @@ -219,18 +242,16 @@ def test_align(): ray_df.align(None) -def test_all(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.all() +@pytest.fixture +def test_all(ray_df, pd_df): + assert pd_df.all().equals(ray_df.all()) + assert pd_df.all(axis=1).equals(ray_df.all(axis=1)) -def test_any(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.any() +@pytest.fixture +def test_any(ray_df, pd_df): + assert pd_df.any().equals(ray_df.any()) + assert pd_df.any(axis=1).equals(ray_df.any(axis=1)) def test_append(): @@ -307,14 +328,19 @@ def test_bfill(): ray_df = create_test_dataframe() with pytest.raises(NotImplementedError): - ray_df.bfill() + ray_df.between_time(None, None) -def test_bool(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): +@pytest.fixture +def test_bool(ray_df, pd_df): + with pytest.raises(ValueError): ray_df.bool() + pd_df.bool() + + single_bool_pd_df = pd.DataFrame([True]) + single_bool_ray_df = rdf.from_pandas(single_bool_pd_df, 1) + + assert single_bool_pd_df.bool() == single_bool_ray_df.bool() def test_boxplot(): @@ -394,11 +420,10 @@ def test_corrwith(): ray_df.corrwith(None) -def test_count(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.count() +@pytest.fixture +def test_count(ray_df, pd_df): + assert ray_df.count().equals(pd_df.count()) + assert ray_df.count(axis=1).equals(pd_df.count(axis=1)) def test_cov(): @@ -1100,11 +1125,10 @@ def test_rolling(): ray_df.rolling(None) -def test_round(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.round() +@pytest.fixture +def test_round(ray_df, pd_df): + assert ray_df_equals_pandas(ray_df.round(), pd_df.round()) + assert ray_df_equals_pandas(ray_df.round(1), pd_df.round(1)) def test_rpow(): @@ -1520,11 +1544,13 @@ def test_xs(): ray_df.xs(None) -def test___getitem__(): - ray_df = create_test_dataframe() +@pytest.fixture +def test___getitem__(ray_df, pd_df): + ray_col = ray_df.__getitem__('col1') + assert isinstance(ray_col, pd.Series) - with pytest.raises(NotImplementedError): - ray_df.__getitem__(None) + pd_col = pd_df['col1'] + assert pd_col.equals(ray_col) def test___setitem__(): @@ -1639,11 +1665,13 @@ def test___setstate__(): ray_df.__setstate__(None) -def test___delitem__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__delitem__(None) +@pytest.fixture +def test___delitem__(ray_df, pd_df): + ray_df = ray_df.copy() + pd_df = pd_df.copy() + ray_df.__delitem__('col1') + pd_df.__delitem__('col1') + ray_df_equals_pandas(ray_df, pd_df) def test___finalize__(): @@ -1653,18 +1681,16 @@ def test___finalize__(): ray_df.__finalize__(None) -def test___copy__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__copy__() +@pytest.fixture +def test___copy__(ray_df, pd_df): + ray_df_copy, pd_df_copy = ray_df.__copy__(), pd_df.__copy__() + assert ray_df_equals_pandas(ray_df_copy, pd_df_copy) -def test___deepcopy__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__deepcopy__() +@pytest.fixture +def test___deepcopy__(ray_df, pd_df): + ray_df_copy, pd_df_copy = ray_df.__deepcopy__(), pd_df.__deepcopy__() + assert ray_df_equals_pandas(ray_df_copy, pd_df_copy) def test_blocks():