From 0c477fbbca90e4945201e16dc066ca4f27aa42d4 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Mon, 30 Apr 2018 06:42:07 -0700 Subject: [PATCH] [DataFrame] Implement Inter-DataFrame operations (#1937) --- python/ray/dataframe/concat.py | 35 +- python/ray/dataframe/dataframe.py | 658 ++++++++++++++------ python/ray/dataframe/test/test_dataframe.py | 209 +++---- python/ray/dataframe/utils.py | 41 +- 4 files changed, 626 insertions(+), 317 deletions(-) diff --git a/python/ray/dataframe/concat.py b/python/ray/dataframe/concat.py index 6d1664ca5..952e326ed 100644 --- a/python/ray/dataframe/concat.py +++ b/python/ray/dataframe/concat.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import pandas +import numpy as np from .dataframe import DataFrame from .utils import _reindex_helper @@ -110,21 +111,23 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, # from remote memory built in the previous line. In the future, we won't be # building new DataFrames, rather just partitioning the DataFrames. if axis == 0: - new_rows = [_reindex_helper.remote(part, all_columns[i], - final_columns, axis) - for i in range(len(objs)) - for part in objs[i]._row_partitions] - - return DataFrame(row_partitions=new_rows, - columns=final_columns, - index=final_index) - + new_blocks = np.array([_reindex_helper._submit( + args=tuple([all_columns[i], final_columns, axis, + len(objs[0]._block_partitions)] + part.tolist()), + num_return_vals=len(objs[0]._block_partitions)) + for i in range(len(objs)) + for part in objs[i]._block_partitions]) else: - new_columns = [_reindex_helper.remote(part, all_index[i], - final_index, axis) - for i in range(len(objs)) - for part in objs[i]._col_partitions] + # Transposing the columns is necessary because the remote task treats + # everything like rows and returns in row-major format. Luckily, this + # operation is cheap in numpy. + new_blocks = np.array([_reindex_helper._submit( + args=tuple([all_index[i], final_index, axis, + len(objs[0]._block_partitions.T)] + part.tolist()), + num_return_vals=len(objs[0]._block_partitions.T)) + for i in range(len(objs)) + for part in objs[i]._block_partitions.T]).T - return DataFrame(col_partitions=new_columns, - columns=final_columns, - index=final_index) + return DataFrame(block_partitions=new_blocks, + columns=final_columns, + index=final_index) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index df7da328f..4b67e5266 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -36,7 +36,8 @@ from .utils import ( _blocks_to_row, _create_block_partitions, _inherit_docstrings, - _reindex_helper) + _reindex_helper, + _co_op_helper) from . import get_npartitions from .index_metadata import _IndexMetadata @@ -103,8 +104,7 @@ class DataFrame(object): if block_partitions is not None: # put in numpy array here to make accesses easier since it's 2D self._block_partitions = np.array(block_partitions) - assert self._block_partitions.ndim == 2, \ - "Block Partitions must be 2D." + axis = 0 else: if row_partitions is not None: axis = 0 @@ -125,10 +125,12 @@ class DataFrame(object): # Sometimes we only get a single column or row, which is # problematic for building blocks from the partitions, so we # add whatever dimension we're missing from the input. - if self._block_partitions.ndim != 2: + if self._block_partitions.ndim < 2: self._block_partitions = np.expand_dims(self._block_partitions, axis=axis ^ 1) + assert self._block_partitions.ndim == 2, "Block Partitions must be 2D." + # Create the row and column index objects for using our partitioning. # If the objects haven't been inherited, then generate them if self._row_metadata is None: @@ -278,8 +280,8 @@ class DataFrame(object): return self._repr_helper_()._repr_html_() # We split so that we insert our correct dataframe dimensions. result = self._repr_helper_()._repr_html_() - return result.split('

')[0] + \ - '

{0} rows × {1} columns

\n'.format(len(self.index), + return result.split("

")[0] + \ + "

{0} rows x {1} columns

\n".format(len(self.index), len(self.columns)) def _get_index(self): @@ -456,7 +458,8 @@ class DataFrame(object): return len(self.index), len(self.columns) def _update_inplace(self, row_partitions=None, col_partitions=None, - columns=None, index=None): + block_partitions=None, columns=None, index=None, + col_metadata=None, row_metadata=None): """Updates the current DataFrame inplace. Behavior should be similar to the constructor, given the corresponding @@ -480,22 +483,31 @@ class DataFrame(object): not have enough contextual info to rebuild the indexes correctly based on the addition/subtraction of rows/columns. """ - assert row_partitions is not None or col_partitions is not None, \ + assert row_partitions is not None or col_partitions is not None\ + or block_partitions is not None, \ "To update inplace, new column or row partitions must be set." - if row_partitions is not None: + if block_partitions is not None: + self._block_partitions = block_partitions + elif row_partitions is not None: self._row_partitions = row_partitions elif col_partitions is not None: self._col_partitions = col_partitions - if row_partitions is not None or col_partitions is not None: - # At least one partition list is being updated, so recompute - # lengths and indices - self._row_metadata = _IndexMetadata(self._block_partitions[:, 0], - index=index, axis=0) - self._col_metadata = _IndexMetadata(self._block_partitions[0, :], - index=columns, axis=1) + if col_metadata is not None: + self._col_metadata = col_metadata + else: + assert columns is not None, \ + "Columns must be passed without col_metadata" + self._col_metadata = _IndexMetadata( + self._block_partitions[0, :], index=columns, axis=1) + if row_metadata is not None: + self._row_metadata = row_metadata + else: + # Index can be None for default index, so we don't check + self._row_metadata = _IndexMetadata( + self._block_partitions[:, 0], index=index, axis=0) def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -714,9 +726,20 @@ class DataFrame(object): raise NotImplementedError("Not yet") def add(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Add this DataFrame to another or a scalar/list. + + Args: + other: What to add this this DataFrame. + axis: The axis to apply addition over. Only applicaable to Series + or list 'other'. + level: A level in the multilevel axis to add over. + fill_value: The value to fill NaN. + + Returns: + A new DataFrame with the applied addition. + """ + return self._operator_helper(pd.DataFrame.add, other, axis, level, + fill_value) def agg(self, func, axis=0, *args, **kwargs): return self.aggregate(func, axis, *args, **kwargs) @@ -953,6 +976,7 @@ class DataFrame(object): to_concat = [self] + other else: to_concat = [self, other] + return concat(to_concat, ignore_index=ignore_index, verify_integrity=verify_integrity) @@ -1261,14 +1285,33 @@ class DataFrame(object): "github.com/ray-project/ray.") def div(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Divides this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the divide against this. + axis: The axis to divide over. + level: The Multilevel index level to apply divide over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Divide applied. + """ + return self._operator_helper(pd.DataFrame.add, other, axis, level, + fill_value) def divide(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Synonym for div. + + Args: + other: The object to use to apply the divide against this. + axis: The axis to divide over. + level: The Multilevel index level to apply divide over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Divide applied. + """ + return self.div(other, axis, level, fill_value) def dot(self, other): raise NotImplementedError( @@ -1419,9 +1462,17 @@ class DataFrame(object): "github.com/ray-project/ray.") def eq(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is equal to other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the eq over. + level: The Multilevel index level to apply eq over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.eq, other, axis, level) def equals(self, other): """ @@ -1688,9 +1739,19 @@ class DataFrame(object): return self._row_metadata.first_valid_index() def floordiv(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Divides this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the divide against this. + axis: The axis to divide over. + level: The Multilevel index level to apply divide over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Divide applied. + """ + return self._operator_helper(pd.DataFrame.floordiv, other, axis, level, + fill_value) @classmethod def from_csv(self, path, header=0, sep=', ', index_col=0, @@ -1720,9 +1781,17 @@ class DataFrame(object): "github.com/ray-project/ray.") def ge(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is greater than or equal to other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the gt over. + level: The Multilevel index level to apply gt over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.ge, other, axis, level) def get(self, key, default=None): """Get item from object for given key (DataFrame column, Panel @@ -1770,9 +1839,17 @@ class DataFrame(object): "github.com/ray-project/ray.") def gt(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is greater than other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the gt over. + level: The Multilevel index level to apply gt over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.gt, other, axis, level) def head(self, n=5): """Get the first n rows of the dataframe. @@ -2104,24 +2181,35 @@ class DataFrame(object): .join(pd.DataFrame(columns=other.columns), lsuffix=lsuffix, rsuffix=rsuffix).columns + new_partition_num = max(len(self._block_partitions.T), + len(other._block_partitions.T)) + # Join is a concat once we have shuffled the data internally. # We shuffle the data by computing the correct order. # Another important thing to note: We set the current self index # to the index variable which may be 'on'. - new_self = [_reindex_helper.remote(col, index, new_index, 1) - for col in self._col_partitions] - new_other = [_reindex_helper.remote(col, other.index, new_index, 1) - for col in other._col_partitions] + new_self = np.array([ + _reindex_helper._submit(args=tuple([index, new_index, 1, + new_partition_num] + + block.tolist()), + num_return_vals=new_partition_num) + for block in self._block_partitions.T]) + new_other = np.array([ + _reindex_helper._submit(args=tuple([other.index, new_index, 1, + new_partition_num] + + block.tolist()), + num_return_vals=new_partition_num) + for block in other._block_partitions.T]) - # Append the columns together (i.e. concat) - new_column_parts = new_self + new_other + # Append the blocks together (i.e. concat) + new_block_parts = np.concatenate((new_self, new_other)).T # Default index in the case that on is set. if on is not None: new_index = None # TODO join the two metadata tables for performance. - return DataFrame(col_partitions=new_column_parts, + return DataFrame(block_partitions=new_block_parts, index=new_index, columns=new_column_labels) else: @@ -2141,17 +2229,28 @@ class DataFrame(object): [pd.DataFrame(columns=obj.columns) for obj in other], lsuffix=lsuffix, rsuffix=rsuffix).columns - new_self = [_reindex_helper.remote(col, self.index, new_index, 1) - for col in self._col_partitions] + new_partition_num = max([len(self._block_partitions.T)] + + [len(obj._block_partitions.T) + for obj in other]) - new_others = [_reindex_helper.remote(col, obj.index, new_index, 1) - for obj in other for col in obj._col_partitions] + new_self = np.array([ + _reindex_helper._submit(args=tuple([self.index, new_index, 1, + new_partition_num] + + block.tolist()), + num_return_vals=new_partition_num) + for block in self._block_partitions.T]) + + new_others = np.array([_reindex_helper._submit( + args=tuple([obj.index, new_index, 1, new_partition_num] + + block.tolist()), + num_return_vals=new_partition_num + ) for obj in other for block in obj._block_partitions.T]) # Append the columns together (i.e. concat) - new_column_parts = new_self + new_others + new_block_parts = np.concatenate((new_self, new_others)).T # TODO join the two metadata tables for performance. - return DataFrame(col_partitions=new_column_parts, + return DataFrame(block_partitions=new_block_parts, index=new_index, columns=new_column_labels) @@ -2181,9 +2280,17 @@ class DataFrame(object): return self._row_metadata.last_valid_index() def le(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is less than or equal to other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the le over. + level: The Multilevel index level to apply le over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.le, other, axis, level) def lookup(self, row_labels, col_labels): raise NotImplementedError( @@ -2191,9 +2298,17 @@ class DataFrame(object): "github.com/ray-project/ray.") def lt(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is less than other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the lt over. + level: The Multilevel index level to apply lt over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.lt, other, axis, level) def mad(self, axis=None, skipna=None, level=None): raise NotImplementedError( @@ -2303,9 +2418,19 @@ class DataFrame(object): return self._arithmetic_helper(remote_func, axis, level) def mod(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Mods this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the mod against this. + axis: The axis to mod over. + level: The Multilevel index level to apply mod over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Mod applied. + """ + return self._operator_helper(pd.DataFrame.mod, other, axis, level, + fill_value) def mode(self, axis=0, numeric_only=False): raise NotImplementedError( @@ -2313,19 +2438,46 @@ class DataFrame(object): "github.com/ray-project/ray.") def mul(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Multiplies this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the multiply against this. + axis: The axis to multiply over. + level: The Multilevel index level to apply multiply over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Multiply applied. + """ + return self._operator_helper(pd.DataFrame.mul, other, axis, level, + fill_value) def multiply(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Synonym for mul. + + Args: + other: The object to use to apply the multiply against this. + axis: The axis to multiply over. + level: The Multilevel index level to apply multiply over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Multiply applied. + """ + return self.mul(other, axis, level, fill_value) def ne(self, other, axis='columns', level=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Checks element-wise that this is not equal to other. + + Args: + other: A DataFrame or Series or scalar to compare to. + axis: The axis to perform the ne over. + level: The Multilevel index level to apply ne over. + + Returns: + A new DataFrame filled with Booleans. + """ + return self._operator_helper(pd.DataFrame.ne, other, axis, level) def nlargest(self, n, columns, keep='first'): raise NotImplementedError( @@ -2426,9 +2578,19 @@ class DataFrame(object): return result def pow(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Pow this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the pow against this. + axis: The axis to pow over. + level: The Multilevel index level to apply pow over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Pow applied. + """ + return self._operator_helper(pd.DataFrame.pow, other, axis, level, + fill_value) def prod(self, axis=None, skipna=None, level=None, numeric_only=None, min_count=0, **kwargs): @@ -2523,9 +2685,7 @@ class DataFrame(object): return DataFrame(row_partitions=new_rows, columns=self.columns) def radd(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.add(other, axis, level, fill_value) def rank(self, axis=0, method='average', numeric_only=None, na_option='keep', ascending=True, pct=False): @@ -2534,9 +2694,9 @@ class DataFrame(object): "github.com/ray-project/ray.") def rdiv(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rdiv(other, axis, level, fill_value), + other, axis, level) def reindex(self, labels=None, index=None, columns=None, axis=None, method=None, copy=True, level=None, fill_value=np.nan, @@ -2755,19 +2915,17 @@ class DataFrame(object): return new_obj def rfloordiv(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rfloordiv(other, axis, level, fill_value), + other, axis, level) def rmod(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rmod(other, axis, level, fill_value), + other, axis, level) def rmul(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.mul(other, axis, level, fill_value) def rolling(self, window, min_periods=None, freq=None, center=False, win_type=None, on=None, axis=0, closed=None): @@ -2785,19 +2943,19 @@ class DataFrame(object): index=self.index) def rpow(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rpow(other, axis, level, fill_value), + other, axis, level) def rsub(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rsub(other, axis, level, fill_value), + other, axis, level) def rtruediv(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self._single_df_op_helper( + lambda df: df.rtruediv(other, axis, level, fill_value), + other, axis, level) def sample(self, n=None, frac=None, replace=False, weights=None, random_state=None, axis=None): @@ -3002,14 +3160,33 @@ class DataFrame(object): return self._arithmetic_helper(remote_func, axis, level) def sub(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Subtract a DataFrame/Series/scalar from this DataFrame. + + Args: + other: The object to use to apply the subtraction to this. + axis: THe axis to apply the subtraction over. + level: Mutlilevel index level to subtract over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the subtraciont applied. + """ + return self._operator_helper(pd.DataFrame.sub, other, axis, level, + fill_value) def subtract(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Alias for sub. + + Args: + other: The object to use to apply the subtraction to this. + axis: THe axis to apply the subtraction over. + level: Mutlilevel index level to subtract over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the subtraciont applied. + """ + return self.sub(other, axis, level, fill_value) def swapaxes(self, axis1, axis2, copy=True): raise NotImplementedError( @@ -3259,9 +3436,19 @@ class DataFrame(object): return result def truediv(self, other, axis='columns', level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Divides this DataFrame against another DataFrame/Series/scalar. + + Args: + other: The object to use to apply the divide against this. + axis: The axis to divide over. + level: The Multilevel index level to apply divide over. + fill_value: The value to fill NaNs with. + + Returns: + A new DataFrame with the Divide applied. + """ + return self._operator_helper(pd.DataFrame.truediv, other, axis, level, + fill_value) def truncate(self, before=None, after=None, axis=None, copy=True): raise NotImplementedError( @@ -3624,80 +3811,92 @@ class DataFrame(object): "github.com/ray-project/ray.") def __lt__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.lt(other) def __le__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.le(other) def __gt__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.gt(other) def __ge__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.ge(other) def __eq__(self, other): - """Computes the equality of this DataFrame with another - - Returns: - True, if the DataFrames are equal. False otherwise. - """ - return self.equals(other) + return self.eq(other) def __ne__(self, other): - """Checks that this DataFrame is not equal to another - - Returns: - True, if the DataFrames are not equal. False otherwise. - """ - return not self.equals(other) + return self.ne(other) def __add__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.add(other) def __iadd__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.add(other) + + def __radd__(self, other, axis="columns", level=None, fill_value=None): + return self.radd(other, axis, level, fill_value) def __mul__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.mul(other) def __imul__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.mul(other) + + def __rmul__(self, other, axis="columns", level=None, fill_value=None): + return self.rmul(other, axis, level, fill_value) def __pow__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.pow(other) def __ipow__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.pow(other) + + def __rpow__(self, other, axis="columns", level=None, fill_value=None): + return self.rpow(other, axis, level, fill_value) def __sub__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.sub(other) def __isub__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return self.sub(other) + + def __rsub__(self, other, axis="columns", level=None, fill_value=None): + return self.rsub(other, axis, level, fill_value) + + def __floordiv__(self, other): + return self.floordiv(other) + + def __ifloordiv__(self, other): + return self.floordiv(other) + + def __rfloordiv__(self, other, axis="columns", level=None, + fill_value=None): + return self.rfloordiv(other, axis, level, fill_value) + + def __truediv__(self, other): + return self.truediv(other) + + def __itruediv__(self, other): + return self.truediv(other) + + def __rtruediv__(self, other, axis="columns", level=None, fill_value=None): + return self.rtruediv(other, axis, level, fill_value) + + def __mod__(self, other): + return self.mod(other) + + def __imod__(self, other): + return self.mod(other) + + def __rmod__(self, other, axis="columns", level=None, fill_value=None): + return self.rmod(other, axis, level, fill_value) + + def __div__(self, other, axis="columns", level=None, fill_value=None): + return self.div(other, axis, level, fill_value) + + def __rdiv__(self, other, axis="columns", level=None, fill_value=None): + return self.rdiv(other, axis, level, fill_value) def __neg__(self): """Computes an element wise negative DataFrame @@ -3720,21 +3919,6 @@ class DataFrame(object): columns=self.columns, index=self.index) - def __floordiv__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - def __truediv__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - def __mod__(self, other): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - def __sizeof__(self): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " @@ -3758,12 +3942,7 @@ class DataFrame(object): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - def iat(axis=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - def __rsub__(other, axis=None, level=None, fill_value=None): + def iat(self, axis=None): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") @@ -3785,22 +3964,12 @@ class DataFrame(object): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - def __itruediv__(other): + def at(self, axis=None): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - def __div__(other, axis=None, level=None, fill_value=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - def at(axis=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") - - def ix(axis=None): + def ix(self, axis=None): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") @@ -3815,3 +3984,122 @@ class DataFrame(object): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") + + def _copartition(self, other, new_index): + """Colocates the values of other with this for certain operations. + + NOTE: This method uses the indexes of each DataFrame to order them the + same. This operation does an implicit shuffling of data and zips + the two DataFrames together to be operated on. + + Args: + other: The other DataFrame to copartition with. + + Returns: + Two new sets of partitions, copartitioned and zipped. + """ + # Put in the object store so they aren't serialized each iteration. + old_self_index = ray.put(self.index) + new_index = ray.put(new_index) + old_other_index = ray.put(other.index) + + new_num_partitions = max(len(self._block_partitions.T), + len(other._block_partitions.T)) + + new_partitions_self = \ + np.array([_reindex_helper._submit( + args=tuple([old_self_index, new_index, 1, + new_num_partitions] + block.tolist()), + num_return_vals=new_num_partitions) + for block in self._block_partitions.T]).T + + new_partitions_other = \ + np.array([_reindex_helper._submit( + args=tuple([old_other_index, new_index, 1, + new_num_partitions] + block.tolist()), + num_return_vals=new_num_partitions) + for block in other._block_partitions.T]).T + + return zip(new_partitions_self, new_partitions_other) + + def _operator_helper(self, func, other, axis, level, *args): + """Helper method for inter-dataframe and scalar operations""" + if isinstance(other, DataFrame): + return self._inter_df_op_helper( + lambda x, y: func(x, y, axis, level, *args), + other, axis, level) + else: + return self._single_df_op_helper( + lambda df: func(df, other, axis, level, *args), + other, axis, level) + + def _inter_df_op_helper(self, func, other, axis, level): + if level is not None: + raise NotImplementedError("Mutlilevel index not yet supported " + "in Pandas on Ray") + axis = pd.DataFrame()._get_axis_number(axis) + + # Adding two DataFrames causes an outer join. + if isinstance(other, DataFrame): + new_column_index = self.columns.join(other.columns, how="outer") + new_index = self.index.join(other.index, how="outer") + copartitions = self._copartition(other, new_index) + + new_blocks = \ + np.array([_co_op_helper._submit( + args=tuple([func, self.columns, other.columns, + len(part[0])] + + np.concatenate(part).tolist()), + num_return_vals=len(part[0])) + for part in copartitions]) + + # TODO join the Index Metadata objects together for performance. + return DataFrame(block_partitions=new_blocks, + columns=new_column_index, + index=new_index) + + def _single_df_op_helper(self, func, other, axis, level): + if level is not None: + raise NotImplementedError("Multilevel index not yet supported " + "in Pandas on Ray") + axis = pd.DataFrame()._get_axis_number(axis) + + if is_list_like(other): + new_index = self.index + new_column_index = self.columns + new_col_metadata = self._col_metadata + new_row_metadata = self._row_metadata + new_blocks = None + + if axis == 0: + if len(other) != len(self.index): + raise ValueError( + "Unable to coerce to Series, length must be {0}: " + "given {1}".format(len(self.index), len(other))) + new_columns = _map_partitions(func, self._col_partitions) + new_rows = None + else: + if len(other) != len(self.columns): + raise ValueError( + "Unable to coerce to Series, length must be {0}: " + "given {1}".format(len(self.columns), len(other))) + new_rows = _map_partitions(func, self._row_partitions) + new_columns = None + + else: + new_blocks = np.array([_map_partitions(func, block) + for block in self._block_partitions]) + new_columns = None + new_rows = None + new_index = self.index + new_column_index = self.columns + new_col_metadata = self._col_metadata + new_row_metadata = self._row_metadata + + return DataFrame(col_partitions=new_columns, + row_partitions=new_rows, + block_partitions=new_blocks, + index=new_index, + columns=new_column_index, + col_metadata=new_col_metadata, + row_metadata=new_row_metadata) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 96c8c75ca..e84ddea41 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -806,11 +806,75 @@ def test_nan_dataframe(): test_transform(ray_df, pandas_df) -def test_add(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_inter_df_math(op, simple=False): + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) - with pytest.raises(NotImplementedError): - ray_df.add(None) + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df_equals_pandas(getattr(ray_df, op)(ray_df), + getattr(pandas_df, op)(pandas_df)) + ray_df_equals_pandas(getattr(ray_df, op)(4), + getattr(pandas_df, op)(4)) + ray_df_equals_pandas(getattr(ray_df, op)(4.0), + getattr(pandas_df, op)(4.0)) + + ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) + pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) + + ray_df_equals_pandas(getattr(ray_df, op)(ray_df2), + getattr(pandas_df, op)(pandas_df2)) + + list_test = [0, 1, 2, 4] + + if not simple: + ray_df_equals_pandas(getattr(ray_df, op)(list_test, axis=1), + getattr(pandas_df, op)(list_test, axis=1)) + + ray_df_equals_pandas(getattr(ray_df, op)(list_test, axis=0), + getattr(pandas_df, op)(list_test, axis=0)) + + +@pytest.fixture +def test_comparison_inter_ops(op): + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df_equals_pandas(getattr(ray_df, op)(ray_df), + getattr(pandas_df, op)(pandas_df)) + ray_df_equals_pandas(getattr(ray_df, op)(4), + getattr(pandas_df, op)(4)) + ray_df_equals_pandas(getattr(ray_df, op)(4.0), + getattr(pandas_df, op)(4.0)) + + ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) + pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]}) + + ray_df_equals_pandas(getattr(ray_df, op)(ray_df2), + getattr(pandas_df, op)(pandas_df2)) + + +@pytest.fixture +def test_inter_df_math_right_ops(op): + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df_equals_pandas(getattr(ray_df, op)(4), + getattr(pandas_df, op)(4)) + ray_df_equals_pandas(getattr(ray_df, op)(4.0), + getattr(pandas_df, op)(4.0)) + + +def test_add(): + test_inter_df_math("add", simple=False) @pytest.fixture @@ -863,6 +927,8 @@ def test_append(): pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) + print(ray_df.append(ray_df2)) + assert ray_df_equals_pandas(ray_df.append(ray_df2), pandas_df.append(pandas_df2)) @@ -1083,17 +1149,11 @@ def test_diff(): def test_div(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.div(None) + test_inter_df_math("div", simple=False) def test_divide(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.divide(None) + test_inter_df_math("divide", simple=False) def test_dot(): @@ -1208,10 +1268,7 @@ def test_duplicated(): def test_eq(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.eq(None) + test_comparison_inter_ops("eq") def test_equals(): @@ -1718,10 +1775,7 @@ def test_first_valid_index(ray_df, pandas_df): def test_floordiv(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.floordiv(None) + test_inter_df_math("floordiv", simple=False) def test_from_csv(): @@ -1745,10 +1799,7 @@ def test_from_records(): def test_ge(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.ge(None) + test_comparison_inter_ops("ge") def test_get_value(): @@ -1766,10 +1817,7 @@ def test_get_values(): def test_gt(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.gt(None) + test_comparison_inter_ops("gt") @pytest.fixture @@ -1935,10 +1983,7 @@ def test_last_valid_index(ray_df, pandas_df): def test_le(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.le(None) + test_comparison_inter_ops("le") def test_lookup(): @@ -1949,10 +1994,7 @@ def test_lookup(): def test_lt(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.lt(None) + test_comparison_inter_ops("lt") def test_mad(): @@ -2014,10 +2056,7 @@ def test_min(ray_df, pandas_df): def test_mod(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.mod(None) + test_inter_df_math("mod", simple=False) def test_mode(): @@ -2028,24 +2067,15 @@ def test_mode(): def test_mul(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.mul(None) + test_inter_df_math("mul", simple=False) def test_multiply(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.multiply(None) + test_inter_df_math("multiply", simple=False) def test_ne(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.ne(None) + test_comparison_inter_ops("ne") def test_nlargest(): @@ -2125,10 +2155,7 @@ def test_pop(ray_df, pandas_df): def test_pow(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.pow(None) + test_inter_df_math("pow", simple=False) def test_prod(): @@ -2158,10 +2185,7 @@ def test_query(ray_df, pandas_df, funcs): def test_radd(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.radd(None) + test_inter_df_math_right_ops("radd") def test_rank(): @@ -2172,10 +2196,7 @@ def test_rank(): def test_rdiv(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rdiv(None) + test_inter_df_math_right_ops("rdiv") def test_reindex(): @@ -2487,24 +2508,15 @@ def test_reset_index(ray_df, pandas_df, inplace=False): def test_rfloordiv(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rfloordiv(None) + test_inter_df_math_right_ops("rfloordiv") def test_rmod(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rmod(None) + test_inter_df_math_right_ops("rmod") def test_rmul(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rmul(None) + test_inter_df_math_right_ops("rmul") def test_rolling(): @@ -2521,24 +2533,15 @@ def test_round(ray_df, pd_df): def test_rpow(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rpow(None) + test_inter_df_math_right_ops("rpow") def test_rsub(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rsub(None) + test_inter_df_math_right_ops("rsub") def test_rtruediv(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.rtruediv(None) + test_inter_df_math_right_ops("rtruediv") def test_sample(): @@ -2657,17 +2660,11 @@ def test_std(ray_df, pandas_df): def test_sub(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.sub(None) + test_inter_df_math("sub", simple=False) def test_subtract(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.subtract(None) + test_inter_df_math("subtract", simple=False) def test_swapaxes(): @@ -2740,10 +2737,7 @@ def test_transform(ray_df, pandas_df): def test_truediv(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.truediv(None) + test_inter_df_math("truediv", simple=False) def test_truncate(): @@ -2992,10 +2986,7 @@ def test_iat(): def test___rsub__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__rsub__(None, None, None) + test_inter_df_math_right_ops("__rsub__") @pytest.fixture @@ -3022,17 +3013,11 @@ def test_is_copy(): def test___itruediv__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__itruediv__() + test_inter_df_math("__itruediv__", simple=True) def test___div__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__div__(None) + test_inter_df_math("__div__", simple=True) def test_at(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 3aee593bd..22cf7eae4 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -200,12 +200,16 @@ def _create_block_partitions(partitions, axis=0, length=None): for partition in partitions] # In the case that axis is 1 we have to transpose because we build the - # columns into rows. Fortunately numpy is efficent at this. + # columns into rows. Fortunately numpy is efficient at this. return np.array(x) if axis == 0 else np.array(x).T @ray.remote def create_blocks(df, npartitions, axis): + return create_blocks_helper(df, npartitions, axis) + + +def create_blocks_helper(df, npartitions, axis): # Single partition dataframes don't need to be repartitioned if npartitions == 1: return df @@ -285,7 +289,7 @@ def _inherit_docstrings(parent): @ray.remote -def _reindex_helper(df, old_index, new_index, axis): +def _reindex_helper(old_index, new_index, axis, npartitions, *df): """Reindexes a dataframe to prepare for join/concat. Args: @@ -295,8 +299,9 @@ def _reindex_helper(df, old_index, new_index, axis): axis: Which axis to reindex over. Returns: - A new reindexed DataFrame. + A new set of blocks made up of DataFrames. """ + df = pd.concat(df, axis=axis ^ 1) if axis == 1: df.index = old_index df = df.reindex(new_index, copy=False) @@ -305,4 +310,32 @@ def _reindex_helper(df, old_index, new_index, axis): df.columns = old_index df = df.reindex(columns=new_index, copy=False) df.columns = pd.RangeIndex(len(df.columns)) - return df + return create_blocks_helper(df, npartitions, axis) + + +@ray.remote +def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped): + """Copartition operation where two DataFrames must have aligned indexes. + + NOTE: This function assumes things are already copartitioned. Requires that + row partitions are passed in as blocks. + + Args: + func: The operation to conduct between two DataFrames. + left_columns: The column names for the left DataFrame. + right_columns: The column names for the right DataFrame. + left_df_len: The length of the left. This is used so we can split up + the zipped partitions. + zipped: The DataFrame partitions (in blocks). + + Returns: + A new set of blocks for the partitioned DataFrame. + """ + left = pd.concat(zipped[:left_df_len], axis=1, copy=False) + left.columns = left_columns + + right = pd.concat(zipped[left_df_len:], axis=1, copy=False) + right.columns = right_columns + + new_rows = func(left, right) + return create_blocks_helper(new_rows, left_df_len, 0)