Spread the actors in data ingest benchmark, which 2x the throughput (#27620)

The consuming actors were not spread and this PR fixed it, which improved throughput by 2x.
This commit is contained in:
Jian Xiao 2022-08-11 11:47:54 -07:00 committed by GitHub
parent 5520a96ce0
commit 5a18b1fc45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,7 @@ import pandas as pd
GiB = 1024 * 1024 * 1024
@ray.remote
@ray.remote(num_cpus=0.5)
class ConsumingActor:
def __init__(self, rank):
self._rank = rank
@ -97,27 +97,19 @@ def run_ingest_bulk(dataset, num_workers):
future = [consumers[i].consume.remote(s) for i, s in enumerate(splits)]
ray.get(future)
# Example ballpark number for transformation (6s):
# Read->Map_Batches: 201/201 [00:06<00:00, 28.90it/s]
# Example ballpark number for transformation (5s):
# Read->Map_Batches: 201/201 Time to read all data 5.001230175999922 seconds
# Example ballpark number for consumption i.e. at an actor (consumer):
# Fast ones:
# Time to read all data 6.060172239998792 seconds
# P50/P95/Max batch delay (s) 0.011000780499671237 0.013028981001298234 0.11437869699875591 # noqa: E501
# Time to read all data 5.275932452000006 seconds
# P50/P95/Max batch delay (s) 0.010558151499992618 0.010944704699983276 0.04179979600007755 # noqa: E501
# Num epochs read 2
# Num batches read 512
# Num bytes read 20480.0 MiB
# Mean throughput 3379.44 MiB/s
# Slow ones:
# Time to read all data 39.7250169550025 seconds
# P50/P95/Max batch delay (s) 0.010788186998979654 0.027017505450021396 2.936176807997981 # noqa: E501
# Num epochs read 2
# Num batches read 512
# Num bytes read 20480.0 MiB
# Mean throughput 515.54 MiB/s
# Mean throughput 3881.78 MiB/s
# Example ballpark number of total time:
# success! total time 62.37753415107727
# Example total time:
# success! total time 13.813468217849731
def run_ingest_streaming(dataset, num_workers):
@ -134,26 +126,26 @@ def run_ingest_streaming(dataset, num_workers):
future = [consumers[i].consume.remote(s) for i, s in enumerate(splits)]
ray.get(future)
# Example ballpark number for a window:
# == Pipeline Window 12 ==
# Stage 1 read->map_batches: 40/40 blocks executed in 4.1s
# * Remote wall time: 1.42s min, 2.63s max, 1.57s mean, 62.7s total
# * Remote cpu time: 1.42s min, 2.59s max, 1.56s mean, 62.38s total
# * Peak heap memory usage (MiB): 3252116000.0 min, 12829140000.0 max, 10597707000 mean # noqa: E501
# Example ballpark numbers:
# == Pipeline Window 10 ==
# Stage 1 read->map_batches: 40/40 blocks executed in 1.98s
# * Remote wall time: 1.38s min, 1.66s max, 1.46s mean, 58.26s total
# * Remote cpu time: 1.38s min, 1.7s max, 1.46s mean, 58.33s total
# * Peak heap memory usage (MiB): 6533908000.0 min, 10731508000.0 max, 9710443300 mean # noqa: E501
# * Output num rows: 104857 min, 104857 max, 104857 mean, 4194280 total
# * Output size bytes: 1074155212 min, 1074155212 max, 1074155212 mean, 42966208480 total # noqa: E501
# * Tasks per node: 1 min, 3 max, 2 mean; 20 nodes used
# * Tasks per node: 2 min, 2 max, 2 mean; 20 nodes used
# Example ballpark number for an actor (consumer):
# Time to read all data 42.57252279000022 seconds
# P50/P95/Max batch delay (s) 0.01082486700033769 0.012740581999969434 4.104724623000948 # noqa: E501
# Example actor (consumer):
# Time to read all data 25.58030511100003 seconds
# P50/P95/Max batch delay (s) 0.010486626999977489 0.012674414999997904 2.0688196870000866 # noqa: E501
# Num epochs read 2
# Num batches read 512
# Num bytes read 20480.0 MiB
# Mean throughput 481.06 MiB/s
# Mean throughput 800.62 MiB/s
# Example ballpark number of total time:
# success! total time 61.76846528053284
# Example total time:
# success! total time 27.822711944580078
if __name__ == "__main__":