diff --git a/release/nightly_tests/dataset/dataset_test.yaml b/release/nightly_tests/dataset/dataset_test.yaml index 26865a6ce..b9dbc889a 100644 --- a/release/nightly_tests/dataset/dataset_test.yaml +++ b/release/nightly_tests/dataset/dataset_test.yaml @@ -86,11 +86,11 @@ cluster: app_config: ray_sgd_training_app.yaml - compute_template: ray_sgd_training_compute.yaml + compute_template: ray_sgd_training_compute_no_gpu.yaml run: - timeout: 14400 - prepare: python wait_cluster.py 66 2400 + timeout: 7200 + prepare: python wait_cluster.py 21 2400 script: python ray_sgd_training.py --address auto --use-s3 --num-workers 16 --use-gpu --large-dataset --debug stable: false diff --git a/release/nightly_tests/dataset/ray_sgd_training.py b/release/nightly_tests/dataset/ray_sgd_training.py index 87db26044..412d959e6 100644 --- a/release/nightly_tests/dataset/ray_sgd_training.py +++ b/release/nightly_tests/dataset/ray_sgd_training.py @@ -3,7 +3,7 @@ import collections import json import os import sys -import time +import timeit from typing import Tuple import boto3 @@ -50,6 +50,7 @@ class MLflowCallback(TrainingCallback): def read_dataset(path: str) -> ray.data.Dataset: print(f"reading data from {path}") return ray.data.read_parquet(path, _spread_resource_prefix="node:") \ + .repartition(400) \ .random_shuffle(_spread_resource_prefix="node:") @@ -101,13 +102,12 @@ class DataPreprocessor: print("\nStep 2: Precalculating fruit-grouped mean for new column and " "for one-hot encoding (latter only uses fruit groups)\n") - agg_ds = ds.groupby("fruit").mean("feature_1") fruit_means = { r["fruit"]: r["mean(feature_1)"] - for r in agg_ds.take_all() + for r in ds.groupby("fruit").mean("feature_1").take_all() } - print("\nStep 3: create mean_by_fruit as mean of feature_1 groupby " + print("\nStep 3: Create mean_by_fruit as mean of feature_1 groupby " "fruit; one-hot encode fruit column\n") if inferencing: @@ -407,7 +407,8 @@ class TrainingWorker: training_dataset.to_torch( batch_size=self.batch_size, label_column="label")): if i % 10000 == 0: - print(f"epoch: {epoch}, worker: {self.rank}, batch: {i}") + print(f"epoch: {epoch}, worker: {self.rank}," + f" processing batch: {i}") if __name__ == "__main__": @@ -437,7 +438,7 @@ if __name__ == "__main__": "--large-dataset", action="store_true", default=False, - help="Use 500GB dataset") + help="Use 100GB dataset") parser.add_argument( "--use-gpu", action="store_true", @@ -465,7 +466,7 @@ if __name__ == "__main__": if large_dataset: assert use_s3, "--large-dataset requires --use-s3 to be set." - start_time = time.time() + e2e_start_time = timeit.default_timer() ray.init(address=address) @@ -487,15 +488,18 @@ if __name__ == "__main__": if use_s3: # Check if s3 data is populated. BUCKET_NAME = "cuj-big-data" - FOLDER_NAME = "big-data/" if large_dataset else "data/" + FOLDER_NAME = "100GB/" if large_dataset else "data/" s3_resource = boto3.resource("s3") bucket = s3_resource.Bucket(BUCKET_NAME) count = bucket.objects.filter(Prefix=FOLDER_NAME) if len(list(count)) == 0: print("please run `python make_and_upload_dataset.py` first") sys.exit(1) + # cuj-big-data/big-data stats # 156 files, 3_120_000_000 rows and 501_748_803_387 bytes - data_path = ("s3://cuj-big-data/big-data/" + # cuj-big-data/100GB stats + # 33 files, 660_000_000 rows and 106_139_169_947 bytes + data_path = ("s3://cuj-big-data/100GB/" if large_dataset else "s3://cuj-big-data/data/") inference_path = "s3://cuj-big-data/inference/" inference_output_path = "s3://cuj-big-data/output/" @@ -521,6 +525,9 @@ if __name__ == "__main__": train_dataset, test_dataset = preprocessor.preprocess_train_data( read_dataset(data_path)) + preprocessing_end_time = timeit.default_timer() + print("Preprocessing time (s): ", preprocessing_end_time - e2e_start_time) + num_columns = len(train_dataset.schema().names) # remove label column and internal Arrow column. num_features = num_columns - 2 @@ -533,22 +540,21 @@ if __name__ == "__main__": DROPOUT_PROB = 0.2 if args.debug: + num_gpus = 1 if use_gpu else 0 shards = train_dataset.repeat(NUM_EPOCHS) \ .random_shuffle_each_window(_spread_resource_prefix="node:") \ .split(num_workers) del train_dataset - num_gpus = 1 if use_gpu else 0 training_workers = [ TrainingWorker.options(num_gpus=num_gpus, num_cpus=0).remote( rank, shard, BATCH_SIZE) for rank, shard in enumerate(shards) ] ray.get([worker.train.remote() for worker in training_workers]) - total_time = time.time() - start_time + e2e_end_time = timeit.default_timer() + total_time = e2e_end_time - e2e_start_time print(f"Job finished in {total_time} seconds.") - with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: - f.write(json.dumps({"time": total_time, "success": 1})) exit() # Random global shuffle @@ -596,6 +602,9 @@ if __name__ == "__main__": model = results[0] trainer.shutdown() + training_end_time = timeit.default_timer() + print("Training time (s): ", training_end_time - preprocessing_end_time) + if args.mlflow_register_model: mlflow.pytorch.log_model( model, artifact_path="models", registered_model_name="torch_model") @@ -648,9 +657,11 @@ if __name__ == "__main__": inference(inference_dataset, BatchInferModel(load_model_func), 100, inference_output_path, use_gpu) - end_time = time.time() + e2e_end_time = timeit.default_timer() + print("Inference time (s): ", e2e_end_time - training_end_time) - total_time = end_time - start_time + total_time = e2e_end_time - e2e_start_time + print("Total time (s): ", e2e_end_time - e2e_start_time) print(f"Job finished in {total_time} seconds.") with open(os.environ["TEST_OUTPUT_JSON"], "w") as f: diff --git a/release/nightly_tests/dataset/ray_sgd_training_compute_no_gpu.yaml b/release/nightly_tests/dataset/ray_sgd_training_compute_no_gpu.yaml new file mode 100644 index 000000000..6a31548f7 --- /dev/null +++ b/release/nightly_tests/dataset/ray_sgd_training_compute_no_gpu.yaml @@ -0,0 +1,26 @@ +cloud_id: cld_17WvYIBBkdgLwEUNcLeRAE +region: us-west-2 +allowed_azs: + - us-west-2a + +max_workers: 20 + +aws: + IamInstanceProfile: {"Name": "ray-autoscaler-v1"} + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 500 + +head_node_type: + name: head_node + instance_type: i3.8xlarge + +worker_node_types: + - name: memory_node + instance_type: i3.8xlarge + min_workers: 20 + max_workers: 20 + use_spot: false + resources: + gpu: 1