From a0841106ffdac57980479c90b1e5910d166b797e Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Wed, 3 Nov 2021 13:56:34 -0700 Subject: [PATCH] [Datasets] Follow-up to groupby standard deviation PR (#20035) --- python/ray/data/aggregate.py | 21 ++++-- python/ray/data/dataset.py | 8 ++ python/ray/data/grouped_dataset.py | 8 ++ python/ray/data/tests/test_dataset.py | 102 +++++++++++++++++++++++++- 4 files changed, 129 insertions(+), 10 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index d19772f8a..275098469 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -106,12 +106,16 @@ class Mean(AggregateFn): class Std(AggregateFn): - """Defines standard deviation aggregation.""" + """Defines standard deviation aggregation. - # Uses Welford's online method for an accumulator-style computation of the - # standard deviation. This method was chosen due to it's numerical - # stability, and it being computable in a single pass. See - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm + Uses Welford's online method for an accumulator-style computation of the + standard deviation. This method was chosen due to it's numerical + stability, and it being computable in a single pass. + This may give different (but more accurate) results than NumPy, Pandas, + and sklearn, which use a less numerically stable two-pass algorithm. + See + https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm + """ def __init__(self, on: AggregateOnT = None, ddof: int = 1): on_fn = _to_on_fn(on) @@ -120,7 +124,8 @@ class Std(AggregateFn): # Accumulates the current count, the current mean, and the sum of # squared differences from the current mean (M2). M2, mean, count = a - # Select the data on which we want to calculate the mean. + # Select the data on which we want to calculate the standard + # deviation. val = on_fn(r) count += 1 @@ -132,6 +137,8 @@ class Std(AggregateFn): def merge(a: List[float], b: List[float]): # Merges two accumulations into one. + # See + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm M2_a, mean_a, count_a = a M2_b, mean_b, count_b = b delta = mean_b - mean_a @@ -146,6 +153,8 @@ class Std(AggregateFn): return [M2, mean, count] def finalize(a: List[float]): + # Compute the final standard deviation from the accumulated + # sum of squared differences from current mean and the count. M2, mean, count = a if count < 2: return 0.0 diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 1cef3ce2f..d624311ea 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -932,6 +932,14 @@ class Dataset(Generic[T]): >>> ray.data.range(100).std() >>> ray.data.range_arrow(100).std("value") + NOTE: This uses Welford's online method for an accumulator-style + computation of the standard deviation. This method was chosen due to + it's numerical stability, and it being computable in a single pass. + This may give different (but more accurate) results than NumPy, Pandas, + and sklearn, which use a less numerically stable two-pass algorithm. + See + https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm + Args: on: The data on which to compute the standard deviation. It can be the column name for Arrow dataset. diff --git a/python/ray/data/grouped_dataset.py b/python/ray/data/grouped_dataset.py index ec670ad2b..d04140c71 100644 --- a/python/ray/data/grouped_dataset.py +++ b/python/ray/data/grouped_dataset.py @@ -243,6 +243,14 @@ class GroupedDataset(Generic[T]): ... {"A": x % 3, "B": x} for x in range(100)]).groupby( ... "A").std("B") + NOTE: This uses Welford's online method for an accumulator-style + computation of the standard deviation. This method was chosen due to + it's numerical stability, and it being computable in a single pass. + This may give different (but more accurate) results than NumPy, Pandas, + and sklearn, which use a less numerically stable two-pass algorithm. + See + https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm + Args: on: The data on which to compute the standard deviation. It can be the column name for Arrow dataset. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 0386dc8b9..51a99e103 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -1,3 +1,4 @@ +import math import os import random import requests @@ -5,7 +6,6 @@ import shutil import time from unittest.mock import patch -import math import numpy as np import pandas as pd import pyarrow as pa @@ -2886,6 +2886,9 @@ def test_groupby_arrow(ray_start_regular_shared): def test_groupby_arrow_count(ray_start_regular_shared): # Test built-in count aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_count with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items([{ @@ -2900,6 +2903,9 @@ def test_groupby_arrow_count(ray_start_regular_shared): def test_groupby_arrow_sum(ray_start_regular_shared): # Test built-in sum aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_sum with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items([{ @@ -2918,6 +2924,9 @@ def test_groupby_arrow_sum(ray_start_regular_shared): def test_groupby_arrow_min(ray_start_regular_shared): # Test built-in min aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_min with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items([{ @@ -2936,6 +2945,9 @@ def test_groupby_arrow_min(ray_start_regular_shared): def test_groupby_arrow_max(ray_start_regular_shared): # Test built-in max aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_max with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items([{ @@ -2954,6 +2966,9 @@ def test_groupby_arrow_max(ray_start_regular_shared): def test_groupby_arrow_mean(ray_start_regular_shared): # Test built-in mean aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_mean with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items([{ @@ -2973,16 +2988,29 @@ def test_groupby_arrow_mean(ray_start_regular_shared): def test_groupby_arrow_std(ray_start_regular_shared): # Test built-in std aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_arrow_std with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs}) agg_ds = ray.data.from_pandas(df).groupby("A").std("B") assert agg_ds.count() == 3 - expected = df.groupby("A")["B"].std() - assert agg_ds.to_pandas()["std(B)"].equals(expected) + result = agg_ds.to_pandas()["std(B)"].to_numpy() + expected = df.groupby("A")["B"].std().to_numpy() + np.testing.assert_array_equal(result, expected) + # ddof of 0 + agg_ds = ray.data.from_pandas(df).groupby("A").std("B", ddof=0) + assert agg_ds.count() == 3 + result = agg_ds.to_pandas()["std(B)"].to_numpy() + expected = df.groupby("A")["B"].std(ddof=0).to_numpy() + np.testing.assert_array_equal(result, expected) # Test built-in global std aggregation df = pd.DataFrame({"A": xs}) - ray.data.from_pandas(df).std("A") == df["A"].std() + assert math.isclose(ray.data.from_pandas(df).std("A"), df["A"].std()) + # ddof of 0 + assert math.isclose( + ray.data.from_pandas(df).std("A", ddof=0), df["A"].std(ddof=0)) with pytest.raises(ValueError): ray.data.from_pandas(pd.DataFrame({"A": []})).std("A") # Test edge cases @@ -2990,6 +3018,9 @@ def test_groupby_arrow_std(ray_start_regular_shared): def test_groupby_simple(ray_start_regular_shared): + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple with: {seed}") + random.seed(seed) parallelism = 3 xs = [("A", 2), ("A", 4), ("A", 9), ("B", 10), ("B", 20), ("C", 3), ("C", 5), ("C", 8), ("C", 12)] @@ -3035,14 +3066,25 @@ def test_groupby_simple(ray_start_regular_shared): lambda r: r).count() assert agg_ds.count() == 0 + +def test_groupby_simple_count(ray_start_regular_shared): # Test built-in count aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_count with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).count() assert agg_ds.count() == 3 assert agg_ds.sort(key=lambda r: r[0]).take(3) == [(0, 34), (1, 33), (2, 33)] + + +def test_groupby_simple_sum(ray_start_regular_shared): # Test built-in sum aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_sum with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).sum() @@ -3053,7 +3095,12 @@ def test_groupby_simple(ray_start_regular_shared): assert ray.data.from_items(xs).sum() == 4950 assert ray.data.range(10).filter(lambda r: r > 10).sum() == 0 + +def test_groupby_simple_min(ray_start_regular_shared): # Test built-in min aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_min with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).min() @@ -3064,7 +3111,12 @@ def test_groupby_simple(ray_start_regular_shared): with pytest.raises(ValueError): ray.data.range(10).filter(lambda r: r > 10).min() + +def test_groupby_simple_max(ray_start_regular_shared): # Test built-in max aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_max with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).max() @@ -3076,7 +3128,12 @@ def test_groupby_simple(ray_start_regular_shared): with pytest.raises(ValueError): ray.data.range(10).filter(lambda r: r > 10).max() + +def test_groupby_simple_mean(ray_start_regular_shared): # Test built-in mean aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_mean with: {seed}") + random.seed(seed) xs = list(range(100)) random.shuffle(xs) agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).mean() @@ -3089,6 +3146,43 @@ def test_groupby_simple(ray_start_regular_shared): ray.data.range(10).filter(lambda r: r > 10).mean() +def test_groupby_simple_std(ray_start_regular_shared): + # Test built-in std aggregation + seed = int(time.time()) + print(f"Seeding RNG for test_groupby_simple_std with: {seed}") + random.seed(seed) + xs = list(range(100)) + random.shuffle(xs) + agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).std() + assert agg_ds.count() == 3 + df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs}) + expected = df.groupby("A")["B"].std() + result = agg_ds.sort(key=lambda r: r[0]).take(3) + groups, stds = zip(*result) + result_df = pd.DataFrame({"A": list(groups), "B": list(stds)}) + result_df = result_df.set_index("A") + pd.testing.assert_series_equal(result_df["B"], expected) + # ddof of 0 + agg_ds = ray.data.from_items(xs).groupby(lambda x: x % 3).std(ddof=0) + assert agg_ds.count() == 3 + df = pd.DataFrame({"A": [x % 3 for x in xs], "B": xs}) + expected = df.groupby("A")["B"].std(ddof=0) + result = agg_ds.sort(key=lambda r: r[0]).take(3) + groups, stds = zip(*result) + result_df = pd.DataFrame({"A": list(groups), "B": list(stds)}) + result_df = result_df.set_index("A") + pd.testing.assert_series_equal(result_df["B"], expected) + # Test built-in global std aggregation + assert math.isclose(ray.data.from_items(xs).std(), pd.Series(xs).std()) + # ddof of 0 + assert math.isclose( + ray.data.from_items(xs).std(ddof=0), pd.Series(xs).std(ddof=0)) + with pytest.raises(ValueError): + ray.data.from_items([]).std() + # Test edge cases + assert ray.data.from_items([3]).std() == 0 + + def test_sort_simple(ray_start_regular_shared): num_items = 100 parallelism = 4