diff --git a/release/nightly_tests/dataset/sort.py b/release/nightly_tests/dataset/sort.py new file mode 100644 index 000000000..e25e6fe03 --- /dev/null +++ b/release/nightly_tests/dataset/sort.py @@ -0,0 +1,168 @@ +import ray +import pandas as pd +import numpy as np +import time +import builtins +from typing import Any, Generic, List, Callable, Union, Tuple, Iterable +import os +import psutil +import resource +import json + +import numpy as np + +import ray +from ray.types import ObjectRef +from ray.data.block import ( + Block, + BlockAccessor, + BlockMetadata, + T, + BlockPartition, + BlockPartitionMetadata, + MaybeBlockPartition, +) +from ray.data.context import DatasetContext +from ray.data.impl.arrow_block import ArrowRow +from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder +from ray.data.impl.util import _check_pyarrow_version +from ray.util.annotations import DeveloperAPI + +from ray.data.datasource import Datasource, ReadTask +from ray.internal.internal_api import memory_summary + + +class RandomIntRowDatasource(Datasource[ArrowRow]): + """An example datasource that generates rows with random int64 columns. + + Examples: + >>> source = RandomIntRowDatasource() + >>> ray.data.read_datasource(source, n=10, num_columns=2).take() + ... {'c_0': 1717767200176864416, 'c_1': 999657309586757214} + ... {'c_0': 4983608804013926748, 'c_1': 1160140066899844087} + """ + + def prepare_read( + self, parallelism: int, n: int, num_columns: int + ) -> List[ReadTask]: + _check_pyarrow_version() + import pyarrow + + read_tasks: List[ReadTask] = [] + block_size = max(1, n // parallelism) + + def make_block(count: int, num_columns: int) -> Block: + return pyarrow.Table.from_arrays( + np.random.randint( + np.iinfo(np.int64).max, size=(num_columns, count), dtype=np.int64 + ), + names=[f"c_{i}" for i in range(num_columns)], + ) + + schema = pyarrow.Table.from_pydict( + {f"c_{i}": [0] for i in range(num_columns)} + ).schema + + i = 0 + while i < n: + count = min(block_size, n - i) + meta = BlockMetadata( + num_rows=count, + size_bytes=8 * count * num_columns, + schema=schema, + input_files=None, + exec_stats=None, + ) + read_tasks.append( + ReadTask( + lambda count=count, num_columns=num_columns: [ + make_block(count, num_columns) + ], + meta, + ) + ) + i += block_size + + return read_tasks + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument( + "--num-partitions", help="number of partitions", default="50", type=str + ) + parser.add_argument( + "--partition-size", + help="partition size (bytes)", + default="200e6", + type=str, + ) + parser.add_argument( + "--shuffle", help="shuffle instead of sort", action="store_true" + ) + + args = parser.parse_args() + + num_partitions = int(args.num_partitions) + partition_size = int(float(args.partition_size)) + print(f"Dataset size: {num_partitions} partitions, {partition_size / 1e9}GB partition size, {num_partitions * partition_size / 1e9}GB total") + start_time = time.time() + source = RandomIntRowDatasource() + num_rows_per_partition = partition_size // 8 + ds = ray.data.read_datasource(source, + parallelism=num_partitions, + n=num_rows_per_partition * num_partitions, + num_columns=1) + exc = None + try: + if args.shuffle: + ds = ds.random_shuffle() + else: + ds = ds.sort(key="c_0") + except Exception as e: + exc = e + pass + + end_time = time.time() + + duration = end_time - start_time + print("Finished in", duration) + print("") + + print("==== Driver memory summary ====") + maxrss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1e3) + print(f"max: {maxrss / 1e9}/GB") + process = psutil.Process(os.getpid()) + rss = int(process.memory_info().rss) + print(f"rss: {rss / 1e9}/GB") + + print(memory_summary(stats_only=True)) + print("") + + print(ds.stats()) + + if "TEST_OUTPUT_JSON" in os.environ: + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + results = { + "time": duration, + "success": "1" if exc is None else "0", + "num_partitions": num_partitions, + "partition_size": partition_size, + "perf_metrics": [ + { + "perf_metric_name": "peak_driver_memory", + "perf_metric_value": maxrss, + "perf_metric_type": "MEMORY", + }, + { + "perf_metric_name": "runtime", + "perf_metric_value": duration, + "perf_metric_type": "LATENCY", + }, + ] + } + json.dump(results, out_file) + + if exc: + raise exc diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 4ebd27da2..ab6b463fd 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3411,6 +3411,58 @@ type: sdk_command file_manager: sdk +- name: dataset_shuffle_random_shuffle_1tb + group: core-multi-test + working_dir: nightly_tests + legacy: + test_name: dataset_shuffle_random_shuffle_1tb + test_suite: dataset_test + + stable: false + + frequency: nightly + team: core + cluster: + cluster_env: shuffle/shuffle_app_config.yaml + cluster_compute: shuffle/shuffle_compute_large_scale.yaml + + run: + timeout: 7200 + script: python dataset/sort.py --num-partitions=1000 --partition-size=1e9 --shuffle + + wait_for_nodes: + num_nodes: 20 + timeout: 900 + + type: sdk_command + file_manager: sdk + +- name: dataset_shuffle_sort_1tb + group: core-multi-test + working_dir: nightly_tests + legacy: + test_name: dataset_shuffle_sort_1tb + test_suite: dataset_test + + stable: false + + frequency: nightly + team: core + cluster: + cluster_env: shuffle/shuffle_app_config.yaml + cluster_compute: shuffle/shuffle_compute_large_scale.yaml + + run: + timeout: 7200 + script: python dataset/sort.py --num-partitions=1000 --partition-size=1e9 + + wait_for_nodes: + num_nodes: 20 + timeout: 900 + + type: sdk_command + file_manager: sdk + ################ # Core K8s tests ################