diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index c81f40085..d71ac1e3d 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pandas as pd +import pandas # TODO: In the future `set_option` or similar needs to run on every node # in order to keep all pandas instances across nodes consistent from pandas import (eval, unique, value_counts, cut, to_numeric, factorize, @@ -12,11 +12,11 @@ from pandas import (eval, unique, value_counts, cut, to_numeric, factorize, set_option, NaT, PeriodIndex, Categorical) import threading -pd_version = pd.__version__ -pd_major = int(pd_version.split(".")[0]) -pd_minor = int(pd_version.split(".")[1]) +pandas_version = pandas.__version__ +pandas_major = int(pandas_version.split(".")[0]) +pandas_minor = int(pandas_version.split(".")[1]) -if pd_major == 0 and pd_minor != 22: +if pandas_major == 0 and pandas_minor != 22: raise Exception("In order to use Pandas on Ray, your pandas version must " "be 0.22. You can run 'pip install pandas==0.22'") diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index c4ef03599..dc9435f67 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pandas as pd +import pandas import functools from pandas.api.types import is_scalar from pandas.util._validators import validate_bool_kwarg @@ -51,8 +51,8 @@ from .index_metadata import _IndexMetadata from .iterator import PartitionIterator -@_inherit_docstrings(pd.DataFrame, - excluded=[pd.DataFrame, pd.DataFrame.__init__]) +@_inherit_docstrings(pandas.DataFrame, + excluded=[pandas.DataFrame, pandas.DataFrame.__init__]) class DataFrame(object): def __init__(self, data=None, index=None, columns=None, dtype=None, @@ -94,25 +94,26 @@ class DataFrame(object): row_partitions is None and block_partitions is None): - pd_df = pd.DataFrame(data=data, index=index, columns=columns, - dtype=dtype, copy=copy) + pandas_df = pandas.DataFrame(data=data, index=index, + columns=columns, dtype=dtype, + copy=copy) # Cache dtypes - self._dtypes_cache = pd_df.dtypes + self._dtypes_cache = pandas_df.dtypes # TODO convert _partition_pandas_dataframe to block partitioning. row_partitions = \ - _partition_pandas_dataframe(pd_df, + _partition_pandas_dataframe(pandas_df, num_partitions=get_npartitions()) self._block_partitions = \ _create_block_partitions(row_partitions, axis=0, - length=len(pd_df.columns)) + length=len(pandas_df.columns)) # Set in case we were only given a single row/column for below. axis = 0 - columns = pd_df.columns - index = pd_df.index + columns = pandas_df.columns + index = pandas_df.index else: # created this invariant to make sure we never have to go into the # partitions to get the columns @@ -136,11 +137,12 @@ class DataFrame(object): elif col_partitions is not None: axis = 1 partitions = col_partitions - axis_length = None + axis_length = len(index) if index is not None else \ + len(row_metadata) # All partitions will already have correct dtypes self._dtypes_cache = [ - _deploy_func.remote(lambda df: df.dtypes, pd_df) - for pd_df in col_partitions + _deploy_func.remote(lambda df: df.dtypes, pandas_df) + for pandas_df in col_partitions ] # TODO: write explicit tests for "short and wide" @@ -194,8 +196,13 @@ class DataFrame(object): _frame_data = property(_get_frame_data, _set_frame_data) def _get_row_partitions(self): + empty_rows_mask = self._row_metadata._lengths > 0 + if any(empty_rows_mask): + self._row_metadata._lengths = \ + self._row_metadata._lengths[empty_rows_mask] + self._block_partitions = self._block_partitions[empty_rows_mask, :] return [_blocks_to_row.remote(*part) - for part in self._block_partitions] + for i, part in enumerate(self._block_partitions)] def _set_row_partitions(self, new_row_partitions): self._block_partitions = \ @@ -205,6 +212,11 @@ class DataFrame(object): _row_partitions = property(_get_row_partitions, _set_row_partitions) def _get_col_partitions(self): + empty_cols_mask = self._col_metadata._lengths > 0 + if any(empty_cols_mask): + self._col_metadata._lengths = \ + self._col_metadata._lengths[empty_cols_mask] + self._block_partitions = self._block_partitions[:, empty_cols_mask] return [_blocks_to_col.remote(*self._block_partitions[:, i]) for i in range(self._block_partitions.shape[1])] @@ -232,10 +244,10 @@ class DataFrame(object): df) index = self.index[:n] - pd_head = pd.concat(ray.get(new_dfs), axis=1, copy=False) - pd_head.index = index - pd_head.columns = self.columns - return pd_head + pandas_head = pandas.concat(ray.get(new_dfs), axis=1, copy=False) + pandas_head.index = index + pandas_head.columns = self.columns + return pandas_head def tail(df, n, get_local_tail=False): """Compute the tail for this without creating a new DataFrame""" @@ -246,21 +258,22 @@ class DataFrame(object): df) index = self.index[-n:] - pd_tail = pd.concat(ray.get(new_dfs), axis=1, copy=False) - pd_tail.index = index - pd_tail.columns = self.columns - return pd_tail + pandas_tail = pandas.concat(ray.get(new_dfs), axis=1, copy=False) + pandas_tail.index = index + pandas_tail.columns = self.columns + return pandas_tail def front(df, n): """Get first n columns without creating a new Dataframe""" cum_col_lengths = self._col_metadata._lengths.cumsum() index = np.argmax(cum_col_lengths >= 10) - pd_front = pd.concat(ray.get(x[:index+1]), axis=1, copy=False) - pd_front = pd_front.iloc[:, :n] - pd_front.index = self.index - pd_front.columns = self.columns[:n] - return pd_front + pandas_front = pandas.concat(ray.get(x[:index + 1]), + axis=1, copy=False) + pandas_front = pandas_front.iloc[:, :n] + pandas_front.index = self.index + pandas_front.columns = self.columns[:n] + return pandas_front def back(df, n): """Get last n columns without creating a new Dataframe""" @@ -268,11 +281,12 @@ class DataFrame(object): cum_col_lengths = np.flip(self._col_metadata._lengths, axis=0).cumsum() index = np.argmax(cum_col_lengths >= 10) - pd_back = pd.concat(ray.get(x[-(index+1):]), axis=1, copy=False) - pd_back = pd_back.iloc[:, -n:] - pd_back.index = self.index - pd_back.columns = self.columns[-n:] - return pd_back + pandas_back = pandas.concat(ray.get(x[-(index + 1):]), + axis=1, copy=False) + pandas_back = pandas_back.iloc[:, -n:] + pandas_back.index = self.index + pandas_back.columns = self.columns[-n:] + return pandas_back x = self._col_partitions get_local_head = False @@ -283,11 +297,10 @@ class DataFrame(object): front = front(x, 10) back = back(x, 10) - col_dots = pd.Series(["..." - for _ in range(len(self.index))]) + col_dots = pandas.Series(["..." for _ in range(len(self.index))]) col_dots.index = self.index col_dots.name = "..." - x = pd.concat([front, col_dots, back], axis=1, copy=False) + x = pandas.concat([front, col_dots, back], axis=1, copy=False) # If less than 60 rows, x is already in the correct format. if len(self._row_metadata) < 60: @@ -297,8 +310,7 @@ class DataFrame(object): tail = tail(x, 30, get_local_head) # Make the dots in between the head and tail - row_dots = pd.Series(["..." - for _ in range(len(head.columns))]) + row_dots = pandas.Series(["..." for _ in range(len(head.columns))]) row_dots.index = head.columns row_dots.name = "..." @@ -378,7 +390,7 @@ class DataFrame(object): if level is not None: raise NotImplementedError("Level not yet supported.") - axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + axis = pandas.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 oid_series = ray.get(_map_partitions(remote_func, @@ -396,10 +408,10 @@ class DataFrame(object): df.index = \ this_partition[this_partition.isin(df.index)].index - result_series = pd.concat([obj[0] for obj in oid_series], - axis=0, copy=False) + result_series = pandas.concat([obj[0] for obj in oid_series], + axis=0, copy=False) else: - result_series = pd.concat(oid_series, axis=0, copy=False) + result_series = pandas.concat(oid_series, axis=0, copy=False) result_series.index = self.index return result_series @@ -474,8 +486,8 @@ class DataFrame(object): if isinstance(self._dtypes_cache, list) and \ isinstance(self._dtypes_cache[0], ray.ObjectID): - self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache), - copy=False) + self._dtypes_cache = pandas.concat(ray.get(self._dtypes_cache), + copy=False) self._dtypes_cache.index = self.columns return self._dtypes_cache @@ -488,9 +500,7 @@ class DataFrame(object): True if the DataFrame is empty. False otherwise. """ - all_empty = ray.get(_map_partitions( - lambda df: df.empty, self._row_partitions)) - return False not in all_empty + return self._row_metadata._empty @property def values(self): @@ -523,7 +533,7 @@ class DataFrame(object): def _update_inplace(self, row_partitions=None, col_partitions=None, block_partitions=None, columns=None, index=None, col_metadata=None, row_metadata=None): - """Updates the current DataFrame inplace. + """updates the current DataFrame inplace. Behavior should be similar to the constructor, given the corresponding arguments. Note that len(columns) and len(index) should match the @@ -535,9 +545,9 @@ class DataFrame(object): The new partitions to replace self._row_partitions directly col_partitions ([ObjectID]): The new partitions to replace self._col_partitions directly - columns (pd.Index): + columns (pandas.Index): Index of the column dimension to replace existing columns - index (pd.Index): + index (pandas.Index): Index of the row dimension to replace existing index Note: @@ -645,13 +655,13 @@ class DataFrame(object): Returns: A new DataFrame resulting from the groupby. """ - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if callable(by): by = by(self.index) elif isinstance(by, compat.string_types): by = self.__getitem__(by).values.tolist() elif is_list_like(by): - if isinstance(by, pd.Series): + if isinstance(by, pandas.Series): by = by.values.tolist() mismatch = len(by) != len(self) if axis == 0 \ @@ -737,8 +747,8 @@ class DataFrame(object): new_block_partitions = np.array([_map_partitions( lambda df: df.isna(), block) for block in self._block_partitions]) - new_dtypes = pd.Series([np.dtype("bool")] * len(self.columns), - index=self.columns) + new_dtypes = pandas.Series([np.dtype("bool")] * len(self.columns), + index=self.columns) return DataFrame(block_partitions=new_block_partitions, row_metadata=self._row_metadata, @@ -758,8 +768,8 @@ class DataFrame(object): lambda df: df.isnull(), block) for block in self._block_partitions]) - new_dtypes = pd.Series([np.dtype("bool")] * len(self.columns), - index=self.columns) + new_dtypes = pandas.Series([np.dtype("bool")] * len(self.columns), + index=self.columns) return DataFrame(block_partitions=new_block_partitions, row_metadata=self._row_metadata, @@ -812,7 +822,7 @@ class DataFrame(object): inplace = validate_bool_kwarg(inplace, "inplace") if is_list_like(axis): - axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis] + axis = [pandas.DataFrame()._get_axis_number(ax) for ax in axis] result = self # TODO(kunalgosar): this builds an intermediate dataframe, @@ -829,7 +839,7 @@ class DataFrame(object): return None - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if how is not None and how not in ['any', 'all']: raise ValueError('invalid how option: %s' % how) @@ -855,7 +865,7 @@ class DataFrame(object): if axis == 1: new_index = new_df.columns - new_df.columns = pd.RangeIndex(0, len(new_df.columns)) + new_df.columns = pandas.RangeIndex(0, len(new_df.columns)) else: new_index = new_df.index new_df.reset_index(drop=True, inplace=True) @@ -916,14 +926,14 @@ class DataFrame(object): Returns: A new DataFrame with the applied addition. """ - return self._operator_helper(pd.DataFrame.add, other, axis, level, + return self._operator_helper(pandas.DataFrame.add, other, axis, level, fill_value) def agg(self, func, axis=0, *args, **kwargs): return self.aggregate(func, axis, *args, **kwargs) def aggregate(self, func, axis=0, *args, **kwargs): - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) result = None @@ -995,12 +1005,12 @@ class DataFrame(object): index = None columns = None - if isinstance(new_df, pd.Series): + if isinstance(new_df, pandas.Series): is_series = True else: columns = new_df.columns index = new_df.index - new_df.columns = pd.RangeIndex(0, len(new_df.columns)) + new_df.columns = pandas.RangeIndex(0, len(new_df.columns)) new_df.reset_index(drop=True, inplace=True) if is_transform: @@ -1052,7 +1062,7 @@ class DataFrame(object): # be returned immediately is_series = ray.get(is_series) if all(is_series): - new_series = pd.concat(ray.get(new_parts), copy=False) + new_series = pandas.concat(ray.get(new_parts), copy=False) new_series.index = self.columns if axis == 0 else self.index return new_series # This error is thrown when some of the partitions return Series and @@ -1126,9 +1136,9 @@ class DataFrame(object): Returns: A new DataFrame containing the concatenated values. """ - if isinstance(other, (pd.Series, dict)): + if isinstance(other, (pandas.Series, dict)): if isinstance(other, dict): - other = pd.Series(other) + other = pandas.Series(other) if other.name is None and not ignore_index: raise TypeError('Can only append a Series if ignore_index=True' ' or if the Series has a name') @@ -1138,17 +1148,17 @@ class DataFrame(object): else: # other must have the same index name as self, otherwise # index name will be reset - index = pd.Index([other.name], name=self.index.name) + index = pandas.Index([other.name], name=self.index.name) combined_columns = self.columns.tolist() + self.columns.union( other.index).difference(self.columns).tolist() other = other.reindex(combined_columns, copy=False) - other = pd.DataFrame(other.values.reshape((1, len(other))), - index=index, - columns=combined_columns) + other = pandas.DataFrame(other.values.reshape((1, len(other))), + index=index, + columns=combined_columns) other = other._convert(datetime=True, timedelta=True) elif isinstance(other, list) and not isinstance(other[0], DataFrame): - other = pd.DataFrame(other) + other = pandas.DataFrame(other) if (self.columns.get_indexer(other.columns) >= 0).all(): other = other.loc[:, self.columns] @@ -1175,7 +1185,7 @@ class DataFrame(object): Returns: Series or DataFrame, depending on func. """ - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): if axis == 1: @@ -1203,13 +1213,13 @@ class DataFrame(object): else [func[key]]), self._col_partitions[part]) for (part, ind), key in part_ind_tuples] - return pd.concat(ray.get(result), axis=1, copy=False) + return pandas.concat(ray.get(result), axis=1, copy=False) else: result = [_deploy_func.remote( lambda df: df.iloc[:, ind].apply(func[key]), self._col_partitions[part]) for (part, ind), key in part_ind_tuples] - return pd.Series(ray.get(result), index=func.keys()) + return pandas.Series(ray.get(result), index=func.keys()) elif is_list_like(func): if axis == 1: @@ -1423,7 +1433,7 @@ class DataFrame(object): "github.com/ray-project/ray.") def _cumulative_helper(self, func, axis): - axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + axis = pandas.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 if axis == 0: @@ -1520,7 +1530,7 @@ class DataFrame(object): # This exception is thrown when there are only non-numeric columns # in this partition except ValueError: - return pd.DataFrame() + return pandas.DataFrame() # Begin fixing index based on the columns inside. parts = ray.get(_map_partitions(describe_helper, self._col_partitions)) @@ -1532,7 +1542,7 @@ class DataFrame(object): df.columns = this_partition[this_partition.isin(df.columns)].index # Remove index from tuple - result = pd.concat([obj[0] for obj in parts], axis=1, copy=False) + result = pandas.concat([obj[0] for obj in parts], axis=1, copy=False) return result def diff(self, periods=1, axis=0): @@ -1545,7 +1555,7 @@ class DataFrame(object): Returns: DataFrame with the diff applied """ - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) partitions = (self._col_partitions if axis == 0 else self._row_partitions) @@ -1574,7 +1584,7 @@ class DataFrame(object): Returns: A new DataFrame with the Divide applied. """ - return self._operator_helper(pd.DataFrame.div, other, axis, level, + return self._operator_helper(pandas.DataFrame.div, other, axis, level, fill_value) def divide(self, other, axis='columns', level=None, fill_value=None): @@ -1621,12 +1631,11 @@ class DataFrame(object): if index is not None or columns is not None: raise ValueError("Cannot specify both 'labels' and " "'index'/'columns'") - axis = pd.DataFrame()._get_axis_name(axis) + axis = pandas.DataFrame()._get_axis_name(axis) axes = {axis: labels} elif index is not None or columns is not None: - axes, _ = pd.DataFrame()._construct_axes_from_arguments((index, - columns), - {}) + axes, _ = pandas.DataFrame() \ + ._construct_axes_from_arguments((index, columns), {}) else: raise ValueError("Need to specify at least one of 'labels', " "'index' or 'columns'") @@ -1639,7 +1648,7 @@ class DataFrame(object): if axis == 'index': try: coords = obj._row_metadata[label] - if isinstance(coords, pd.DataFrame): + if isinstance(coords, pandas.DataFrame): partitions = list(coords['partition']) indexes = list(coords['index_within_partition']) else: @@ -1668,7 +1677,7 @@ class DataFrame(object): else: try: coords = obj._col_metadata[label] - if isinstance(coords, pd.DataFrame): + if isinstance(coords, pandas.DataFrame): partitions = list(coords['partition']) indexes = list(coords['index_within_partition']) else: @@ -1745,7 +1754,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.eq, other, axis, level) + return self._operator_helper(pandas.DataFrame.eq, other, axis, level) def equals(self, other): """ @@ -1824,8 +1833,8 @@ class DataFrame(object): df.columns = columns result = df.eval(expr, inplace=False, **kwargs) # If result is a series, expr was not an assignment expression. - if not isinstance(result, pd.Series): - result.columns = pd.RangeIndex(0, len(result.columns)) + if not isinstance(result, pandas.Series): + result.columns = pandas.RangeIndex(0, len(result.columns)) return result inplace = validate_bool_kwarg(inplace, "inplace") @@ -1833,8 +1842,8 @@ class DataFrame(object): result_type = ray.get(_deploy_func.remote(lambda df: type(df), new_rows[0])) - if result_type is pd.Series: - new_series = pd.concat(ray.get(new_rows), axis=0, copy=False) + if result_type is pandas.Series: + new_series = pandas.concat(ray.get(new_rows), axis=0, copy=False) new_series.index = self.index return new_series @@ -1903,13 +1912,13 @@ class DataFrame(object): filled: DataFrame """ # TODO implement value passed as DataFrame - if isinstance(value, pd.DataFrame): + if isinstance(value, pandas.DataFrame): raise NotImplementedError("Passing a DataFrame as the value for " "fillna is not yet supported.") inplace = validate_bool_kwarg(inplace, 'inplace') - axis = pd.DataFrame()._get_axis_number(axis) \ + axis = pandas.DataFrame()._get_axis_number(axis) \ if axis is not None \ else 0 @@ -1937,7 +1946,7 @@ class DataFrame(object): (new_obj._row_partitions, new_obj._row_metadata) - if isinstance(value, (pd.Series, dict)): + if isinstance(value, (pandas.Series, dict)): new_vals = {} value = dict(value) for val in value: @@ -2005,7 +2014,7 @@ class DataFrame(object): if axis is None: axis = 'columns' # This is the default info axis for dataframes - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) labels = self.columns if axis else self.index if items is not None: @@ -2049,8 +2058,8 @@ class DataFrame(object): Returns: A new DataFrame with the Divide applied. """ - return self._operator_helper(pd.DataFrame.floordiv, other, axis, level, - fill_value) + return self._operator_helper(pandas.DataFrame.floordiv, other, axis, + level, fill_value) @classmethod def from_csv(self, path, header=0, sep=', ', index_col=0, @@ -2090,7 +2099,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.ge, other, axis, level) + return self._operator_helper(pandas.DataFrame.ge, other, axis, level) def get(self, key, default=None): """Get item from object for given key (DataFrame column, Panel @@ -2148,7 +2157,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.gt, other, axis, level) + return self._operator_helper(pandas.DataFrame.gt, other, axis, level) def head(self, n=5): """Get the first n rows of the dataframe. @@ -2252,8 +2261,8 @@ class DataFrame(object): index_string = self.index.summary() + '\n' # A column header is needed in the inf() output - col_header = 'Data columns (total {0} columns):\n'.format( - len(self.columns)) + col_header = 'Data columns (total {0} columns):\n' \ + .format(len(self.columns)) # Parse the per-partition values to get the per-column details # Find all the lines in the output that start with integers @@ -2284,8 +2293,8 @@ class DataFrame(object): if len(mem_vals) != 0: # Sum memory usage from each partition if memory_usage != 'deep': - memory_string = 'memory usage: {0}+ bytes'.format( - sum(mem_vals)) + memory_string = 'memory usage: {0}+ bytes' \ + .format(sum(mem_vals)) else: memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals)) @@ -2330,15 +2339,15 @@ class DataFrame(object): # Deploy insert function to specific column partition, and replace that # column def insert_col_part(df): - if isinstance(value, pd.Series) and \ + if isinstance(value, pandas.Series) and \ isinstance(value.dtype, - pd.core.dtypes.dtypes.DatetimeTZDtype): + pandas.core.dtypes.dtypes.DatetimeTZDtype): # Need to set index to index of this dtype or inserted values # become NaT df.index = value df.insert(index_within_partition, column, value, allow_duplicates) - df.index = pd.RangeIndex(0, len(df)) + df.index = pandas.RangeIndex(0, len(df)) else: df.insert(index_within_partition, column, value, allow_duplicates) @@ -2475,7 +2484,7 @@ class DataFrame(object): if on is not None: raise NotImplementedError("Not yet.") - if isinstance(other, pd.Series): + if isinstance(other, pandas.Series): if other.name is None: raise ValueError("Other Series must have a name") other = DataFrame({other.name: other}) @@ -2489,8 +2498,8 @@ class DataFrame(object): new_index = index.join(other.index, how=how, sort=sort) # Joining two empty DataFrames is fast, and error checks for us. - new_column_labels = pd.DataFrame(columns=self.columns) \ - .join(pd.DataFrame(columns=other.columns), + new_column_labels = pandas.DataFrame(columns=self.columns) \ + .join(pandas.DataFrame(columns=other.columns), lsuffix=lsuffix, rsuffix=rsuffix).columns new_partition_num = max(len(self._block_partitions.T), @@ -2533,12 +2542,12 @@ class DataFrame(object): # Joining the empty DataFrames with either index or columns is # fast. It gives us proper error checking for the edge cases that # would otherwise require a lot more logic. - new_index = pd.DataFrame(index=self.index).join( - [pd.DataFrame(index=obj.index) for obj in other], + new_index = pandas.DataFrame(index=self.index).join( + [pandas.DataFrame(index=obj.index) for obj in other], how=how, sort=sort).index - new_column_labels = pd.DataFrame(columns=self.columns).join( - [pd.DataFrame(columns=obj.columns) for obj in other], + new_column_labels = pandas.DataFrame(columns=self.columns).join( + [pandas.DataFrame(columns=obj.columns) for obj in other], lsuffix=lsuffix, rsuffix=rsuffix).columns new_partition_num = max([len(self._block_partitions.T)] + @@ -2602,7 +2611,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.le, other, axis, level) + return self._operator_helper(pandas.DataFrame.le, other, axis, level) def lookup(self, row_labels, col_labels): raise NotImplementedError( @@ -2620,7 +2629,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.lt, other, axis, level) + return self._operator_helper(pandas.DataFrame.lt, other, axis, level) def mad(self, axis=None, skipna=None, level=None): raise NotImplementedError( @@ -2700,7 +2709,7 @@ class DataFrame(object): result.index = self.columns if index: index_value = self._row_metadata.index.memory_usage(deep=deep) - return pd.Series(index_value, index=['Index']).append(result) + return pandas.Series(index_value, index=['Index']).append(result) return result @@ -2877,7 +2886,7 @@ class DataFrame(object): Returns: A new DataFrame with the Mod applied. """ - return self._operator_helper(pd.DataFrame.mod, other, axis, level, + return self._operator_helper(pandas.DataFrame.mod, other, axis, level, fill_value) def mode(self, axis=0, numeric_only=False): @@ -2890,7 +2899,7 @@ class DataFrame(object): Returns: DataFrame: The mode of the DataFrame. """ - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) def mode_helper(df): mode_df = df.mode(axis=axis, numeric_only=numeric_only) @@ -2898,7 +2907,7 @@ class DataFrame(object): def fix_length(df, *lengths): max_len = max(lengths[0]) - df = df.reindex(pd.RangeIndex(max_len), axis=axis) + df = df.reindex(pandas.RangeIndex(max_len), axis=axis) return df parts = self._col_partitions if axis == 0 else self._row_partitions @@ -2932,7 +2941,7 @@ class DataFrame(object): Returns: A new DataFrame with the Multiply applied. """ - return self._operator_helper(pd.DataFrame.mul, other, axis, level, + return self._operator_helper(pandas.DataFrame.mul, other, axis, level, fill_value) def multiply(self, other, axis='columns', level=None, fill_value=None): @@ -2960,7 +2969,7 @@ class DataFrame(object): Returns: A new DataFrame filled with Booleans. """ - return self._operator_helper(pd.DataFrame.ne, other, axis, level) + return self._operator_helper(pandas.DataFrame.ne, other, axis, level) def nlargest(self, n, columns, keep='first'): raise NotImplementedError( @@ -2980,8 +2989,8 @@ class DataFrame(object): new_block_partitions = np.array([_map_partitions( lambda df: df.notna(), block) for block in self._block_partitions]) - new_dtypes = pd.Series([np.dtype("bool")] * len(self.columns), - index=self.columns) + new_dtypes = pandas.Series([np.dtype("bool")] * len(self.columns), + index=self.columns) return DataFrame(block_partitions=new_block_partitions, row_metadata=self._row_metadata, @@ -3002,8 +3011,8 @@ class DataFrame(object): lambda df: df.notnull(), block) for block in self._block_partitions]) - new_dtypes = pd.Series([np.dtype("bool")] * len(self.columns), - index=self.columns) + new_dtypes = pandas.Series([np.dtype("bool")] * len(self.columns), + index=self.columns) return DataFrame(block_partitions=new_block_partitions, row_metadata=self._row_metadata, @@ -3099,7 +3108,7 @@ class DataFrame(object): Returns: A new DataFrame with the Pow applied. """ - return self._operator_helper(pd.DataFrame.pow, other, axis, level, + return self._operator_helper(pandas.DataFrame.pow, other, axis, level, fill_value) def prod(self, axis=None, skipna=None, level=None, numeric_only=None, @@ -3186,14 +3195,15 @@ class DataFrame(object): raise ValueError("need at least one array to concatenate") # check that all qs are between 0 and 1 - pd.DataFrame()._check_percentile(q) + pandas.DataFrame()._check_percentile(q) def quantile_helper(df, base_object): """Quantile to be run inside each partitoin. Args: df: The DataFrame composing the partition. - base_object: An empty pd.Series or pd.DataFrame depending on q. + base_object: An empty pandas.Series or pandas.DataFrame + depending on q. Returns: A new Series or DataFrame depending on q. @@ -3208,15 +3218,15 @@ class DataFrame(object): return df.quantile(q=q, axis=axis, numeric_only=numeric_only, interpolation=interpolation) - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) - if isinstance(q, (pd.Series, np.ndarray, pd.Index, list)): + if isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)): - q_index = pd.Float64Index(q) + q_index = pandas.Float64Index(q) if axis == 0: new_partitions = _map_partitions( - lambda df: quantile_helper(df, pd.DataFrame()), + lambda df: quantile_helper(df, pandas.DataFrame()), self._col_partitions) # select only correct dtype columns @@ -3225,7 +3235,7 @@ class DataFrame(object): else: new_partitions = _map_partitions( - lambda df: quantile_helper(df, pd.DataFrame()), + lambda df: quantile_helper(df, pandas.DataFrame()), self._row_partitions) new_columns = self.index @@ -3237,7 +3247,7 @@ class DataFrame(object): # When q is a single float, we return a Series, so using # arithmetic_helper works well here. result = self._arithmetic_helper( - lambda df: quantile_helper(df, pd.Series()), axis) + lambda df: quantile_helper(df, pandas.Series()), axis) result.name = q return result @@ -3255,7 +3265,7 @@ class DataFrame(object): df = df.copy() df.columns = columns df.query(expr, inplace=True, **kwargs) - df.columns = pd.RangeIndex(0, len(df.columns)) + df.columns = pandas.RangeIndex(0, len(df.columns)) return df new_rows = _map_partitions(query_helper, @@ -3301,7 +3311,7 @@ class DataFrame(object): na_option=na_option, ascending=ascending, pct=pct) - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if (axis == 1): new_cols = self.dtypes[self.dtypes.apply( @@ -3333,7 +3343,7 @@ class DataFrame(object): "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + axis = pandas.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 if axis == 0 and labels is not None: index = labels @@ -3409,7 +3419,7 @@ class DataFrame(object): # will use the results after. kwargs['inplace'] = True - df_to_rename = pd.DataFrame(index=self.index, columns=self.columns) + df_to_rename = pandas.DataFrame(index=self.index, columns=self.columns) df_to_rename.rename(**kwargs) if inplace: @@ -3499,9 +3509,10 @@ class DataFrame(object): new_obj = self.copy() def _maybe_casted_values(index, labels=None): - if isinstance(index, pd.PeriodIndex): + if isinstance(index, pandas.PeriodIndex): values = index.asobject.values - elif isinstance(index, pd.DatetimeIndex) and index.tz is not None: + elif isinstance(index, pandas.DatetimeIndex) \ + and index.tz is not None: values = index else: values = index.values @@ -3525,17 +3536,17 @@ class DataFrame(object): return values # We're building a new default index dataframe for use later. - new_index = pd.RangeIndex(len(self)) + new_index = pandas.RangeIndex(len(self)) if level is not None: if not isinstance(level, (tuple, list)): level = [level] level = [self.index._get_level_number(lev) for lev in level] - if isinstance(self.index, pd.MultiIndex): + if isinstance(self.index, pandas.MultiIndex): if len(level) < self.index.nlevels: new_index = self.index.droplevel(level) if not drop: - if isinstance(self.index, pd.MultiIndex): + if isinstance(self.index, pandas.MultiIndex): names = [n if n is not None else ('level_%d' % i) for (i, n) in enumerate(self.index.names)] to_insert = lzip(self.index.levels, self.index.labels) @@ -3550,7 +3561,7 @@ class DataFrame(object): else [self.index.name]) to_insert = ((self.index, None),) - multi_col = isinstance(self.columns, pd.MultiIndex) + multi_col = isinstance(self.columns, pandas.MultiIndex) for i, (lev, lab) in reversed(list(enumerate(to_insert))): if not (level is None or i in level): continue @@ -3649,7 +3660,7 @@ class DataFrame(object): A new Dataframe """ - axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + axis = pandas.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 if axis == 0: @@ -3661,7 +3672,7 @@ class DataFrame(object): # Index of the weights Series should correspond to the index of the # Dataframe in order to sample - if isinstance(weights, pd.Series): + if isinstance(weights, pandas.Series): weights = weights.reindex(self.axes[axis]) # If weights arg is a string, the weights used for sampling will @@ -3678,7 +3689,7 @@ class DataFrame(object): "weights when sampling from rows on " "a DataFrame") - weights = pd.Series(weights, dtype='float64') + weights = pandas.Series(weights, dtype='float64') if len(weights) != axis_length: raise ValueError("Weights and axis to be sampled must be of " @@ -3793,7 +3804,7 @@ class DataFrame(object): def select_dtypes(self, include=None, exclude=None): # Validates arguments for whether both include and exclude are None or # if they are disjoint. Also invalidates string dtypes. - pd.DataFrame().select_dtypes(include, exclude) + pandas.DataFrame().select_dtypes(include, exclude) if include and not is_list_like(include): include = [include] @@ -3810,8 +3821,8 @@ class DataFrame(object): include, exclude = map( lambda x: set(map(_get_dtype_from_object, x)), sel) - include_these = pd.Series(not bool(include), index=self.columns) - exclude_these = pd.Series(not bool(exclude), index=self.columns) + include_these = pandas.Series(not bool(include), index=self.columns) + exclude_these = pandas.Series(not bool(exclude), index=self.columns) def is_dtype_instance_mapper(column, dtype): return column, functools.partial(issubclass, dtype.type) @@ -3838,7 +3849,7 @@ class DataFrame(object): """Assign desired index to given axis. Args: - labels (pd.Index or list-like): The Index to assign. + labels (pandas.Index or list-like): The Index to assign. axis (string or int): The axis to reassign. inplace (bool): Whether to make these modifications inplace. @@ -3862,7 +3873,7 @@ class DataFrame(object): FutureWarning, stacklevel=2) inplace = True if inplace: - setattr(self, pd.DataFrame()._get_axis_name(axis), labels) + setattr(self, pandas.DataFrame()._get_axis_name(axis), labels) else: obj = self.copy() obj.set_axis(labels, axis=axis, inplace=True) @@ -3897,7 +3908,7 @@ class DataFrame(object): names = [] if append: names = [x for x in self.index.names] - if isinstance(self.index, pd.MultiIndex): + if isinstance(self.index, pandas.MultiIndex): for i in range(self.index.nlevels): arrays.append(self.index._get_level_values(i)) else: @@ -3905,7 +3916,7 @@ class DataFrame(object): to_remove = [] for col in keys: - if isinstance(col, pd.MultiIndex): + if isinstance(col, pandas.MultiIndex): # append all but the last column so we don't have to modify # the end of this loop for n in range(col.nlevels - 1): @@ -3913,13 +3924,13 @@ class DataFrame(object): level = col._get_level_values(col.nlevels - 1) names.extend(col.names) - elif isinstance(col, pd.Series): + elif isinstance(col, pandas.Series): level = col._values names.append(col.name) - elif isinstance(col, pd.Index): + elif isinstance(col, pandas.Index): level = col names.append(col.name) - elif isinstance(col, (list, np.ndarray, pd.Index)): + elif isinstance(col, (list, np.ndarray, pandas.Index)): level = col names.append(None) else: @@ -4011,7 +4022,7 @@ class DataFrame(object): return self.sort_values(by, axis=axis, ascending=ascending, inplace=inplace) - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) args = (axis, level, ascending, False, kind, na_position, sort_remaining) @@ -4024,7 +4035,7 @@ class DataFrame(object): result = df.sort_index(*args) df.reset_index(drop=True, inplace=True) - df.columns = pd.RangeIndex(len(df.columns)) + df.columns = pandas.RangeIndex(len(df.columns)) return result if axis == 0: @@ -4073,14 +4084,14 @@ class DataFrame(object): A sorted DataFrame. """ - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if not is_list_like(by): by = [by] if axis == 0: broadcast_value_dict = {str(col): self[col] for col in by} - broadcast_values = pd.DataFrame(broadcast_value_dict) + broadcast_values = pandas.DataFrame(broadcast_value_dict) else: broadcast_value_list = [to_pandas(self[row::len(self.index)]) for row in by] @@ -4090,8 +4101,8 @@ class DataFrame(object): for row, idx in index_builder: row.index = [str(idx)] - broadcast_values = pd.concat([row for row, idx in index_builder], - copy=False) + broadcast_values = \ + pandas.concat([row for row, idx in index_builder], copy=False) # We are converting the by to string here so that we don't have a # collision with the RangeIndex on the inner frame. It is cheap and @@ -4119,8 +4130,8 @@ class DataFrame(object): broadcast_values.columns = df.columns names = broadcast_values.index - return pd.concat([df, broadcast_values], axis=axis ^ 1, - copy=False).sort_values(*args)\ + return pandas.concat([df, broadcast_values], axis=axis ^ 1, + copy=False).sort_values(*args) \ .drop(names, axis=axis ^ 1) if axis == 0: @@ -4202,7 +4213,7 @@ class DataFrame(object): Returns: A new DataFrame with the subtraciont applied. """ - return self._operator_helper(pd.DataFrame.sub, other, axis, level, + return self._operator_helper(pandas.DataFrame.sub, other, axis, level, fill_value) def subtract(self, other, axis='columns', level=None, fill_value=None): @@ -4315,13 +4326,13 @@ class DataFrame(object): return df.to_csv(**kwargs) idxs = [0] + np.cumsum(self._row_metadata._lengths).tolist() - idx_args = [self.index[idxs[i]:idxs[i+1]] + idx_args = [self.index[idxs[i]:idxs[i + 1]] for i in range(len(self._row_partitions))] csv_str_ids = _map_partitions( - get_csv_str, self._row_partitions, idx_args, - [columns_id] * len(self._row_partitions), - [header] + [False] * (len(self._row_partitions) - 1), - [remote_kwargs_id] * len(self._row_partitions)) + get_csv_str, self._row_partitions, idx_args, + [columns_id] * len(self._row_partitions), + [header] + [False] * (len(self._row_partitions) - 1), + [remote_kwargs_id] * len(self._row_partitions)) if path_or_buf is None: buf = io.StringIO() @@ -4540,8 +4551,8 @@ class DataFrame(object): Returns: A new DataFrame with the Divide applied. """ - return self._operator_helper(pd.DataFrame.truediv, other, axis, level, - fill_value) + return self._operator_helper(pandas.DataFrame.truediv, other, axis, + level, fill_value) def truncate(self, before=None, after=None, axis=None, copy=True): raise NotImplementedError( @@ -4640,14 +4651,15 @@ class DataFrame(object): inplace = validate_bool_kwarg(inplace, 'inplace') - if isinstance(other, pd.Series) and axis is None: + if isinstance(other, pandas.Series) and axis is None: raise ValueError("Must specify axis=0 or 1") if level is not None: raise NotImplementedError("Multilevel Index not yet supported on " "Pandas on Ray.") - axis = pd.DataFrame()._get_axis_number(axis) if axis is not None else 0 + axis = pandas.DataFrame()._get_axis_number(axis) if axis is not None \ + else 0 cond = cond(self) if callable(cond) else cond @@ -4673,11 +4685,11 @@ class DataFrame(object): # Series has to be treated specially because we're operating on row # partitions from here on. - elif isinstance(other, pd.Series): + elif isinstance(other, pandas.Series): if axis == 0: # Pandas determines which index to use based on axis. other = other.reindex(self.index) - other.index = pd.RangeIndex(len(other)) + other.index = pandas.RangeIndex(len(other)) # Since we're working on row partitions, we have to partition # the Series based on the partitioning of self (since both @@ -4688,12 +4700,13 @@ class DataFrame(object): other = other[length:] # Resetting the index here ensures that we apply each part # to the correct row within the partitions. - other.index = pd.RangeIndex(len(other)) + other.index = pandas.RangeIndex(len(other)) other = (obj for obj in other_builder) - new_partitions = [_where_helper.remote(k, v, next(other, - pd.Series()), + new_partitions = [_where_helper.remote(k, v, + next(other, + pandas.Series()), self.columns, cond.columns, None, *args) @@ -4737,7 +4750,7 @@ class DataFrame(object): key = com._apply_if_callable(key, self) # shortcut if we are an actual column - is_mi_columns = isinstance(self.columns, pd.MultiIndex) + is_mi_columns = isinstance(self.columns, pandas.MultiIndex) try: if key in self.columns and not is_mi_columns: return self._getitem_column(key) @@ -4749,7 +4762,7 @@ class DataFrame(object): if indexer is not None: return self._getitem_slice(indexer) - if isinstance(key, (pd.Series, np.ndarray, pd.Index, list)): + if isinstance(key, (pandas.Series, np.ndarray, pandas.Index, list)): return self._getitem_array(key) elif isinstance(key, DataFrame): raise NotImplementedError("To contribute to Pandas on Ray, please" @@ -4772,7 +4785,7 @@ class DataFrame(object): def _getitem_array(self, key): if com.is_bool_indexer(key): - if isinstance(key, pd.Series) and \ + if isinstance(key, pandas.Series) and \ not key.index.equals(self.index): warnings.warn("Boolean Series key will be reindexed to match " "DataFrame index.", UserWarning, stacklevel=3) @@ -4796,7 +4809,7 @@ class DataFrame(object): def get_columns_partition(df): result = df.__getitem__(indices_for_rows), - result.columns = pd.RangeIndex(0, len(result.columns)) + result.columns = pandas.RangeIndex(0, len(result.columns)) return result new_parts = [_deploy_func.remote( @@ -4811,7 +4824,7 @@ class DataFrame(object): def _getitem_indiv_col(self, key, part): loc = self._col_metadata[key] - if isinstance(loc, pd.Series): + if isinstance(loc, pandas.Series): index = loc[loc['partition'] == part] else: index = loc[loc['partition'] == part]['index_within_partition'] @@ -4959,22 +4972,22 @@ class DataFrame(object): df.__delitem__(col) # Reset the column index to conserve space - df.columns = pd.RangeIndex(0, len(df.columns)) + df.columns = pandas.RangeIndex(0, len(df.columns)) return df # This structure is used to get the correct index inside the partition. del_df = self._col_metadata[key] # We need to standardize between multiple and single occurrences in the - # columns. Putting single occurrences in a pd.DataFrame and transposing - # results in the same structure as multiple with 'loc'. - if isinstance(del_df, pd.Series): - del_df = pd.DataFrame(del_df).T + # columns. Putting single occurrences in a pandas.DataFrame and + # transposing results in the same structure as multiple with 'loc'. + if isinstance(del_df, pandas.Series): + del_df = pandas.DataFrame(del_df).T - # Cast cols as pd.Series as duplicate columns mean result may be - # np.int64 or pd.Series + # Cast cols as pandas.Series as duplicate columns mean result may be + # np.int64 or pandas.Series col_parts_to_del = \ - pd.Series(del_df['partition'].copy()).unique() + pandas.Series(del_df['partition'].copy()).unique() self._col_metadata.drop(key) for i in col_parts_to_del: @@ -5259,7 +5272,7 @@ class DataFrame(object): if level is not None: raise NotImplementedError("Mutlilevel index not yet supported " "in Pandas on Ray") - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) new_column_index = self.columns.join(other.columns, how=how) new_index = self.index.join(other.index, how=how) @@ -5287,7 +5300,7 @@ class DataFrame(object): if level is not None: raise NotImplementedError("Multilevel index not yet supported " "in Pandas on Ray") - axis = pd.DataFrame()._get_axis_number(axis) + axis = pandas.DataFrame()._get_axis_number(axis) if is_list_like(other): new_index = self.index @@ -5342,16 +5355,16 @@ def _merge_columns(left_columns, right_columns, *args): Returns: The columns for the merge operation. """ - return pd.DataFrame(columns=left_columns, index=[0], dtype='uint8').merge( - pd.DataFrame(columns=right_columns, index=[0], dtype='uint8'), - *args).columns + return pandas.DataFrame(columns=left_columns, index=[0], dtype='uint8') \ + .merge(pandas.DataFrame(columns=right_columns, index=[0], + dtype='uint8'), *args).columns @ray.remote def _where_helper(left, cond, other, left_columns, cond_columns, other_columns, *args): - left = pd.concat(ray.get(left.tolist()), axis=1, copy=False) + left = pandas.concat(ray.get(left.tolist()), axis=1, copy=False) # We have to reset the index and columns here because we are coming # from blocks and the axes are set according to the blocks. We have # already correctly copartitioned everything, so there's no @@ -5359,12 +5372,12 @@ def _where_helper(left, cond, other, left_columns, cond_columns, left.reset_index(inplace=True, drop=True) left.columns = left_columns - cond = pd.concat(ray.get(cond.tolist()), axis=1, copy=False) + cond = pandas.concat(ray.get(cond.tolist()), axis=1, copy=False) cond.reset_index(inplace=True, drop=True) cond.columns = cond_columns if isinstance(other, np.ndarray): - other = pd.concat(ray.get(other.tolist()), axis=1, copy=False) + other = pandas.concat(ray.get(other.tolist()), axis=1, copy=False) other.reset_index(inplace=True, drop=True) other.columns = other_columns @@ -5374,7 +5387,7 @@ def _where_helper(left, cond, other, left_columns, cond_columns, @ray.remote def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value, limit, tolerance, *df): - df = pd.concat(df, axis=axis ^ 1, copy=False) + df = pandas.concat(df, axis=axis ^ 1, copy=False) if axis == 1: df.index = old_index else: @@ -5388,8 +5401,8 @@ def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value, @ray.remote def _equals_helper(left, right): - right = pd.concat(ray.get(right.tolist()), axis=1, copy=False) - left = pd.concat(ray.get(left.tolist()), axis=1, copy=False) + right = pandas.concat(ray.get(right.tolist()), axis=1, copy=False) + left = pandas.concat(ray.get(left.tolist()), axis=1, copy=False) # Since we know that the index and columns match, we can just check the # values. We can't use np.array_equal here because it doesn't recognize # np.nan as equal to another np.nan diff --git a/python/ray/dataframe/groupby.py b/python/ray/dataframe/groupby.py index b2c2b9fdc..2a3100075 100644 --- a/python/ray/dataframe/groupby.py +++ b/python/ray/dataframe/groupby.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pandas as pd +import pandas import numpy as np import pandas.core.groupby from pandas.core.dtypes.common import is_list_like @@ -12,6 +12,7 @@ import ray from .utils import _inherit_docstrings, _reindex_helper from .concat import concat +from .index_metadata import _IndexMetadata @_inherit_docstrings(pandas.core.groupby.DataFrameGroupBy, @@ -31,12 +32,13 @@ class DataFrameGroupBy(object): if axis == 0: partitions = [column for column in df._block_partitions.T] - self._index_grouped = pd.Series(self._index, index=self._index)\ + self._index_grouped = \ + pandas.Series(self._index, index=self._index) \ .groupby(by=by, sort=sort) else: partitions = [row for row in df._block_partitions] self._index_grouped = \ - pd.Series(self._columns, index=self._columns) \ + pandas.Series(self._columns, index=self._columns) \ .groupby(by=by, sort=sort) self._keys_and_values = [(k, v) @@ -127,7 +129,7 @@ class DataFrameGroupBy(object): @property def groups(self): - return {k: pd.Index(v) for k, v in self._keys_and_values} + return {k: pandas.Index(v) for k, v in self._keys_and_values} def min(self, **kwargs): return self._apply_agg_function(lambda df: df.min(axis=self._axis, @@ -194,7 +196,7 @@ class DataFrameGroupBy(object): result = [func(v) for k, v in self._iter] if self._axis == 0: - if isinstance(result[0], pd.Series): + if isinstance(result[0], pandas.Series): # Applied an aggregation function new_df = concat(result, axis=1).T new_df.columns = self._columns @@ -208,8 +210,11 @@ class DataFrameGroupBy(object): num_return_vals=len(new_df._block_partitions)) for block in new_df._block_partitions.T]).T new_df.index = self._index + new_df._row_metadata = \ + _IndexMetadata(new_df._block_partitions[:, 0], + index=new_df.index, axis=0) else: - if isinstance(result[0], pd.Series): + if isinstance(result[0], pandas.Series): # Applied an aggregation function new_df = concat(result, axis=1) new_df.columns = [k for k, v in self._iter] @@ -223,6 +228,9 @@ class DataFrameGroupBy(object): num_return_vals=new_df._block_partitions.shape[1]) for block in new_df._block_partitions]) new_df.columns = self._columns + new_df._col_metadata = \ + _IndexMetadata(new_df._block_partitions[0, :], + index=new_df.columns, axis=1) return new_df @property @@ -392,6 +400,9 @@ class DataFrameGroupBy(object): num_return_vals=len(new_df._block_partitions)) for block in new_df._block_partitions.T]).T new_df.index = sorted_index + new_df._row_metadata = \ + _IndexMetadata(new_df._block_partitions[:, 0], + index=new_df.index, axis=0) return new_df @@ -447,6 +458,9 @@ class DataFrameGroupBy(object): num_return_vals=len(new_df._block_partitions)) for block in new_df._block_partitions.T]).T new_df.index = sorted_index + new_df._row_metadata = \ + _IndexMetadata(new_df._block_partitions[:, 0], + index=new_df.index, axis=0) return new_df @@ -516,6 +530,9 @@ class DataFrameGroupBy(object): num_return_vals=len(new_df._block_partitions)) for block in new_df._block_partitions.T]).T new_df.index = self._index + new_df._row_metadata = \ + _IndexMetadata(new_df._block_partitions[:, 0], + index=new_df.index, axis=0) else: new_df._block_partitions = np.array([_reindex_helper._submit( args=tuple([new_df.columns, self._columns, 0, @@ -524,6 +541,9 @@ class DataFrameGroupBy(object): num_return_vals=new_df._block_partitions.shape[1]) for block in new_df._block_partitions]) new_df.columns = self._columns + new_df._col_metadata = \ + _IndexMetadata(new_df._block_partitions[0, :], + index=new_df.columns, axis=1) return new_df @@ -531,7 +551,7 @@ class DataFrameGroupBy(object): @ray.remote def groupby(by, axis, level, as_index, sort, group_keys, squeeze, *df): - df = pd.concat(df, axis=axis) + df = pandas.concat(df, axis=axis) return [v for k, v in df.groupby(by=by, axis=axis, diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 6777a6b57..358347ecd 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -1,11 +1,12 @@ -import pandas as pd +import pandas import numpy as np import ray from .utils import ( _build_row_lengths, _build_col_widths, - _build_coord_df) + _build_coord_df, + _check_empty) from pandas.core.indexing import convert_to_index_sliceable @@ -32,12 +33,12 @@ class _IndexMetadata(object): Args: dfs ([ObjectID]): ObjectIDs of dataframe partitions - index (pd.Index): Index of the Ray DataFrame. + index (pandas.Index): Index of the Ray DataFrame. axis: Axis of partition (0=row partitions, 1=column partitions) Returns: - A IndexMetadata backed by the specified pd.Index, partitioned off - specified partitions + A IndexMetadata backed by the specified pandas.Index, partitioned + off specified partitions """ assert (lengths_oid is None) == (coord_df_oid is None), \ "Must pass both or neither of lengths_oid and coord_df_oid" @@ -48,6 +49,9 @@ class _IndexMetadata(object): else: lengths_oid = _build_col_widths.remote(dfs) coord_df_oid = _build_coord_df.remote(lengths_oid, index) + self._empty = _check_empty.remote(dfs) + else: + self._empty = True self._lengths = lengths_oid self._coord_df = coord_df_oid @@ -115,7 +119,7 @@ class _IndexMetadata(object): This design is more straightforward than caching indexes on setting the coord_df to an OID due to the possibility of an OID-to-OID change. """ - new_index = pd.DataFrame(index=new_index).index + new_index = pandas.DataFrame(index=new_index).index assert len(new_index) == len(self) self._index_cache = new_index @@ -138,7 +142,7 @@ class _IndexMetadata(object): The Index object in _index_cache. """ if self._index_cache_validator is None: - self._index_cache_validator = pd.RangeIndex(len(self)) + self._index_cache_validator = pandas.RangeIndex(len(self)) elif isinstance(self._index_cache_validator, ray.ObjectID): self._index_cache_validator = ray.get(self._index_cache_validator) @@ -157,6 +161,16 @@ class _IndexMetadata(object): # cache to accept ObjectIDs and ray.get them when needed. _index_cache = property(_get_index_cache, _set_index_cache) + def _get_empty(self): + if isinstance(self._empty_cache, ray.ObjectID): + self._empty_cache = ray.get(self._empty_cache) + return self._empty_cache + + def _set_empty(self, empty): + self._empty_cache = empty + + _empty = property(_get_empty, _set_empty) + def coords_of(self, key): """Returns the coordinates (partition, index_within_partition) of the provided key in the index. Can be called on its own or implicitly @@ -170,9 +184,9 @@ class _IndexMetadata(object): Returns: Pandas object with the keys specified. If key is a single object - it will be a pd.Series with items `partition` and + it will be a pandas.Series with items `partition` and `index_within_partition`, and if key is a slice or if the key is - duplicate it will be a pd.DataFrame with said items as columns. + duplicate it will be a pandas.DataFrame with said items as columns. """ return self._coord_df.loc[key] @@ -191,7 +205,7 @@ class _IndexMetadata(object): 'index_within_partition'] def __len__(self): - return sum(self._lengths) + return int(sum(self._lengths)) def reset_partition_coords(self, partitions=None): partitions = np.array(partitions) @@ -200,7 +214,7 @@ class _IndexMetadata(object): partition_mask = (self._coord_df['partition'] == partition) # Since we are replacing columns with RangeIndex inside the # partition, we have to make sure that our reference to it is - # updated as well. + # upandasated as well. try: self._coord_df.loc[partition_mask, 'index_within_partition'] = np.arange( @@ -263,7 +277,7 @@ class _IndexMetadata(object): # TODO: Determine if there's a better way to do a row-index insert in # pandas, because this is very annoying/unsure of efficiency # Create new coord entry to insert - coord_to_insert = pd.DataFrame( + coord_to_insert = pandas.DataFrame( {'partition': partition, 'index_within_partition': index_within_partition}, index=[key]) @@ -329,9 +343,9 @@ class _IndexMetadata(object): Returns: Pandas object with the keys specified. If key is a single object - it will be a pd.Series with items `partition` and + it will be a pandas.Series with items `partition` and `index_within_partition`, and if key is a slice or if the key is - duplicate it will be a pd.DataFrame with said items as columns. + duplicate it will be a pandas.DataFrame with said items as columns. """ return self.coords_of(key) @@ -355,26 +369,37 @@ class _IndexMetadata(object): """ dropped = self.coords_of(labels) - # Update first lengths to prevent possible length inconsistencies - if isinstance(dropped, pd.DataFrame): + # Upandasate first lengths to prevent possible length inconsistencies + if isinstance(dropped, pandas.DataFrame): try: drop_per_part = dropped.groupby(["partition"]).size()\ - .reindex(index=pd.RangeIndex(len(self._lengths)), + .reindex(index=pandas.RangeIndex(len(self._lengths)), fill_value=0) except ValueError: # Copy the arrow sealed dataframe so we can mutate it. dropped = dropped.copy() drop_per_part = dropped.groupby(["partition"]).size()\ - .reindex(index=pd.RangeIndex(len(self._lengths)), + .reindex(index=pandas.RangeIndex(len(self._lengths)), fill_value=0) - elif isinstance(dropped, pd.Series): + elif isinstance(dropped, pandas.Series): drop_per_part = np.zeros_like(self._lengths) drop_per_part[dropped["partition"]] = 1 else: raise AssertionError("Unrecognized result from `coords_of`") - self._lengths = self._lengths - drop_per_part - self._coord_df = self._coord_df.drop(labels, errors=errors) + self._lengths = self._lengths - np.array(drop_per_part) + + new_coord_df = self._coord_df.drop(labels, errors=errors) + + num_dropped = 0 + for i, length in enumerate(self._lengths): + if length == 0: + num_dropped += 1 + if num_dropped > 0: + new_coord_df['partition'][new_coord_df['partition'] == i] \ + -= num_dropped + + self._coord_df = new_coord_df return dropped def rename_index(self, mapper): diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py index 2ae1809fd..5f99ce877 100644 --- a/python/ray/dataframe/indexing.py +++ b/python/ray/dataframe/indexing.py @@ -13,7 +13,7 @@ _LocIndexer and _iLocIndexer is responsible for indexer specific logic and An illustration is available at https://github.com/ray-project/ray/pull/1955#issuecomment-386781826 """ -import pandas as pd +import pandas import numpy as np import ray from warnings import warn @@ -96,7 +96,7 @@ def _is_enlargement(locator, coord_df): """ if is_list_like(locator) and not is_slice( locator) and len(locator) > 0 and not is_boolean_array(locator): - n_diff_elems = len(pd.Index(locator).difference(coord_df.index)) + n_diff_elems = len(pandas.Index(locator).difference(coord_df.index)) is_enlargement_boolean = n_diff_elems > 0 return is_enlargement_boolean return False @@ -140,8 +140,8 @@ class _Location_Indexer_Base(): def __getitem__(self, row_lookup, col_lookup, ndim): """ Args: - row_lookup: A pd dataframe, a partial view from row_coord_df - col_lookup: A pd dataframe, a partial view from col_coord_df + row_lookup: A pandas dataframe, a partial view from row_coord_df + col_lookup: A pandas dataframe, a partial view from col_coord_df ndim: the dimension of returned data """ if ndim == 2: @@ -152,7 +152,7 @@ class _Location_Indexer_Base(): result = ray.get(_blocks_to_col.remote(*extracted)).squeeze() if is_scalar(result): - result = pd.Series(result) + result = pandas.Series(result) scaler_axis = row_lookup if len(row_lookup) == 1 else col_lookup series_name = scaler_axis.iloc[0].name @@ -213,8 +213,8 @@ class _Location_Indexer_Base(): def __setitem__(self, row_lookup, col_lookup, item): """ Args: - row_lookup: A pd dataframe, a partial view from row_coord_df - col_lookup: A pd dataframe, a partial view from col_coord_df + row_lookup: A pandas dataframe, a partial view from row_coord_df + col_lookup: A pandas dataframe, a partial view from col_coord_df item: The new item needs to be set. It can be any shape that's broadcastable to the product of the lookup tables. """ @@ -348,14 +348,14 @@ class _Loc_Indexer(_Location_Indexer_Base): [self.block_oids, nan_blks], axis=0 if row_based_bool else 1) # 3. Prepare metadata to return - nan_coord_df = pd.DataFrame(data=[{ + nan_coord_df = pandas.DataFrame(data=[{ '': name, 'partition': blk_part_n_row if row_based_bool else blk_part_n_col, 'index_within_partition': i } for name, i in zip(nan_labels, np.arange(num_nan_labels)) ]).set_index('') - coord_df = pd.concat([major_meta._coord_df, nan_coord_df]) + coord_df = pandas.concat([major_meta._coord_df, nan_coord_df]) coord_df = coord_df.loc[locator] # Re-index that allows duplicates lens = major_meta._lengths @@ -370,7 +370,7 @@ class _Loc_Indexer(_Location_Indexer_Base): Returns: nan_labels: The labels needs to be added """ - locator_as_index = pd.Index(locator) + locator_as_index = pandas.Index(locator) nan_labels = locator_as_index.difference(base_index) common_labels = locator_as_index.intersection(base_index) diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py index 5ce1edde6..8b1bdbb4e 100644 --- a/python/ray/dataframe/io.py +++ b/python/ray/dataframe/io.py @@ -7,9 +7,9 @@ from io import BytesIO import os import re import warnings +import pandas from pyarrow.parquet import ParquetFile -import pandas as pd from pandas.io.common import _infer_compression # don't depend on internal API @@ -62,23 +62,23 @@ def _read_parquet_row_group(path, columns, row_group_id, kwargs={}): @ray.remote -def _split_df(pd_df, chunksize): - """Split a pd_df into partitions. +def _split_df(pandas_df, chunksize): + """Split a pandas_df into partitions. Returns: remote_df_ids ([ObjectID]) """ dataframes = [] - while len(pd_df) > chunksize: - t_df = pd_df[:chunksize] + while len(pandas_df) > chunksize: + t_df = pandas_df[:chunksize] t_df.reset_index(drop=True) top = ray.put(t_df) dataframes.append(top) - pd_df = pd_df[chunksize:] + pandas_df = pandas_df[chunksize:] else: - pd_df = pd_df.reset_index(drop=True) - dataframes.append(ray.put(pd_df)) + pandas_df = pandas_df.reset_index(drop=True) + dataframes.append(ray.put(pandas_df)) return dataframes @@ -122,7 +122,7 @@ def _get_firstline(file_path): def _infer_column(first_line, kwargs={}): - return pd.read_csv(BytesIO(first_line), **kwargs).columns + return pandas.read_csv(BytesIO(first_line), **kwargs).columns @ray.remote @@ -133,11 +133,11 @@ def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''): bio.seek(start) to_read = header + bio.read(end - start) bio.close() - pd_df = pd.read_csv(BytesIO(to_read), **kwargs) - index = pd_df.index + pandas_df = pandas.read_csv(BytesIO(to_read), **kwargs) + index = pandas_df.index # Partitions must have RangeIndex - pd_df.index = pd.RangeIndex(0, len(pd_df)) - return pd_df, index + pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) + return pandas_df, index @ray.remote @@ -271,11 +271,11 @@ def read_csv(filepath_or_buffer, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - pd_obj = pd.read_csv(filepath_or_buffer, **kwargs) - if isinstance(pd_obj, pd.DataFrame): - return from_pandas(pd_obj, get_npartitions()) + pandas_obj = pandas.read_csv(filepath_or_buffer, **kwargs) + if isinstance(pandas_obj, pandas.DataFrame): + return from_pandas(pandas_obj, get_npartitions()) - return pd_obj + return pandas_obj filepath = filepath_or_buffer @@ -332,10 +332,11 @@ def read_json(path_or_buf=None, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_json(path_or_buf, orient, typ, dtype, - convert_axes, convert_dates, keep_default_dates, - numpy, precise_float, date_unit, encoding, - lines, chunksize, compression) + port_frame = pandas.read_json(path_or_buf, orient, typ, dtype, + convert_axes, convert_dates, + keep_default_dates, numpy, precise_float, + date_unit, encoding, lines, chunksize, + compression) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -360,10 +361,10 @@ def read_html(io, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_html(io, match, flavor, header, index_col, - skiprows, attrs, parse_dates, tupleize_cols, - thousands, encoding, decimal, converters, - na_values, keep_default_na) + port_frame = pandas.read_html(io, match, flavor, header, index_col, + skiprows, attrs, parse_dates, tupleize_cols, + thousands, encoding, decimal, converters, + na_values, keep_default_na) ray_frame = from_pandas(port_frame[0], get_npartitions()) return ray_frame @@ -374,7 +375,7 @@ def read_clipboard(sep=r'\s+'): warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_clipboard(sep) + port_frame = pandas.read_clipboard(sep) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -403,11 +404,11 @@ def read_excel(io, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_excel(io, sheet_name, header, skiprows, skip_footer, - index_col, names, usecols, parse_dates, - date_parser, na_values, thousands, - convert_float, converters, dtype, true_values, - false_values, engine, squeeze) + port_frame = pandas.read_excel(io, sheet_name, header, skiprows, + skip_footer, index_col, names, usecols, + parse_dates, date_parser, na_values, + thousands, convert_float, converters, dtype, + true_values, false_values, engine, squeeze) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -420,7 +421,7 @@ def read_hdf(path_or_buf, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_hdf(path_or_buf, key, mode) + port_frame = pandas.read_hdf(path_or_buf, key, mode) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -432,7 +433,7 @@ def read_feather(path, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_feather(path) + port_frame = pandas.read_feather(path) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -445,7 +446,7 @@ def read_msgpack(path_or_buf, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_msgpack(path_or_buf, encoding, iterator) + port_frame = pandas.read_msgpack(path_or_buf, encoding, iterator) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -466,10 +467,10 @@ def read_stata(filepath_or_buffer, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_stata(filepath_or_buffer, convert_dates, - convert_categoricals, encoding, index_col, - convert_missing, preserve_dtypes, columns, - order_categoricals, chunksize, iterator) + port_frame = pandas.read_stata(filepath_or_buffer, convert_dates, + convert_categoricals, encoding, index_col, + convert_missing, preserve_dtypes, columns, + order_categoricals, chunksize, iterator) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -485,8 +486,8 @@ def read_sas(filepath_or_buffer, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_sas(filepath_or_buffer, format, index, encoding, - chunksize, iterator) + port_frame = pandas.read_sas(filepath_or_buffer, format, index, encoding, + chunksize, iterator) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -498,7 +499,7 @@ def read_pickle(path, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_pickle(path, compression) + port_frame = pandas.read_pickle(path, compression) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame @@ -516,8 +517,8 @@ def read_sql(sql, warnings.warn("Defaulting to Pandas implementation", PendingDeprecationWarning) - port_frame = pd.read_sql(sql, con, index_col, coerce_float, params, - parse_dates, columns, chunksize) + port_frame = pandas.read_sql(sql, con, index_col, coerce_float, params, + parse_dates, columns, chunksize) ray_frame = from_pandas(port_frame, get_npartitions()) return ray_frame diff --git a/python/ray/dataframe/pandas_code_gen.py b/python/ray/dataframe/pandas_code_gen.py index b5969281c..8a122e530 100644 --- a/python/ray/dataframe/pandas_code_gen.py +++ b/python/ray/dataframe/pandas_code_gen.py @@ -5,17 +5,17 @@ from __future__ import print_function import inspect -def code_gen(pd_obj, ray_obj, path): +def code_gen(pandas_obj, ray_obj, path): """Generate code skeleton for methods not in Ray Args: - pd_obj: The pandas object to generate code from. + pandas_obj: The pandas object to generate code from. ray_obj: The ray object to diff against. path: Path to output the file to. """ with open(path, "w") as outfile: - funcs = pandas_ray_diff(pd_obj, ray_obj) + funcs = pandas_ray_diff(pandas_obj, ray_obj) for func in funcs: if func[0] == "_" and func[1] != "_": @@ -24,9 +24,10 @@ def code_gen(pd_obj, ray_obj, path): # let's not mess with these continue try: - outfile.write("\ndef " + func + - str(inspect.signature(getattr(pd_obj, func))) + - ":\n") + outfile.write( + "\ndef " + func + + str(inspect.signature(getattr(pandas_obj, func))) + ":\n") + except TypeError: outfile.write("\n@property") outfile.write("\ndef " + func + "(self):\n") @@ -73,21 +74,21 @@ def code_gen_test(ray_obj, path, name): outfile.write(")\n") -def pandas_ray_diff(pd_obj, ray_obj): +def pandas_ray_diff(pandas_obj, ray_obj): """Gets the diff of the methods in the Pandas and Ray objects. Args: - pd_obj: The Pandas object to diff. + pandas_obj: The Pandas object to diff. ray_obj: The Ray object to diff. Returns: A list of method names that are different between the two. """ - pd_funcs = dir(pd_obj) + pandas_funcs = dir(pandas_obj) ray_funcs = dir(ray_obj) - pd_funcs = set(filter(lambda f: f[0] != "_" or f[1] == "_", - pd_funcs)) + pandas_funcs = set(filter(lambda f: f[0] != "_" or f[1] == "_", + pandas_funcs)) - diff = [x for x in pd_funcs if x not in set(ray_funcs)] + diff = [x for x in pandas_funcs if x not in set(ray_funcs)] return diff diff --git a/python/ray/dataframe/series.py b/python/ray/dataframe/series.py index 0d50970ea..aa102363d 100644 --- a/python/ray/dataframe/series.py +++ b/python/ray/dataframe/series.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import numpy as np -import pandas as pd +import pandas from .utils import _inherit_docstrings @@ -14,7 +14,8 @@ def na_op(): raise NotImplementedError("Not Yet implemented.") -@_inherit_docstrings(pd.Series, excluded=[pd.Series, pd.Series.__init__]) +@_inherit_docstrings(pandas.Series, excluded=[pandas.Series, + pandas.Series.__init__]) class Series(object): def __init__(self, series_oids): @@ -832,7 +833,7 @@ class Series(object): def unstack(self, level=-1, fill_value=None): raise NotImplementedError("Not Yet implemented.") - def update(self, other): + def upandasate(self, other): raise NotImplementedError("Not Yet implemented.") def valid(self, inplace=False, **kwargs): diff --git a/python/ray/dataframe/test/test_series.py b/python/ray/dataframe/test/test_series.py index ba8b50061..bf26e32e8 100644 --- a/python/ray/dataframe/test/test_series.py +++ b/python/ray/dataframe/test/test_series.py @@ -3,12 +3,12 @@ from __future__ import division from __future__ import print_function import pytest -import ray.dataframe as rdf +import ray.dataframe as pd @pytest.fixture def create_test_series(): - return rdf.Series(None) + return pd.Series(None) def test_T(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 466061c56..4f4ea0b64 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import collections -import pandas as pd +import pandas import numpy as np import ray @@ -119,14 +119,14 @@ def _get_nan_block_id(n_row=1, n_col=1, transpose=False): shape = (n_row, n_col) if shape not in _NAN_BLOCKS: arr = np.tile(np.array(np.NaN), shape) - _NAN_BLOCKS[shape] = ray.put(pd.DataFrame(data=arr)) + _NAN_BLOCKS[shape] = ray.put(pandas.DataFrame(data=arr)) return _NAN_BLOCKS[shape] def _get_lengths(df): """Gets the length of the dataframe. Args: - df: A remote pd.DataFrame object. + df: A remote pandas.DataFrame object. Returns: Returns an integer length of the dataframe object. If the attempt fails, returns 0 as the length. @@ -142,7 +142,7 @@ def _get_lengths(df): def _get_widths(df): """Gets the width (number of columns) of the dataframe. Args: - df: A remote pd.DataFrame object. + df: A remote pandas.DataFrame object. Returns: Returns an integer width of the dataframe object. If the attempt fails, returns 0 as the length. @@ -155,6 +155,11 @@ def _get_widths(df): return 0 +def _get_empty(df): + """Return True if the DataFrame is empty""" + return df.empty + + def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): """Partitions a Pandas DataFrame object. Args: @@ -178,10 +183,10 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): row_partitions = [] while len(temp_df) > row_chunksize: t_df = temp_df[:row_chunksize] - # reset_index here because we want a pd.RangeIndex + # reset_index here because we want a pandas.RangeIndex # within the partitions. It is smaller and sometimes faster. t_df.reset_index(drop=True, inplace=True) - t_df.columns = pd.RangeIndex(0, len(t_df.columns)) + t_df.columns = pandas.RangeIndex(0, len(t_df.columns)) top = ray.put(t_df) row_partitions.append(top) temp_df = temp_df[row_chunksize:] @@ -190,7 +195,7 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): # This call is necessary to prevent modifying original df temp_df = temp_df[:] temp_df.reset_index(drop=True, inplace=True) - temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) + temp_df.columns = pandas.RangeIndex(0, len(temp_df.columns)) row_partitions.append(ray.put(temp_df)) return row_partitions @@ -223,10 +228,10 @@ def to_pandas(df): Returns: A new pandas DataFrame. """ - pd_df = pd.concat(ray.get(df._row_partitions), copy=False) - pd_df.index = df.index - pd_df.columns = df.columns - return pd_df + pandas_df = pandas.concat(ray.get(df._row_partitions), copy=False) + pandas_df.index = df.index + pandas_df.columns = df.columns + return pandas_df @ray.remote @@ -342,17 +347,27 @@ def _build_row_lengths(df_row): @ray.remote def _build_coord_df(lengths, index): """Build the coordinate dataframe over all partitions.""" - coords = np.vstack([np.column_stack((np.full(l, i), np.arange(l))) - for i, l in enumerate(lengths)]) - + filtered_lengths = [x for x in lengths if x > 0] + coords = None + if len(filtered_lengths) > 0: + coords = np.vstack([np.column_stack((np.full(l, i), np.arange(l))) + for i, l in enumerate(filtered_lengths)]) col_names = ("partition", "index_within_partition") - return pd.DataFrame(coords, index=index, columns=col_names) + return pandas.DataFrame(coords, index=index, columns=col_names) + + +@ray.remote +def _check_empty(dfs): + """Check if all partitions are empty""" + return all(ray.get([_deploy_func.remote(_get_empty, d) for d in dfs])) def _create_block_partitions(partitions, axis=0, length=None): if length is not None and length != 0 and get_npartitions() > length: npartitions = length + elif length == 0: + npartitions = 1 else: npartitions = get_npartitions() @@ -385,8 +400,8 @@ def create_blocks_helper(df, npartitions, axis): if df.shape[axis ^ 1] % npartitions == 0 \ else df.shape[axis ^ 1] // npartitions + 1 - # if not isinstance(df.columns, pd.RangeIndex): - # df.columns = pd.RangeIndex(0, len(df.columns)) + # if not isinstance(df.columns, pandas.RangeIndex): + # df.columns = pandas.RangeIndex(0, len(df.columns)) blocks = [df.iloc[:, i * block_size: (i + 1) * block_size] if axis == 0 @@ -394,7 +409,7 @@ def create_blocks_helper(df, npartitions, axis): for i in range(npartitions)] for block in blocks: - block.columns = pd.RangeIndex(0, len(block.columns)) + block.columns = pandas.RangeIndex(0, len(block.columns)) block.reset_index(inplace=True, drop=True) return blocks @@ -403,20 +418,20 @@ def create_blocks_helper(df, npartitions, axis): @ray.remote def _blocks_to_col(*partition): if len(partition): - return pd.concat(partition, axis=0, copy=False)\ + return pandas.concat(partition, axis=0, copy=False)\ .reset_index(drop=True) else: - return pd.Series() + return pandas.Series() @memoize @ray.remote def _blocks_to_row(*partition): - row_part = pd.concat(partition, axis=1, copy=False)\ + row_part = pandas.concat(partition, axis=1, copy=False)\ .reset_index(drop=True) # Because our block partitions contain different indices (for the # columns), this change is needed to ensure correctness. - row_part.columns = pd.RangeIndex(0, len(row_part.columns)) + row_part.columns = pandas.RangeIndex(0, len(row_part.columns)) return row_part @@ -468,7 +483,7 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df): Returns: A new set of blocks made up of DataFrames. """ - df = pd.concat(df, axis=axis ^ 1) + df = pandas.concat(df, axis=axis ^ 1) if axis == 1: df.index = old_index elif axis == 0: @@ -497,12 +512,12 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx, Returns: A new set of blocks for the partitioned DataFrame. """ - left = pd.concat(zipped[:left_df_len], axis=1, copy=False).copy() + left = pandas.concat(zipped[:left_df_len], axis=1, copy=False).copy() left.columns = left_columns if left_idx is not None: left.index = left_idx - right = pd.concat(zipped[left_df_len:], axis=1, copy=False).copy() + right = pandas.concat(zipped[left_df_len:], axis=1, copy=False).copy() right.columns = right_columns new_rows = func(left, right) @@ -546,7 +561,7 @@ def _match_partitioning(column_partition, lengths, index): column_partition.index = index for length in lengths: if len(column_partition) == 0: - partitioned_list.append(pd.DataFrame(columns=columns)) + partitioned_list.append(pandas.DataFrame(columns=columns)) continue partitioned_list.append(column_partition.iloc[:length, :]) @@ -570,4 +585,4 @@ def fix_blocks_dimensions(blocks, axis): @ray.remote def _compile_remote_dtypes(*column_of_blocks): small_dfs = [df.loc[0:0] for df in column_of_blocks] - return pd.concat(small_dfs, copy=False).dtypes + return pandas.concat(small_dfs, copy=False).dtypes