
1248 lines
44 KiB

.. _train-user-guide:
Ray Train Deep Learning User Guide
.. tip:: Get in touch with us if you're using or considering using `Ray Train <https://forms.gle/PXFcJmHwszCwQhqX7>`_!
Ray Train provides solutions for training machine learning models in a distributed manner on Ray.
This guide focuses on deep learning with PyTorch, TensorFlow and Horovod.
For other model types, distributed training support is available through other Trainers & libraries:
* **Reinforcement Learning:** :ref:`RLlib <rllib-index>`
* **XGBoost:** :doc:`/ray-air/examples/xgboost_example`
* **LightGBM:** :doc:`/ray-air/examples/lightgbm_example`
* **Scikit-Learn** :doc:`/ray-air/examples/sklearn_example`
* **Hugging Face** :doc:`/ray-air/examples/huggingface_text_classification`
* **PyTorch Lightning:** :ref:`ray-lightning`
In this guide, we cover examples for the following use cases:
* How do I :ref:`port my code <train-porting-code>` to using Ray Train?
* How do I use Ray Train to :ref:`train with a large dataset <train-datasets>`?
* How do I :ref:`monitor <train-monitoring>` my training?
* How do I run my training on pre-emptible instances
(:ref:`fault tolerance <train-fault-tolerance>`)?
* How do I :ref:`tune <train-tune>` my Ray Train model?
.. _train-backends:
Ray Train provides a thin API around different backend frameworks for
distributed deep learning. At the moment, Ray Train allows you to perform
training with:
* **PyTorch:** Ray Train initializes your distributed process group, allowing
you to run your ``DistributedDataParallel`` training script. See `PyTorch
Distributed Overview <https://pytorch.org/tutorials/beginner/dist_overview.html>`_
for more information.
* **TensorFlow:** Ray Train configures ``TF_CONFIG`` for you, allowing you to run
your ``MultiWorkerMirroredStrategy`` training script. See `Distributed
training with TensorFlow <https://www.tensorflow.org/guide/distributed_training>`_
for more information.
* **Horovod:** Ray Train configures the Horovod environment and Rendezvous
server for you, allowing you to run your ``DistributedOptimizer`` training
script. See `Horovod documentation <https://horovod.readthedocs.io/en/stable/index.html>`_
for more information.
.. _train-porting-code:
Porting code to Ray Train
The following instructions assume you have a training function
that can already be run on a single worker for one of the supported
:ref:`backend <train-backends>` frameworks.
Update training function
First, you'll want to update your training function to support distributed
.. tabbed:: PyTorch
Ray Train will set up your distributed process group for you and also provides utility methods
to automatically prepare your model and data for distributed training.
.. note::
Ray Train will still work even if you don't use the ``prepare_model`` and ``prepare_data_loader`` utilities below,
and instead handle the logic directly inside your training function.
First, use the ``prepare_model`` function to automatically move your model to the right device and wrap it in
.. code-block:: diff
import torch
from torch.nn.parallel import DistributedDataParallel
+from ray import train
+import ray.train.torch
def train_func():
- device = torch.device(f"cuda:{train.local_rank()}" if
- torch.cuda.is_available() else "cpu")
- torch.cuda.set_device(device)
# Create model.
model = NeuralNetwork()
- model = model.to(device)
- model = DistributedDataParallel(model,
- device_ids=[train.local_rank()] if torch.cuda.is_available() else None)
+ model = train.torch.prepare_model(model)
Then, use the ``prepare_data_loader`` function to automatically add a ``DistributedSampler`` to your ``DataLoader``
and move the batches to the right device.
.. code-block:: diff
import torch
from torch.utils.data import DataLoader, DistributedSampler
+from ray import train
+import ray.train.torch
def train_func():
- device = torch.device(f"cuda:{train.local_rank()}" if
- torch.cuda.is_available() else "cpu")
- torch.cuda.set_device(device)
- data_loader = DataLoader(my_dataset, batch_size=worker_batch_size, sampler=DistributedSampler(dataset))
+ data_loader = DataLoader(my_dataset, batch_size=worker_batch_size)
+ data_loader = train.torch.prepare_data_loader(data_loader)
for X, y in data_loader:
- X = X.to_device(device)
- y = y.to_device(device)
.. tip::
Keep in mind that ``DataLoader`` takes in a ``batch_size`` which is the batch size for each worker.
The global batch size can be calculated from the worker batch size (and vice-versa) with the following equation:
.. code-block::
global_batch_size = worker_batch_size * train.world_size()
.. tabbed:: TensorFlow
.. note::
The current TensorFlow implementation supports
``MultiWorkerMirroredStrategy`` (and ``MirroredStrategy``). If there are
other strategies you wish to see supported by Ray Train, please let us know
by submitting a `feature request on GitHub <https://github.com/ray-project/ray/issues>`_.
These instructions closely follow TensorFlow's `Multi-worker training
with Keras <https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras>`_
tutorial. One key difference is that Ray Train will handle the environment
variable set up for you.
**Step 1:** Wrap your model in ``MultiWorkerMirroredStrategy``.
The `MultiWorkerMirroredStrategy <https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/MultiWorkerMirroredStrategy>`_
enables synchronous distributed training. The ``Model`` *must* be built and
compiled within the scope of the strategy.
.. code-block:: python
with tf.distribute.MultiWorkerMirroredStrategy().scope():
model = ... # build model
**Step 2:** Update your ``Dataset`` batch size to the *global* batch
The `batch <https://www.tensorflow.org/api_docs/python/tf/data/Dataset#batch>`_
will be split evenly across worker processes, so ``batch_size`` should be
set appropriately.
.. code-block:: diff
-batch_size = worker_batch_size
+batch_size = worker_batch_size * train.world_size()
.. tabbed:: Horovod
If you have a training function that already runs with the `Horovod Ray
Executor <https://horovod.readthedocs.io/en/stable/ray_include.html#horovod-ray-executor>`_,
you should not need to make any additional changes!
To onboard onto Horovod, please visit the `Horovod guide
Create Ray Train Trainer
``Trainer``\s are the primary Ray Train classes that are used to manage state and
execute training. You can create a simple ``Trainer`` for the backend of choice
with one of the following:
.. tabbed:: PyTorch
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TorchTrainer(
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
.. tabbed:: TensorFlow
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TensorflowTrainer(
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
.. tabbed:: Horovod
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
To customize the backend setup, you can use a :ref:`train-api-backend-config` object.
.. tabbed:: PyTorch
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer, TorchConfig
trainer = TorchTrainer(
.. tabbed:: TensorFlow
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowConfig
trainer = TensorflowTrainer(
.. tabbed:: Horovod
.. code-block:: python
from ray.air import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig
trainer = HorovodTrainer(
For more configurability, please reference the :class:`BaseTrainer` API.
Run training function
With a distributed training function and a Ray Train ``Trainer``, you are now
ready to start training!
.. code-block:: python
Configuring Training
With Ray Train, you can execute a training function (``train_func``) in a
distributed manner by calling ``Trainer.fit``. To pass arguments
into the training function, you can expose a single ``config`` dictionary parameter:
.. code-block:: diff
-def train_func():
+def train_func(config):
Then, you can pass in the config dictionary as an argument to ``Trainer``:
.. code-block:: diff
+config = {} # This should be populated.
trainer = TorchTrainer(
+ train_loop_config=config,
Putting this all together, you can run your training function with different
configurations. As an example:
.. code-block:: python
from ray.air import session, ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
for i in range(config["num_epochs"]):
session.report({"epoch": i})
trainer = TorchTrainer(
train_loop_config={"num_epochs": 2},
result = trainer.fit()
# 1
A primary use-case for ``config`` is to try different hyperparameters. To
perform hyperparameter tuning with Ray Train, please refer to the
:ref:`Ray Tune integration <train-tune>`.
.. TODO add support for with_parameters
.. _train-result-object:
Accessing Training Results
.. TODO(ml-team) Flesh this section out.
The return of a ``Trainer.fit`` is a :class:`Result` object, containing
information about the training run. You can access it to obtain saved checkpoints,
metrics and other relevant data.
For example, you can:
* Print the metrics for the last training iteration:
.. code-block:: python
from pprint import pprint
# {'_time_this_iter_s': 0.001016855239868164,
# '_timestamp': 1657829125,
# '_training_iteration': 2,
# 'config': {},
# 'date': '2022-07-14_20-05-25',
# 'done': True,
# 'episodes_total': None,
# 'epoch': 1,
# 'experiment_id': '5a3f8b9bf875437881a8ddc7e4dd3340',
# 'experiment_tag': '0',
# 'hostname': 'ip-172-31-43-110',
# 'iterations_since_restore': 2,
# 'node_ip': '',
# 'pid': 654068,
# 'time_since_restore': 3.4353830814361572,
# 'time_this_iter_s': 0.00809168815612793,
# 'time_total_s': 3.4353830814361572,
# 'timestamp': 1657829125,
# 'timesteps_since_restore': 0,
# 'timesteps_total': None,
# 'training_iteration': 2,
# 'trial_id': '4913f_00000',
# 'warmup_time': 0.003167867660522461}
* View the dataframe containing the metrics from all iterations:
.. code-block:: python
* Obtain the :class:`Checkpoint`, used for resuming training, prediction and serving.
.. code-block:: python
result.checkpoint # last saved checkpoint
result.best_checkpoints # N best saved checkpoints, as configured in run_config
.. _train-log-dir:
Log Directory Structure
Each ``Trainer`` will have a local directory created for logs and checkpoints.
You can obtain the path to the directory by accessing the ``log_dir`` attribute
of the :class:`Result` object returned by ``Trainer.fit``.
.. code-block:: python
# '/home/ubuntu/ray_results/TorchTrainer_2022-06-13_20-31-06/checkpoint_000003'
.. _train-datasets:
Distributed Data Ingest with Ray Datasets
Ray Train provides native support for :ref:`Ray Datasets <datasets>` to support the following use cases:
1. **Large Datasets**: With Ray Datasets, you can easily work with datasets that are too big to fit on a single node.
Ray Datasets will distribute the dataset across the Ray Cluster and allow you to perform dataset operations (map, filter, etc.)
on the distributed dataset.
2. **Automatic locality-aware sharding**: If provided a Ray Dataset, Ray Train will automatically shard the dataset and assign each shard
to a training worker while minimizing cross-node data transfer. Unlike with standard Torch or TensorFlow datasets, each training
worker will only load its assigned shard into memory rather than the entire ``Dataset``.
3. **Pipelined Execution**: Ray Datasets also supports pipelining, meaning that data processing operations
can be run concurrently with training. Training is no longer blocked on expensive data processing operations (such as global shuffling)
and this minimizes the amount of time your GPUs are idle. See :ref:`dataset-pipeline-api` for more information.
To get started, pass in a Ray Dataset (or multiple) into ``Trainer``. Underneath the hood, Ray Train will automatically shard the given dataset.
Using Ray Datasets is the recommended way for ingesting data into ``Trainer``\s and can be used with any ``Trainer`` in Ray AIR.
.. warning::
If you are doing distributed training with TensorFlow, you will need to
disable TensorFlow's built-in autosharding as the data on each worker is
already sharded.
.. code-block:: python
:emphasize-lines: 1, 6
from ray.train.tensorflow import prepare_dataset_shard
def train_func():
tf_dataset = ray.train.get_dataset_shard().to_tf(...)
tf_dataset = prepare_dataset_shard(tf_dataset)
**Simple Dataset Example**
.. code-block:: python
import ray
from ray import train
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
# Create your model here.
model = NeuralNetwork()
batch_size = config["worker_batch_size"]
train_data_shard = train.get_dataset_shard("train")
train_torch_dataset = train_data_shard.to_torch(
label_column="label", batch_size=batch_size
validation_data_shard = train.get_dataset_shard("validation")
validation_torch_dataset = validation_data_shard.to_torch(
label_column="label", batch_size=batch_size
for epoch in config["num_epochs"]:
for X, y in train_torch_dataset:
output = model(X)
# Train on one batch.
for X, y in validation_torch_dataset:
output = model(X)
# Validate one batch.
return model
# Random split dataset into 80% training data and 20% validation data.
train_dataset, validation_dataset = dataset.train_test_split(
test_size=0.2, shuffle=True
trainer = TorchTrainer(
train_loop_config={"worker_batch_size": 64, "num_epochs": 2},
datasets={"train": train_dataset, "validation": validation_dataset},
dataset = ray.data.read_csv("...")
result = trainer.fit()
.. _train-dataset-pipeline:
Pipelined Execution
For details on how to enable pipelined execution, please refer to :ref:`air-ingest`.
.. TODO link to Training Run Iterator API as a 3rd option for logging.
.. _train-monitoring:
Logging, Checkpointing and Callbacks
Ray Train has mechanisms to easily collect intermediate results from the training workers during the training run
and also has a :ref:`Callback interface <train-callbacks>` to perform actions on these intermediate results (such as logging, aggregations, etc.).
You can use either the :ref:`built-in callbacks <air-builtin-callbacks>` that Ray AIR provides,
or implement a :ref:`custom callback <train-custom-callbacks>` for your use case. The callback API
is shared with Ray Tune.
.. _train-checkpointing:
Ray Train also provides a way to save :ref:`Checkpoints <air-checkpoints-doc>` during the training process. This is
useful for:
1. :ref:`Integration with Ray Tune <train-tune>` to use certain Ray Tune
2. Running a long-running training job on a cluster of pre-emptible machines/pods.
3. Persisting trained model state to later use for serving/inference.
4. In general, storing any model artifacts.
Reporting intermediate results and handling checkpoints
Ray AIR provides a *Session* API for reporting intermediate
results and checkpoints from the training function (run on distributed workers) up to the
``Trainer`` (where your python script is executed) by calling ``session.report(metrics)``.
The results will be collected from the distributed workers and passed to the driver to
be logged and displayed.
.. warning::
Only the results from rank 0 worker will be used. However, in order to ensure
consistency, ``session.report()`` has to be called on each worker.
The primary use-case for reporting is for metrics (accuracy, loss, etc.) at
the end of each training epoch.
.. code-block:: python
from ray.air import session
def train_func():
for i in range(num_epochs):
result = model.train(...)
session.report({"result": result})
The session concept exists on several levels: The execution layer (called `Tune Session`) and the Data Parallel training layer
(called `Train Session`).
The following figure shows how these two sessions look like in a Data Parallel training scenario.
.. image:: ../ray-air/images/session.svg
:width: 650px
:align: center
Saving checkpoints
:ref:`Checkpoints <air-checkpoints-doc>` can be saved by calling ``session.report(metrics, checkpoint=Checkpoint(...))`` in the
training function. This will cause the checkpoint state from the distributed
workers to be saved on the ``Trainer`` (where your python script is executed).
The latest saved checkpoint can be accessed through the ``checkpoint`` attribute of
the :class:`Result`, and the best saved checkpoints can be accessed by the ``best_checkpoints``
Concrete examples are provided to demonstrate how checkpoints (model weights but not models) are saved
appropriately in distributed training.
.. tabbed:: PyTorch
.. code-block:: python
:emphasize-lines: 36, 37, 38, 39, 40, 41
import ray.train.torch
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present
from torch.optim import Adam
import numpy as np
def train_func(config):
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
# toy neural network : 1-layer
# wrap the model in DDP
model = ray.train.torch.prepare_model(nn.Linear(4, 1))
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=3e-4)
for epoch in range(config["num_epochs"]):
y = model.forward(X)
# compute loss
loss = criterion(y, Y)
# back-propagate loss
# To fetch non-DDP state_dict
# w/o DDP: model.state_dict()
# w/ DDP: model.module.state_dict()
# See: https://github.com/ray-project/ray/issues/20915
state_dict = model.state_dict()
consume_prefix_in_state_dict_if_present(state_dict, "module.")
checkpoint = Checkpoint.from_dict(
dict(epoch=epoch, model_weights=state_dict)
session.report({}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_loop_config={"num_epochs": 5},
result = trainer.fit()
# {'epoch': 4, 'model_weights': OrderedDict([('bias', tensor([-0.1215])), ('weight', tensor([[0.3253, 0.1979, 0.4525, 0.2850]]))]), '_timestamp': 1656107095, '_preprocessor': None, '_current_checkpoint_id': 4}
.. tabbed:: TensorFlow
.. code-block:: python
:emphasize-lines: 23
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(config["num_epochs"]):
model.fit(X, Y, batch_size=20)
checkpoint = Checkpoint.from_dict(
dict(epoch=epoch, model_weights=model.get_weights())
session.report({}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_loop_config={"num_epochs": 5},
result = trainer.fit()
# {'epoch': 4, 'model_weights': [array([[-0.31858477],
# [ 0.03747174],
# [ 0.28266194],
# [ 0.8626015 ]], dtype=float32), array([0.02230084], dtype=float32)], '_timestamp': 1656107383, '_preprocessor': None, '_current_checkpoint_id': 4}
By default, checkpoints will be persisted to local disk in the :ref:`log
directory <train-log-dir>` of each run.
.. code-block:: python
# ('local_path', '/home/ubuntu/ray_results/TorchTrainer_2022-06-24_21-34-49/TorchTrainer_7988b_00000_0_2022-06-24_21-34-49/checkpoint_000003')
Configuring checkpoints
For more configurability of checkpointing behavior (specifically saving
checkpoints to disk), a :class:`CheckpointConfig` can be passed into
As an example, to completely disable writing checkpoints to disk:
.. code-block:: python
:emphasize-lines: 9,14
from ray import train
from ray.air import RunConfig, CheckpointConfig, ScalingConfig
from ray.train.torch import TorchTrainer
def train_func():
for epoch in range(3):
checkpoint_config = CheckpointConfig(num_to_keep=0)
trainer = TorchTrainer(
You may also config ``CheckpointConfig`` to keep the "N best" checkpoints persisted to disk. The following example shows how you could keep the 2 checkpoints with the lowest "loss" value:
.. code-block:: python
from ray.air import session, Checkpoint, RunConfig, CheckpointConfig, ScalingConfig
from ray.train.torch import TorchTrainer
def train_func():
# first checkpoint
session.report(dict(loss=2), checkpoint=Checkpoint.from_dict(dict(loss=2)))
# second checkpoint
session.report(dict(loss=2), checkpoint=Checkpoint.from_dict(dict(loss=4)))
# third checkpoint
session.report(dict(loss=2), checkpoint=Checkpoint.from_dict(dict(loss=1)))
# fourth checkpoint
session.report(dict(loss=2), checkpoint=Checkpoint.from_dict(dict(loss=3)))
# Keep the 2 checkpoints with the smallest "loss" value.
checkpoint_config = CheckpointConfig(
num_to_keep=2, checkpoint_score_attribute="loss", checkpoint_score_order="min"
trainer = TorchTrainer(
result = trainer.fit()
# ('local_path', '/home/ubuntu/ray_results/TorchTrainer_2022-06-24_21-34-49/TorchTrainer_7988b_00000_0_2022-06-24_21-34-49/checkpoint_000000')
# ('local_path', '/home/ubuntu/ray_results/TorchTrainer_2022-06-24_21-34-49/TorchTrainer_7988b_00000_0_2022-06-24_21-34-49/checkpoint_000002')
Loading checkpoints
Checkpoints can be loaded into the training function in 2 steps:
1. From the training function, ``session.get_checkpoint`` can be used to access
the most recently saved :class:`Checkpoint`. This is useful to continue training even
if there's a worker failure.
2. The checkpoint to start training with can be bootstrapped by passing in a
:class:`Checkpoint` to ``Trainer`` as the ``resume_from_checkpoint`` argument.
.. tabbed:: PyTorch
.. code-block:: python
:emphasize-lines: 23, 25, 26, 29, 30, 31, 35
import ray.train.torch
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present
from torch.optim import Adam
import numpy as np
def train_func(config):
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
# toy neural network : 1-layer
model = nn.Linear(4, 1)
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=3e-4)
start_epoch = 0
checkpoint = session.get_checkpoint()
if checkpoint:
# assume that we have run the session.report() example
# and successfully save some model weights
checkpoint_dict = checkpoint.to_dict()
start_epoch = checkpoint_dict.get("epoch", -1) + 1
# wrap the model in DDP
model = ray.train.torch.prepare_model(model)
for epoch in range(start_epoch, config["num_epochs"]):
y = model.forward(X)
# compute loss
loss = criterion(y, Y)
# back-propagate loss
state_dict = model.state_dict()
consume_prefix_in_state_dict_if_present(state_dict, "module.")
checkpoint = Checkpoint.from_dict(
dict(epoch=epoch, model_weights=state_dict)
session.report({}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_loop_config={"num_epochs": 2},
# save a checkpoint
result = trainer.fit()
# load checkpoint
trainer = TorchTrainer(
train_loop_config={"num_epochs": 4},
result = trainer.fit()
# {'epoch': 3, 'model_weights': OrderedDict([('bias', tensor([0.0902])), ('weight', tensor([[-0.1549, -0.0861, 0.4353, -0.4116]]))]), '_timestamp': 1656108265, '_preprocessor': None, '_current_checkpoint_id': 2}
.. tabbed:: TensorFlow
.. code-block:: python
:emphasize-lines: 15, 21, 22, 25, 26, 27, 30
from ray.air import session, Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
start_epoch = 0
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
checkpoint = session.get_checkpoint()
if checkpoint:
# assume that we have run the session.report() example
# and successfully save some model weights
checkpoint_dict = checkpoint.to_dict()
start_epoch = checkpoint_dict.get("epoch", -1) + 1
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(start_epoch, config["num_epochs"]):
model.fit(X, Y, batch_size=20)
checkpoint = Checkpoint.from_dict(
dict(epoch=epoch, model_weights=model.get_weights())
session.report({}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_loop_config={"num_epochs": 2},
# save a checkpoint
result = trainer.fit()
# load a checkpoint
trainer = TensorflowTrainer(
train_loop_config={"num_epochs": 5},
result = trainer.fit()
# {'epoch': 4, 'model_weights': [array([[-0.70056134],
# [-0.8839263 ],
# [-1.0043601 ],
# [-0.61634773]], dtype=float32), array([0.01889327], dtype=float32)], '_timestamp': 1656108446, '_preprocessor': None, '_current_checkpoint_id': 3}
.. _train-callbacks:
You may want to plug in your training code with your favorite experiment management framework.
Ray AIR provides an interface to fetch intermediate results and callbacks to process/log your intermediate results
(the values passed into ``session.report(...)``).
Ray AIR contains :ref:`built-in callbacks <air-builtin-callbacks>` for popular tracking frameworks, or you can implement your own callback via the :ref:`Callback <tune-callbacks-docs>` interface.
Example: Logging to MLflow and TensorBoard
**Step 1: Install the necessary packages**
.. code-block:: bash
$ pip install mlflow
$ pip install tensorboardX
**Step 2: Run the following training script**
.. literalinclude:: /../../python/ray/train/examples/mlflow_simple_example.py
:language: python
.. _train-custom-callbacks:
Custom Callbacks
If the provided callbacks do not cover your desired integrations or use-cases,
you may always implement a custom callback by subclassing ``Callback``. If
the callback is general enough, please feel welcome to :ref:`add it <getting-involved>`
to the ``ray`` `repository <https://github.com/ray-project/ray>`_.
A simple example for creating a callback that will print out results:
.. code-block:: python
from typing import List, Dict
from ray.air import session, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.tune.logger import LoggerCallback
# LoggerCallback is a higher level API of Callback.
class LoggingCallback(LoggerCallback):
def __init__(self) -> None:
self.results = []
def log_trial_result(self, iteration: int, trial: "Trial", result: Dict):
def train_func():
for i in range(3):
session.report({"epoch": i})
callback = LoggingCallback()
trainer = TorchTrainer(
print("\n".join([str(x) for x in callback.results]))
# {'trial_id': '0f1d0_00000', 'experiment_id': '494a1d050b4a4d11aeabd87ba475fcd3', 'date': '2022-06-27_17-03-28', 'timestamp': 1656349408, 'pid': 23018, 'hostname': 'ip-172-31-43-110', 'node_ip': '', 'config': {}}
# {'epoch': 0, '_timestamp': 1656349412, '_time_this_iter_s': 0.0026497840881347656, '_training_iteration': 1, 'time_this_iter_s': 3.433483362197876, 'done': False, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 1, 'trial_id': '0f1d0_00000', 'experiment_id': '494a1d050b4a4d11aeabd87ba475fcd3', 'date': '2022-06-27_17-03-32', 'timestamp': 1656349412, 'time_total_s': 3.433483362197876, 'pid': 23018, 'hostname': 'ip-172-31-43-110', 'node_ip': '', 'config': {}, 'time_since_restore': 3.433483362197876, 'timesteps_since_restore': 0, 'iterations_since_restore': 1, 'warmup_time': 0.003779172897338867, 'experiment_tag': '0'}
# {'epoch': 1, '_timestamp': 1656349412, '_time_this_iter_s': 0.0013833045959472656, '_training_iteration': 2, 'time_this_iter_s': 0.016670703887939453, 'done': False, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 2, 'trial_id': '0f1d0_00000', 'experiment_id': '494a1d050b4a4d11aeabd87ba475fcd3', 'date': '2022-06-27_17-03-32', 'timestamp': 1656349412, 'time_total_s': 3.4501540660858154, 'pid': 23018, 'hostname': 'ip-172-31-43-110', 'node_ip': '', 'config': {}, 'time_since_restore': 3.4501540660858154, 'timesteps_since_restore': 0, 'iterations_since_restore': 2, 'warmup_time': 0.003779172897338867, 'experiment_tag': '0'}
Example: PyTorch Distributed metrics
In real applications, you may want to calculate optimization metrics besides
accuracy and loss: recall, precision, Fbeta, etc.
Ray Train natively supports `TorchMetrics <https://torchmetrics.readthedocs.io/en/latest/>`_, which provides a collection of machine learning metrics for distributed, scalable PyTorch models.
Here is an example:
.. code-block:: python
from typing import List, Dict
from ray.air import session, ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torchmetrics
def train_func(config):
preds = torch.randn(10, 5).softmax(dim=-1)
target = torch.randint(5, (10,))
accuracy = torchmetrics.functional.accuracy(preds, target).item()
session.report({"accuracy": accuracy})
trainer = TorchTrainer(train_func, scaling_config=ScalingConfig(num_workers=2))
result = trainer.fit()
# 0.20000000298023224
.. Running on the cloud
.. --------------------
.. Use Ray Train with the Ray cluster launcher by changing the following:
.. .. code-block:: bash
.. ray up cluster.yaml
.. TODO.
.. _train-fault-tolerance:
Fault Tolerance & Elastic Training
Ray Train has built-in fault tolerance to recover from worker failures (i.e.
``RayActorError``\s). When a failure is detected, the workers will be shut
down and new workers will be added in. The training function will be
restarted, but progress from the previous execution can be resumed through
.. warning:: In order to retain progress when recovery, your training function
**must** implement logic for both saving *and* loading :ref:`checkpoints
Each instance of recovery from a worker failure is considered a retry. The
number of retries is configurable through the ``max_failures`` attribute of the
``failure_config`` argument set in the ``run_config`` argument passed to the
.. note:: Elastic Training is not yet supported.
.. Running on pre-emptible machines
.. --------------------------------
.. You may want to
.. TODO.
.. We do not have a profiling callback in AIR as the execution engine has changed to Tune. The behavior of the callback can be replicated with checkpoints (do a trace, save it to checkpoint, it gets downloaded to driver every iteration).
.. .. _train-profiling:
.. Profiling
.. ---------
.. Ray Train comes with an integration with `PyTorch Profiler <https://pytorch.org/blog/introducing-pytorch-profiler-the-new-and-improved-performance-tool/>`_.
.. Specifically, it comes with a :ref:`TorchWorkerProfiler <train-api-torch-worker-profiler>` utility class and :ref:`train-api-torch-tensorboard-profiler-callback` callback
.. that allow you to use the PyTorch Profiler as you would in a non-distributed PyTorch script, and synchronize the generated Tensorboard traces onto
.. the disk that from which your script was executed from.
.. **Step 1: Update training function with** ``TorchWorkerProfiler``
.. .. code-block:: bash
.. from ray.train.torch import TorchWorkerProfiler
.. def train_func():
.. twp = TorchWorkerProfiler()
.. with profile(..., on_trace_ready=twp.trace_handler) as p:
.. ...
.. profile_results = twp.get_and_clear_profile_traces()
.. train.report(..., **profile_results)
.. ...
.. **Step 2: Run training function with** ``TorchTensorboardProfilerCallback``
.. .. code-block:: python
.. from ray.train import Trainer
.. from ray.train.callbacks import TorchTensorboardProfilerCallback
.. trainer = Trainer(backend="torch", num_workers=2)
.. trainer.start()
.. trainer.run(train_func, callbacks=[TorchTensorboardProfilerCallback()])
.. trainer.shutdown()
.. **Step 3: Visualize the logs**
.. .. code-block:: bash
.. # Navigate to the run directory of the trainer.
.. # For example `cd /home/ray_results/train_2021-09-01_12-00-00/run_001/pytorch_profiler`
.. $ cd <TRAINER_RUN_DIR>/pytorch_profiler
.. # Install the PyTorch Profiler TensorBoard Plugin.
.. $ pip install torch_tb_profiler
.. # Star the TensorBoard UI.
.. $ tensorboard --logdir .
.. # View the PyTorch Profiler traces.
.. $ open http://localhost:6006/#pytorch_profiler
.. _train-tune:
Hyperparameter tuning (Ray Tune)
Hyperparameter tuning with :ref:`Ray Tune <tune-main>` is natively supported
with Ray Train. Specifically, you can take an existing ``Trainer`` and simply
pass it into a :class:`Tuner`.
.. code-block:: python
from ray import tune
from ray.air import session, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.tune.tuner import Tuner, TuneConfig
def train_func(config):
# In this example, nothing is expected to change over epochs,
# and the output metric is equivalent to the input value.
for _ in range(config["num_epochs"]):
trainer = TorchTrainer(train_func, scaling_config=ScalingConfig(num_workers=2))
tuner = Tuner(
"train_loop_config": {
"num_epochs": 2,
"input": tune.grid_search([1, 2, 3]),
tune_config=TuneConfig(num_samples=5, metric="output", mode="max"),
result_grid = tuner.fit()
# 3
.. _torch-amp:
Automatic Mixed Precision
Automatic mixed precision (AMP) lets you train your models faster by using a lower
precision datatype for operations like linear layers and convolutions.
.. tabbed:: PyTorch
You can train your Torch model with AMP by:
1. Adding ``train.torch.accelerate(amp=True)`` to the top of your training function.
2. Wrapping your optimizer with ``train.torch.prepare_optimizer``.
3. Replacing your backward call with ``train.torch.backward``.
.. code-block:: diff
def train_func():
+ train.torch.accelerate(amp=True)
model = NeuralNetwork()
model = train.torch.prepare_model(model)
data_loader = DataLoader(my_dataset, batch_size=worker_batch_size)
data_loader = train.torch.prepare_data_loader(data_loader)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
+ optimizer = train.torch.prepare_optimizer(optimizer)
for epoch in range(90):
for images, targets in dataloader:
outputs = model(images)
loss = torch.nn.functional.cross_entropy(outputs, targets)
- loss.backward()
+ train.torch.backward(loss)
.. note:: The performance of AMP varies based on GPU architecture, model type,
and data shape. For certain workflows, AMP may perform worse than
full-precision training.
.. _train-reproducibility:
.. tabbed:: PyTorch
To limit sources of nondeterministic behavior, add
``train.torch.enable_reproducibility()`` to the top of your training
.. code-block:: diff
def train_func():
+ train.torch.enable_reproducibility()
model = NeuralNetwork()
model = train.torch.prepare_model(model)
.. warning:: ``train.torch.enable_reproducibility`` can't guarantee
completely reproducible results across executions. To learn more, read
the `PyTorch notes on randomness <https://pytorch.org/docs/stable/notes/randomness.html>`_.
import ray
from ray import tune
def training_func(config):
dataloader = ray.train.get_dataset()\
for i in config["epochs"]:
ray.train.report(...) # use same intermediate reporting API
# Declare the specification for training.
trainer = Trainer(backend="torch", num_workers=12, use_gpu=True)
dataset = ray.dataset.window()
# Convert this to a trainable.
trainable = trainer.to_tune_trainable(training_func, dataset=dataset)
tuner = tune.Tuner(trainable,
param_space={"lr": tune.uniform(), "batch_size": tune.randint(1, 2, 3)},
results = tuner.fit()
Advanced APIs
Training Run Iterator API
Stateful Class API