[train] add simple Ray Train release tests (#19817)

* [train] add simple Ray Train release tests

* simplify tests

* update

* driver requirements

* move to test

* remove connect

* fix

* fix

* fix torch

* gpu

* add assert

* remove assert

* use gloo backend

* fix

* finish

Co-authored-by: Amog Kamsetty <amogkamsetty@yahoo.com>
This commit is contained in:
matthewdeng 2021-11-01 18:25:19 -07:00 committed by GitHub
parent 05c63f0208
commit e1e4a45b8d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 14 deletions

View file

@ -72,7 +72,7 @@ def train_func(config):
return results
def train_tensorflow_mnist(num_workers=2, use_gpu=False):
def train_tensorflow_mnist(num_workers=2, use_gpu=False, epochs=4):
trainer = Trainer(
backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
trainer.start()
@ -81,7 +81,7 @@ def train_tensorflow_mnist(num_workers=2, use_gpu=False):
config={
"lr": 1e-3,
"batch_size": 64,
"epochs": 4
"epochs": epochs
})
trainer.shutdown()
print(f"Results: {results[0]}")
@ -105,6 +105,8 @@ if __name__ == "__main__":
action="store_true",
default=False,
help="Enables GPU training")
parser.add_argument(
"--epochs", type=int, default=3, help="Number of epochs to train for.")
parser.add_argument(
"--smoke-test",
action="store_true",
@ -117,6 +119,10 @@ if __name__ == "__main__":
if args.smoke_test:
ray.init(num_cpus=2)
train_tensorflow_mnist()
else:
ray.init(address=args.address)
train_tensorflow_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu)
train_tensorflow_mnist(
num_workers=args.num_workers,
use_gpu=args.use_gpu,
epochs=args.epochs)

View file

@ -25,8 +25,9 @@ class LinearDataset(torch.utils.data.Dataset):
return len(self.x)
def train_epoch(dataloader, model, loss_fn, optimizer):
def train_epoch(dataloader, model, loss_fn, optimizer, device):
for X, y in dataloader:
X, y = X.to(device), y.to(device)
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
@ -37,12 +38,13 @@ def train_epoch(dataloader, model, loss_fn, optimizer):
optimizer.step()
def validate_epoch(dataloader, model, loss_fn):
def validate_epoch(dataloader, model, loss_fn, device):
num_batches = len(dataloader)
model.eval()
loss = 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
loss += loss_fn(pred, y).item()
loss /= num_batches
@ -58,6 +60,9 @@ def train_func(config):
lr = config.get("lr", 1e-2)
epochs = config.get("epochs", 3)
device = torch.device(f"cuda:{train.local_rank()}"
if torch.cuda.is_available() else "cpu")
train_dataset = LinearDataset(2, 5, size=data_size)
val_dataset = LinearDataset(2, 5, size=val_size)
train_loader = torch.utils.data.DataLoader(
@ -70,7 +75,10 @@ def train_func(config):
sampler=DistributedSampler(val_dataset))
model = nn.Linear(1, hidden_size)
model = DistributedDataParallel(model)
model.to(device)
model = DistributedDataParallel(
model,
device_ids=[device.index] if torch.cuda.is_available() else None)
loss_fn = nn.MSELoss()
@ -79,17 +87,20 @@ def train_func(config):
results = []
for _ in range(epochs):
train_epoch(train_loader, model, loss_fn, optimizer)
result = validate_epoch(validation_loader, model, loss_fn)
train_epoch(train_loader, model, loss_fn, optimizer, device)
result = validate_epoch(validation_loader, model, loss_fn, device)
train.report(**result)
results.append(result)
return results
def train_linear(num_workers=2):
trainer = Trainer(TorchConfig(backend="gloo"), num_workers=num_workers)
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 3}
def train_linear(num_workers=2, use_gpu=False, epochs=3):
trainer = Trainer(
backend=TorchConfig(backend="gloo"),
num_workers=num_workers,
use_gpu=use_gpu)
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs}
trainer.start()
results = trainer.run(
train_func,
@ -115,6 +126,12 @@ if __name__ == "__main__":
type=int,
default=2,
help="Sets number of workers for training.")
parser.add_argument(
"--use-gpu",
action="store_true",
help="Whether to use GPU for training.")
parser.add_argument(
"--epochs", type=int, default=3, help="Number of epochs to train for.")
parser.add_argument(
"--smoke-test",
action="store_true",
@ -127,7 +144,10 @@ if __name__ == "__main__":
if args.smoke_test:
ray.init(num_cpus=2)
train_linear()
else:
ray.init(address=args.address)
train_linear(num_workers=args.num_workers)
train_linear(
num_workers=args.num_workers,
use_gpu=args.use_gpu,
epochs=args.epochs)

View file

@ -250,7 +250,18 @@ MANUAL_TESTS = {
# 2. Use autoscaling/scale up (no wait_cluster.py)
# 3. Use GPUs if applicable
# 4. Have the `use_connect` flag set.
USER_TESTS = {}
USER_TESTS = {
"~/ray/release/train_tests/train_tests.yaml": [
ConnectTest(
"train_tensorflow_mnist_test",
requirements_file="release/train_tests"
"/driver_requirements.txt"),
ConnectTest(
"train_torch_linear_test",
requirements_file="release/train_tests"
"/driver_requirements.txt")
]
}
SUITES = {
"core-nightly": CORE_NIGHTLY_TESTS,

View file

@ -0,0 +1,13 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37-gpu"
env_vars: { }
debian_packages:
- curl
python:
pip_packages: [ ]
conda_packages: [ ]
post_build_cmds:
- pip3 uninstall -y ray || true
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}
- {{ env["RAY_WHEELS_SANITY_CHECK"] | default("echo No Ray wheels sanity check") }}

View file

@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 2
head_node_type:
name: head_node
instance_type: g3.8xlarge
worker_node_types:
- name: worker_node
instance_type: g3.8xlarge
min_workers: 0
max_workers: 2
use_spot: false

View file

@ -0,0 +1,8 @@
# Make sure the driver versions are the same as cluster versions.
# The cluster uses ray-ml Docker image.
# ray-ml Docker image installs dependencies from ray/python/requirements/ml/ directory.
# We constrain on these requirements file so that the same versions are installed.
-c ../../python/requirements/ml/requirements_dl.txt
torch
tensorflow

View file

@ -0,0 +1,17 @@
- name: train_tensorflow_mnist_test
cluster:
app_config: app_config.yaml
compute_template: compute_tpl.yaml
run:
timeout: 36000
script: python workloads/train_tensorflow_mnist_test.py
- name: train_torch_linear_test
cluster:
app_config: app_config.yaml
compute_template: compute_tpl.yaml
run:
timeout: 36000
script: python workloads/train_torch_linear_test.py

View file

@ -0,0 +1,31 @@
import json
import os
import time
import ray
from ray.train.examples.tensorflow_mnist_example import train_tensorflow_mnist
if __name__ == "__main__":
start = time.time()
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "horovod_user_test")
if addr is not None and addr.startswith("anyscale://"):
ray.init(address=addr, job_name=job_name)
else:
ray.init(address="auto")
train_tensorflow_mnist(num_workers=6, use_gpu=True, epochs=20)
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/train_torc_linear_test.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("Test Successful!")

View file

@ -0,0 +1,30 @@
import json
import os
import time
import ray
from ray.train.examples.train_linear_example import train_linear
if __name__ == "__main__":
start = time.time()
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "horovod_user_test")
if addr is not None and addr.startswith("anyscale://"):
ray.init(address=addr, job_name=job_name)
else:
ray.init(address="auto")
results = train_linear(num_workers=6, use_gpu=True, epochs=20)
taken = time.time() - start
result = {"time_taken": taken}
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/train_torc_linear_test.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("Test Successful!")