[Horovod] Add Horovod example (#16742)

* wip

* updates

* updates

* update

* formatting

* updates

* updates

* update

* fix

* add timeout
This commit is contained in:
Amog Kamsetty 2021-06-29 19:15:15 -07:00 committed by GitHub
parent d89f148fbf
commit 69507f53db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 278 additions and 22 deletions

View file

@ -310,6 +310,7 @@
- rm -rf ./python/ray/thirdparty_files; ./ci/travis/ci.sh build - rm -rf ./python/ray/thirdparty_files; ./ci/travis/ci.sh build
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=py37,-flaky,-client python/ray/tune/... - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=py37,-flaky,-client python/ray/tune/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/xgboost/... - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/xgboost/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/horovod/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/tests/modin/... - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/tests/modin/...
# Dask tests and examples. # Dask tests and examples.
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/... - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/...

View file

@ -441,14 +441,6 @@ py_test(
deps = [":tune_lib"], deps = [":tune_lib"],
) )
py_test(
name = "test_horovod_client",
size = "medium",
srcs = ["tests/test_horovod_client.py"],
tags = ["exclusive", "client", "py37"],
deps = [":tune_lib"],
)
py_test( py_test(
name = "ddp_mnist_torch", name = "ddp_mnist_torch",
size = "small", size = "small",

View file

@ -0,0 +1,21 @@
# --------------------------------------------------------------------
# Tests from the python/ray/util/horovod directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
name = "test_horovod",
size = "medium",
srcs = ["tests/test_horovod.py"],
deps = [":horovod_lib"],
tags = ["exclusive"]
)
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "horovod_lib",
srcs = glob(["**/*.py"]),
)

View file

View file

@ -0,0 +1,224 @@
import argparse
import os
from filelock import FileLock
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import torch.utils.data.distributed
import horovod.torch as hvd
from horovod.ray import RayExecutor
def metric_average(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
def train_fn(data_dir=None,
seed=42,
use_cuda=False,
batch_size=64,
use_adasum=False,
lr=0.01,
momentum=0.5,
num_epochs=10,
log_interval=10):
# Horovod: initialize library.
hvd.init()
torch.manual_seed(seed)
if use_cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(seed)
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(1)
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
data_dir = data_dir or "./data"
with FileLock(os.path.expanduser("~/.horovod_lock")):
train_dataset = \
datasets.MNIST(data_dir, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs)
model = Net()
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not use_adasum else 1
if use_cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()
# Horovod: scale learning rate by lr_scaler.
optimizer = optim.SGD(
model.parameters(), lr=lr * lr_scaler, momentum=momentum)
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
op=hvd.Adasum if use_adasum else hvd.Average)
for epoch in range(1, num_epochs + 1):
model.train()
# Horovod: set epoch to sampler for shuffling.
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
if use_cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
# Horovod: use train_sampler to determine the number of
# examples in this worker's partition.
print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch, batch_idx * len(data), len(train_sampler),
100. * batch_idx / len(train_loader), loss.item()))
def main(num_workers, use_gpu, **kwargs):
settings = RayExecutor.create_settings(timeout_s=30)
executor = RayExecutor(settings, use_gpu=use_gpu, num_workers=num_workers)
executor.run(train_fn, kwargs=kwargs)
if __name__ == "__main__":
# Training settings
parser = argparse.ArgumentParser(
description="PyTorch MNIST Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)")
parser.add_argument(
"--epochs",
type=int,
default=5,
metavar="N",
help="number of epochs to train (default: 10)")
parser.add_argument(
"--lr",
type=float,
default=0.01,
metavar="LR",
help="learning rate (default: 0.01)")
parser.add_argument(
"--momentum",
type=float,
default=0.5,
metavar="M",
help="SGD momentum (default: 0.5)")
parser.add_argument(
"--no-cuda",
action="store_true",
default=False,
help="disables CUDA training")
parser.add_argument(
"--seed",
type=int,
default=42,
metavar="S",
help="random seed (default: 42)")
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status")
parser.add_argument(
"--use-adasum",
action="store_true",
default=False,
help="use adasum algorithm to do reduction")
parser.add_argument(
"--num-workers",
type=int,
default=4,
help="Number of Ray workers to use for training.")
parser.add_argument(
"--data-dir",
help="location of the training dataset in the local filesystem ("
"will be downloaded if needed)")
parser.add_argument(
"--address",
require=False,
types=str,
default=None,
help="Address of Ray cluster.")
parser.add_argument(
"--server-address",
type=str,
default=None,
required=False,
help="The address of server to connect to if using "
"Ray Client.")
args = parser.parse_args()
import ray
if args.address:
ray.init(args.address)
elif args.server_address:
ray.util.connect(args.server_address)
else:
ray.init()
kwargs = {
"data_dir": args.data_dir,
"seed": args.seed,
"use_cuda": args.use_cuda if args.use_cuda else False,
"batch_size": args.batch_size,
"use_adasum": args.use_adasum if args.use_adasum else False,
"lr": args.lr,
"momentum": args.momentum,
"num_epochs": args.num_epochs,
"log_interval": args.log_interval
}
main(
num_workers=args.num_workers,
use_gpu=args.use_cuda if args.use_cuda else False,
kwargs=kwargs)

View file

@ -12,19 +12,27 @@ except ImportError:
pass # This shouldn't be reached - the test should be skipped. pass # This shouldn't be reached - the test should be skipped.
@pytest.fixture # For each test, run it once with ray.init() and again with ray.util.connect().
def ray_start_client(): @pytest.fixture(params=[False, True])
def ray_connect_handler(job_config=None): def ray_start_4_cpus(request):
# Ray client will disconnect from ray when if request.param:
# num_clients == 0.
if ray.is_initialized():
return
else:
return ray.init(job_config=job_config, num_cpus=4)
assert not ray.util.client.ray.is_connected() def ray_connect_handler(job_config=None):
with ray_start_client_server(ray_connect_handler=ray_connect_handler): # Ray client will disconnect from ray when
# num_clients == 0.
if ray.is_initialized():
return
else:
return ray.init(job_config=job_config, num_cpus=4)
assert not ray.util.client.ray.is_connected()
with ray_start_client_server(ray_connect_handler=ray_connect_handler):
assert ray.util.client.ray.is_connected()
yield
else:
ray.init(num_cpus=4)
yield yield
ray.shutdown()
def _train(batch_size=32, batch_per_iter=10): def _train(batch_size=32, batch_per_iter=10):
@ -64,13 +72,11 @@ def _train(batch_size=32, batch_per_iter=10):
@pytest.mark.skipif( @pytest.mark.skipif(
not gloo_built(), reason="Gloo is required for Ray integration") not gloo_built(), reason="Gloo is required for Ray integration")
def test_remote_client_train(ray_start_client): def test_train(ray_start_4_cpus):
def simple_fn(worker): def simple_fn(worker):
local_rank = _train() local_rank = _train()
return local_rank return local_rank
assert ray.util.client.ray.is_connected()
setting = RayExecutor.create_settings(timeout_s=30) setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor( hjob = RayExecutor(
setting, num_workers=3, use_gpu=torch.cuda.is_available()) setting, num_workers=3, use_gpu=torch.cuda.is_available())
@ -82,6 +88,18 @@ def test_remote_client_train(ray_start_client):
hjob.shutdown() hjob.shutdown()
@pytest.mark.skipif(
not gloo_built(), reason="Gloo is required for Ray integration")
def test_horovod_example(ray_start_4_cpus):
from ray.util.horovod.horovod_example import main
kwargs = {
"data_dir": "./data",
"num_epochs": 1,
}
main(num_workers=1, use_gpu=False, kwargs=kwargs)
if __name__ == "__main__": if __name__ == "__main__":
import pytest import pytest
import sys import sys