[nighly-test] update cuj2 to reflect latest change #20889

we fixed groupby issue in cuj2; sync the change into nightly test. this test doesn't need to use gpu at all. it returns soon after data ingestion finishes.
This commit is contained in:
Chen Shen 2021-12-06 09:59:21 -08:00 committed by GitHub
parent b8c64480d8
commit a628182cf5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 18 deletions

View file

@ -86,11 +86,11 @@
cluster:
app_config: ray_sgd_training_app.yaml
compute_template: ray_sgd_training_compute.yaml
compute_template: ray_sgd_training_compute_no_gpu.yaml
run:
timeout: 14400
prepare: python wait_cluster.py 66 2400
timeout: 7200
prepare: python wait_cluster.py 21 2400
script: python ray_sgd_training.py --address auto --use-s3 --num-workers 16 --use-gpu --large-dataset --debug
stable: false

View file

@ -3,7 +3,7 @@ import collections
import json
import os
import sys
import time
import timeit
from typing import Tuple
import boto3
@ -50,6 +50,7 @@ class MLflowCallback(TrainingCallback):
def read_dataset(path: str) -> ray.data.Dataset:
print(f"reading data from {path}")
return ray.data.read_parquet(path, _spread_resource_prefix="node:") \
.repartition(400) \
.random_shuffle(_spread_resource_prefix="node:")
@ -101,13 +102,12 @@ class DataPreprocessor:
print("\nStep 2: Precalculating fruit-grouped mean for new column and "
"for one-hot encoding (latter only uses fruit groups)\n")
agg_ds = ds.groupby("fruit").mean("feature_1")
fruit_means = {
r["fruit"]: r["mean(feature_1)"]
for r in agg_ds.take_all()
for r in ds.groupby("fruit").mean("feature_1").take_all()
}
print("\nStep 3: create mean_by_fruit as mean of feature_1 groupby "
print("\nStep 3: Create mean_by_fruit as mean of feature_1 groupby "
"fruit; one-hot encode fruit column\n")
if inferencing:
@ -407,7 +407,8 @@ class TrainingWorker:
training_dataset.to_torch(
batch_size=self.batch_size, label_column="label")):
if i % 10000 == 0:
print(f"epoch: {epoch}, worker: {self.rank}, batch: {i}")
print(f"epoch: {epoch}, worker: {self.rank},"
f" processing batch: {i}")
if __name__ == "__main__":
@ -437,7 +438,7 @@ if __name__ == "__main__":
"--large-dataset",
action="store_true",
default=False,
help="Use 500GB dataset")
help="Use 100GB dataset")
parser.add_argument(
"--use-gpu",
action="store_true",
@ -465,7 +466,7 @@ if __name__ == "__main__":
if large_dataset:
assert use_s3, "--large-dataset requires --use-s3 to be set."
start_time = time.time()
e2e_start_time = timeit.default_timer()
ray.init(address=address)
@ -487,15 +488,18 @@ if __name__ == "__main__":
if use_s3:
# Check if s3 data is populated.
BUCKET_NAME = "cuj-big-data"
FOLDER_NAME = "big-data/" if large_dataset else "data/"
FOLDER_NAME = "100GB/" if large_dataset else "data/"
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket(BUCKET_NAME)
count = bucket.objects.filter(Prefix=FOLDER_NAME)
if len(list(count)) == 0:
print("please run `python make_and_upload_dataset.py` first")
sys.exit(1)
# cuj-big-data/big-data stats
# 156 files, 3_120_000_000 rows and 501_748_803_387 bytes
data_path = ("s3://cuj-big-data/big-data/"
# cuj-big-data/100GB stats
# 33 files, 660_000_000 rows and 106_139_169_947 bytes
data_path = ("s3://cuj-big-data/100GB/"
if large_dataset else "s3://cuj-big-data/data/")
inference_path = "s3://cuj-big-data/inference/"
inference_output_path = "s3://cuj-big-data/output/"
@ -521,6 +525,9 @@ if __name__ == "__main__":
train_dataset, test_dataset = preprocessor.preprocess_train_data(
read_dataset(data_path))
preprocessing_end_time = timeit.default_timer()
print("Preprocessing time (s): ", preprocessing_end_time - e2e_start_time)
num_columns = len(train_dataset.schema().names)
# remove label column and internal Arrow column.
num_features = num_columns - 2
@ -533,22 +540,21 @@ if __name__ == "__main__":
DROPOUT_PROB = 0.2
if args.debug:
num_gpus = 1 if use_gpu else 0
shards = train_dataset.repeat(NUM_EPOCHS) \
.random_shuffle_each_window(_spread_resource_prefix="node:") \
.split(num_workers)
del train_dataset
num_gpus = 1 if use_gpu else 0
training_workers = [
TrainingWorker.options(num_gpus=num_gpus, num_cpus=0).remote(
rank, shard, BATCH_SIZE) for rank, shard in enumerate(shards)
]
ray.get([worker.train.remote() for worker in training_workers])
total_time = time.time() - start_time
e2e_end_time = timeit.default_timer()
total_time = e2e_end_time - e2e_start_time
print(f"Job finished in {total_time} seconds.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"time": total_time, "success": 1}))
exit()
# Random global shuffle
@ -596,6 +602,9 @@ if __name__ == "__main__":
model = results[0]
trainer.shutdown()
training_end_time = timeit.default_timer()
print("Training time (s): ", training_end_time - preprocessing_end_time)
if args.mlflow_register_model:
mlflow.pytorch.log_model(
model, artifact_path="models", registered_model_name="torch_model")
@ -648,9 +657,11 @@ if __name__ == "__main__":
inference(inference_dataset, BatchInferModel(load_model_func), 100,
inference_output_path, use_gpu)
end_time = time.time()
e2e_end_time = timeit.default_timer()
print("Inference time (s): ", e2e_end_time - training_end_time)
total_time = end_time - start_time
total_time = e2e_end_time - e2e_start_time
print("Total time (s): ", e2e_end_time - e2e_start_time)
print(f"Job finished in {total_time} seconds.")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:

View file

@ -0,0 +1,26 @@
cloud_id: cld_17WvYIBBkdgLwEUNcLeRAE
region: us-west-2
allowed_azs:
- us-west-2a
max_workers: 20
aws:
IamInstanceProfile: {"Name": "ray-autoscaler-v1"}
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 500
head_node_type:
name: head_node
instance_type: i3.8xlarge
worker_node_types:
- name: memory_node
instance_type: i3.8xlarge
min_workers: 20
max_workers: 20
use_spot: false
resources:
gpu: 1