[DataFrame] Implement Inter-DataFrame operations (#1937)

This commit is contained in:
Devin Petersohn 2018-04-30 06:42:07 -07:00 committed by Robert Nishihara
parent 34bc6ce6ea
commit 0c477fbbca
4 changed files with 626 additions and 317 deletions

View file

@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import pandas
import numpy as np
from .dataframe import DataFrame
from .utils import _reindex_helper
@ -110,21 +111,23 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_rows = [_reindex_helper.remote(part, all_columns[i],
final_columns, axis)
for i in range(len(objs))
for part in objs[i]._row_partitions]
return DataFrame(row_partitions=new_rows,
columns=final_columns,
index=final_index)
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs))
for part in objs[i]._block_partitions])
else:
new_columns = [_reindex_helper.remote(part, all_index[i],
final_index, axis)
for i in range(len(objs))
for part in objs[i]._col_partitions]
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs))
for part in objs[i]._block_partitions.T]).T
return DataFrame(col_partitions=new_columns,
columns=final_columns,
index=final_index)
return DataFrame(block_partitions=new_blocks,
columns=final_columns,
index=final_index)

View file

@ -36,7 +36,8 @@ from .utils import (
_blocks_to_row,
_create_block_partitions,
_inherit_docstrings,
_reindex_helper)
_reindex_helper,
_co_op_helper)
from . import get_npartitions
from .index_metadata import _IndexMetadata
@ -103,8 +104,7 @@ class DataFrame(object):
if block_partitions is not None:
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
assert self._block_partitions.ndim == 2, \
"Block Partitions must be 2D."
axis = 0
else:
if row_partitions is not None:
axis = 0
@ -125,10 +125,12 @@ class DataFrame(object):
# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
if self._block_partitions.ndim != 2:
if self._block_partitions.ndim < 2:
self._block_partitions = np.expand_dims(self._block_partitions,
axis=axis ^ 1)
assert self._block_partitions.ndim == 2, "Block Partitions must be 2D."
# Create the row and column index objects for using our partitioning.
# If the objects haven't been inherited, then generate them
if self._row_metadata is None:
@ -278,8 +280,8 @@ class DataFrame(object):
return self._repr_helper_()._repr_html_()
# We split so that we insert our correct dataframe dimensions.
result = self._repr_helper_()._repr_html_()
return result.split('<p>')[0] + \
'<p>{0} rows × {1} columns</p>\n</div>'.format(len(self.index),
return result.split("<p>")[0] + \
"<p>{0} rows x {1} columns</p>\n</div>".format(len(self.index),
len(self.columns))
def _get_index(self):
@ -456,7 +458,8 @@ class DataFrame(object):
return len(self.index), len(self.columns)
def _update_inplace(self, row_partitions=None, col_partitions=None,
columns=None, index=None):
block_partitions=None, columns=None, index=None,
col_metadata=None, row_metadata=None):
"""Updates the current DataFrame inplace.
Behavior should be similar to the constructor, given the corresponding
@ -480,22 +483,31 @@ class DataFrame(object):
not have enough contextual info to rebuild the indexes
correctly based on the addition/subtraction of rows/columns.
"""
assert row_partitions is not None or col_partitions is not None, \
assert row_partitions is not None or col_partitions is not None\
or block_partitions is not None, \
"To update inplace, new column or row partitions must be set."
if row_partitions is not None:
if block_partitions is not None:
self._block_partitions = block_partitions
elif row_partitions is not None:
self._row_partitions = row_partitions
elif col_partitions is not None:
self._col_partitions = col_partitions
if row_partitions is not None or col_partitions is not None:
# At least one partition list is being updated, so recompute
# lengths and indices
self._row_metadata = _IndexMetadata(self._block_partitions[:, 0],
index=index, axis=0)
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)
if col_metadata is not None:
self._col_metadata = col_metadata
else:
assert columns is not None, \
"Columns must be passed without col_metadata"
self._col_metadata = _IndexMetadata(
self._block_partitions[0, :], index=columns, axis=1)
if row_metadata is not None:
self._row_metadata = row_metadata
else:
# Index can be None for default index, so we don't check
self._row_metadata = _IndexMetadata(
self._block_partitions[:, 0], index=index, axis=0)
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@ -714,9 +726,20 @@ class DataFrame(object):
raise NotImplementedError("Not yet")
def add(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Add this DataFrame to another or a scalar/list.
Args:
other: What to add this this DataFrame.
axis: The axis to apply addition over. Only applicaable to Series
or list 'other'.
level: A level in the multilevel axis to add over.
fill_value: The value to fill NaN.
Returns:
A new DataFrame with the applied addition.
"""
return self._operator_helper(pd.DataFrame.add, other, axis, level,
fill_value)
def agg(self, func, axis=0, *args, **kwargs):
return self.aggregate(func, axis, *args, **kwargs)
@ -953,6 +976,7 @@ class DataFrame(object):
to_concat = [self] + other
else:
to_concat = [self, other]
return concat(to_concat, ignore_index=ignore_index,
verify_integrity=verify_integrity)
@ -1261,14 +1285,33 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def div(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Divides this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the divide against this.
axis: The axis to divide over.
level: The Multilevel index level to apply divide over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pd.DataFrame.add, other, axis, level,
fill_value)
def divide(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Synonym for div.
Args:
other: The object to use to apply the divide against this.
axis: The axis to divide over.
level: The Multilevel index level to apply divide over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Divide applied.
"""
return self.div(other, axis, level, fill_value)
def dot(self, other):
raise NotImplementedError(
@ -1419,9 +1462,17 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def eq(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is equal to other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the eq over.
level: The Multilevel index level to apply eq over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.eq, other, axis, level)
def equals(self, other):
"""
@ -1688,9 +1739,19 @@ class DataFrame(object):
return self._row_metadata.first_valid_index()
def floordiv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Divides this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the divide against this.
axis: The axis to divide over.
level: The Multilevel index level to apply divide over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pd.DataFrame.floordiv, other, axis, level,
fill_value)
@classmethod
def from_csv(self, path, header=0, sep=', ', index_col=0,
@ -1720,9 +1781,17 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def ge(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is greater than or equal to other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the gt over.
level: The Multilevel index level to apply gt over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.ge, other, axis, level)
def get(self, key, default=None):
"""Get item from object for given key (DataFrame column, Panel
@ -1770,9 +1839,17 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def gt(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is greater than other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the gt over.
level: The Multilevel index level to apply gt over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.gt, other, axis, level)
def head(self, n=5):
"""Get the first n rows of the dataframe.
@ -2104,24 +2181,35 @@ class DataFrame(object):
.join(pd.DataFrame(columns=other.columns),
lsuffix=lsuffix, rsuffix=rsuffix).columns
new_partition_num = max(len(self._block_partitions.T),
len(other._block_partitions.T))
# Join is a concat once we have shuffled the data internally.
# We shuffle the data by computing the correct order.
# Another important thing to note: We set the current self index
# to the index variable which may be 'on'.
new_self = [_reindex_helper.remote(col, index, new_index, 1)
for col in self._col_partitions]
new_other = [_reindex_helper.remote(col, other.index, new_index, 1)
for col in other._col_partitions]
new_self = np.array([
_reindex_helper._submit(args=tuple([index, new_index, 1,
new_partition_num] +
block.tolist()),
num_return_vals=new_partition_num)
for block in self._block_partitions.T])
new_other = np.array([
_reindex_helper._submit(args=tuple([other.index, new_index, 1,
new_partition_num] +
block.tolist()),
num_return_vals=new_partition_num)
for block in other._block_partitions.T])
# Append the columns together (i.e. concat)
new_column_parts = new_self + new_other
# Append the blocks together (i.e. concat)
new_block_parts = np.concatenate((new_self, new_other)).T
# Default index in the case that on is set.
if on is not None:
new_index = None
# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
return DataFrame(block_partitions=new_block_parts,
index=new_index,
columns=new_column_labels)
else:
@ -2141,17 +2229,28 @@ class DataFrame(object):
[pd.DataFrame(columns=obj.columns) for obj in other],
lsuffix=lsuffix, rsuffix=rsuffix).columns
new_self = [_reindex_helper.remote(col, self.index, new_index, 1)
for col in self._col_partitions]
new_partition_num = max([len(self._block_partitions.T)] +
[len(obj._block_partitions.T)
for obj in other])
new_others = [_reindex_helper.remote(col, obj.index, new_index, 1)
for obj in other for col in obj._col_partitions]
new_self = np.array([
_reindex_helper._submit(args=tuple([self.index, new_index, 1,
new_partition_num] +
block.tolist()),
num_return_vals=new_partition_num)
for block in self._block_partitions.T])
new_others = np.array([_reindex_helper._submit(
args=tuple([obj.index, new_index, 1, new_partition_num] +
block.tolist()),
num_return_vals=new_partition_num
) for obj in other for block in obj._block_partitions.T])
# Append the columns together (i.e. concat)
new_column_parts = new_self + new_others
new_block_parts = np.concatenate((new_self, new_others)).T
# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
return DataFrame(block_partitions=new_block_parts,
index=new_index,
columns=new_column_labels)
@ -2181,9 +2280,17 @@ class DataFrame(object):
return self._row_metadata.last_valid_index()
def le(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is less than or equal to other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the le over.
level: The Multilevel index level to apply le over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.le, other, axis, level)
def lookup(self, row_labels, col_labels):
raise NotImplementedError(
@ -2191,9 +2298,17 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def lt(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is less than other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the lt over.
level: The Multilevel index level to apply lt over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.lt, other, axis, level)
def mad(self, axis=None, skipna=None, level=None):
raise NotImplementedError(
@ -2303,9 +2418,19 @@ class DataFrame(object):
return self._arithmetic_helper(remote_func, axis, level)
def mod(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Mods this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the mod against this.
axis: The axis to mod over.
level: The Multilevel index level to apply mod over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Mod applied.
"""
return self._operator_helper(pd.DataFrame.mod, other, axis, level,
fill_value)
def mode(self, axis=0, numeric_only=False):
raise NotImplementedError(
@ -2313,19 +2438,46 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def mul(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Multiplies this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the multiply against this.
axis: The axis to multiply over.
level: The Multilevel index level to apply multiply over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Multiply applied.
"""
return self._operator_helper(pd.DataFrame.mul, other, axis, level,
fill_value)
def multiply(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Synonym for mul.
Args:
other: The object to use to apply the multiply against this.
axis: The axis to multiply over.
level: The Multilevel index level to apply multiply over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Multiply applied.
"""
return self.mul(other, axis, level, fill_value)
def ne(self, other, axis='columns', level=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Checks element-wise that this is not equal to other.
Args:
other: A DataFrame or Series or scalar to compare to.
axis: The axis to perform the ne over.
level: The Multilevel index level to apply ne over.
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pd.DataFrame.ne, other, axis, level)
def nlargest(self, n, columns, keep='first'):
raise NotImplementedError(
@ -2426,9 +2578,19 @@ class DataFrame(object):
return result
def pow(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Pow this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the pow against this.
axis: The axis to pow over.
level: The Multilevel index level to apply pow over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Pow applied.
"""
return self._operator_helper(pd.DataFrame.pow, other, axis, level,
fill_value)
def prod(self, axis=None, skipna=None, level=None, numeric_only=None,
min_count=0, **kwargs):
@ -2523,9 +2685,7 @@ class DataFrame(object):
return DataFrame(row_partitions=new_rows, columns=self.columns)
def radd(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.add(other, axis, level, fill_value)
def rank(self, axis=0, method='average', numeric_only=None,
na_option='keep', ascending=True, pct=False):
@ -2534,9 +2694,9 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def rdiv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rdiv(other, axis, level, fill_value),
other, axis, level)
def reindex(self, labels=None, index=None, columns=None, axis=None,
method=None, copy=True, level=None, fill_value=np.nan,
@ -2755,19 +2915,17 @@ class DataFrame(object):
return new_obj
def rfloordiv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rfloordiv(other, axis, level, fill_value),
other, axis, level)
def rmod(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rmod(other, axis, level, fill_value),
other, axis, level)
def rmul(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.mul(other, axis, level, fill_value)
def rolling(self, window, min_periods=None, freq=None, center=False,
win_type=None, on=None, axis=0, closed=None):
@ -2785,19 +2943,19 @@ class DataFrame(object):
index=self.index)
def rpow(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rpow(other, axis, level, fill_value),
other, axis, level)
def rsub(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rsub(other, axis, level, fill_value),
other, axis, level)
def rtruediv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self._single_df_op_helper(
lambda df: df.rtruediv(other, axis, level, fill_value),
other, axis, level)
def sample(self, n=None, frac=None, replace=False, weights=None,
random_state=None, axis=None):
@ -3002,14 +3160,33 @@ class DataFrame(object):
return self._arithmetic_helper(remote_func, axis, level)
def sub(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Subtract a DataFrame/Series/scalar from this DataFrame.
Args:
other: The object to use to apply the subtraction to this.
axis: THe axis to apply the subtraction over.
level: Mutlilevel index level to subtract over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the subtraciont applied.
"""
return self._operator_helper(pd.DataFrame.sub, other, axis, level,
fill_value)
def subtract(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Alias for sub.
Args:
other: The object to use to apply the subtraction to this.
axis: THe axis to apply the subtraction over.
level: Mutlilevel index level to subtract over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the subtraciont applied.
"""
return self.sub(other, axis, level, fill_value)
def swapaxes(self, axis1, axis2, copy=True):
raise NotImplementedError(
@ -3259,9 +3436,19 @@ class DataFrame(object):
return result
def truediv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Divides this DataFrame against another DataFrame/Series/scalar.
Args:
other: The object to use to apply the divide against this.
axis: The axis to divide over.
level: The Multilevel index level to apply divide over.
fill_value: The value to fill NaNs with.
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pd.DataFrame.truediv, other, axis, level,
fill_value)
def truncate(self, before=None, after=None, axis=None, copy=True):
raise NotImplementedError(
@ -3624,80 +3811,92 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def __lt__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.lt(other)
def __le__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.le(other)
def __gt__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.gt(other)
def __ge__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.ge(other)
def __eq__(self, other):
"""Computes the equality of this DataFrame with another
Returns:
True, if the DataFrames are equal. False otherwise.
"""
return self.equals(other)
return self.eq(other)
def __ne__(self, other):
"""Checks that this DataFrame is not equal to another
Returns:
True, if the DataFrames are not equal. False otherwise.
"""
return not self.equals(other)
return self.ne(other)
def __add__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.add(other)
def __iadd__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.add(other)
def __radd__(self, other, axis="columns", level=None, fill_value=None):
return self.radd(other, axis, level, fill_value)
def __mul__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.mul(other)
def __imul__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.mul(other)
def __rmul__(self, other, axis="columns", level=None, fill_value=None):
return self.rmul(other, axis, level, fill_value)
def __pow__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.pow(other)
def __ipow__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.pow(other)
def __rpow__(self, other, axis="columns", level=None, fill_value=None):
return self.rpow(other, axis, level, fill_value)
def __sub__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.sub(other)
def __isub__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.sub(other)
def __rsub__(self, other, axis="columns", level=None, fill_value=None):
return self.rsub(other, axis, level, fill_value)
def __floordiv__(self, other):
return self.floordiv(other)
def __ifloordiv__(self, other):
return self.floordiv(other)
def __rfloordiv__(self, other, axis="columns", level=None,
fill_value=None):
return self.rfloordiv(other, axis, level, fill_value)
def __truediv__(self, other):
return self.truediv(other)
def __itruediv__(self, other):
return self.truediv(other)
def __rtruediv__(self, other, axis="columns", level=None, fill_value=None):
return self.rtruediv(other, axis, level, fill_value)
def __mod__(self, other):
return self.mod(other)
def __imod__(self, other):
return self.mod(other)
def __rmod__(self, other, axis="columns", level=None, fill_value=None):
return self.rmod(other, axis, level, fill_value)
def __div__(self, other, axis="columns", level=None, fill_value=None):
return self.div(other, axis, level, fill_value)
def __rdiv__(self, other, axis="columns", level=None, fill_value=None):
return self.rdiv(other, axis, level, fill_value)
def __neg__(self):
"""Computes an element wise negative DataFrame
@ -3720,21 +3919,6 @@ class DataFrame(object):
columns=self.columns,
index=self.index)
def __floordiv__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __truediv__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __mod__(self, other):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __sizeof__(self):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
@ -3758,12 +3942,7 @@ class DataFrame(object):
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def iat(axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __rsub__(other, axis=None, level=None, fill_value=None):
def iat(self, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
@ -3785,22 +3964,12 @@ class DataFrame(object):
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __itruediv__(other):
def at(self, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def __div__(other, axis=None, level=None, fill_value=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def at(axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def ix(axis=None):
def ix(self, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
@ -3815,3 +3984,122 @@ class DataFrame(object):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def _copartition(self, other, new_index):
"""Colocates the values of other with this for certain operations.
NOTE: This method uses the indexes of each DataFrame to order them the
same. This operation does an implicit shuffling of data and zips
the two DataFrames together to be operated on.
Args:
other: The other DataFrame to copartition with.
Returns:
Two new sets of partitions, copartitioned and zipped.
"""
# Put in the object store so they aren't serialized each iteration.
old_self_index = ray.put(self.index)
new_index = ray.put(new_index)
old_other_index = ray.put(other.index)
new_num_partitions = max(len(self._block_partitions.T),
len(other._block_partitions.T))
new_partitions_self = \
np.array([_reindex_helper._submit(
args=tuple([old_self_index, new_index, 1,
new_num_partitions] + block.tolist()),
num_return_vals=new_num_partitions)
for block in self._block_partitions.T]).T
new_partitions_other = \
np.array([_reindex_helper._submit(
args=tuple([old_other_index, new_index, 1,
new_num_partitions] + block.tolist()),
num_return_vals=new_num_partitions)
for block in other._block_partitions.T]).T
return zip(new_partitions_self, new_partitions_other)
def _operator_helper(self, func, other, axis, level, *args):
"""Helper method for inter-dataframe and scalar operations"""
if isinstance(other, DataFrame):
return self._inter_df_op_helper(
lambda x, y: func(x, y, axis, level, *args),
other, axis, level)
else:
return self._single_df_op_helper(
lambda df: func(df, other, axis, level, *args),
other, axis, level)
def _inter_df_op_helper(self, func, other, axis, level):
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)
# Adding two DataFrames causes an outer join.
if isinstance(other, DataFrame):
new_column_index = self.columns.join(other.columns, how="outer")
new_index = self.index.join(other.index, how="outer")
copartitions = self._copartition(other, new_index)
new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0])] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])
# TODO join the Index Metadata objects together for performance.
return DataFrame(block_partitions=new_blocks,
columns=new_column_index,
index=new_index)
def _single_df_op_helper(self, func, other, axis, level):
if level is not None:
raise NotImplementedError("Multilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)
if is_list_like(other):
new_index = self.index
new_column_index = self.columns
new_col_metadata = self._col_metadata
new_row_metadata = self._row_metadata
new_blocks = None
if axis == 0:
if len(other) != len(self.index):
raise ValueError(
"Unable to coerce to Series, length must be {0}: "
"given {1}".format(len(self.index), len(other)))
new_columns = _map_partitions(func, self._col_partitions)
new_rows = None
else:
if len(other) != len(self.columns):
raise ValueError(
"Unable to coerce to Series, length must be {0}: "
"given {1}".format(len(self.columns), len(other)))
new_rows = _map_partitions(func, self._row_partitions)
new_columns = None
else:
new_blocks = np.array([_map_partitions(func, block)
for block in self._block_partitions])
new_columns = None
new_rows = None
new_index = self.index
new_column_index = self.columns
new_col_metadata = self._col_metadata
new_row_metadata = self._row_metadata
return DataFrame(col_partitions=new_columns,
row_partitions=new_rows,
block_partitions=new_blocks,
index=new_index,
columns=new_column_index,
col_metadata=new_col_metadata,
row_metadata=new_row_metadata)

View file

@ -806,11 +806,75 @@ def test_nan_dataframe():
test_transform(ray_df, pandas_df)
def test_add():
ray_df = create_test_dataframe()
@pytest.fixture
def test_inter_df_math(op, simple=False):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
with pytest.raises(NotImplementedError):
ray_df.add(None)
pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
ray_df_equals_pandas(getattr(ray_df, op)(ray_df),
getattr(pandas_df, op)(pandas_df))
ray_df_equals_pandas(getattr(ray_df, op)(4),
getattr(pandas_df, op)(4))
ray_df_equals_pandas(getattr(ray_df, op)(4.0),
getattr(pandas_df, op)(4.0))
ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
ray_df_equals_pandas(getattr(ray_df, op)(ray_df2),
getattr(pandas_df, op)(pandas_df2))
list_test = [0, 1, 2, 4]
if not simple:
ray_df_equals_pandas(getattr(ray_df, op)(list_test, axis=1),
getattr(pandas_df, op)(list_test, axis=1))
ray_df_equals_pandas(getattr(ray_df, op)(list_test, axis=0),
getattr(pandas_df, op)(list_test, axis=0))
@pytest.fixture
def test_comparison_inter_ops(op):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
ray_df_equals_pandas(getattr(ray_df, op)(ray_df),
getattr(pandas_df, op)(pandas_df))
ray_df_equals_pandas(getattr(ray_df, op)(4),
getattr(pandas_df, op)(4))
ray_df_equals_pandas(getattr(ray_df, op)(4.0),
getattr(pandas_df, op)(4.0))
ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
ray_df_equals_pandas(getattr(ray_df, op)(ray_df2),
getattr(pandas_df, op)(pandas_df2))
@pytest.fixture
def test_inter_df_math_right_ops(op):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
ray_df_equals_pandas(getattr(ray_df, op)(4),
getattr(pandas_df, op)(4))
ray_df_equals_pandas(getattr(ray_df, op)(4.0),
getattr(pandas_df, op)(4.0))
def test_add():
test_inter_df_math("add", simple=False)
@pytest.fixture
@ -863,6 +927,8 @@ def test_append():
pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]})
print(ray_df.append(ray_df2))
assert ray_df_equals_pandas(ray_df.append(ray_df2),
pandas_df.append(pandas_df2))
@ -1083,17 +1149,11 @@ def test_diff():
def test_div():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.div(None)
test_inter_df_math("div", simple=False)
def test_divide():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.divide(None)
test_inter_df_math("divide", simple=False)
def test_dot():
@ -1208,10 +1268,7 @@ def test_duplicated():
def test_eq():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.eq(None)
test_comparison_inter_ops("eq")
def test_equals():
@ -1718,10 +1775,7 @@ def test_first_valid_index(ray_df, pandas_df):
def test_floordiv():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.floordiv(None)
test_inter_df_math("floordiv", simple=False)
def test_from_csv():
@ -1745,10 +1799,7 @@ def test_from_records():
def test_ge():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.ge(None)
test_comparison_inter_ops("ge")
def test_get_value():
@ -1766,10 +1817,7 @@ def test_get_values():
def test_gt():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.gt(None)
test_comparison_inter_ops("gt")
@pytest.fixture
@ -1935,10 +1983,7 @@ def test_last_valid_index(ray_df, pandas_df):
def test_le():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.le(None)
test_comparison_inter_ops("le")
def test_lookup():
@ -1949,10 +1994,7 @@ def test_lookup():
def test_lt():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.lt(None)
test_comparison_inter_ops("lt")
def test_mad():
@ -2014,10 +2056,7 @@ def test_min(ray_df, pandas_df):
def test_mod():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.mod(None)
test_inter_df_math("mod", simple=False)
def test_mode():
@ -2028,24 +2067,15 @@ def test_mode():
def test_mul():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.mul(None)
test_inter_df_math("mul", simple=False)
def test_multiply():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.multiply(None)
test_inter_df_math("multiply", simple=False)
def test_ne():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.ne(None)
test_comparison_inter_ops("ne")
def test_nlargest():
@ -2125,10 +2155,7 @@ def test_pop(ray_df, pandas_df):
def test_pow():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.pow(None)
test_inter_df_math("pow", simple=False)
def test_prod():
@ -2158,10 +2185,7 @@ def test_query(ray_df, pandas_df, funcs):
def test_radd():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.radd(None)
test_inter_df_math_right_ops("radd")
def test_rank():
@ -2172,10 +2196,7 @@ def test_rank():
def test_rdiv():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rdiv(None)
test_inter_df_math_right_ops("rdiv")
def test_reindex():
@ -2487,24 +2508,15 @@ def test_reset_index(ray_df, pandas_df, inplace=False):
def test_rfloordiv():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rfloordiv(None)
test_inter_df_math_right_ops("rfloordiv")
def test_rmod():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rmod(None)
test_inter_df_math_right_ops("rmod")
def test_rmul():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rmul(None)
test_inter_df_math_right_ops("rmul")
def test_rolling():
@ -2521,24 +2533,15 @@ def test_round(ray_df, pd_df):
def test_rpow():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rpow(None)
test_inter_df_math_right_ops("rpow")
def test_rsub():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rsub(None)
test_inter_df_math_right_ops("rsub")
def test_rtruediv():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.rtruediv(None)
test_inter_df_math_right_ops("rtruediv")
def test_sample():
@ -2657,17 +2660,11 @@ def test_std(ray_df, pandas_df):
def test_sub():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.sub(None)
test_inter_df_math("sub", simple=False)
def test_subtract():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.subtract(None)
test_inter_df_math("subtract", simple=False)
def test_swapaxes():
@ -2740,10 +2737,7 @@ def test_transform(ray_df, pandas_df):
def test_truediv():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.truediv(None)
test_inter_df_math("truediv", simple=False)
def test_truncate():
@ -2992,10 +2986,7 @@ def test_iat():
def test___rsub__():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.__rsub__(None, None, None)
test_inter_df_math_right_ops("__rsub__")
@pytest.fixture
@ -3022,17 +3013,11 @@ def test_is_copy():
def test___itruediv__():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.__itruediv__()
test_inter_df_math("__itruediv__", simple=True)
def test___div__():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.__div__(None)
test_inter_df_math("__div__", simple=True)
def test_at():

View file

@ -200,12 +200,16 @@ def _create_block_partitions(partitions, axis=0, length=None):
for partition in partitions]
# In the case that axis is 1 we have to transpose because we build the
# columns into rows. Fortunately numpy is efficent at this.
# columns into rows. Fortunately numpy is efficient at this.
return np.array(x) if axis == 0 else np.array(x).T
@ray.remote
def create_blocks(df, npartitions, axis):
return create_blocks_helper(df, npartitions, axis)
def create_blocks_helper(df, npartitions, axis):
# Single partition dataframes don't need to be repartitioned
if npartitions == 1:
return df
@ -285,7 +289,7 @@ def _inherit_docstrings(parent):
@ray.remote
def _reindex_helper(df, old_index, new_index, axis):
def _reindex_helper(old_index, new_index, axis, npartitions, *df):
"""Reindexes a dataframe to prepare for join/concat.
Args:
@ -295,8 +299,9 @@ def _reindex_helper(df, old_index, new_index, axis):
axis: Which axis to reindex over.
Returns:
A new reindexed DataFrame.
A new set of blocks made up of DataFrames.
"""
df = pd.concat(df, axis=axis ^ 1)
if axis == 1:
df.index = old_index
df = df.reindex(new_index, copy=False)
@ -305,4 +310,32 @@ def _reindex_helper(df, old_index, new_index, axis):
df.columns = old_index
df = df.reindex(columns=new_index, copy=False)
df.columns = pd.RangeIndex(len(df.columns))
return df
return create_blocks_helper(df, npartitions, axis)
@ray.remote
def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped):
"""Copartition operation where two DataFrames must have aligned indexes.
NOTE: This function assumes things are already copartitioned. Requires that
row partitions are passed in as blocks.
Args:
func: The operation to conduct between two DataFrames.
left_columns: The column names for the left DataFrame.
right_columns: The column names for the right DataFrame.
left_df_len: The length of the left. This is used so we can split up
the zipped partitions.
zipped: The DataFrame partitions (in blocks).
Returns:
A new set of blocks for the partitioned DataFrame.
"""
left = pd.concat(zipped[:left_df_len], axis=1, copy=False)
left.columns = left_columns
right = pd.concat(zipped[left_df_len:], axis=1, copy=False)
right.columns = right_columns
new_rows = func(left, right)
return create_blocks_helper(new_rows, left_df_len, 0)