From e1e4a45b8df8c898963faab3ce40f591d1ac1271 Mon Sep 17 00:00:00 2001 From: matthewdeng Date: Mon, 1 Nov 2021 18:25:19 -0700 Subject: [PATCH] [train] add simple Ray Train release tests (#19817) * [train] add simple Ray Train release tests * simplify tests * update * driver requirements * move to test * remove connect * fix * fix * fix torch * gpu * add assert * remove assert * use gloo backend * fix * finish Co-authored-by: Amog Kamsetty --- .../examples/tensorflow_mnist_example.py | 12 ++++-- .../train/examples/train_linear_example.py | 40 ++++++++++++++----- release/.buildkite/build_pipeline.py | 13 +++++- release/train_tests/app_config.yaml | 13 ++++++ release/train_tests/compute_tpl.yaml | 15 +++++++ release/train_tests/driver_requirements.txt | 8 ++++ release/train_tests/train_tests.yaml | 17 ++++++++ .../workloads/train_tensorflow_mnist_test.py | 31 ++++++++++++++ .../workloads/train_torch_linear_test.py | 30 ++++++++++++++ 9 files changed, 165 insertions(+), 14 deletions(-) create mode 100644 release/train_tests/app_config.yaml create mode 100644 release/train_tests/compute_tpl.yaml create mode 100644 release/train_tests/driver_requirements.txt create mode 100644 release/train_tests/train_tests.yaml create mode 100644 release/train_tests/workloads/train_tensorflow_mnist_test.py create mode 100644 release/train_tests/workloads/train_torch_linear_test.py diff --git a/python/ray/train/examples/tensorflow_mnist_example.py b/python/ray/train/examples/tensorflow_mnist_example.py index 5f71842bf..0880f3347 100644 --- a/python/ray/train/examples/tensorflow_mnist_example.py +++ b/python/ray/train/examples/tensorflow_mnist_example.py @@ -72,7 +72,7 @@ def train_func(config): return results -def train_tensorflow_mnist(num_workers=2, use_gpu=False): +def train_tensorflow_mnist(num_workers=2, use_gpu=False, epochs=4): trainer = Trainer( backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu) trainer.start() @@ -81,7 +81,7 @@ def train_tensorflow_mnist(num_workers=2, use_gpu=False): config={ "lr": 1e-3, "batch_size": 64, - "epochs": 4 + "epochs": epochs }) trainer.shutdown() print(f"Results: {results[0]}") @@ -105,6 +105,8 @@ if __name__ == "__main__": action="store_true", default=False, help="Enables GPU training") + parser.add_argument( + "--epochs", type=int, default=3, help="Number of epochs to train for.") parser.add_argument( "--smoke-test", action="store_true", @@ -117,6 +119,10 @@ if __name__ == "__main__": if args.smoke_test: ray.init(num_cpus=2) + train_tensorflow_mnist() else: ray.init(address=args.address) - train_tensorflow_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu) + train_tensorflow_mnist( + num_workers=args.num_workers, + use_gpu=args.use_gpu, + epochs=args.epochs) diff --git a/python/ray/train/examples/train_linear_example.py b/python/ray/train/examples/train_linear_example.py index 50bfbd0fe..2512f022a 100644 --- a/python/ray/train/examples/train_linear_example.py +++ b/python/ray/train/examples/train_linear_example.py @@ -25,8 +25,9 @@ class LinearDataset(torch.utils.data.Dataset): return len(self.x) -def train_epoch(dataloader, model, loss_fn, optimizer): +def train_epoch(dataloader, model, loss_fn, optimizer, device): for X, y in dataloader: + X, y = X.to(device), y.to(device) # Compute prediction error pred = model(X) loss = loss_fn(pred, y) @@ -37,12 +38,13 @@ def train_epoch(dataloader, model, loss_fn, optimizer): optimizer.step() -def validate_epoch(dataloader, model, loss_fn): +def validate_epoch(dataloader, model, loss_fn, device): num_batches = len(dataloader) model.eval() loss = 0 with torch.no_grad(): for X, y in dataloader: + X, y = X.to(device), y.to(device) pred = model(X) loss += loss_fn(pred, y).item() loss /= num_batches @@ -58,6 +60,9 @@ def train_func(config): lr = config.get("lr", 1e-2) epochs = config.get("epochs", 3) + device = torch.device(f"cuda:{train.local_rank()}" + if torch.cuda.is_available() else "cpu") + train_dataset = LinearDataset(2, 5, size=data_size) val_dataset = LinearDataset(2, 5, size=val_size) train_loader = torch.utils.data.DataLoader( @@ -70,7 +75,10 @@ def train_func(config): sampler=DistributedSampler(val_dataset)) model = nn.Linear(1, hidden_size) - model = DistributedDataParallel(model) + model.to(device) + model = DistributedDataParallel( + model, + device_ids=[device.index] if torch.cuda.is_available() else None) loss_fn = nn.MSELoss() @@ -79,17 +87,20 @@ def train_func(config): results = [] for _ in range(epochs): - train_epoch(train_loader, model, loss_fn, optimizer) - result = validate_epoch(validation_loader, model, loss_fn) + train_epoch(train_loader, model, loss_fn, optimizer, device) + result = validate_epoch(validation_loader, model, loss_fn, device) train.report(**result) results.append(result) return results -def train_linear(num_workers=2): - trainer = Trainer(TorchConfig(backend="gloo"), num_workers=num_workers) - config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 3} +def train_linear(num_workers=2, use_gpu=False, epochs=3): + trainer = Trainer( + backend=TorchConfig(backend="gloo"), + num_workers=num_workers, + use_gpu=use_gpu) + config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs} trainer.start() results = trainer.run( train_func, @@ -115,6 +126,12 @@ if __name__ == "__main__": type=int, default=2, help="Sets number of workers for training.") + parser.add_argument( + "--use-gpu", + action="store_true", + help="Whether to use GPU for training.") + parser.add_argument( + "--epochs", type=int, default=3, help="Number of epochs to train for.") parser.add_argument( "--smoke-test", action="store_true", @@ -127,7 +144,10 @@ if __name__ == "__main__": if args.smoke_test: ray.init(num_cpus=2) + train_linear() else: ray.init(address=args.address) - - train_linear(num_workers=args.num_workers) + train_linear( + num_workers=args.num_workers, + use_gpu=args.use_gpu, + epochs=args.epochs) diff --git a/release/.buildkite/build_pipeline.py b/release/.buildkite/build_pipeline.py index 5faa766d8..c2de1fe25 100644 --- a/release/.buildkite/build_pipeline.py +++ b/release/.buildkite/build_pipeline.py @@ -250,7 +250,18 @@ MANUAL_TESTS = { # 2. Use autoscaling/scale up (no wait_cluster.py) # 3. Use GPUs if applicable # 4. Have the `use_connect` flag set. -USER_TESTS = {} +USER_TESTS = { + "~/ray/release/train_tests/train_tests.yaml": [ + ConnectTest( + "train_tensorflow_mnist_test", + requirements_file="release/train_tests" + "/driver_requirements.txt"), + ConnectTest( + "train_torch_linear_test", + requirements_file="release/train_tests" + "/driver_requirements.txt") + ] +} SUITES = { "core-nightly": CORE_NIGHTLY_TESTS, diff --git a/release/train_tests/app_config.yaml b/release/train_tests/app_config.yaml new file mode 100644 index 000000000..446b53847 --- /dev/null +++ b/release/train_tests/app_config.yaml @@ -0,0 +1,13 @@ +base_image: "anyscale/ray-ml:pinned-nightly-py37-gpu" +env_vars: { } +debian_packages: + - curl + +python: + pip_packages: [ ] + conda_packages: [ ] + +post_build_cmds: + - pip3 uninstall -y ray || true + - pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }} + - {{ env["RAY_WHEELS_SANITY_CHECK"] | default("echo No Ray wheels sanity check") }} diff --git a/release/train_tests/compute_tpl.yaml b/release/train_tests/compute_tpl.yaml new file mode 100644 index 000000000..221bb8f66 --- /dev/null +++ b/release/train_tests/compute_tpl.yaml @@ -0,0 +1,15 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 2 + +head_node_type: + name: head_node + instance_type: g3.8xlarge + +worker_node_types: + - name: worker_node + instance_type: g3.8xlarge + min_workers: 0 + max_workers: 2 + use_spot: false diff --git a/release/train_tests/driver_requirements.txt b/release/train_tests/driver_requirements.txt new file mode 100644 index 000000000..b779be3a3 --- /dev/null +++ b/release/train_tests/driver_requirements.txt @@ -0,0 +1,8 @@ +# Make sure the driver versions are the same as cluster versions. +# The cluster uses ray-ml Docker image. +# ray-ml Docker image installs dependencies from ray/python/requirements/ml/ directory. +# We constrain on these requirements file so that the same versions are installed. +-c ../../python/requirements/ml/requirements_dl.txt + +torch +tensorflow \ No newline at end of file diff --git a/release/train_tests/train_tests.yaml b/release/train_tests/train_tests.yaml new file mode 100644 index 000000000..c19493f85 --- /dev/null +++ b/release/train_tests/train_tests.yaml @@ -0,0 +1,17 @@ +- name: train_tensorflow_mnist_test + cluster: + app_config: app_config.yaml + compute_template: compute_tpl.yaml + + run: + timeout: 36000 + script: python workloads/train_tensorflow_mnist_test.py + +- name: train_torch_linear_test + cluster: + app_config: app_config.yaml + compute_template: compute_tpl.yaml + + run: + timeout: 36000 + script: python workloads/train_torch_linear_test.py diff --git a/release/train_tests/workloads/train_tensorflow_mnist_test.py b/release/train_tests/workloads/train_tensorflow_mnist_test.py new file mode 100644 index 000000000..376979d93 --- /dev/null +++ b/release/train_tests/workloads/train_tensorflow_mnist_test.py @@ -0,0 +1,31 @@ +import json +import os +import time + +import ray +from ray.train.examples.tensorflow_mnist_example import train_tensorflow_mnist + +if __name__ == "__main__": + start = time.time() + + addr = os.environ.get("RAY_ADDRESS") + job_name = os.environ.get("RAY_JOB_NAME", "horovod_user_test") + + if addr is not None and addr.startswith("anyscale://"): + ray.init(address=addr, job_name=job_name) + else: + ray.init(address="auto") + + train_tensorflow_mnist(num_workers=6, use_gpu=True, epochs=20) + + taken = time.time() - start + result = { + "time_taken": taken, + } + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/train_torc_linear_test.json") + + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("Test Successful!") diff --git a/release/train_tests/workloads/train_torch_linear_test.py b/release/train_tests/workloads/train_torch_linear_test.py new file mode 100644 index 000000000..fe013a8ef --- /dev/null +++ b/release/train_tests/workloads/train_torch_linear_test.py @@ -0,0 +1,30 @@ +import json +import os +import time + +import ray + +from ray.train.examples.train_linear_example import train_linear + +if __name__ == "__main__": + start = time.time() + + addr = os.environ.get("RAY_ADDRESS") + job_name = os.environ.get("RAY_JOB_NAME", "horovod_user_test") + + if addr is not None and addr.startswith("anyscale://"): + ray.init(address=addr, job_name=job_name) + else: + ray.init(address="auto") + + results = train_linear(num_workers=6, use_gpu=True, epochs=20) + + taken = time.time() - start + result = {"time_taken": taken} + test_output_json = os.environ.get("TEST_OUTPUT_JSON", + "/tmp/train_torc_linear_test.json") + + with open(test_output_json, "wt") as f: + json.dump(result, f) + + print("Test Successful!")