ray/python
Jian Xiao 2878119ece
Optimize groupby/mapgroups performance (#27805)
For the following script, it took 75-90 mins to finish the groupby().map_groups() before, and with this PR it finishes in less than 10 seconds.

The slowness came from the  `get_boundaries` routine which linearly loop over each row in the Pandas DataFrame (note: there's just one block in the script below, which had multiple million rows). We make it 1) operate on numpy arrow,  2) use binary search and 3) use native impl of bsearch from numpy.

```
import argparse
import time
import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from pyarrow import fs
from pyarrow import dataset as ds
from pyarrow import parquet as pq
import pyarrow as pa
import ray
 
def transform_batch(df: pd.DataFrame):
   # Drop nulls.
   df['pickup_at'] =  pd.to_datetime(df['pickup_at'], format='%Y-%m-%d %H:%M:%S')
   df['dropoff_at'] =  pd.to_datetime(df['dropoff_at'], format='%Y-%m-%d %H:%M:%S')
   df['trip_duration'] = (df['dropoff_at'] - df['pickup_at']).dt.seconds
   df['pickup_location_id'].fillna(-1, inplace = True)
   df['dropoff_location_id'].fillna(-1, inplace = True)
   return df
 
def train_test(rows):
 # if the group is too small, it cannot be split for train/test
 if len(rows.index) < 4:
   print(f"Dataframe for LocID: {rows.index} is empty")
 else:
   train, test = train_test_split(rows)
   train_X = train[["dropoff_location_id"]]
   train_y = train[['trip_duration']]
   test_X = test[["dropoff_location_id"]]
   test_y = test[['trip_duration']]
   reg = LinearRegression().fit(train_X, train_y)
   reg.score(train_X, train_y)
   pred_y = reg.predict(test_X)
   reg.score(test_X, test_y)
   error = np.mean(pred_y-test_y)
   # format output in dataframe (the same format as input)
   data = [[reg.coef_, reg.intercept_, error]]
   return pd.DataFrame(data, columns=["coef", "intercept", "error"])
 
start = time.time()
rds = ray.data.read_parquet("s3://ursa-labs-taxi-data/2019/01/", columns=['pickup_at', 'dropoff_at', "pickup_location_id", "dropoff_location_id"])
rds = rds.map_batches(transform_batch, batch_format="pandas")
grouped_ds = rds.groupby("pickup_location_id")
results = grouped_ds.map_groups(train_test)
taken = time.time() - start
```
2022-08-17 11:08:18 -07:00
..
ray Optimize groupby/mapgroups performance (#27805) 2022-08-17 11:08:18 -07:00
requirements [CI] Update upstream requirements for XGB/LGBM-Ray (#27908) 2022-08-17 10:55:02 +02:00
asv.conf.json [docs] Move all /latest links to /master (#11897) 2020-11-10 10:53:28 -08:00
build-wheel-macos-arm64.sh [python3.10] build python310 wheels (#24829) 2022-05-16 12:36:33 -07:00
build-wheel-macos.sh [python3.10] build python310 wheels (#24829) 2022-05-16 12:36:33 -07:00
build-wheel-manylinux2014.sh [python3.10] build python310 wheels (#24829) 2022-05-16 12:36:33 -07:00
build-wheel-windows.sh [python3.10] build python310 wheels (#24829) 2022-05-16 12:36:33 -07:00
MANIFEST.in [hotfix] Revert "Exclude Bazel build files from Ray wheels (#25679)" (#25950) 2022-06-20 20:59:48 -07:00
README-building-wheels.md [build] Build wheels with manylinux2014 (#11621) 2020-11-03 19:36:32 -08:00
requirements.txt Revert "Revert "[serve] Integrate and Document Bring-Your-Own Gradio Applications"" (#27662) 2022-08-12 15:12:20 -07:00
requirements_linters.txt Add import sorting to format.sh (#25678) 2022-06-13 14:08:51 -07:00
requirements_ml_docker.txt [AIR] Add distributed torch_geometric example (#23580) 2022-04-21 09:48:43 -07:00
setup.py [AIR] Add rich notebook repr for DataParallelTrainer (#26335) 2022-08-16 08:51:14 -07:00