[Datasets] Follow-up to groupby standard deviation PR (#20035)

This commit is contained in:
Clark Zinzow 2021-11-03 13:56:34 -07:00 committed by GitHub
parent eacaff5d8d
commit a0841106ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 129 additions and 10 deletions

View file

@ -106,12 +106,16 @@ class Mean(AggregateFn):
class Std(AggregateFn):
"""Defines standard deviation aggregation."""
"""Defines standard deviation aggregation.
# Uses Welford's online method for an accumulator-style computation of the
# standard deviation. This method was chosen due to it's numerical
# stability, and it being computable in a single pass. See
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
Uses Welford's online method for an accumulator-style computation of the
standard deviation. This method was chosen due to it's numerical
stability, and it being computable in a single pass.
This may give different (but more accurate) results than NumPy, Pandas,
and sklearn, which use a less numerically stable two-pass algorithm.
See
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
"""
def __init__(self, on: AggregateOnT = None, ddof: int = 1):
on_fn = _to_on_fn(on)
@ -120,7 +124,8 @@ class Std(AggregateFn):
# Accumulates the current count, the current mean, and the sum of
# squared differences from the current mean (M2).
M2, mean, count = a
# Select the data on which we want to calculate the mean.
# Select the data on which we want to calculate the standard
# deviation.
val = on_fn(r)
count += 1
@ -132,6 +137,8 @@ class Std(AggregateFn):
def merge(a: List[float], b: List[float]):
# Merges two accumulations into one.
# See
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
M2_a, mean_a, count_a = a
M2_b, mean_b, count_b = b
delta = mean_b - mean_a
@ -146,6 +153,8 @@ class Std(AggregateFn):
return [M2, mean, count]
def finalize(a: List[float]):
# Compute the final standard deviation from the accumulated
# sum of squared differences from current mean and the count.
M2, mean, count = a
if count < 2:
return 0.0

View file

@ -932,6 +932,14 @@ class Dataset(Generic[T]):
>>> ray.data.range(100).std()
>>> ray.data.range_arrow(100).std("value")
NOTE: This uses Welford's online method for an accumulator-style
computation of the standard deviation. This method was chosen due to
it's numerical stability, and it being computable in a single pass.
This may give different (but more accurate) results than NumPy, Pandas,
and sklearn, which use a less numerically stable two-pass algorithm.
See
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
Args:
on: The data on which to compute the standard deviation.
It can be the column name for Arrow dataset.

View file

@ -243,6 +243,14 @@ class GroupedDataset(Generic[T]):
... {"A": x % 3, "B": x} for x in range(100)]).groupby(
... "A").std("B")
NOTE: This uses Welford's online method for an accumulator-style
computation of the standard deviation. This method was chosen due to
it's numerical stability, and it being computable in a single pass.
This may give different (but more accurate) results than NumPy, Pandas,
and sklearn, which use a less numerically stable two-pass algorithm.
See
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
Args:
on: The data on which to compute the standard deviation.
It can be the column name for Arrow dataset.

View file

@ -1,3 +1,4 @@
import math
import os
import random
import requests
@ -5,7 +6,6 @@ import shutil
import time
from unittest.mock import patch
import math
import numpy as np
import pandas as pd
import pyarrow as pa
@ -2886,6 +2886,9 @@ def test_groupby_arrow(ray_start_regular_shared):
def test_groupby_arrow_count(ray_start_regular_shared):
# Test built-in count aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_count with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items([{
@ -2900,6 +2903,9 @@ def test_groupby_arrow_count(ray_start_regular_shared):
def test_groupby_arrow_sum(ray_start_regular_shared):
# Test built-in sum aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_sum with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items([{
@ -2918,6 +2924,9 @@ def test_groupby_arrow_sum(ray_start_regular_shared):
def test_groupby_arrow_min(ray_start_regular_shared):
# Test built-in min aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_min with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items([{
@ -2936,6 +2945,9 @@ def test_groupby_arrow_min(ray_start_regular_shared):
def test_groupby_arrow_max(ray_start_regular_shared):
# Test built-in max aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_max with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items([{
@ -2954,6 +2966,9 @@ def test_groupby_arrow_max(ray_start_regular_shared):
def test_groupby_arrow_mean(ray_start_regular_shared):
# Test built-in mean aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_mean with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items([{
@ -2973,16 +2988,29 @@ def test_groupby_arrow_mean(ray_start_regular_shared):
def test_groupby_arrow_std(ray_start_regular_shared):
# Test built-in std aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_arrow_std with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs})
agg_ds = ray.data.from_pandas(df).groupby("A").std("B")
assert agg_ds.count() == 3
expected = df.groupby("A")["B"].std()
assert agg_ds.to_pandas()["std(B)"].equals(expected)
result = agg_ds.to_pandas()["std(B)"].to_numpy()
expected = df.groupby("A")["B"].std().to_numpy()
np.testing.assert_array_equal(result, expected)
# ddof of 0
agg_ds = ray.data.from_pandas(df).groupby("A").std("B", ddof=0)
assert agg_ds.count() == 3
result = agg_ds.to_pandas()["std(B)"].to_numpy()
expected = df.groupby("A")["B"].std(ddof=0).to_numpy()
np.testing.assert_array_equal(result, expected)
# Test built-in global std aggregation
df = pd.DataFrame({"A": xs})
ray.data.from_pandas(df).std("A") == df["A"].std()
assert math.isclose(ray.data.from_pandas(df).std("A"), df["A"].std())
# ddof of 0
assert math.isclose(
ray.data.from_pandas(df).std("A", ddof=0), df["A"].std(ddof=0))
with pytest.raises(ValueError):
ray.data.from_pandas(pd.DataFrame({"A": []})).std("A")
# Test edge cases
@ -2990,6 +3018,9 @@ def test_groupby_arrow_std(ray_start_regular_shared):
def test_groupby_simple(ray_start_regular_shared):
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple with: {seed}")
random.seed(seed)
parallelism = 3
xs = [("A", 2), ("A", 4), ("A", 9), ("B", 10), ("B", 20), ("C", 3),
("C", 5), ("C", 8), ("C", 12)]
@ -3035,14 +3066,25 @@ def test_groupby_simple(ray_start_regular_shared):
lambda r: r).count()
assert agg_ds.count() == 0
def test_groupby_simple_count(ray_start_regular_shared):
# Test built-in count aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_count with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).count()
assert agg_ds.count() == 3
assert agg_ds.sort(key=lambda r: r[0]).take(3) == [(0, 34), (1, 33), (2,
33)]
def test_groupby_simple_sum(ray_start_regular_shared):
# Test built-in sum aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_sum with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).sum()
@ -3053,7 +3095,12 @@ def test_groupby_simple(ray_start_regular_shared):
assert ray.data.from_items(xs).sum() == 4950
assert ray.data.range(10).filter(lambda r: r > 10).sum() == 0
def test_groupby_simple_min(ray_start_regular_shared):
# Test built-in min aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_min with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).min()
@ -3064,7 +3111,12 @@ def test_groupby_simple(ray_start_regular_shared):
with pytest.raises(ValueError):
ray.data.range(10).filter(lambda r: r > 10).min()
def test_groupby_simple_max(ray_start_regular_shared):
# Test built-in max aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_max with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).max()
@ -3076,7 +3128,12 @@ def test_groupby_simple(ray_start_regular_shared):
with pytest.raises(ValueError):
ray.data.range(10).filter(lambda r: r > 10).max()
def test_groupby_simple_mean(ray_start_regular_shared):
# Test built-in mean aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_mean with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).mean()
@ -3089,6 +3146,43 @@ def test_groupby_simple(ray_start_regular_shared):
ray.data.range(10).filter(lambda r: r > 10).mean()
def test_groupby_simple_std(ray_start_regular_shared):
# Test built-in std aggregation
seed = int(time.time())
print(f"Seeding RNG for test_groupby_simple_std with: {seed}")
random.seed(seed)
xs = list(range(100))
random.shuffle(xs)
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).std()
assert agg_ds.count() == 3
df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs})
expected = df.groupby("A")["B"].std()
result = agg_ds.sort(key=lambda r: r[0]).take(3)
groups, stds = zip(*result)
result_df = pd.DataFrame({"A": list(groups), "B": list(stds)})
result_df = result_df.set_index("A")
pd.testing.assert_series_equal(result_df["B"], expected)
# ddof of 0
agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).std(ddof=0)
assert agg_ds.count() == 3
df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs})
expected = df.groupby("A")["B"].std(ddof=0)
result = agg_ds.sort(key=lambda r: r[0]).take(3)
groups, stds = zip(*result)
result_df = pd.DataFrame({"A": list(groups), "B": list(stds)})
result_df = result_df.set_index("A")
pd.testing.assert_series_equal(result_df["B"], expected)
# Test built-in global std aggregation
assert math.isclose(ray.data.from_items(xs).std(), pd.Series(xs).std())
# ddof of 0
assert math.isclose(
ray.data.from_items(xs).std(ddof=0), pd.Series(xs).std(ddof=0))
with pytest.raises(ValueError):
ray.data.from_items([]).std()
# Test edge cases
assert ray.data.from_items([3]).std() == 0
def test_sort_simple(ray_start_regular_shared):
num_items = 100
parallelism = 4