[core][tests] Add nightly test for datasets random_shuffle and sort (#23784)

Adding a large-scale nightly test for Datasets random_shuffle and sort. The test script generates random blocks and reports total run time and peak driver memory.
This commit is contained in:
Stephanie Wang 2022-04-08 11:31:10 -07:00 committed by GitHub
parent c82f6c62c8
commit ba484feac0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 220 additions and 0 deletions

View file

@ -0,0 +1,168 @@
import ray
import pandas as pd
import numpy as np
import time
import builtins
from typing import Any, Generic, List, Callable, Union, Tuple, Iterable
import os
import psutil
import resource
import json
import numpy as np
import ray
from ray.types import ObjectRef
from ray.data.block import (
Block,
BlockAccessor,
BlockMetadata,
T,
BlockPartition,
BlockPartitionMetadata,
MaybeBlockPartition,
)
from ray.data.context import DatasetContext
from ray.data.impl.arrow_block import ArrowRow
from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder
from ray.data.impl.util import _check_pyarrow_version
from ray.util.annotations import DeveloperAPI
from ray.data.datasource import Datasource, ReadTask
from ray.internal.internal_api import memory_summary
class RandomIntRowDatasource(Datasource[ArrowRow]):
"""An example datasource that generates rows with random int64 columns.
Examples:
>>> source = RandomIntRowDatasource()
>>> ray.data.read_datasource(source, n=10, num_columns=2).take()
... {'c_0': 1717767200176864416, 'c_1': 999657309586757214}
... {'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
"""
def prepare_read(
self, parallelism: int, n: int, num_columns: int
) -> List[ReadTask]:
_check_pyarrow_version()
import pyarrow
read_tasks: List[ReadTask] = []
block_size = max(1, n // parallelism)
def make_block(count: int, num_columns: int) -> Block:
return pyarrow.Table.from_arrays(
np.random.randint(
np.iinfo(np.int64).max, size=(num_columns, count), dtype=np.int64
),
names=[f"c_{i}" for i in range(num_columns)],
)
schema = pyarrow.Table.from_pydict(
{f"c_{i}": [0] for i in range(num_columns)}
).schema
i = 0
while i < n:
count = min(block_size, n - i)
meta = BlockMetadata(
num_rows=count,
size_bytes=8 * count * num_columns,
schema=schema,
input_files=None,
exec_stats=None,
)
read_tasks.append(
ReadTask(
lambda count=count, num_columns=num_columns: [
make_block(count, num_columns)
],
meta,
)
)
i += block_size
return read_tasks
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--num-partitions", help="number of partitions", default="50", type=str
)
parser.add_argument(
"--partition-size",
help="partition size (bytes)",
default="200e6",
type=str,
)
parser.add_argument(
"--shuffle", help="shuffle instead of sort", action="store_true"
)
args = parser.parse_args()
num_partitions = int(args.num_partitions)
partition_size = int(float(args.partition_size))
print(f"Dataset size: {num_partitions} partitions, {partition_size / 1e9}GB partition size, {num_partitions * partition_size / 1e9}GB total")
start_time = time.time()
source = RandomIntRowDatasource()
num_rows_per_partition = partition_size // 8
ds = ray.data.read_datasource(source,
parallelism=num_partitions,
n=num_rows_per_partition * num_partitions,
num_columns=1)
exc = None
try:
if args.shuffle:
ds = ds.random_shuffle()
else:
ds = ds.sort(key="c_0")
except Exception as e:
exc = e
pass
end_time = time.time()
duration = end_time - start_time
print("Finished in", duration)
print("")
print("==== Driver memory summary ====")
maxrss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1e3)
print(f"max: {maxrss / 1e9}/GB")
process = psutil.Process(os.getpid())
rss = int(process.memory_info().rss)
print(f"rss: {rss / 1e9}/GB")
print(memory_summary(stats_only=True))
print("")
print(ds.stats())
if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"time": duration,
"success": "1" if exc is None else "0",
"num_partitions": num_partitions,
"partition_size": partition_size,
"perf_metrics": [
{
"perf_metric_name": "peak_driver_memory",
"perf_metric_value": maxrss,
"perf_metric_type": "MEMORY",
},
{
"perf_metric_name": "runtime",
"perf_metric_value": duration,
"perf_metric_type": "LATENCY",
},
]
}
json.dump(results, out_file)
if exc:
raise exc

View file

@ -3411,6 +3411,58 @@
type: sdk_command
file_manager: sdk
- name: dataset_shuffle_random_shuffle_1tb
group: core-multi-test
working_dir: nightly_tests
legacy:
test_name: dataset_shuffle_random_shuffle_1tb
test_suite: dataset_test
stable: false
frequency: nightly
team: core
cluster:
cluster_env: shuffle/shuffle_app_config.yaml
cluster_compute: shuffle/shuffle_compute_large_scale.yaml
run:
timeout: 7200
script: python dataset/sort.py --num-partitions=1000 --partition-size=1e9 --shuffle
wait_for_nodes:
num_nodes: 20
timeout: 900
type: sdk_command
file_manager: sdk
- name: dataset_shuffle_sort_1tb
group: core-multi-test
working_dir: nightly_tests
legacy:
test_name: dataset_shuffle_sort_1tb
test_suite: dataset_test
stable: false
frequency: nightly
team: core
cluster:
cluster_env: shuffle/shuffle_app_config.yaml
cluster_compute: shuffle/shuffle_compute_large_scale.yaml
run:
timeout: 7200
script: python dataset/sort.py --num-partitions=1000 --partition-size=1e9
wait_for_nodes:
num_nodes: 20
timeout: 900
type: sdk_command
file_manager: sdk
################
# Core K8s tests
################