diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 84303dea8..10260431a 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -23,6 +23,7 @@ from pandas.errors import MergeError import warnings import numpy as np +from numpy.testing import assert_equal import ray import itertools import io @@ -286,7 +287,7 @@ class DataFrame(object): for _ in range(len(self.index))]) col_dots.index = self.index col_dots.name = "..." - x = pd.concat([front, col_dots, back], axis=1) + x = pd.concat([front, col_dots, back], axis=1, copy=False) # If less than 60 rows, x is already in the correct format. if len(self._row_metadata) < 60: @@ -473,7 +474,8 @@ class DataFrame(object): if isinstance(self._dtypes_cache, list) and \ isinstance(self._dtypes_cache[0], ray.ObjectID): - self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache)) + self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache), + copy=False) self._dtypes_cache.index = self.columns return self._dtypes_cache @@ -1050,7 +1052,7 @@ class DataFrame(object): # be returned immediately is_series = ray.get(is_series) if all(is_series): - new_series = pd.concat(ray.get(new_parts)) + new_series = pd.concat(ray.get(new_parts), copy=False) new_series.index = self.columns if axis == 0 else self.index return new_series # This error is thrown when some of the partitions return Series and @@ -1201,7 +1203,7 @@ class DataFrame(object): else [func[key]]), self._col_partitions[part]) for (part, ind), key in part_ind_tuples] - return pd.concat(ray.get(result), axis=1) + return pd.concat(ray.get(result), axis=1, copy=False) else: result = [_deploy_func.remote( lambda df: df.iloc[:, ind].apply(func[key]), @@ -1599,17 +1601,12 @@ class DataFrame(object): """Return new object with labels in requested axis removed. Args: labels: Index or column labels to drop. - axis: Whether to drop labels from the index (0 / 'index') or columns (1 / 'columns'). - index, columns: Alternative to specifying axis (labels, axis=1 is equivalent to columns=labels). - level: For MultiIndex - inplace: If True, do operation inplace and return None. - errors: If 'ignore', suppress error and existing labels are dropped. Returns: @@ -1757,33 +1754,22 @@ class DataFrame(object): Returns: Boolean: True if equal, otherwise False """ - # TODO(kunalgosar): Implement Copartition and use to implement equals - def helper(df, index, other_series): - return df.iloc[index['index_within_partition']] \ - .equals(other_series) - results = [] - other_partition = None - other_df = None - # TODO: Make the appropriate coord df accessor methods for this fxn - for i, idx in other._row_metadata._coord_df.iterrows(): - if idx['partition'] != other_partition: - other_df = ray.get(other._row_partitions[idx['partition']]) - other_partition = idx['partition'] - # TODO: group series here into full df partitions to reduce - # the number of remote calls to helper - other_series = other_df.iloc[idx['index_within_partition']] - curr_index = self._row_metadata._coord_df.loc[i] - curr_df = self._row_partitions[int(curr_index['partition'])] - results.append(_deploy_func.remote(helper, - curr_df, - curr_index, - other_series)) + if not self.index.equals(other.index) or not \ + self.columns.equals(other.columns): + return False - for r in results: - if not ray.get(r): - return False - return True + # We copartition because we don't know what the DataFrames look like + # before this. Empty partitions can give problems with + # _match_partitioning (See _match_partitioning) + new_zipped_parts = self._copartition(other, self.index) + + equals_partitions = [_equals_helper.remote(left, right) + for left, right in new_zipped_parts] + + # To avoid getting all we use next notation. + return next((False for eq in equals_partitions if not ray.get(eq)), + True) def eval(self, expr, inplace=False, **kwargs): """Evaluate a Python expression as a string using various backends. @@ -1848,7 +1834,7 @@ class DataFrame(object): result_type = ray.get(_deploy_func.remote(lambda df: type(df), new_rows[0])) if result_type is pd.Series: - new_series = pd.concat(ray.get(new_rows), axis=0) + new_series = pd.concat(ray.get(new_rows), axis=0, copy=False) new_series.index = self.index return new_series @@ -4104,7 +4090,8 @@ class DataFrame(object): for row, idx in index_builder: row.index = [str(idx)] - broadcast_values = pd.concat([row for row, idx in index_builder]) + broadcast_values = pd.concat([row for row, idx in index_builder], + copy=False) # We are converting the by to string here so that we don't have a # collision with the RangeIndex on the inner frame. It is cheap and @@ -5365,7 +5352,7 @@ def _merge_columns(left_columns, right_columns, *args): def _where_helper(left, cond, other, left_columns, cond_columns, other_columns, *args): - left = pd.concat(ray.get(left.tolist()), axis=1) + left = pd.concat(ray.get(left.tolist()), axis=1, copy=False) # We have to reset the index and columns here because we are coming # from blocks and the axes are set according to the blocks. We have # already correctly copartitioned everything, so there's no @@ -5373,12 +5360,12 @@ def _where_helper(left, cond, other, left_columns, cond_columns, left.reset_index(inplace=True, drop=True) left.columns = left_columns - cond = pd.concat(ray.get(cond.tolist()), axis=1) + cond = pd.concat(ray.get(cond.tolist()), axis=1, copy=False) cond.reset_index(inplace=True, drop=True) cond.columns = cond_columns if isinstance(other, np.ndarray): - other = pd.concat(ray.get(other.tolist()), axis=1) + other = pd.concat(ray.get(other.tolist()), axis=1, copy=False) other.reset_index(inplace=True, drop=True) other.columns = other_columns @@ -5388,7 +5375,7 @@ def _where_helper(left, cond, other, left_columns, cond_columns, @ray.remote def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value, limit, tolerance, *df): - df = pd.concat(df, axis=axis ^ 1) + df = pd.concat(df, axis=axis ^ 1, copy=False) if axis == 1: df.index = old_index else: @@ -5398,3 +5385,17 @@ def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value, method=method, fill_value=fill_value, limit=limit, tolerance=tolerance) return create_blocks_helper(df, npartitions, axis) + + +@ray.remote +def _equals_helper(left, right): + right = pd.concat(ray.get(right.tolist()), axis=1, copy=False) + left = pd.concat(ray.get(left.tolist()), axis=1, copy=False) + # Since we know that the index and columns match, we can just check the + # values. We can't use np.array_equal here because it doesn't recognize + # np.nan as equal to another np.nan + try: + assert_equal(left.values, right.values) + except AssertionError: + return False + return True diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 59f9bbeb4..81cf3b420 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -17,13 +17,13 @@ def ray_df_equals_pandas(ray_df, pandas_df): @pytest.fixture -def ray_series_equals_pandas(ray_df, pandas_df): - return ray_df.equals(pandas_df) +def ray_series_equals_pandas(ray_series, pandas_series): + return ray_series.equals(pandas_series) @pytest.fixture def ray_df_equals(ray_df1, ray_df2): - return to_pandas(ray_df1).equals(to_pandas(ray_df2)) + return ray_df1.equals(ray_df2) @pytest.fixture diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 0da3e0ff8..e576cf860 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -426,6 +426,16 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx, def _match_partitioning(column_partition, lengths, index): """Match the number of rows on each partition. Used in df.merge(). + NOTE: This function can cause problems when there are empty column + partitions. + + The way this function is intended to be used is as follows: Align the + right partitioning with the left. The left will remain unchanged. Then, + you are free to perform actions on a per-partition basis with the + partitioning. + + The index objects must already be identical for this to work correctly. + Args: column_partition: The column partition to change. lengths: The lengths of each row partition to match to.