[Train] Rename Ray SGD v2 to Ray Train (#19436)

This commit is contained in:
matthewdeng 2021-10-18 22:27:46 -07:00 committed by GitHub
parent 46b4c7464d
commit 4674c78050
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
98 changed files with 924 additions and 896 deletions

View file

@ -1,8 +1,8 @@
- label: ":tv: :octopus: SGD GPU tests "
conditions: ["RAY_CI_SGD_AFFECTED"]
- label: ":tv: :steam_locomotive: Train GPU tests "
conditions: ["RAY_CI_TRAIN_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- SGD_TESTING=1 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
- pip install -Ur ./python/requirements_ml_docker.txt
- ./ci/travis/env_info.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=gpu,gpu_only python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=gpu,gpu_only python/ray/train/...

View file

@ -483,6 +483,13 @@
- ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=soft_imports python/ray/tune/...
- label: ":steam_locomotive: Train tests and examples"
conditions: ["RAY_CI_TRAIN_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- SGD_TESTING=1 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-gpu_only python/ray/train/...
- label: ":octopus: SGD tests and examples"
conditions: ["RAY_CI_SGD_AFFECTED"]
commands:
@ -491,7 +498,6 @@
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=tf,-pytorch,-py37,-flaky,-client,-gpu_only python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-tf,pytorch,-py37,-flaky,-client,-gpu_only python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client_unit_tests,-gpu_only --test_env=RAY_CLIENT_MODE=1 python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-gpu_only python/ray/util/sgd/v2/...
- label: ":octopus: Tune/SGD/Modin/Dask tests and examples. Python 3.7"
conditions: ["RAY_CI_TUNE_AFFECTED", "RAY_CI_SGD_AFFECTED"]

View file

@ -33,11 +33,11 @@ body:
- "Ray Core"
- "Ray Tune"
- "Ray Serve"
- "Ray Train"
- "RLlib"
- "Ray Clusters"
- "Monitoring & Debugging"
- "Dashboard"
- "RaySGD"
- "Others"
validations:
required: true

View file

@ -21,7 +21,7 @@ Ray is packaged with the following libraries for accelerating machine learning w
- `Tune`_: Scalable Hyperparameter Tuning
- `RLlib`_: Scalable Reinforcement Learning
- `RaySGD <https://docs.ray.io/en/master/raysgd/raysgd.html>`__: Distributed Training Wrappers
- `Train`_: Distributed Deep Learning (alpha)
- `Datasets`_: Flexible Distributed Data Loading (beta)
As well as libraries for taking ML and distributed apps to production:
@ -43,6 +43,7 @@ Install Ray with: ``pip install ray``. For nightly wheels, see the
.. _`Serve`: https://docs.ray.io/en/master/serve/index.html
.. _`Datasets`: https://docs.ray.io/en/master/data/dataset.html
.. _`Workflows`: https://docs.ray.io/en/master/workflows/concepts.html
.. _`Train`: https://docs.ray.io/en/master/train/train.html
Quick Start

View file

@ -72,6 +72,7 @@ if __name__ == "__main__":
RAY_CI_TUNE_AFFECTED = 0
RAY_CI_SGD_AFFECTED = 0
RAY_CI_TRAIN_AFFECTED = 0
RAY_CI_ONLY_RLLIB_AFFECTED = 0 # Whether only RLlib is affected.
RAY_CI_RLLIB_AFFECTED = 0 # Whether RLlib minimal tests should be run.
RAY_CI_RLLIB_FULL_AFFECTED = 0 # Whether full RLlib tests should be run.
@ -110,6 +111,10 @@ if __name__ == "__main__":
RAY_CI_SGD_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
elif changed_file.startswith("python/ray/train"):
RAY_CI_TRAIN_AFFECTED = 1
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
elif re.match("^(python/ray/)?rllib/", changed_file):
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_RLLIB_FULL_AFFECTED = 1
@ -133,6 +138,7 @@ if __name__ == "__main__":
elif changed_file.startswith("python/"):
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TRAIN_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_SERVE_AFFECTED = 1
RAY_CI_PYTHON_AFFECTED = 1
@ -167,6 +173,7 @@ if __name__ == "__main__":
elif changed_file.startswith("src/"):
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TRAIN_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_SERVE_AFFECTED = 1
RAY_CI_JAVA_AFFECTED = 1
@ -189,6 +196,7 @@ if __name__ == "__main__":
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TRAIN_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_SERVE_AFFECTED = 1
RAY_CI_JAVA_AFFECTED = 1
@ -203,6 +211,7 @@ if __name__ == "__main__":
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TRAIN_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_RLLIB_FULL_AFFECTED = 1
RAY_CI_SERVE_AFFECTED = 1
@ -221,13 +230,15 @@ if __name__ == "__main__":
RAY_CI_STREAMING_CPP_AFFECTED and \
not RAY_CI_STREAMING_PYTHON_AFFECTED and \
not RAY_CI_STREAMING_JAVA_AFFECTED and \
not RAY_CI_SGD_AFFECTED:
not RAY_CI_SGD_AFFECTED and\
not RAY_CI_TRAIN_AFFECTED:
RAY_CI_ONLY_RLLIB_AFFECTED = 1
# Log the modified environment variables visible in console.
output_string = " ".join([
"RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED),
"RAY_CI_SGD_AFFECTED={}".format(RAY_CI_SGD_AFFECTED),
"RAY_CI_TRAIN_AFFECTED={}".format(RAY_CI_TRAIN_AFFECTED),
"RAY_CI_ONLY_RLLIB_AFFECTED={}".format(RAY_CI_ONLY_RLLIB_AFFECTED),
"RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED),
"RAY_CI_RLLIB_FULL_AFFECTED={}".format(RAY_CI_RLLIB_FULL_AFFECTED),

View file

@ -230,8 +230,8 @@ Example: Per-Epoch Shuffle Pipeline
.. tip::
If you interested in distributed ingest for deep learning, it is
recommended to use Ray Datasets in conjunction with :ref:`Ray SGD <sgd-v2-docs>`.
See the :ref:`example below<dataset-pipeline-ray-sgd>` for more info.
recommended to use Ray Datasets in conjunction with :ref:`Ray Train <train-docs>`.
See the :ref:`example below<dataset-pipeline-ray-train>` for more info.
..
https://docs.google.com/drawings/d/1vWQ-Zfxy2_Gthq8l3KmNsJ7nOCuYUQS9QMZpj5GHYx0/edit
@ -300,13 +300,13 @@ Similar to how you can ``.split()`` a Dataset, you can also split a DatasetPipel
.. image:: dataset-repeat-2.svg
.. _dataset-pipeline-ray-sgd:
.. _dataset-pipeline-ray-train:
Distributed Ingest with Ray SGD
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Ray Datasets integrates with :ref:`Ray SGD <sgd-v2-docs>`, further simplifying your distributed ingest pipeline.
Distributed Ingest with Ray Train
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Ray Datasets integrates with :ref:`Ray Train <train-docs>`, further simplifying your distributed ingest pipeline.
Ray SGD is a lightweight library for scalable deep learning on Ray.
Ray Train is a lightweight library for scalable deep learning on Ray.
1. It allows you to focus on the training logic and automatically handles distributed setup for your framework of choice (PyTorch, Tensorflow, or Horovod).
2. It has out of the box fault-tolerance and elastic training
@ -319,7 +319,7 @@ Ray SGD is a lightweight library for scalable deep learning on Ray.
def train_func():
# This is a dummy train function just iterating over the dataset shard.
# You should replace this with your training logic.
shard = ray.sgd.get_dataset_shard()
shard = ray.train.get_dataset_shard()
for row in shard.iter_rows():
print(row)
@ -338,8 +338,8 @@ Ray SGD is a lightweight library for scalable deep learning on Ray.
config={"worker_batch_size": 64, "num_epochs": 2},
dataset=pipe)
Ray SGD is responsible for the orchestration of the training workers and will automatically split the Dataset for you.
See :ref:`the SGD User Guide <sgd-dataset-pipeline>` for more details.
Ray Train is responsible for the orchestration of the training workers and will automatically split the Dataset for you.
See :ref:`the Train User Guide <train-dataset-pipeline>` for more details.
Changing Pipeline Structure
---------------------------

View file

@ -16,7 +16,7 @@ Ray Datasets are the standard way to load and exchange data in Ray libraries and
Concepts
--------
Ray Datasets implement `Distributed Arrow <https://arrow.apache.org/>`__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either an `Arrow table <https://arrow.apache.org/docs/python/data.html#tables>`__ or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data (e.g., into :ref:`Ray SGD <sgd-v2-docs>` for ML training).
Ray Datasets implement `Distributed Arrow <https://arrow.apache.org/>`__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either an `Arrow table <https://arrow.apache.org/docs/python/data.html#tables>`__ or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data (e.g., into :ref:`Ray Train <train-docs>` for ML training).
The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:

View file

@ -140,7 +140,7 @@ PyTorch.
return torch.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=[150, 250, 350], gamma=0.1)
# You can use the RayDP Estimator API or libraries like RaySGD for distributed training.
# You can use the RayDP Estimator API or libraries like Ray Train for distributed training.
from raydp.torch import TorchEstimator
estimator = TorchEstimator(
num_workers = 2,

View file

@ -332,14 +332,14 @@ Papers
.. toctree::
:hidden:
:maxdepth: -1
:caption: Ray SGD
:caption: Ray Train
raysgd/v2/raysgd.rst
raysgd/v2/user_guide.rst
raysgd/v2/examples.rst
raysgd/v2/architecture.rst
raysgd/v2/api.rst
raysgd/v2/migration-guide.rst
train/train.rst
train/user_guide.rst
train/examples.rst
train/architecture.rst
train/api.rst
train/migration-guide.rst
RaySGD v1: Distributed Training Wrappers <raysgd/raysgd.rst>
.. toctree::

View file

@ -1,9 +1,7 @@
.. _lightgbm-ray:
..
This part of the docs is generated from the LightGBM-Ray readme using m2r
To update:
- run `m2r /path/to/_lightgbm_ray/README.md`
- run `m2r /path/to/lightgbm_ray/README.md`
- copy the contents of README.rst here
- Get rid of the badges in the top
- Get rid of the references section at the bottom
@ -12,6 +10,8 @@
by adding a second underscore (use `target <link>`__)
- Search for `\ **` and delete this from the links (bold links are not supported)
.. _lightgbm-ray:
Distributed LightGBM on Ray
===========================

View file

@ -1,9 +1,7 @@
.. _ray-lightning:
..
This part of the docs is generated from the Ray Lightning readme using m2r
To update:
- run `m2r /path/to/xgboost_ray/README.md`
- run `m2r /path/to/ray_lightning/README.md`
- copy the contents of README.rst here
- remove the table of contents
- remove the PyTorch Lightning Compatibility section
@ -12,6 +10,8 @@
by adding a second underscore (use `target <link>`__)
- Search for `\ **` and delete this from the links (bold links are not supported)
.. _ray-lightning:
Distributed PyTorch Lightning Training on Ray
=============================================

View file

@ -5,8 +5,8 @@ RaySGD: Distributed Training Wrappers
=====================================
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD is in alpha as of Ray 1.7.
See the documentation :ref:`here <sgd-v2-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD (named Ray Train) is in alpha as of Ray 1.7.
See the documentation :ref:`here <train-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
RaySGD is a lightweight library for distributed deep learning, providing thin wrappers around PyTorch and TensorFlow native modules for data parallel training.

View file

@ -3,8 +3,8 @@
Distributed PyTorch
===================
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD is in alpha as of Ray 1.7.
See the documentation :ref:`here <sgd-v2-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD (named Ray Train) is in alpha as of Ray 1.7.
See the documentation :ref:`here <train-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
The RaySGD ``TorchTrainer`` simplifies distributed model training for PyTorch.

View file

@ -1,8 +1,8 @@
Distributed TensorFlow
======================
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD is in alpha as of Ray 1.7.
See the documentation :ref:`here <sgd-v2-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD (named Ray Train) is in alpha as of Ray 1.7.
See the documentation :ref:`here <train-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
RaySGD's ``TFTrainer`` simplifies distributed model training for Tensorflow. The ``TFTrainer`` is a wrapper around ``MultiWorkerMirroredStrategy`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to write custom logic of setting environments and starting separate processes.

View file

@ -3,8 +3,8 @@
RaySGD Hyperparameter Tuning
============================
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD is in alpha as of Ray 1.7.
See the documentation :ref:`here <sgd-v2-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
.. warning:: This is an older version of Ray SGD. A newer, more light-weight version of Ray SGD (named Ray Train) is in alpha as of Ray 1.7.
See the documentation :ref:`here <train-docs>`. To migrate from v1 to v2 you can follow the :ref:`migration guide <sgd-migration>`.
RaySGD integrates with :ref:`Ray Tune <tune-60-seconds>` to easily run distributed hyperparameter tuning experiments with your RaySGD Trainer.

View file

@ -1,111 +0,0 @@
.. _sgd-api:
RaySGD API
==========
.. _sgd-api-trainer:
Trainer
-------
.. autoclass:: ray.sgd.Trainer
:members:
.. _sgd-api-iterator:
SGDIterator
~~~~~~~~~~~
.. autoclass:: ray.sgd.SGDIterator
:members:
.. _sgd-api-backend-config:
Backend Configurations
----------------------
.. _sgd-api-torch-config:
TorchConfig
~~~~~~~~~~~
.. autoclass:: ray.sgd.TorchConfig
.. _sgd-api-tensorflow-config:
TensorflowConfig
~~~~~~~~~~~~~~~~
.. autoclass:: ray.sgd.TensorflowConfig
.. _sgd-api-horovod-config:
HorovodConfig
~~~~~~~~~~~~~
.. autoclass:: ray.sgd.HorovodConfig
Callbacks
---------
.. _sgd-api-callback:
SGDCallback
~~~~~~~~~~~
.. autoclass:: ray.sgd.SGDCallback
:members:
.. _sgd-api-json-logger-callback:
JsonLoggerCallback
~~~~~~~~~~~~~~~~~~
.. autoclass:: ray.sgd.callbacks.JsonLoggerCallback
.. _sgd-api-tbx-logger-callback:
TBXLoggerCallback
~~~~~~~~~~~~~~~~~
.. autoclass:: ray.sgd.callbacks.TBXLoggerCallback
Checkpointing
-------------
.. _sgd-api-checkpoint-strategy:
CheckpointStrategy
~~~~~~~~~~~~~~~~~~
.. autoclass:: ray.sgd.CheckpointStrategy
Training Function Utilities
---------------------------
sgd.report
~~~~~~~~~~
.. autofunction:: ray.sgd.report
sgd.load_checkpoint
~~~~~~~~~~~~~~~~~~~
.. autofunction:: ray.sgd.load_checkpoint
sgd.save_checkpoint
~~~~~~~~~~~~~~~~~~~
.. autofunction:: ray.sgd.save_checkpoint
sgd.world_rank
~~~~~~~~~~~~~~
.. autofunction:: ray.sgd.world_rank
sgd.local_rank
~~~~~~~~~~~~~~
.. autofunction:: ray.sgd.local_rank

View file

@ -1,6 +0,0 @@
:orphan:
horovod_example
===============
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/horovod/horovod_example.py

View file

@ -1,6 +0,0 @@
:orphan:
mlflow_fashion_mnist_example
============================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/mlflow_fashion_mnist_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tensorflow_linear_dataset_example
=================================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tensorflow_linear_dataset_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tensorflow_mnist_example
========================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tensorflow_mnist_example.py

View file

@ -1,6 +0,0 @@
:orphan:
train_fashion_mnist_example
===========================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/train_fashion_mnist_example.py

View file

@ -1,6 +0,0 @@
:orphan:
train_linear_dataset_example
============================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/train_linear_dataset_example.py

View file

@ -1,6 +0,0 @@
:orphan:
train_linear_example
====================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/train_linear_example.py

View file

@ -1,6 +0,0 @@
:orphan:
transformers_example
====================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/transformers/transformers_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tune_cifar_pytorch_pbt_example
==============================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tune_cifar_pytorch_pbt_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tune_linear_dataset_example
===========================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tune_linear_dataset_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tune_linear_example
===================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tune_linear_example.py

View file

@ -1,6 +0,0 @@
:orphan:
tune_tensorflow_mnist_example
=============================
.. literalinclude:: /../../python/ray/util/sgd/v2/examples/tune_tensorflow_mnist_example.py

111
doc/source/train/api.rst Normal file
View file

@ -0,0 +1,111 @@
.. _train-api:
Ray Train API
=============
.. _train-api-trainer:
Trainer
-------
.. autoclass:: ray.train.Trainer
:members:
.. _train-api-iterator:
TrainingIterator
~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.TrainingIterator
:members:
.. _train-api-backend-config:
Backend Configurations
----------------------
.. _train-api-torch-config:
TorchConfig
~~~~~~~~~~~
.. autoclass:: ray.train.TorchConfig
.. _train-api-tensorflow-config:
TensorflowConfig
~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.TensorflowConfig
.. _train-api-horovod-config:
HorovodConfig
~~~~~~~~~~~~~
.. autoclass:: ray.train.HorovodConfig
Callbacks
---------
.. _train-api-callback:
TrainingCallback
~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.TrainingCallback
:members:
.. _train-api-json-logger-callback:
JsonLoggerCallback
~~~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.callbacks.JsonLoggerCallback
.. _train-api-tbx-logger-callback:
TBXLoggerCallback
~~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.callbacks.TBXLoggerCallback
Checkpointing
-------------
.. _train-api-checkpoint-strategy:
CheckpointStrategy
~~~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.CheckpointStrategy
Training Function Utilities
---------------------------
train.report
~~~~~~~~~~~~
.. autofunction:: ray.train.report
train.load_checkpoint
~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: ray.train.load_checkpoint
train.save_checkpoint
~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: ray.train.save_checkpoint
train.world_rank
~~~~~~~~~~~~~~~~
.. autofunction:: ray.train.world_rank
train.local_rank
~~~~~~~~~~~~~~~~
.. autofunction:: ray.train.local_rank

View file

@ -1,11 +1,11 @@
.. _sgd-arch:
.. _train-arch:
RaySGD Architecture
===================
Ray Train Architecture
======================
A diagram of the RaySGD architecture is provided below.
A diagram of the Ray Train architecture is provided below.
.. image:: sgd-arch.svg
.. image:: train-arch.svg
:width: 70%
:align: center
@ -13,14 +13,13 @@ A diagram of the RaySGD architecture is provided below.
Trainer
-------
The Trainer is the main class that is exposed in the RaySGD API that users will interact with.
The Trainer is the main class that is exposed in the Ray Train API that users will interact with.
* The user will pass in a *function* which defines the training logic.
* The Trainer will create an :ref:`Executor <sgd-arch-executor>` to run the distributed training.
* The Trainer will create an :ref:`Executor <train-arch-executor>` to run the distributed training.
* The Trainer will handle callbacks based on the results from the BackendExecutor.
.. _sgd-arch-executor:
.. _train-arch-executor:
Executor
--------

View file

@ -1,12 +1,13 @@
.. _sgd-v2-examples:
RaySGD Examples
===============
Ray Train Examples
==================
.. Example .rst files should be organized in the same manner as the
.py files in ray/python/ray/util/sgd/v2/examples.
.py files in ray/python/ray/train/examples.
Below are examples for using RaySGD with a variety of models, frameworks, and use cases.
Below are examples for using Ray Train with a variety of models, frameworks,
and use cases.
General Examples
----------------
@ -14,66 +15,66 @@ General Examples
PyTorch
~~~~~~~
* :doc:`/raysgd/v2/examples/train_linear_example`:
* :doc:`/train/examples/train_linear_example`:
Simple example for PyTorch.
* :doc:`/raysgd/v2/examples/train_fashion_mnist_example`:
* :doc:`/train/examples/train_fashion_mnist_example`:
End-to-end example for PyTorch.
* :doc:`/raysgd/v2/examples/transformers/transformers_example`:
* :doc:`/train/examples/transformers/transformers_example`:
End-to-end example for HuggingFace Transformers (PyTorch).
TensorFlow
~~~~~~~~~~
* :doc:`/raysgd/v2/examples/tensorflow_mnist_example`:
* :doc:`/train/examples/tensorflow_mnist_example`:
End-to-end example for TensorFlow
Horovod
~~~~~~~~~~
~~~~~~~
* :doc:`/raysgd/v2/examples/horovod/horovod_example`:
* :doc:`/train/examples/horovod/horovod_example`:
End-to-end example for Horovod (with PyTorch)
..
TODO
* :doc:`/raysgd/v2/examples/TODO`:
* :doc:`/train/examples/TODO`:
Simple example for TensorFlow
* :doc:`/raysgd/v2/examples/TODO`:
* :doc:`/train/examples/TODO`:
Simple example for Horovod (with TensorFlow)
Iterator API Examples
---------------------
* :doc:`/raysgd/v2/examples/mlflow_fashion_mnist_example`:
* :doc:`/train/examples/mlflow_fashion_mnist_example`:
Example for using the Iterator API for custom MLFlow integration.
Ray Datasets Integration Examples
---------------------------------
* :doc:`/raysgd/v2/examples/tensorflow_linear_dataset_example`:
* :doc:`/train/examples/tensorflow_linear_dataset_example`:
Simple example for training a linear TensorFlow model.
* :doc:`/raysgd/v2/examples/train_linear_dataset_example`:
* :doc:`/train/examples/train_linear_dataset_example`:
Simple example for training a linear PyTorch model.
* :doc:`/raysgd/v2/examples/tune_linear_dataset_example`:
* :doc:`/train/examples/tune_linear_dataset_example`:
Simple example for tuning a linear PyTorch model.
Ray Tune Integration Examples
-----------------------------
* :doc:`/raysgd/v2/examples/tune_linear_example`:
* :doc:`/train/examples/tune_linear_example`:
Simple example for tuning a PyTorch model.
* :doc:`/raysgd/v2/examples/tune_tensorflow_mnist_example`:
* :doc:`/train/examples/tune_tensorflow_mnist_example`:
End-to-end example for tuning a TensorFlow model.
* :doc:`/raysgd/v2/examples/tune_cifar_pytorch_pbt_example`:
* :doc:`/train/examples/tune_cifar_pytorch_pbt_example`:
End-to-end example for tuning a PyTorch model with PBT.
..

View file

@ -0,0 +1,6 @@
:orphan:
horovod_example
===============
.. literalinclude:: /../../python/ray/train/examples/horovod/horovod_example.py

View file

@ -0,0 +1,6 @@
:orphan:
mlflow_fashion_mnist_example
============================
.. literalinclude:: /../../python/ray/train/examples/mlflow_fashion_mnist_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tensorflow_linear_dataset_example
=================================
.. literalinclude:: /../../python/ray/train/examples/tensorflow_linear_dataset_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tensorflow_mnist_example
========================
.. literalinclude:: /../../python/ray/train/examples/tensorflow_mnist_example.py

View file

@ -0,0 +1,6 @@
:orphan:
train_fashion_mnist_example
===========================
.. literalinclude:: /../../python/ray/train/examples/train_fashion_mnist_example.py

View file

@ -0,0 +1,6 @@
:orphan:
train_linear_dataset_example
============================
.. literalinclude:: /../../python/ray/train/examples/train_linear_dataset_example.py

View file

@ -0,0 +1,6 @@
:orphan:
train_linear_example
====================
.. literalinclude:: /../../python/ray/train/examples/train_linear_example.py

View file

@ -0,0 +1,6 @@
:orphan:
transformers_example
====================
.. literalinclude:: /../../python/ray/train/examples/transformers/transformers_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tune_cifar_pytorch_pbt_example
==============================
.. literalinclude:: /../../python/ray/train/examples/tune_cifar_pytorch_pbt_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tune_linear_dataset_example
===========================
.. literalinclude:: /../../python/ray/train/examples/tune_linear_dataset_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tune_linear_example
===================
.. literalinclude:: /../../python/ray/train/examples/tune_linear_example.py

View file

@ -0,0 +1,6 @@
:orphan:
tune_tensorflow_mnist_example
=============================
.. literalinclude:: /../../python/ray/train/examples/tune_tensorflow_mnist_example.py

View file

@ -1,23 +1,25 @@
.. _sgd-migration:
Migrating from Ray SGD v1
=========================
Migrating from Ray SGD to Ray Train
===================================
In Ray 1.7, we are rolling out a new and more streamlined version of Ray SGD. Ray SGD v2 focuses on usability and composability - it has a much simpler API, has support for more deep learning backends, integrates better with other libraries in the Ray ecosystem, and will continue to be actively developed with more features.
In Ray 1.7, we are rolling out a new and more streamlined version of Ray SGD.
As of Ray 1.8, the new version of Ray SGD has been rebranded as Ray Train.
Ray Train focuses on usability and composability - it has a much simpler API, has support for more deep learning backends, integrates better with other libraries in the Ray ecosystem, and will continue to be actively developed with more features.
This guide will help you easily migrate existing code from Ray SGD v1 to Ray SGD v2. If you are new to Ray SGD as a whole, you should get started with :ref:`Ray SGD v2 directly <sgd-v2-docs>`.
This guide will help you easily migrate existing code from Ray SGD v1 to Ray Train. If you are new to Ray SGD as a whole, you should get started with :ref:`Ray Train directly <train-docs>`.
For a full list of features that Ray SGD v2 provides, please check out the :ref:`user guide<sgd-user-guide>`.
For a full list of features that Ray Train provides, please check out the :ref:`user guide<train-user-guide>`.
.. note:: If there are any issues or anything missing with this guide or any feedback on Ray SGD v2 overall, please file a `Github issue on the Ray repo <https://github.com/ray-project/ray/issues>`_!
.. note:: If there are any issues or anything missing with this guide or any feedback on Ray Train overall, please file a `Github issue on the Ray repo <https://github.com/ray-project/ray/issues>`_!
What are the API differences?
-----------------------------
There are 3 primary API differences between Ray SGD v1 and v2.
There are 3 primary API differences between Ray SGD v1 and Ray Train.
1. There is a single ``Trainer`` interface for all backends (torch, tensorflow, horovod), and the backend is simply specified via an argument: ``Trainer(backend="torch")``\ , ``Trainer(backend="horovod")``\ , etc. Any features that we add to Ray SGD will be supported for all backends, and there won't be any API divergence like there was with a separate ``TorchTrainer`` and ``TFTrainer``.
2. The ``TrainingOperator`` and creator functions are replaced by a more natural user-defined training function. You no longer have to make your training logic fit into a restrictive interface. In Ray SGD v2, you simply have to provide a training function that describes the full logic for your training execution and this will be distributed by Ray SGD v2.
2. The ``TrainingOperator`` and creator functions are replaced by a more natural user-defined training function. You no longer have to make your training logic fit into a restrictive interface. In Ray SGD v2, you simply have to provide a training function that describes the full logic for your training execution and this will be distributed by Ray Train.
.. code-block:: python
@ -40,7 +42,7 @@ There are 3 primary API differences between Ray SGD v1 and v2.
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
from ray.sgd import Trainer
from ray.train import Trainer
trainer = Trainer(backend="torch", num_workers=4)
trainer.start()
@ -49,7 +51,7 @@ There are 3 primary API differences between Ray SGD v1 and v2.
Currently, this means that you are now responsible for modifying your code to support distributed training (specifying ``DistributedDataParallel`` for ``torch`` or ``MultiWorkerMirroredStrategy`` for ``tensorflow``) as opposed to having this be automatically handled internally. However, we have plans to provide utilities that you can use to automatically handle these recipes for you.
3. Rather than iteratively calling ``trainer.train()`` or ``trainer.validate()`` for each epoch, in Ray SGD v2 the training function defines the full training execution and is run via ``trainer.run(train_func)``.
3. Rather than iteratively calling ``trainer.train()`` or ``trainer.validate()`` for each epoch, in Ray Train the training function defines the full training execution and is run via ``trainer.run(train_func)``.
In the following sections, we will guide you through the steps to migrate:
@ -65,15 +67,15 @@ The main change you will have to make is how you define your training logic. In
PyTorch
~~~~~~~
In v1, the training logic is defined through the ``train_epoch`` and ``train_batch`` methods of a ``TrainingOperator`` class which is passed into the ``TorchTrainer``. To migrate to Ray SGD v2, there are 2 options:
In v1, the training logic is defined through the ``train_epoch`` and ``train_batch`` methods of a ``TrainingOperator`` class which is passed into the ``TorchTrainer``. To migrate to Ray Train, there are 2 options:
1. If you felt the ``TrainingOperator`` is too unnecessary and complex, or you had to customize it extensively, you can define your own training function.
2. If you liked having your training logic in the ``TrainingOperator``, you can continue to use the ``TrainingOperator`` with Ray SGD v2.
2. If you liked having your training logic in the ``TrainingOperator``, you can continue to use the ``TrainingOperator`` with Ray Train.
**Alternative 1: Custom Training Function**
You can define your own custom training function, and use only the parts from ``TrainingOperator.train_epoch``, ``TrainingOperator.setup``, and ``TrainingOperator.validate`` that are necessary for your application.
You can see a full example on how to :ref:`port over regular PyTorch DDP code to Ray SGD here <sgd-porting-code>`
You can see a full example on how to :ref:`port over regular PyTorch DDP code to Ray Train here <train-porting-code>`
**Alternative 2: Continue to use TrainingOperator**
Alternatively, if you liked having the ``TrainingOperator``, you can define a training function that instantiates your `TrainingOperator` and you can call methods directly on the operator object.
@ -102,14 +104,14 @@ you would do
.. code-block:: python
from ray.util.sgd import TrainingOperator
from ray.sgd import Trainer
from ray import sgd
from ray import train
from ray.train import Trainer
class MyTrainingOperator(TrainingOperator):
...
def train_func(config):
device = torch.device(f"cuda:{sgd.local_rank()}" if
device = torch.device(f"cuda:{train.local_rank()}" if
torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
torch.cuda.set_device(device)
@ -117,8 +119,8 @@ you would do
# Set the args to whatever values you want.
training_operator = MyTrainingOperator(
config=config,
world_rank=sgd.world_rank(),
local_rank=sgd.local_rank(),
world_rank=train.world_rank(),
local_rank=train.local_rank(),
is_distributed=True,
device=device,
use_gpu=True,
@ -136,7 +138,7 @@ you would do
validation_loader = training_operator._get_validation_loader()
training_operator.validate(iterator=iter(validation_loader))
if sgd.world_rank() == 0:
if train.world_rank() == 0:
return training_operator._get_original_models()
else:
return None
@ -149,14 +151,14 @@ you would do
Tensorflow
~~~~~~~~~~
The API for ``TFTrainer`` uses creator functions instead of a ``TrainingOperator`` to define the training logic. To port over Ray SGD v1 Tensorflow code to v2 you can do the following:
The API for ``TFTrainer`` uses creator functions instead of a ``TrainingOperator`` to define the training logic. To port over Ray SGD v1 Tensorflow code to Ray Train you can do the following:
.. code-block:: python
from tensorflow.distribute import MultiWorkerMirroredStrategy
from ray.sgd import Trainer
from ray import sgd
from ray import train
from ray.train import Trainer
def train_func(config):
train_dataset, val_dataset = data_creator(config)
@ -167,7 +169,7 @@ The API for ``TFTrainer`` uses creator functions instead of a ``TrainingOperator
for epoch_idx in range(config["num_epochs"]):
model.fit(train_dataset)
if sgd.world_rank() == 0:
if train.world_rank() == 0:
return model
else:
return None
@ -176,21 +178,21 @@ The API for ``TFTrainer`` uses creator functions instead of a ``TrainingOperator
trainer.start()
model = trainer.run(train_func)[0]
You can see a full example :ref:`here <sgd-porting-code>`.
You can see a full example :ref:`here <train-porting-code>`.
.. _sgd-migration-trainer:
Interacting with the ``Trainer``
--------------------------------
In Ray SGD v1, you can iteratively call ``trainer.train()`` or ``trainer.validate()`` for each epoch, and can then interact with the trainer to get certain state (model, checkpoints, results, etc.). In Ray SGD v2, this is replaced by a single training function that defines the full training & validation loop for all epochs.
In Ray SGD v1, you can iteratively call ``trainer.train()`` or ``trainer.validate()`` for each epoch, and can then interact with the trainer to get certain state (model, checkpoints, results, etc.). In Ray Train, this is replaced by a single training function that defines the full training & validation loop for all epochs.
There are 3 ways to get state during or after the training execution:
#. Return values from your training function
#. Intermediate results via ``sgd.report()``
#. Saving & loading checkpoints via ``sgd.save_checkpoint()`` and ``sgd.load_checkpoint()``
#. Intermediate results via ``train.report()``
#. Saving & loading checkpoints via ``train.save_checkpoint()`` and ``train.load_checkpoint()``
Return Values
~~~~~~~~~~~~~
@ -199,7 +201,7 @@ To get any state from training *after* training has completed, you can simply re
For example, to get the final model:
**SGD v1**
**Ray SGD v1**
.. code-block:: python
@ -214,11 +216,11 @@ For example, to get the final model:
trained_model = trainer.get_model()
**SGD v2**
**Ray Train**
.. code-block:: python
from ray.sgd import Trainer
from ray.train import Trainer
def train_func():
model = Net()
@ -237,9 +239,9 @@ For example, to get the final model:
Intermediate Reporting
~~~~~~~~~~~~~~~~~~~~~~
If you want to access any values *during* the training process, you can do so via ``sgd.report()``. You can pass in any values to ``sgd.report()`` and these values from all workers will be sent to any callbacks passed into your ``Trainer``.
If you want to access any values *during* the training process, you can do so via ``train.report()``. You can pass in any values to ``train.report()`` and these values from all workers will be sent to any callbacks passed into your ``Trainer``.
**SGD v1**
**Ray SGD v1**
.. code-block:: python
@ -254,22 +256,21 @@ If you want to access any values *during* the training process, you can do so vi
print(trainer.train(reduce_results=False))
**SGD v2**
**Ray Train**
.. code-block:: python
from ray import sgd
from ray.sgd Trainer
from ray.sgd.callbacks import SGDCallback
from ray import train
from ray.train import Trainer, TrainingCallback
from typing import List, Dict
class PrintingCallback(SGDCallback):
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
def train_func():
for i in range(3):
sgd.report(epoch=i)
train.report(epoch=i)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
@ -282,18 +283,18 @@ If you want to access any values *during* the training process, you can do so vi
# [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}]
trainer.shutdown()
See the :ref:`v2 User Guide <sgd-user-guide>` for more details.
See the :ref:`Ray Train User Guide <train-user-guide>` for more details.
Checkpointing
~~~~~~~~~~~~~
Finally, you can also use ``sgd.save_checkpoint()`` and ``sgd.load_checkpoint()`` to write checkpoints to disk during the training process, and to load from the most recently saved checkpoint in the case of node failures.
Finally, you can also use ``train.save_checkpoint()`` and ``train.load_checkpoint()`` to write checkpoints to disk during the training process, and to load from the most recently saved checkpoint in the case of node failures.
See the :ref:`Checkpointing <sgd-checkpointing>` and :ref:`Fault Tolerance & Elastic Training <sgd-fault-tolerance>` sections on the user guide for more info.
See the :ref:`Checkpointing <train-checkpointing>` and :ref:`Fault Tolerance & Elastic Training <train-fault-tolerance>` sections on the user guide for more info.
For example, in order to save checkpoints after every epoch:
**SGD v1**
**Ray SGD v1**
.. code-block:: python
@ -309,12 +310,12 @@ For example, in order to save checkpoints after every epoch:
trainer.save_checkpoint(checkpoint_dir="~/ray_results")
**SGD v2**
**Ray Train**
.. code-block:: python
from ray.sgd import Trainer
from ray import sgd
from ray import train
from ray.train import Trainer
def train_func():
model = Net()
@ -322,7 +323,7 @@ For example, in order to save checkpoints after every epoch:
for i in range(3):
for batch in train_loader:
model.train(batch)
sgd.save_checkpoint(epoch=i, model=model.state_dict()))
train.save_checkpoint(epoch=i, model=model.state_dict()))
trainer = Trainer(backend="torch")
trainer.start()
@ -334,11 +335,11 @@ For example, in order to save checkpoints after every epoch:
Hyperparameter Tuning with Ray Tune
-----------------------------------
Ray SGD v2 also comes with an easier to use interface for Hyperparameter Tuning with Ray Tune using Tune's function API instead of its Class API. In particular, it is much easier to define custom procedures because the logic is entirely defined by your training function.
Ray Train also comes with an easier to use interface for Hyperparameter Tuning with Ray Tune using Tune's function API instead of its Class API. In particular, it is much easier to define custom procedures because the logic is entirely defined by your training function.
There is a 1:1 mapping between rank 0 worker's ``sgd.report()``\ , ``sgd.save_checkpoint()``\ , and ``sgd.load_checkpoint()`` with ``tune.report()``\ , ``tune.save_checkpoint()``\ , and ``tune.load_checkpoint()``.
There is a 1:1 mapping between rank 0 worker's ``train.report()``\ , ``train.save_checkpoint()``\ , and ``train.load_checkpoint()`` with ``tune.report()``\ , ``tune.save_checkpoint()``\ , and ``tune.load_checkpoint()``.
**SGD v1**
**Ray SGD v1**
.. code-block:: python
@ -367,19 +368,18 @@ There is a 1:1 mapping between rank 0 worker's ``sgd.report()``\ , ``sgd.save_ch
**SGD v2**
**Ray Train**
.. code-block:: python
from ray import tune
from ray import sgd
from ray.sgd import Trainer
from ray import train, tune
from ray.train import Trainer
def train_func(config)
# In this example, nothing is expected to change over epochs,
# and the output metric is equivalent to the input value.
for _ in range(config["num_epochs"]):
sgd.report(output=config["input"])
train.report(output=config["input"])
trainer = Trainer(backend="torch", num_workers=2)
trainable = trainer.to_tune_trainable(train_func)
@ -390,4 +390,4 @@ There is a 1:1 mapping between rank 0 worker's ``sgd.report()``\ , ``sgd.save_ch
print(analysis.get_best_config(metric="output", mode="max"))
# {'num_epochs': 2, 'input': 3}
For more information see :ref:`sgd-tune`
For more information see :ref:`train-tune`

View file

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 32 KiB

View file

@ -1,20 +1,20 @@
.. _sgd-v2-docs:
.. _train-docs:
RaySGD: Deep Learning on Ray
=============================
Ray Train: Distributed Deep Learning
====================================
.. _`issue on GitHub`: https://github.com/ray-project/ray/issues
.. tip:: Get in touch with us if you're using or considering using `RaySGD <https://forms.gle/PXFcJmHwszCwQhqX7>`_!
.. tip:: Get in touch with us if you're using or considering using `Ray Train <https://forms.gle/PXFcJmHwszCwQhqX7>`_!
RaySGD is a lightweight library for distributed deep learning, allowing you
Ray Train is a lightweight library for distributed deep learning, allowing you
to scale up and speed up training for your deep learning models.
The main features are:
- **Ease of use**: Scale your single process training code to a cluster in just a couple lines of code.
- **Composability**: RaySGD interoperates with :ref:`Ray Tune <tune-main>` to tune your distributed model and :ref:`Ray Datasets <datasets>` to train on large amounts of data.
- **Interactivity**: RaySGD fits in your workflow with support to run from any environment, including seamless Jupyter notebook support.
- **Composability**: Ray Train interoperates with :ref:`Ray Tune <tune-main>` to tune your distributed model and :ref:`Ray Datasets <datasets>` to train on large amounts of data.
- **Interactivity**: Ray Train fits in your workflow with support to run from any environment, including seamless Jupyter notebook support.
.. note::
@ -23,18 +23,18 @@ The main features are:
`issue on GitHub`_.
If you are looking for the previous API documentation, see :ref:`sgd-index`.
Intro to RaySGD
---------------
Intro to Ray Train
------------------
RaySGD is a library that aims to simplify distributed deep learning.
Ray Train is a library that aims to simplify distributed deep learning.
**Frameworks**: RaySGD is built to abstract away the coordination/configuration setup of distributed deep learning frameworks such as Pytorch Distributed and Tensorflow Distributed, allowing users to only focus on implementing training logic.
**Frameworks**: Ray Train is built to abstract away the coordination/configuration setup of distributed deep learning frameworks such as Pytorch Distributed and Tensorflow Distributed, allowing users to only focus on implementing training logic.
* For Pytorch, RaySGD automatically handles the construction of the distributed process group.
* For Tensorflow, RaySGD automatically handles the coordination of the ``TF_CONFIG``. The current implementation assumes that the user will use a MultiWorkerMirroredStrategy, but this will change in the near future.
* For Horovod, RaySGD automatically handles the construction of the Horovod runtime and Rendezvous server.
* For Pytorch, Ray Train automatically handles the construction of the distributed process group.
* For Tensorflow, Ray Train automatically handles the coordination of the ``TF_CONFIG``. The current implementation assumes that the user will use a MultiWorkerMirroredStrategy, but this will change in the near future.
* For Horovod, Ray Train automatically handles the construction of the Horovod runtime and Rendezvous server.
**Built for data scientists/ML practitioners**: RaySGD has support for standard ML tools and features that practitioners love:
**Built for data scientists/ML practitioners**: Ray Train has support for standard ML tools and features that practitioners love:
* Callbacks for early stopping
* Checkpointing
@ -44,22 +44,22 @@ RaySGD is a library that aims to simplify distributed deep learning.
**Integration with Ray Ecosystem**: Distributed deep learning often comes with a lot of complexity.
* Use :ref:`Ray Datasets <datasets>` with RaySGD to handle and train on large amounts of data.
* Use :ref:`Ray Tune <tune-main>` with RaySGD to leverage cutting edge hyperparameter techniques and distribute both your training and tuning.
* Use :ref:`Ray Datasets <datasets>` with Ray Train to handle and train on large amounts of data.
* Use :ref:`Ray Tune <tune-main>` with Ray Train to leverage cutting edge hyperparameter techniques and distribute both your training and tuning.
* You can leverage the :ref:`Ray cluster launcher <cluster-cloud>` to launch autoscaling or spot instance clusters to train your model at scale on any cloud.
Quick Start
-----------
RaySGD abstracts away the complexity of setting up a distributed training
Ray Train abstracts away the complexity of setting up a distributed training
system. Let's take following simple examples:
.. tabs::
.. group-tab:: PyTorch
This example shows how you can use RaySGD with PyTorch.
This example shows how you can use Ray Train with PyTorch.
First, set up your dataset and model.
@ -119,7 +119,7 @@ system. Let's take following simple examples:
Now let's convert this to a distributed multi-worker training function!
First, update the training function code to use PyTorch's
``DistributedDataParallel``. With RaySGD, you just pass in your distributed
``DistributedDataParallel``. With Ray Train, you just pass in your distributed
data parallel code as as you would normally run it with
``torch.distributed.launch``.
@ -147,7 +147,7 @@ system. Let's take following simple examples:
.. code-block:: python
from ray.sgd import Trainer
from ray.train import Trainer
trainer = Trainer(backend="torch", num_workers=4)
trainer.start()
@ -155,12 +155,12 @@ system. Let's take following simple examples:
trainer.shutdown()
See :ref:`sgd-porting-code` for a more comprehensive example.
See :ref:`train-porting-code` for a more comprehensive example.
.. group-tab:: TensorFlow
This example shows how you can use RaySGD to set up `Multi-worker training
This example shows how you can use Ray Train to set up `Multi-worker training
with Keras <https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras>`_.
First, set up your dataset and model.
@ -227,7 +227,7 @@ system. Let's take following simple examples:
def train_func_distributed():
per_worker_batch_size = 64
# This environment variable will be set by Ray SGD.
# This environment variable will be set by Ray Train.
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
@ -247,7 +247,7 @@ system. Let's take following simple examples:
.. code-block:: python
from ray.sgd import Trainer
from ray.train import Trainer
trainer = Trainer(backend="tensorflow", num_workers=4)
trainer.start()
@ -255,7 +255,7 @@ system. Let's take following simple examples:
trainer.shutdown()
See :ref:`sgd-porting-code` for a more comprehensive example.
See :ref:`train-porting-code` for a more comprehensive example.
**Next steps:** Check out the :ref:`User Guide <sgd-user-guide>`!
**Next steps:** Check out the :ref:`User Guide <train-user-guide>`!

View file

@ -1,49 +1,58 @@
.. _sgd-user-guide:
.. _train-user-guide:
RaySGD User Guide
=================
Ray Train User Guide
====================
.. tip:: Get in touch with us if you're using or considering using `RaySGD <https://forms.gle/PXFcJmHwszCwQhqX7>`_!
.. tip:: Get in touch with us if you're using or considering using `Ray Train <https://forms.gle/PXFcJmHwszCwQhqX7>`_!
Ray Train provides solutions for training machine learning models in a distributed manner on Ray.
As of Ray 1.8, support for Deep Learning is available in ``ray.train`` (formerly :ref:`Ray SGD <sgd-index>`).
For other model types, distributed training support is available through other libraries:
* **Reinforcement Learning:** :ref:`rllib-index`
* **XGBoost:** :ref:`xgboost-ray`
* **LightGBM:** :ref:`lightgbm-ray`
* **Pytorch Lightning:** :ref:`ray-lightning`
In this guide, we cover examples for the following use cases:
* How do I :ref:`port my code <sgd-porting-code>` to using RaySGD?
* How do I :ref:`monitor <sgd-monitoring>` my training?
* How do I :ref:`port my code <train-porting-code>` to using Ray Train?
* How do I :ref:`monitor <train-monitoring>` my training?
* How do I run my training on pre-emptible instances
(:ref:`fault tolerance <sgd-fault-tolerance>`)?
* How do I use RaySGD to :ref:`train with a large dataset <sgd-datasets>`?
* How do I :ref:`tune <sgd-tune>` my RaySGD model?
(:ref:`fault tolerance <train-fault-tolerance>`)?
* How do I use Ray Train to :ref:`train with a large dataset <train-datasets>`?
* How do I :ref:`tune <train-tune>` my Ray Train model?
.. _sgd-backends:
.. _train-backends:
Backends
--------
RaySGD provides a thin API around different backend frameworks for
distributed deep learning. At the moment, RaySGD allows you to perform
Ray Train provides a thin API around different backend frameworks for
distributed deep learning. At the moment, Ray Train allows you to perform
training with:
* **PyTorch:** RaySGD initializes your distributed process group, allowing
* **PyTorch:** Ray Train initializes your distributed process group, allowing
you to run your ``DistributedDataParallel`` training script. See `PyTorch
Distributed Overview <https://pytorch.org/tutorials/beginner/dist_overview.html>`_
for more information.
* **TensorFlow:** RaySGD configures ``TF_CONFIG`` for you, allowing you to run
* **TensorFlow:** Ray Train configures ``TF_CONFIG`` for you, allowing you to run
your ``MultiWorkerMirroredStrategy`` training script. See `Distributed
training with TensorFlow <https://www.tensorflow.org/guide/distributed_training>`_
for more information.
* **Horovod:** RaySGD configures the Horovod environment and Rendezvous
* **Horovod:** Ray Train configures the Horovod environment and Rendezvous
server for you, allowing you to run your ``DistributedOptimizer`` training
script. See `Horovod documentation <https://horovod.readthedocs.io/en/stable/index.html>`_
for more information.
.. _sgd-porting-code:
.. _train-porting-code:
Porting code to RaySGD
----------------------
Porting code to Ray Train
-------------------------
The following instructions assume you have a training function
that can already be run on a single worker for one of the supported
:ref:`backend <sgd-backends>` frameworks.
:ref:`backend <train-backends>` frameworks.
Update training function
~~~~~~~~~~~~~~~~~~~~~~~~
@ -55,7 +64,7 @@ training.
.. group-tab:: PyTorch
RaySGD will set up your distributed process group for you. You simply
Ray Train will set up your distributed process group for you. You simply
need to add in the proper PyTorch hooks in your training function to
utilize it.
@ -98,7 +107,7 @@ training.
.. code-block:: python
def train_func():
device = torch.device(f"cuda:{sgd.local_rank()}" if
device = torch.device(f"cuda:{train.local_rank()}" if
torch.cuda.is_available() else "cpu")
torch.cuda.set_device(device)
@ -107,7 +116,7 @@ training.
model = model.to(device)
model = DistributedDataParallel(
model,
device_ids=[sgd.local_rank()] if torch.cuda.is_available() else None)
device_ids=[train.local_rank()] if torch.cuda.is_available() else None)
.. group-tab:: TensorFlow
@ -115,12 +124,12 @@ training.
.. note::
The current TensorFlow implementation supports
``MultiWorkerMirroredStrategy`` (and ``MirroredStrategy``). If there are
other strategies you wish to see supported by RaySGD, please let us know
other strategies you wish to see supported by Ray Train, please let us know
by submitting a `feature request on GitHub`_.
These instructions closely follow TensorFlow's `Multi-worker training
with Keras <https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras>`_
tutorial. One key difference is that RaySGD will handle the environment
tutorial. One key difference is that Ray Train will handle the environment
variable set up for you.
**Step 1:** Wrap your model in ``MultiWorkerMirroredStrategy``.
@ -156,10 +165,10 @@ training.
To onboard onto Horovod, please visit the `Horovod guide
<https://horovod.readthedocs.io/en/stable/index.html#get-started>`_.
Create RaySGD Trainer
~~~~~~~~~~~~~~~~~~~~~
Create Ray Train Trainer
~~~~~~~~~~~~~~~~~~~~~~~~
The ``Trainer`` is the primary RaySGD class that is used to manage state and
The ``Trainer`` is the primary Ray Train class that is used to manage state and
execute training. You can create a simple ``Trainer`` for the backend of choice
with one of the following:
@ -171,14 +180,14 @@ with one of the following:
horovod_trainer = Trainer(backend="horovod", num_workers=2)
For more configurability, please reference the :ref:`sgd-api-trainer` API.
For more configurability, please reference the :ref:`train-api-trainer` API.
To customize the ``backend`` setup, you can replace the string argument with a
:ref:`sgd-api-backend-config` object.
:ref:`train-api-backend-config` object.
Run training function
~~~~~~~~~~~~~~~~~~~~~
With a distributed training function and a RaySGD ``Trainer``, you are now
With a distributed training function and a Ray Train ``Trainer``, you are now
ready to start training!
.. code-block:: python
@ -187,14 +196,14 @@ ready to start training!
trainer.run(train_func)
trainer.shutdown() # clean up resources
.. To make existing code from the previous SGD API, see :ref:`Backwards Compatibility <sgd-backwards-compatibility>`.
.. To make existing code from the previous SGD API, see :ref:`Backwards Compatibility <train-backwards-compatibility>`.
.. _`feature request on GitHub`: https://github.com/ray-project/ray/issues
Configuring Training
--------------------
With RaySGD, you can execute a training function (``train_func``) in a
With Ray Train, you can execute a training function (``train_func``) in a
distributed manner by calling ``trainer.run(train_func)``. To pass arguments
into the training function, you can expose a single ``config`` parameter:
@ -216,7 +225,7 @@ configurations. As an example:
.. code-block:: python
from ray.sgd import Trainer
from ray.train import Trainer
def train_func(config):
results = []
@ -233,13 +242,13 @@ configurations. As an example:
trainer.shutdown()
A primary use-case for ``config`` is to try different hyperparameters. To
perform hyperparameter tuning with RaySGD, please refer to the
:ref:`Ray Tune integration <sgd-tune>`.
perform hyperparameter tuning with Ray Train, please refer to the
:ref:`Ray Tune integration <train-tune>`.
.. TODO add support for with_parameters
.. _sgd-log-dir:
.. _train-log-dir:
Log Directory Structure
-----------------------
@ -248,28 +257,28 @@ Each ``Trainer`` will have a local directory created for logs, and each call
to ``Trainer.run`` will create its own sub-directory of logs.
By default, the ``logdir`` will be created at
``~/ray_results/sgd_<datestring>``.
``~/ray_results/train_<datestring>``.
This can be overridden in the ``Trainer`` constructor to an absolute path or
a path relative to ``~/ray_results``.
Log directories are exposed through the following attributes:
+------------------------+---------------------------------------------------+
| Attribute | Example |
+========================+===================================================+
| trainer.logdir | /home/ray_results/sgd_2021-09-01_12-00-00 |
+------------------------+---------------------------------------------------+
| trainer.latest_run_dir | /home/ray_results/sgd_2021-09-01_12-00-00/run_001 |
+------------------------+---------------------------------------------------+
+------------------------+-----------------------------------------------------+
| Attribute | Example |
+========================+=====================================================+
| trainer.logdir | /home/ray_results/train_2021-09-01_12-00-00 |
+------------------------+-----------------------------------------------------+
| trainer.latest_run_dir | /home/ray_results/train_2021-09-01_12-00-00/run_001 |
+------------------------+-----------------------------------------------------+
Logs will be written by:
1. :ref:`Logging Callbacks <sgd-logging-callbacks>`
2. :ref:`Checkpoints <sgd-checkpointing>`
1. :ref:`Logging Callbacks <train-logging-callbacks>`
2. :ref:`Checkpoints <train-checkpointing>`
.. TODO link to Training Run Iterator API as a 3rd option for logging.
.. _sgd-monitoring:
.. _train-monitoring:
Logging, Monitoring, and Callbacks
----------------------------------
@ -277,14 +286,14 @@ Logging, Monitoring, and Callbacks
Reporting intermediate results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RaySGD provides an ``sgd.report(**kwargs)`` API for reporting intermediate
Ray Train provides a ``train.report(**kwargs)`` API for reporting intermediate
results from the training function up to the ``Trainer``.
Using ``Trainer.run``, these results can be processed through :ref:`Callbacks
<sgd-callbacks>` with a ``handle_result`` method defined.
<train-callbacks>` with a ``handle_result`` method defined.
For custom handling, the lower-level ``Trainer.run_iterator`` API produces an
:ref:`sgd-api-iterator` which will iterate over the reported results.
:ref:`train-api-iterator` which will iterate over the reported results.
The primary use-case for reporting is for metrics (accuracy, loss, etc.).
@ -294,7 +303,7 @@ The primary use-case for reporting is for metrics (accuracy, loss, etc.).
...
for i in range(num_epochs):
results = model.train(...)
sgd.report(results)
train.report(results)
return model
Autofilled metrics
@ -308,12 +317,12 @@ In addition to user defined metrics, a few fields are automatically populated:
_timestamp
# Time in seconds between iterations.
_time_this_iter_s
# The iteration ID, where each iteration is defined by one call to sgd.report().
# The iteration ID, where each iteration is defined by one call to train.report().
# This is a 1-indexed incrementing integer ID.
_training_iteration
For debugging purposes, a more extensive set of metrics can be included in
any run by setting the ``SGD_RESULT_ENABLE_DETAILED_AUTOFILLED_METRICS`` environment
any run by setting the ``TRAIN_RESULT_ENABLE_DETAILED_AUTOFILLED_METRICS`` environment
variable to ``1``.
@ -331,30 +340,29 @@ variable to ``1``.
_time_total_s
.. _sgd-callbacks:
.. _train-callbacks:
Callbacks
~~~~~~~~~
You may want to plug in your training code with your favorite experiment management framework.
RaySGD provides an interface to fetch intermediate results and callbacks to process/log your intermediate results.
Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results.
You can plug all of these into RaySGD with the following interface:
You can plug all of these into Ray Train with the following interface:
.. code-block:: python
from ray import sgd
from ray.sgd Trainer
from ray.sgd.callbacks import SGDCallback
from ray import train
from ray.train import Trainer, TrainingCallback
from typing import List, Dict
class PrintingCallback(SGDCallback):
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
def train_func():
for i in range(3):
sgd.report(epoch=i)
train.report(epoch=i)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
@ -367,7 +375,7 @@ You can plug all of these into RaySGD with the following interface:
# [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}]
trainer.shutdown()
.. Here is a list of callbacks that are supported by RaySGD:
.. Here is a list of callbacks that are supported by Ray Train:
.. * JsonLoggerCallback
.. * TBXLoggerCallback
@ -375,22 +383,22 @@ You can plug all of these into RaySGD with the following interface:
.. * MlflowCallback
.. * CSVCallback
.. _sgd-logging-callbacks:
.. _train-logging-callbacks:
Logging Callbacks
+++++++++++++++++
The following ``SGDCallback``\s are available and will write to a file within the
:ref:`log directory <sgd-log-dir>` of each training run.
The following ``TrainingCallback``\s are available and will write to a file within the
:ref:`log directory <train-log-dir>` of each training run.
1. :ref:`sgd-api-json-logger-callback`
2. :ref:`sgd-api-tbx-logger-callback`
1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
Custom Callbacks
++++++++++++++++
If the provided callbacks do not cover your desired integrations or use-cases,
you may always implement a custom callback by subclassing ``SGDCallback``. If
you may always implement a custom callback by subclassing ``TrainingCallback``. If
the callback is general enough, please feel welcome to `add it <https://docs
.ray.io/en/master/getting-involved.html>`_ to the ``ray``
`repository <https://github.com/ray-project/ray>`_.
@ -399,9 +407,9 @@ A simple example for creating a callback that will print out results:
.. code-block:: python
from ray.sgd.callbacks import SGDCallback
from ray.train import TrainingCallback
class PrintingCallback(SGDCallback):
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
@ -420,20 +428,20 @@ Example: PyTorch Distributed metrics
In real applications, you may want to calculate optimization metrics besides
accuracy and loss: recall, precision, Fbeta, etc.
RaySGD natively supports `TorchMetrics <https://torchmetrics.readthedocs.io/en/latest/>`_, which provides a collection of machine learning metrics for distributed, scalable Pytorch models.
Ray Train natively supports `TorchMetrics <https://torchmetrics.readthedocs.io/en/latest/>`_, which provides a collection of machine learning metrics for distributed, scalable Pytorch models.
Here is an example:
.. code-block:: python
from ray import sgd
from ray.sgd import SGDCallback, Trainer
from ray import train
from train.train import Trainer, TrainingCallback
from typing import List, Dict
import torch
import torchmetrics
class PrintingCallback(SGDCallback):
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
@ -441,7 +449,7 @@ Here is an example:
preds = torch.randn(10, 5).softmax(dim=-1)
target = torch.randint(5, (10,))
accuracy = torchmetrics.functional.accuracy(preds, target).item()
sgd.report(accuracy=accuracy)
train.report(accuracy=accuracy)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
@ -453,15 +461,15 @@ Here is an example:
# {'accuracy': 0.10000000149011612, '_timestamp': 1630716913, '_time_this_iter_s': 0.0030548572540283203, '_training_iteration': 1}]
trainer.shutdown()
.. _sgd-checkpointing:
.. _train-checkpointing:
Checkpointing
-------------
RaySGD provides a way to save state during the training process. This is
Ray Train provides a way to save state during the training process. This is
useful for:
1. :ref:`Integration with Ray Tune <sgd-tune>` to use certain Ray Tune
1. :ref:`Integration with Ray Tune <train-tune>` to use certain Ray Tune
schedulers.
2. Running a long-running training job on a cluster of pre-emptible machines/pods.
3. Persisting trained model state to later use for serving/inference.
@ -470,7 +478,7 @@ useful for:
Saving checkpoints
~~~~~~~~~~~~~~~~~~
Checkpoints can be saved by calling ``sgd.save_checkpoint(**kwargs)`` in the
Checkpoints can be saved by calling ``train.save_checkpoint(**kwargs)`` in the
training function.
.. note:: This must be called by all workers, but only data from the rank 0
@ -481,14 +489,14 @@ The latest saved checkpoint can be accessed through the ``Trainer``'s
.. code-block:: python
from ray import sgd
from ray.sgd import Trainer
from ray import train
from ray.train import Trainer
def train_func(config):
model = 0 # This should be replaced with a real model.
for epoch in range(config["num_epochs"]):
model += epoch
sgd.save_checkpoint(epoch=epoch, model=model)
train.save_checkpoint(epoch=epoch, model=model)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
@ -499,14 +507,14 @@ The latest saved checkpoint can be accessed through the ``Trainer``'s
# {'epoch': 4, 'model': 10}
By default, checkpoints will be persisted to local disk in the :ref:`log
directory <sgd-log-dir>` of each run.
directory <train-log-dir>` of each run.
.. code-block:: python
print(trainer.latest_checkpoint_dir)
# /home/ray_results/sgd_2021-09-01_12-00-00/run_001/checkpoints
# /home/ray_results/train_2021-09-01_12-00-00/run_001/checkpoints
print(trainer.latest_checkpoint_path)
# /home/ray_results/sgd_2021-09-01_12-00-00/run_001/checkpoints/checkpoint_000005
# /home/ray_results/train_2021-09-01_12-00-00/run_001/checkpoints/checkpoint_000005
.. note:: Persisting checkpoints to durable storage (e.g. S3) is not yet supported.
@ -515,7 +523,7 @@ Configuring checkpoints
+++++++++++++++++++++++
For more configurability of checkpointing behavior (specifically saving
checkpoints to disk), a :ref:`sgd-api-checkpoint-strategy` can be passed into
checkpoints to disk), a :ref:`train-api-checkpoint-strategy` can be passed into
``Trainer.run``.
As an example, to disable writing checkpoints to disk:
@ -523,12 +531,12 @@ As an example, to disable writing checkpoints to disk:
.. code-block:: python
:emphasize-lines: 8,12
from ray import sgd
from ray.sgd import CheckpointStrategy, Trainer
from ray import train
from ray.train import CheckpointStrategy, Trainer
def train_func():
for epoch in range(3):
sgd.save_checkpoint(epoch=epoch)
train.save_checkpoint(epoch=epoch)
checkpoint_strategy = CheckpointStrategy(num_to_keep=0)
@ -546,7 +554,7 @@ Loading checkpoints
Checkpoints can be loaded into the training function in 2 steps:
1. From the training function, ``sgd.load_checkpoint()`` can be used to access
1. From the training function, ``train.load_checkpoint()`` can be used to access
the most recently saved checkpoint. This is useful to continue training even
if there's a worker failure.
2. The checkpoint to start training with can be bootstrapped by passing in a
@ -554,17 +562,17 @@ Checkpoints can be loaded into the training function in 2 steps:
.. code-block:: python
from ray import sgd
from ray.sgd import Trainer
from ray import train
from ray.train import Trainer
def train_func(config):
checkpoint = sgd.load_checkpoint() or {}
checkpoint = train.load_checkpoint() or {}
# This should be replaced with a real model.
model = checkpoint.get("model", 0)
start_epoch = checkpoint.get("epoch", -1) + 1
for epoch in range(start_epoch, config["num_epochs"]):
model += epoch
sgd.save_checkpoint(epoch=epoch, model=model)
train.save_checkpoint(epoch=epoch, model=model)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
@ -578,7 +586,7 @@ Checkpoints can be loaded into the training function in 2 steps:
.. Running on the cloud
.. --------------------
.. Use RaySGD with the Ray cluster launcher by changing the following:
.. Use Ray Train with the Ray cluster launcher by changing the following:
.. .. code-block:: bash
@ -586,12 +594,12 @@ Checkpoints can be loaded into the training function in 2 steps:
.. TODO.
.. _sgd-fault-tolerance:
.. _train-fault-tolerance:
Fault Tolerance & Elastic Training
----------------------------------
RaySGD has built-in fault tolerance to recover from worker failures (i.e.
Ray Train has built-in fault tolerance to recover from worker failures (i.e.
``RayActorError``\s). When a failure is detected, the workers will be shut
down and new workers will be added in. The training function will be
restarted, but progress from the previous execution can be resumed through
@ -599,7 +607,7 @@ checkpointing.
.. warning:: In order to retain progress when recovery, your training function
**must** implement logic for both saving *and* loading :ref:`checkpoints
<sgd-checkpointing>`.
<train-checkpointing>`.
Each instance of recovery from a worker failure is considered a retry. The
number of retries is configurable through the ``max_retries`` argument of the
@ -614,24 +622,24 @@ number of retries is configurable through the ``max_retries`` argument of the
.. TODO.
.. _sgd-datasets:
.. _train-datasets:
Distributed Data Ingest (Ray Datasets)
--------------------------------------
Ray SGD provides native support for :ref:`Ray Datasets <datasets>` to support the following use cases:
Ray Train provides native support for :ref:`Ray Datasets <datasets>` to support the following use cases:
1. **Large Datasets**: With Ray Datasets, you can easily work with datasets that are too big to fit on a single node.
Ray Datasets will distribute the dataset across the Ray Cluster and allow you to perform dataset operations (map, filter, etc.)
on the distributed dataset.
2. **Automatic locality-aware sharding**: If provided a Ray Dataset, Ray SGD will automatically shard the dataset and assign each shard
2. **Automatic locality-aware sharding**: If provided a Ray Dataset, Ray Train will automatically shard the dataset and assign each shard
to a training worker while minimizing cross-node data transfer. Unlike with standard Torch or Tensorflow datasets, each training
worker will only load its assigned shard into memory rather than the entire ``Dataset``.
3. **Pipelined Execution**: Ray Datasets also supports pipelining, meaning that data processing operations
can be run concurrently with training. Training is no longer blocked on expensive data processing operations (such as global shuffling)
and this minimizes the amount of time your GPUs are idle. See :ref:`dataset-pipeline` for more information.
To get started, pass in a Ray Dataset (or multiple) into ``Trainer.run``. Underneath the hood, Ray SGD will automatically shard the given dataset.
To get started, pass in a Ray Dataset (or multiple) into ``Trainer.run``. Underneath the hood, Ray Train will automatically shard the given dataset.
.. warning::
@ -643,7 +651,7 @@ To get started, pass in a Ray Dataset (or multiple) into ``Trainer.run``. Undern
def train_func():
...
tf_dataset = sgd.get_dataset_shard().to_tf()
tf_dataset = ray.train.get_dataset_shard().to_tf()
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = \
tf.data.experimental.AutoShardPolicy.OFF
@ -660,11 +668,11 @@ To get started, pass in a Ray Dataset (or multiple) into ``Trainer.run``. Undern
batch_size = config["worker_batch_size"]
train_data_shard = ray.sgd.get_dataset_shard("train")
train_data_shard = ray.train.get_dataset_shard("train")
train_torch_dataset = train_data_shard.to_torch(label_column="label",
batch_size=batch_size)
validation_data_shard = ray.sgd.get_dataset_shard("validation")
validation_data_shard = ray.train.get_dataset_shard("validation")
validation_torch_dataset = validation_data_shard.to_torch(label_column="label",
batch_size=batch_size)
@ -695,7 +703,7 @@ To get started, pass in a Ray Dataset (or multiple) into ``Trainer.run``. Undern
"validation": validation_dataset
})
.. _sgd-dataset-pipeline:
.. _train-dataset-pipeline:
Pipelined Execution
~~~~~~~~~~~~~~~~~~~
@ -708,14 +716,14 @@ Example: Per-Epoch Shuffle Pipeline
+++++++++++++++++++++++++++++++++++
A common use case is to have a training pipeline that globally shuffles the dataset before every epoch.
This is very simple to do with Ray Datasets + Ray SGD.
This is very simple to do with Ray Datasets + Ray Train.
.. code-block:: python
def train_func():
# This is a dummy train function just iterating over the dataset.
# You should replace this with your training logic.
dataset_pipeline_shard = ray.sgd.get_dataset_shard()
dataset_pipeline_shard = ray.train.get_dataset_shard()
# Infinitely long iterator of randomly shuffled dataset shards.
dataset_iterator = train_dataset_pipeline_shard.iter_datasets()
for _ in range(config["num_epochs"]):
@ -757,13 +765,13 @@ You can easily set the working set size for the global shuffle by specifying the
See :ref:`dataset-pipeline-per-epoch-shuffle` for more info.
.. _sgd-tune:
.. _train-tune:
Hyperparameter tuning (Ray Tune)
--------------------------------
Hyperparameter tuning with :ref:`Ray Tune <tune-main>` is natively supported
with RaySGD. Specifically, you can take an existing training function and
with Ray Train. Specifically, you can take an existing training function and
follow these steps:
**Step 1: Convert to Tune Trainable**
@ -773,14 +781,14 @@ produce an object ("Trainable") that will be passed to Ray Tune.
.. code-block:: python
from ray import sgd
from ray.sgd import Trainer
from ray import train
from ray.train import Trainer
def train_func(config):
# In this example, nothing is expected to change over epochs,
# and the output metric is equivalent to the input value.
for _ in range(config["num_epochs"]):
sgd.report(output=config["input"])
train.report(output=config["input"])
trainer = Trainer(backend="torch", num_workers=2)
trainable = trainer.to_tune_trainable(train_func)
@ -788,7 +796,7 @@ produce an object ("Trainable") that will be passed to Ray Tune.
**Step 2: Call tune.run**
Call ``tune.run`` on the created ``Trainable`` to start multiple ``Tune``
"trials", each running a RaySGD job and each with a unique hyperparameter
"trials", each running a Ray Train job and each with a unique hyperparameter
configuration.
.. code-block:: python
@ -804,25 +812,24 @@ configuration.
A couple caveats:
* Tune will ignore the return value of ``train_func``. To save your best
trained model, you will need to use the ``sgd.save_checkpoint`` API.
trained model, you will need to use the ``train.save_checkpoint`` API.
* You should **not** call ``tune.report`` or ``tune.checkpoint_dir`` in your
training function. Functional parity is achieved through ``sgd.report``,
``sgd.save_checkpoint``, and ``sgd.load_checkpoint``. This allows you to go
from RaySGD to RaySGD+RayTune without changing any code in the training
training function. Functional parity is achieved through ``train.report``,
``train.save_checkpoint``, and ``train.load_checkpoint``. This allows you to go
from Ray Train to Ray Train+RayTune without changing any code in the training
function.
.. code-block:: python
from ray import tune
from ray import sgd
from ray.sgd import Trainer
from ray import train, tune
from ray.train import Trainer
def train_func(config):
# In this example, nothing is expected to change over epochs,
# and the output metric is equivalent to the input value.
for _ in range(config["num_epochs"]):
sgd.report(output=config["input"])
train.report(output=config["input"])
trainer = Trainer(backend="torch", num_workers=2)
trainable = trainer.to_tune_trainable(train_func)
@ -839,12 +846,12 @@ A couple caveats:
from ray import tune
def training_func(config):
dataloader = ray.sgd.get_dataset()\
dataloader = ray.train.get_dataset()\
.get_shard(torch.rank())\
.to_torch(batch_size=config["batch_size"])
for i in config["epochs"]:
ray.sgd.report(...) # use same intermediate reporting API
ray.train.report(...) # use same intermediate reporting API
# Declare the specification for training.
trainer = Trainer(backend="torch", num_workers=12, use_gpu=True)
@ -871,7 +878,7 @@ A couple caveats:
TODO
.. _sgd-backwards-compatibility:
.. _train-backwards-compatibility:
..
Backwards Compatibility

View file

@ -240,7 +240,7 @@ In other words, you will have to make sure that your Ray cluster
has machines that can actually fulfill your resource requests.
In some cases your trainable might want to start other remote actors, for instance if you're
leveraging distributed training via RaySGD. In these cases, you can use
leveraging distributed training via Ray Train. In these cases, you can use
:ref:`placement groups <ray-placement-group-doc-ref>` to request additional resources:
.. code-block:: python

View file

@ -274,7 +274,7 @@ Mixing steps with Ray tasks and actors
Workflows are compatible with Ray tasks and actors. There are two methods of using them together:
1. Workflows can be launched from within a Ray task or actor. For example, you can launch a long-running workflow from Ray serve in response to a user request. This is no different from launching a workflow from the driver program.
2. Workflow steps can use Ray tasks or actors within a single step. For example, a step could use RaySGD internally to train a model. No durability guarantees apply to the tasks or actors used within the step; if the step fails, it will be re-executed from scratch.
2. Workflow steps can use Ray tasks or actors within a single step. For example, a step could use Ray Train internally to train a model. No durability guarantees apply to the tasks or actors used within the step; if the step fails, it will be re-executed from scratch.
Passing nested arguments
~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -1,5 +1,3 @@
.. _xgboost-ray:
..
This part of the docs is generated from the XGBoost-Ray readme using m2r
To update:
@ -12,6 +10,8 @@
by adding a second underscore (use `target <link>`__)
- Search for `\ **` and delete this from the links (bold links are not supported)
.. _xgboost-ray:
Distributed XGBoost on Ray
==========================

View file

@ -1 +1 @@
from ray.util.sgd.v2 import * # noqa: F401, F403
raise DeprecationWarning("ray.sgd has been moved to ray.train.")

View file

@ -1 +0,0 @@
from ray.util.sgd.v2.callbacks import * # noqa: E501, F401, F403

View file

@ -1,5 +1,5 @@
# --------------------------------------------------------------------
# Tests from the python/ray/util/sgd/v2/examples directory.
# Tests from the python/ray/train/examples directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
@ -8,7 +8,7 @@ py_test(
main = "examples/mlflow_fashion_mnist_example.py",
srcs = ["examples/mlflow_fashion_mnist_example.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"],
deps = [":train_lib"],
args = ["--smoke-test"]
)
@ -18,7 +18,7 @@ py_test(
main = "examples/transformers/transformers_example.py",
srcs = ["examples/transformers/transformers_example.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"],
deps = [":train_lib"],
args = ["--model_name_or_path=bert-base-cased", "--task_name=mrpc",
"--max_length=32", "--per_device_train_batch_size=64",
"--max_train_steps=2", "--start_local", "--num_workers=2"]
@ -30,7 +30,7 @@ py_test(
main = "examples/tune_cifar_pytorch_pbt_example.py",
srcs = ["examples/tune_cifar_pytorch_pbt_example.py"],
tags = ["team:ml", "exclusive", "pytorch"],
deps = [":sgd_v2_lib"],
deps = [":train_lib"],
args = ["--smoke-test"]
)
@ -40,7 +40,7 @@ py_test(
main = "examples/tune_linear_dataset_example.py",
srcs = ["examples/tune_linear_dataset_example.py"],
tags = ["team:ml", "exclusive", "gpu_only"],
deps = [":sgd_v2_lib"],
deps = [":train_lib"],
args = ["--smoke-test", "--use-gpu"]
)
@ -50,12 +50,12 @@ py_test(
main = "examples/tune_linear_example.py",
srcs = ["examples/tune_linear_example.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"],
deps = [":train_lib"],
args = ["--smoke-test"]
)
# --------------------------------------------------------------------
# Tests from the python/ray/util/sgd/v2/tests directory.
# Tests from the python/ray/train/tests directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
@ -64,7 +64,7 @@ py_test(
size = "large",
srcs = ["tests/test_backend.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
py_test(
@ -72,7 +72,7 @@ py_test(
size = "large",
srcs = ["tests/test_gpu.py"],
tags = ["team:ml", "exclusive", "gpu_only"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
py_test(
@ -80,7 +80,7 @@ py_test(
size = "small",
srcs = ["tests/test_session.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
py_test(
@ -88,7 +88,7 @@ py_test(
size = "large",
srcs = ["tests/test_trainer.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
py_test(
@ -96,7 +96,7 @@ py_test(
size = "medium",
srcs = ["tests/test_tune.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
py_test(
@ -104,7 +104,7 @@ py_test(
size = "small",
srcs = ["tests/test_utils.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
@ -113,7 +113,7 @@ py_test(
size = "medium",
srcs = ["tests/test_worker_group.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
deps = [":train_lib"]
)
@ -121,6 +121,6 @@ py_test(
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "sgd_v2_lib",
name = "train_lib",
srcs = glob(["**/*.py"], exclude=["tests/*.py"]),
)

View file

@ -0,0 +1,14 @@
from ray.train.backends import (BackendConfig, HorovodConfig, TensorflowConfig,
TorchConfig)
from ray.train.callbacks import TrainingCallback
from ray.train.checkpoint import CheckpointStrategy
from ray.train.session import (get_dataset_shard, local_rank, load_checkpoint,
report, save_checkpoint, world_rank)
from ray.train.trainer import Trainer, TrainingIterator
__all__ = [
"BackendConfig", "CheckpointStrategy", "get_dataset_shard",
"HorovodConfig", "load_checkpoint", "local_rank", "report",
"save_checkpoint", "TrainingIterator", "TensorflowConfig",
"TrainingCallback", "TorchConfig", "Trainer", "world_rank"
]

View file

@ -0,0 +1,11 @@
from ray.train.backends.backend import BackendConfig
from ray.train.backends.horovod import HorovodConfig
from ray.train.backends.tensorflow import TensorflowConfig
from ray.train.backends.torch import TorchConfig
__all__ = [
"BackendConfig",
"HorovodConfig",
"TensorflowConfig",
"TorchConfig",
]

View file

@ -9,15 +9,15 @@ import ray
from ray import cloudpickle
from ray.exceptions import RayActorError
from ray.ray_constants import env_integer
from ray.util.sgd.v2.checkpoint import CheckpointStrategy
from ray.util.sgd.v2.constants import ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, \
from ray.train.checkpoint import CheckpointStrategy
from ray.train.constants import ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, \
TUNE_INSTALLED, TUNE_CHECKPOINT_FILE_NAME, \
TUNE_CHECKPOINT_ID, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.util.sgd.v2.session import TrainingResultType, TrainingResult
from ray.util.sgd.v2.session import init_session, get_session, shutdown_session
from ray.util.sgd.v2.utils import construct_path, check_for_failure
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.util.sgd.v2.utils import RayDataset
from ray.train.session import TrainingResultType, TrainingResult
from ray.train.session import init_session, get_session, shutdown_session
from ray.train.utils import construct_path, check_for_failure
from ray.train.worker_group import WorkerGroup
from ray.train.utils import RayDataset
if TUNE_INSTALLED:
from ray import tune
@ -37,7 +37,7 @@ class BackendConfig:
raise NotImplementedError
class SGDBackendError(Exception):
class TrainBackendError(Exception):
"""Errors with BackendExecutor that should not be exposed to user."""
@ -50,7 +50,7 @@ class CheckpointManager:
The full default path will be:
~/ray_results/sgd_<datestring>/run_<run_id>/checkpoints/
~/ray_results/train_<datestring>/run_<run_id>/checkpoints/
checkpoint_<checkpoint_id>
Attributes:
@ -203,7 +203,7 @@ class BackendExecutor:
This class holds a worker group and is responsible for executing the
training function on the workers, and collecting intermediate results
from ``sgd.report()`` and ``sgd.checkpoint()``.
from ``train.report()`` and ``train.checkpoint()``.
Args:
backend_config (BackendConfig): The configurations for this
@ -413,15 +413,15 @@ class BackendExecutor:
dataset (Optional[Union[Dataset, DatasetPipeline]])
Distributed Ray Dataset or DatasetPipeline to pass into
worker, which can be accessed from the training function via
``sgd.get_dataset_shard()``. Sharding will automatically be
``train.get_dataset_shard()``. Sharding will automatically be
handled by the Trainer. Multiple Datasets can be passed in as
a ``Dict`` that maps each name key to a Dataset value,
and each Dataset can be accessed from the training function
by passing in a `dataset_name` argument to
``sgd.get_dataset_shard()``.
``train.get_dataset_shard()``.
checkpoint (Optional[Dict|str|Path]): The checkpoint data that
should be loaded onto each worker and accessed by the
training function via ``sgd.load_checkpoint()``. If this is a
training function via ``train.load_checkpoint()``. If this is a
``str`` or ``Path`` then the value is expected to be a path
to a file that contains a serialized checkpoint dict. If this
is ``None`` then no checkpoint will be loaded.
@ -451,7 +451,7 @@ class BackendExecutor:
detailed_autofilled_metrics=use_detailed_autofilled_metrics
)
except ValueError:
raise SGDBackendError(
raise TrainBackendError(
"Attempting to start training but a "
"previous training run is still ongoing. "
"You must call `finish_training` before "
@ -489,8 +489,8 @@ class BackendExecutor:
"""Fetches the next ``TrainingResult`` from each worker.
Each ``TrainingResult`` is expected to correspond to the same step from
each worker (e.g. the same call to ``sgd.report()`` or
``sgd.checkpoint()``).
each worker (e.g. the same call to ``train.report()`` or
``train.checkpoint()``).
Returns:
A list of ``TrainingResult``s with the same
@ -503,19 +503,19 @@ class BackendExecutor:
session = get_session()
except ValueError:
# Session is not initialized yet.
raise SGDBackendError("`fetch_next_result` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`fetch_next_result`.")
raise TrainBackendError("`fetch_next_result` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`fetch_next_result`.")
try:
result = session.get_next()
except RuntimeError:
# Training thread has not been started yet.
raise SGDBackendError("`fetch_next_result` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`fetch_next_result`.")
raise TrainBackendError("`fetch_next_result` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`fetch_next_result`.")
return result
@ -527,11 +527,12 @@ class BackendExecutor:
if any(r is None for r in results):
# Either all workers have results or none of them do.
if not all(r is None for r in results):
raise RuntimeError("Some workers returned results while "
"others didn't. Make sure that "
"`sgd.report()` and `sgd.checkpoint()` are "
"called the same number of times on all "
"workers.")
raise RuntimeError(
"Some workers returned results while "
"others didn't. Make sure that "
"`train.report()` and `train.checkpoint()` "
"are called the same number of times on all "
"workers.")
else:
# Return None if all results are None.
return None
@ -539,19 +540,19 @@ class BackendExecutor:
result_type = first_result.type
if any(r.type != result_type for r in results):
raise RuntimeError("Some workers returned results with "
"different types. Make sure `sgd.report()` and "
"`sgd.save_checkpoint()` are called the same "
"number of times and in the same order on each "
"worker.")
"different types. Make sure `train.report()` "
"and `train.save_checkpoint()` are called the "
"same number of times and in the same order on "
"each worker.")
return results
def fetch_next_result(self) -> Optional[List[Dict]]:
"""Fetch next results produced by ``sgd.report()`` from each worker.
"""Fetch next results produced by ``train.report()`` from each worker.
Assumes ``start_training`` has already been called.
Returns:
A list of dictionaries of values passed to ``sgd.report()`` from
A list of dictionaries of values passed to ``train.report()`` from
each worker. Each item corresponds to an intermediate result
a single worker. If there are no more items to fetch,
returns None.
@ -570,10 +571,10 @@ class BackendExecutor:
self.checkpoint_manager._process_checkpoint(results)
# Iterate until next REPORT call or training has finished.
else:
raise SGDBackendError(f"Unexpected result type: "
f"{result_type}. "
f"Expected one of "
f"{[type in TrainingResultType]}")
raise TrainBackendError(f"Unexpected result type: "
f"{result_type}. "
f"Expected one of "
f"{[type in TrainingResultType]}")
def finish_training(self) -> List[T]:
"""Finish training and return final results. Propagate any exceptions.
@ -593,10 +594,10 @@ class BackendExecutor:
session = get_session()
except ValueError:
# Session is not initialized yet.
raise SGDBackendError("`finish_training` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`finish_training`.")
raise TrainBackendError("`finish_training` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`finish_training`.")
return session.pause_reporting()
@ -606,10 +607,10 @@ class BackendExecutor:
session = get_session()
except ValueError:
# Session is not initialized yet.
raise SGDBackendError("`finish_training` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`finish_training`.")
raise TrainBackendError("`finish_training` has been called "
"before `start_training`. Please call "
"`start_training` before "
"`finish_training`.")
try:
# session.finish raises any Exceptions from training.
@ -621,7 +622,7 @@ class BackendExecutor:
return output
# Disable workers from enqueuing results from `sgd.report()`.
# Disable workers from enqueuing results from `train.report()`.
# Results will not be processed during the execution of `finish`.
# Note: Reported results may still be enqueued at this point,
# and should be handled appropriately.
@ -654,7 +655,7 @@ class BackendExecutor:
Args:
remote_values (list): List of object refs representing functions
that may fail in the middle of execution. For example, running
a SGD training loop in multiple parallel actor calls.
a Train training loop in multiple parallel actor calls.
Returns:
The resolved objects represented by the passed in ObjectRefs.
"""

View file

@ -4,9 +4,9 @@ from dataclasses import dataclass
from typing import Optional, Set
import ray
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend
from ray.util.sgd.v2.utils import update_env_vars
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.train.backends.backend import BackendConfig, Backend
from ray.train.utils import update_env_vars
from ray.train.worker_group import WorkerGroup
try:
from horovod.ray.runner import Coordinator

View file

@ -5,10 +5,10 @@ from dataclasses import dataclass
from typing import List
import ray
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend
from ray.util.sgd.v2.session import shutdown_session
from ray.util.sgd.v2.utils import get_address_and_port
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.train.backends.backend import BackendConfig, Backend
from ray.train.session import shutdown_session
from ray.train.utils import get_address_and_port
from ray.train.worker_group import WorkerGroup
try:
import tensorflow

View file

@ -6,9 +6,9 @@ from datetime import timedelta
from typing import Optional
import ray
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.util.sgd.v2.utils import get_address_and_port
from ray.train.backends.backend import BackendConfig, Backend
from ray.train.worker_group import WorkerGroup
from ray.train.utils import get_address_and_port
try:
import torch

View file

@ -0,0 +1,4 @@
from ray.train.callbacks.callback import TrainingCallback
from ray.train.callbacks.logging import (JsonLoggerCallback, TBXLoggerCallback)
__all__ = ["TrainingCallback", "JsonLoggerCallback", "TBXLoggerCallback"]

View file

@ -2,11 +2,11 @@ import abc
from typing import List, Dict
class SGDCallback(metaclass=abc.ABCMeta):
"""Abstract SGD callback class."""
class TrainingCallback(metaclass=abc.ABCMeta):
"""Abstract Train callback class."""
def handle_result(self, results: List[Dict], **info):
"""Called every time sgd.report() is called.
"""Called every time train.report() is called.
Args:
results (List[Dict]): List of results from the training

View file

@ -9,14 +9,14 @@ from pathlib import Path
from ray.util.debug import log_once
from ray.util.ml_utils.dict import flatten_dict
from ray.util.ml_utils.json import SafeFallbackEncoder
from ray.util.sgd.v2.callbacks import SGDCallback
from ray.util.sgd.v2.constants import (RESULT_FILE_JSON, TRAINING_ITERATION,
TIME_TOTAL_S, TIMESTAMP, PID)
from ray.train.callbacks import TrainingCallback
from ray.train.constants import (RESULT_FILE_JSON, TRAINING_ITERATION,
TIME_TOTAL_S, TIMESTAMP, PID)
logger = logging.getLogger(__name__)
class SGDLogdirMixin:
class TrainingLogdirMixin:
def start_training(self, logdir: str, **info):
if self._logdir:
logdir_path = Path(self._logdir)
@ -36,9 +36,9 @@ class SGDLogdirMixin:
return Path(self._logdir_path)
class SGDSingleFileLoggingCallback(
SGDLogdirMixin, SGDCallback, metaclass=abc.ABCMeta):
"""Abstract SGD logging callback class.
class TrainingSingleFileLoggingCallback(
TrainingLogdirMixin, TrainingCallback, metaclass=abc.ABCMeta):
"""Abstract Train logging callback class.
Args:
logdir (Optional[str]): Path to directory where the results file
@ -102,8 +102,8 @@ class SGDSingleFileLoggingCallback(
return self._log_path
class JsonLoggerCallback(SGDSingleFileLoggingCallback):
"""Logs SGD results in json format.
class JsonLoggerCallback(TrainingSingleFileLoggingCallback):
"""Logs Train results in json format.
Args:
logdir (Optional[str]): Path to directory where the results file
@ -139,9 +139,9 @@ class JsonLoggerCallback(SGDSingleFileLoggingCallback):
loaded_results + [results_to_log], f, cls=SafeFallbackEncoder)
class SGDSingleWorkerLoggingCallback(
SGDLogdirMixin, SGDCallback, metaclass=abc.ABCMeta):
"""Abstract SGD logging callback class.
class TrainingSingleWorkerLoggingCallback(
TrainingLogdirMixin, TrainingCallback, metaclass=abc.ABCMeta):
"""Abstract Train logging callback class.
Allows only for single-worker logging.
@ -174,8 +174,8 @@ class SGDSingleWorkerLoggingCallback(
return worker_to_log
class TBXLoggerCallback(SGDSingleWorkerLoggingCallback):
"""Logs SGD results in TensorboardX format.
class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback):
"""Logs Train results in TensorboardX format.
Args:
logdir (Optional[str]): Path to directory where the results file
@ -206,7 +206,7 @@ class TBXLoggerCallback(SGDSingleWorkerLoggingCallback):
step = result[TRAINING_ITERATION]
result = {k: v for k, v in result.items() if k not in self.IGNORE_KEYS}
flat_result = flatten_dict(result, delimiter="/")
path = ["ray", "sgd"]
path = ["ray", "train"]
# same logic as in ray.tune.logger.TBXLogger
for attr, value in flat_result.items():

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Optional
from ray.util.sgd.v2.constants import TIMESTAMP
from ray.train.constants import TIMESTAMP
MAX = "max"
MIN = "min"
@ -9,7 +9,7 @@ MIN = "min"
@dataclass
class CheckpointStrategy:
"""Configurable parameters for defining the SGD checkpointing strategy.
"""Configurable parameters for defining the Train checkpointing strategy.
Default behavior is to persist all checkpoints to disk. If
``num_to_keep`` is set, the default retention policy is to keep the

View file

@ -6,7 +6,7 @@ try:
except ImportError:
TUNE_INSTALLED = False
# Autofilled sgd.report() metrics. Keys should be consistent with Tune.
# Autofilled train.report() metrics. Keys should be consistent with Tune.
TIMESTAMP = "_timestamp"
TIME_THIS_ITER_S = "_time_this_iter_s"
TRAINING_ITERATION = "_training_iteration"
@ -21,7 +21,7 @@ TIME_TOTAL_S = "_time_total_s"
# Env var name
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV = (
"SGD_RESULT_ENABLE_DETAILED_AUTOFILLED_METRICS")
"TRAIN_RESULT_ENABLE_DETAILED_AUTOFILLED_METRICS")
# Will not be reported unless ENABLE_DETAILED_AUTOFILLED_METRICS_ENV
# env var is not 0
@ -34,7 +34,7 @@ RESULT_FETCH_TIMEOUT = 0.2
# Default filename for JSON logger
RESULT_FILE_JSON = "results.json"
# Default directory where all SGD logs, checkpoints, etc. will be stored.
# Default directory where all Train logs, checkpoints, etc. will be stored.
DEFAULT_RESULTS_DIR = Path("~/ray_results").expanduser()
# File name to use for checkpoints saved with Tune
@ -47,4 +47,5 @@ TUNE_CHECKPOINT_ID = "_current_checkpoint_id"
# Integer value which if set will override the value of
# Backend.share_cuda_visible_devices. 1 for True, 0 for False.
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "SGD_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV =\
"TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"

View file

@ -8,7 +8,7 @@ import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
from torchvision import datasets, transforms

View file

@ -2,12 +2,12 @@ import argparse
import mlflow
from ray.util.sgd.v2 import Trainer
from ray.util.sgd.v2.examples.train_fashion_mnist_example import train_func
from ray.train import Trainer
from ray.train.examples.train_fashion_mnist_example import train_func
def main(num_workers=1, use_gpu=False):
mlflow.set_experiment("sgd_torch_fashion_mnist")
mlflow.set_experiment("train_torch_fashion_mnist")
trainer = Trainer(
backend="torch", num_workers=num_workers, use_gpu=use_gpu)

View file

@ -4,15 +4,15 @@ import tensorflow as tf
from tensorflow.keras.callbacks import Callback
import ray
import ray.util.sgd.v2 as sgd
import ray.train as train
from ray.data import Dataset
from ray.data.dataset_pipeline import DatasetPipeline
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
class SGDReportCallback(Callback):
class TrainReportCallback(Callback):
def on_epoch_end(self, epoch, logs=None):
sgd.report(**logs)
train.report(**logs)
def get_dataset_pipeline(a=5, b=10, size=1000) -> DatasetPipeline:
@ -65,7 +65,7 @@ def train_func(config):
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_model(config)
dataset_pipeline = sgd.get_dataset_shard()
dataset_pipeline = train.get_dataset_shard()
dataset_iterator = dataset_pipeline.iter_datasets()
results = []
@ -80,7 +80,7 @@ def train_func(config):
shape=(None), dtype=tf.float32)),
batch_size=batch_size))
history = multi_worker_model.fit(
tf_dataset, callbacks=[SGDReportCallback()])
tf_dataset, callbacks=[TrainReportCallback()])
results.append(history.history)
return results

View file

@ -1,4 +1,4 @@
# This example showcases how to use Tensorflow with RaySGD.
# This example showcases how to use Tensorflow with Ray Train.
# Original code:
# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import argparse
@ -9,13 +9,13 @@ import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import Callback
import ray.util.sgd.v2 as sgd
from ray.util.sgd.v2 import Trainer
import ray.train as train
from ray.train import Trainer
class SGDReportCallback(Callback):
class TrainReportCallback(Callback):
def on_epoch_end(self, epoch, logs=None):
sgd.report(**logs)
train.report(**logs)
def mnist_dataset(batch_size):
@ -67,7 +67,7 @@ def train_func(config):
multi_worker_dataset,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[SGDReportCallback()])
callbacks=[TrainReportCallback()])
results = history.history
return results

View file

@ -2,9 +2,9 @@ import argparse
from typing import Dict
import torch
import ray.util.sgd.v2 as sgd
from ray.util.sgd.v2.trainer import Trainer
from ray.util.sgd.v2.callbacks import JsonLoggerCallback
import ray.train as train
from ray.train.trainer import Trainer
from ray.train.callbacks import JsonLoggerCallback
from torch import nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
@ -43,7 +43,7 @@ class NeuralNetwork(nn.Module):
return logits
def train(dataloader, model, loss_fn, optimizer, device):
def train_epoch(dataloader, model, loss_fn, optimizer, device):
size = len(dataloader.dataset)
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
@ -62,7 +62,7 @@ def train(dataloader, model, loss_fn, optimizer, device):
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate(dataloader, model, loss_fn, device):
def validate_epoch(dataloader, model, loss_fn, device):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
@ -86,7 +86,7 @@ def train_func(config: Dict):
lr = config["lr"]
epochs = config["epochs"]
device = torch.device(f"cuda:{sgd.local_rank()}"
device = torch.device(f"cuda:{train.local_rank()}"
if torch.cuda.is_available() else "cpu")
# Create data loaders.
@ -112,9 +112,9 @@ def train_func(config: Dict):
loss_results = []
for _ in range(epochs):
train(train_dataloader, model, loss_fn, optimizer, device)
loss = validate(test_dataloader, model, loss_fn, device)
sgd.report(loss=loss)
train_epoch(train_dataloader, model, loss_fn, optimizer, device)
loss = validate_epoch(test_dataloader, model, loss_fn, device)
train.report(loss=loss)
loss_results.append(loss)
return loss_results
@ -131,7 +131,7 @@ def train_fashion_mnist(num_workers=1, use_gpu=False):
"batch_size": 64,
"epochs": 4
},
callbacks=[JsonLoggerCallback("./sgd_results")])
callbacks=[JsonLoggerCallback("./train_results")])
trainer.shutdown()
print(f"Loss results: {result}")

View file

@ -6,11 +6,11 @@ import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
import ray
import ray.util.sgd.v2 as sgd
import ray.train as train
from ray.data import Dataset
from ray.data.dataset_pipeline import DatasetPipeline
from ray.util.sgd.v2 import Trainer
from ray.util.sgd.v2.callbacks import JsonLoggerCallback, TBXLoggerCallback
from ray.train import Trainer
from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback
def get_datasets(a=5, b=10, size=1000,
@ -42,7 +42,7 @@ def get_datasets(a=5, b=10, size=1000,
return datasets
def train(iterable_dataset, model, loss_fn, optimizer, device):
def train_epoch(iterable_dataset, model, loss_fn, optimizer, device):
model.train()
for X, y in iterable_dataset:
X = X.to(device)
@ -58,7 +58,7 @@ def train(iterable_dataset, model, loss_fn, optimizer, device):
optimizer.step()
def validate(iterable_dataset, model, loss_fn, device):
def validate_epoch(iterable_dataset, model, loss_fn, device):
num_batches = 0
model.eval()
loss = 0
@ -80,10 +80,10 @@ def train_func(config):
lr = config.get("lr", 1e-2)
epochs = config.get("epochs", 3)
train_dataset_pipeline_shard = sgd.get_dataset_shard("train")
validation_dataset_pipeline_shard = sgd.get_dataset_shard("validation")
train_dataset_pipeline_shard = train.get_dataset_shard("train")
validation_dataset_pipeline_shard = train.get_dataset_shard("validation")
device = torch.device(f"cuda:{sgd.local_rank()}"
device = torch.device(f"cuda:{train.local_rank()}"
if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
torch.cuda.set_device(device)
@ -92,7 +92,7 @@ def train_func(config):
model = model.to(device)
model = DistributedDataParallel(
model,
device_ids=[sgd.local_rank()] if torch.cuda.is_available() else None)
device_ids=[train.local_rank()] if torch.cuda.is_available() else None)
loss_fn = nn.MSELoss()
@ -122,9 +122,10 @@ def train_func(config):
feature_column_dtypes=[torch.float],
batch_size=batch_size)
train(train_torch_dataset, model, loss_fn, optimizer, device)
result = validate(validation_torch_dataset, model, loss_fn, device)
sgd.report(**result)
train_epoch(train_torch_dataset, model, loss_fn, optimizer, device)
result = validate_epoch(validation_torch_dataset, model, loss_fn,
device)
train.report(**result)
results.append(result)
return results

View file

@ -3,9 +3,9 @@ import argparse
import numpy as np
import torch
import torch.nn as nn
import ray.util.sgd.v2 as sgd
from ray.util.sgd.v2 import Trainer, TorchConfig
from ray.util.sgd.v2.callbacks import JsonLoggerCallback, TBXLoggerCallback
import ray.train as train
from ray.train import Trainer, TorchConfig
from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DistributedSampler
@ -25,7 +25,7 @@ class LinearDataset(torch.utils.data.Dataset):
return len(self.x)
def train(dataloader, model, loss_fn, optimizer):
def train_epoch(dataloader, model, loss_fn, optimizer):
for X, y in dataloader:
# Compute prediction error
pred = model(X)
@ -37,7 +37,7 @@ def train(dataloader, model, loss_fn, optimizer):
optimizer.step()
def validate(dataloader, model, loss_fn):
def validate_epoch(dataloader, model, loss_fn):
num_batches = len(dataloader)
model.eval()
loss = 0
@ -79,9 +79,9 @@ def train_func(config):
results = []
for _ in range(epochs):
train(train_loader, model, loss_fn, optimizer)
result = validate(validation_loader, model, loss_fn)
sgd.report(**result)
train_epoch(train_loader, model, loss_fn, optimizer)
result = validate_epoch(validation_loader, model, loss_fn)
train.report(**result)
results.append(result)
return results

View file

@ -2,7 +2,7 @@ HuggingFace Transformers Glue Fine-tuning Example
=================================================
We've ported the ``huggingface/transformers/examples/pytorch/text-classification/run_glue_no_trainer.py`` example to
RaySGD. This example enables fine-tuning the library models for sequence classification on the GLUE benchmark: General Language Understanding Evaluation.
Ray Train. This example enables fine-tuning the library models for sequence classification on the GLUE benchmark: General Language Understanding Evaluation.
This script can fine-tune the following models:
CoLA, SST-2, MRPC, STS-B, QQP, MNLI, QNLI, RTE, WNLI.

View file

@ -27,7 +27,7 @@ import ray
import transformers
from accelerate import Accelerator
from datasets import load_dataset, load_metric
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
from torch.utils.data.dataloader import DataLoader
from tqdm.auto import tqdm
from transformers import (

View file

@ -1,26 +1,24 @@
import numpy as np
import argparse
from filelock import FileLock
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import PopulationBasedTraining
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from filelock import FileLock
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler, Subset
from torchvision.datasets import CIFAR10
import torchvision.transforms as transforms
from torch.nn.parallel import DistributedDataParallel
import ray
import ray.train as train
from ray import tune
from ray.train import Trainer
from ray.tune import CLIReporter
from ray.tune.schedulers import PopulationBasedTraining
from ray.util.sgd.torch.resnet import ResNet18
import ray.util.sgd.v2 as sgd
from ray.util.sgd.v2 import Trainer
def train(dataloader, model, loss_fn, optimizer, device):
def train_epoch(dataloader, model, loss_fn, optimizer, device):
size = len(dataloader.dataset)
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
@ -39,7 +37,7 @@ def train(dataloader, model, loss_fn, optimizer, device):
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate(dataloader, model, loss_fn, device):
def validate_epoch(dataloader, model, loss_fn, device):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
@ -59,7 +57,7 @@ def validate(dataloader, model, loss_fn, device):
def train_func(config):
device = torch.device(f"cuda:{sgd.local_rank()}"
device = torch.device(f"cuda:{train.local_rank()}"
if torch.cuda.is_available() else "cpu")
epochs = config.pop("epochs", 3)
@ -121,9 +119,9 @@ def train_func(config):
results = []
for _ in range(epochs):
train(train_loader, model, criterion, optimizer, device)
result = validate(validation_loader, model, criterion, device)
sgd.report(**result)
train_epoch(train_loader, model, criterion, optimizer, device)
result = validate_epoch(validation_loader, model, criterion, device)
train.report(**result)
results.append(result)
return results

View file

@ -2,7 +2,7 @@ import argparse
import ray
from ray import tune
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
from train_linear_dataset_example import train_func, get_datasets

View file

@ -2,7 +2,7 @@ import argparse
import ray
from ray import tune
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
from train_linear_example import train_func

View file

@ -2,7 +2,7 @@ import argparse
import ray
from ray import tune
from ray.util.sgd.v2 import Trainer
from ray.train import Trainer
from tensorflow_mnist_example import train_func

View file

@ -10,10 +10,10 @@ from typing import Optional, Dict
import warnings
import ray
from ray.util.sgd.v2.constants import (
from ray.train.constants import (
DETAILED_AUTOFILLED_KEYS, TIME_THIS_ITER_S, PID, TIMESTAMP, TIME_TOTAL_S,
NODE_IP, TRAINING_ITERATION, HOSTNAME, DATE, RESULT_FETCH_TIMEOUT)
from ray.util.sgd.v2.utils import PropagatingThread, RayDataset
from ray.train.utils import PropagatingThread, RayDataset
class TrainingResultType(Enum):
@ -73,7 +73,7 @@ class Session:
self.training_thread.start()
def pause_reporting(self):
"""Ignore all future ``sgd.report()`` calls."""
"""Ignore all future ``train.report()`` calls."""
self.ignore_report = True
def finish(self):
@ -198,7 +198,7 @@ _session = None
def init_session(*args, **kwargs) -> None:
global _session
if _session:
raise ValueError("An SGD session is already in use. Do not call "
raise ValueError("A Train session is already in use. Do not call "
"`init_session()` manually.")
_session = Session(*args, **kwargs)
@ -206,10 +206,10 @@ def init_session(*args, **kwargs) -> None:
def get_session() -> Session:
global _session
if _session is None or not isinstance(_session, Session):
raise ValueError("Trying to access an SGD session that has not been "
"initialized yet. SGD functions like `sgd.report()` "
"should only be called from inside the training "
"function.")
raise ValueError("Trying to access a Train session that has not been "
"initialized yet. Train functions like "
"`train.report()` should only be called from inside "
"the training function.")
return _session
@ -229,12 +229,12 @@ def get_dataset_shard(
.. code-block:: python
import ray
from ray.util import sgd
from ray import train
def train_func():
model = Net()
for iter in range(100):
data_shard = sgd.get_dataset_shard().to_torch()
data_shard = train.get_dataset_shard().to_torch()
model.train(data_shard)
return model
@ -274,17 +274,17 @@ def get_dataset_shard(
def report(**kwargs) -> None:
"""Reports all keyword arguments to SGD as intermediate results.
"""Reports all keyword arguments to Train as intermediate results.
.. code-block:: python
import time
from ray.util import sgd
from ray import train
def train_func():
for iter in range(100):
time.sleep(1)
sgd.report(hello="world")
train.report(hello="world")
trainer = Trainer(backend="torch")
trainer.start()
@ -292,7 +292,7 @@ def report(**kwargs) -> None:
trainer.shutdown()
Args:
**kwargs: Any key value pair to be reported by SGD.
**kwargs: Any key value pair to be reported by Train.
If callbacks are provided, they are executed on these
intermediate results.
"""
@ -306,12 +306,12 @@ def world_rank() -> int:
.. code-block:: python
import time
from ray.util import sgd
from ray import train
def train_func():
for iter in range(100):
time.sleep(1)
if sgd.world_rank() == 0:
if train.world_rank() == 0:
print("Worker 0")
trainer = Trainer(backend="torch")
@ -330,11 +330,11 @@ def local_rank() -> int:
.. code-block:: python
import time
from ray.util import sgd
from ray import train
def train_func():
if torch.cuda.is_available():
torch.cuda.set_device(sgd.local_rank())
torch.cuda.set_device(train.local_rank())
...
trainer = Trainer(backend="torch", use_gpu=True)
@ -352,10 +352,10 @@ def load_checkpoint() -> Optional[Dict]:
.. code-block:: python
from ray.util import sgd
from ray import train
def train_func():
checkpoint = sgd.load_checkpoint()
checkpoint = train.load_checkpoint()
for iter in range(checkpoint["epoch"], 5):
print(iter)
@ -367,9 +367,9 @@ def load_checkpoint() -> Optional[Dict]:
trainer.shutdown()
Args:
**kwargs: Any key value pair to be checkpointed by SGD.
**kwargs: Any key value pair to be checkpointed by Train.
Returns:
The most recently saved checkpoint if ``sgd.save_checkpoint()``
The most recently saved checkpoint if ``train.save_checkpoint()``
has been called. Otherwise, the checkpoint that the session was
originally initialized with. ``None`` if neither exist.
"""
@ -378,17 +378,17 @@ def load_checkpoint() -> Optional[Dict]:
def save_checkpoint(**kwargs) -> None:
"""Checkpoints all keyword arguments to SGD as restorable state.
"""Checkpoints all keyword arguments to Train as restorable state.
.. code-block:: python
import time
from ray.util import sgd
from ray import train
def train_func():
for iter in range(100):
time.sleep(1)
sgd.save_checkpoint(epoch=iter)
train.save_checkpoint(epoch=iter)
trainer = Trainer(backend="torch")
trainer.start()
@ -396,7 +396,7 @@ def save_checkpoint(**kwargs) -> None:
trainer.shutdown()
Args:
**kwargs: Any key value pair to be checkpointed by SGD.
**kwargs: Any key value pair to be checkpointed by Train.
"""
session = get_session()
session.checkpoint(**kwargs)

View file

@ -5,15 +5,15 @@ from unittest.mock import patch
import ray
from ray.cluster_utils import Cluster
from ray.util.sgd import v2 as sgd
from ray.util.sgd.v2.backends.backend import BackendConfig, BackendExecutor
from ray.util.sgd.v2.backends.tensorflow import TensorflowConfig
from ray.util.sgd.v2.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.util.sgd.v2.backends.torch import TorchConfig
import ray.train as train
from ray.train.backends.backend import BackendConfig, BackendExecutor
from ray.train.backends.tensorflow import TensorflowConfig
from ray.train.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.train.worker_group import WorkerGroup
from ray.train.backends.torch import TorchConfig
from ray.util.sgd.v2.backends.backend import Backend, \
InactiveWorkerGroupError, SGDBackendError, TrainingWorkerError
from ray.train.backends.backend import Backend, \
InactiveWorkerGroupError, TrainBackendError, TrainingWorkerError
@pytest.fixture
@ -130,10 +130,10 @@ def test_local_ranks(ray_start_2_cpus, tmp_path):
e = BackendExecutor(config, num_workers=2)
e.start()
def train():
return sgd.local_rank()
def train_func():
return train.local_rank()
e.start_training(train, run_dir=tmp_path)
e.start_training(train_func, run_dir=tmp_path)
assert set(e.finish_training()) == {0, 1}
@ -142,15 +142,15 @@ def test_train_failure(ray_start_2_cpus, tmp_path):
e = BackendExecutor(config, num_workers=2)
e.start()
with pytest.raises(SGDBackendError):
with pytest.raises(TrainBackendError):
e.fetch_next_result()
with pytest.raises(SGDBackendError):
with pytest.raises(TrainBackendError):
e.finish_training()
e.start_training(lambda: 1, run_dir=tmp_path)
with pytest.raises(SGDBackendError):
with pytest.raises(TrainBackendError):
e.start_training(lambda: 2, run_dir=tmp_path)
assert e.finish_training() == [1, 1]
@ -174,31 +174,31 @@ def test_worker_failure(ray_start_2_cpus, tmp_path):
def test_no_exhaust(ray_start_2_cpus, tmp_path):
"""Tests if training can finish even if queue is not exhausted."""
def train():
def train_func():
for _ in range(2):
sgd.report(loss=1)
train.report(loss=1)
return 2
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
e.start()
e.start_training(train, run_dir=tmp_path)
e.start_training(train_func, run_dir=tmp_path)
output = e.finish_training()
assert output == [2, 2]
def test_checkpoint(ray_start_2_cpus, tmp_path):
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
config = TestConfig()
e = BackendExecutor(config, num_workers=1)
e.start()
e.start_training(train, run_dir=tmp_path)
e.start_training(train_func, run_dir=tmp_path)
e.finish_training()
assert e.latest_checkpoint is not None
@ -206,14 +206,14 @@ def test_checkpoint(ray_start_2_cpus, tmp_path):
def test_persisted_checkpoint(ray_start_2_cpus, tmp_path):
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
config = TestConfig()
e = BackendExecutor(config)
e.start()
e.start_training(train, run_dir=tmp_path)
e.start_training(train_func, run_dir=tmp_path)
e.finish_training()
assert e.latest_checkpoint_id == 2
@ -224,7 +224,7 @@ def test_persisted_checkpoint(ray_start_2_cpus, tmp_path):
assert os.path.exists(e.latest_checkpoint_path)
def validate():
checkpoint = sgd.load_checkpoint()
checkpoint = train.load_checkpoint()
assert checkpoint is not None
assert checkpoint["epoch"] == 1
@ -236,14 +236,14 @@ def test_persisted_checkpoint(ray_start_2_cpus, tmp_path):
def test_persisted_checkpoint_id(ray_start_2_cpus, tmp_path):
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
config = TestConfig()
e = BackendExecutor(config)
e.start()
e.start_training(train, run_dir=tmp_path, latest_checkpoint_id=100)
e.start_training(train_func, run_dir=tmp_path, latest_checkpoint_id=100)
e.finish_training()
assert e.latest_checkpoint_id == 102
@ -255,16 +255,16 @@ def test_persisted_checkpoint_id(ray_start_2_cpus, tmp_path):
def test_mismatch_checkpoint_report(ray_start_2_cpus, tmp_path):
def train():
if (sgd.world_rank()) == 0:
sgd.save_checkpoint(epoch=0)
def train_func():
if (train.world_rank()) == 0:
train.save_checkpoint(epoch=0)
else:
sgd.report(iter=0)
train.report(iter=0)
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
e.start()
e.start_training(train, run_dir=tmp_path)
e.start_training(train_func, run_dir=tmp_path)
with pytest.raises(RuntimeError):
e.finish_training()

View file

@ -7,14 +7,14 @@ import glob
from collections import defaultdict
import ray
import ray.util.sgd.v2 as sgd
from ray.util.sgd.v2 import Trainer
from ray.util.sgd.v2.constants import (
TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS, BASIC_AUTOFILLED_KEYS,
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV)
from ray.util.sgd.v2.callbacks import JsonLoggerCallback, TBXLoggerCallback
from ray.util.sgd.v2.backends.backend import BackendConfig, BackendInterface
from ray.util.sgd.v2.worker_group import WorkerGroup
import ray.train as train
from ray.train import Trainer
from ray.train.constants import (TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS,
BASIC_AUTOFILLED_KEYS,
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV)
from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback
from ray.train.backends.backend import BackendConfig, BackendInterface
from ray.train.worker_group import WorkerGroup
try:
from tensorflow.python.summary.summary_iterator \
@ -79,7 +79,7 @@ def test_json(ray_start_4_cpus, make_temp_dir, workers_to_log, detailed,
def train_func():
for i in range(num_iters):
sgd.report(index=i)
train.report(index=i)
return 1
if filename is None:
@ -127,7 +127,7 @@ def _validate_tbx_result(events_dir):
results = defaultdict(list)
for event in summary_iterator(events_file):
for v in event.summary.value:
assert v.tag.startswith("ray/sgd")
assert v.tag.startswith("ray/train")
results[v.tag[8:]].append(v.simple_value)
assert len(results["episode_reward_mean"]) == 3
@ -145,9 +145,10 @@ def test_TBX(ray_start_4_cpus, make_temp_dir):
num_workers = 4
def train_func():
sgd.report(episode_reward_mean=4)
sgd.report(episode_reward_mean=5)
sgd.report(episode_reward_mean=6, score=[1, 2, 3], hello={"world": 1})
train.report(episode_reward_mean=4)
train.report(episode_reward_mean=5)
train.report(
episode_reward_mean=6, score=[1, 2, 3], hello={"world": 1})
return 1
callback = TBXLoggerCallback(temp_dir)

View file

@ -1,12 +1,12 @@
import pytest
import ray
from ray.util.sgd.v2 import Trainer
from ray.util.sgd.v2.examples.horovod.horovod_example import train_func as \
from ray.train import Trainer
from ray.train.examples.horovod.horovod_example import train_func as \
horovod_torch_train_func
from ray.util.sgd.v2.examples.tensorflow_mnist_example import train_func as \
from ray.train.examples.tensorflow_mnist_example import train_func as \
tensorflow_mnist_train_func
from ray.util.sgd.v2.examples.train_fashion_mnist_example import train_func \
from ray.train.examples.train_fashion_mnist_example import train_func \
as fashion_mnist_train_func
from test_tune import torch_fashion_mnist, tune_tensorflow_mnist
@ -86,7 +86,7 @@ def test_tune_tensorflow_mnist_gpu(ray_start_4_cpus_2_gpus):
def test_train_linear_dataset_gpu(ray_start_4_cpus_2_gpus):
from ray.util.sgd.v2.examples.train_linear_dataset_example import \
from ray.train.examples.train_linear_dataset_example import \
train_linear
results = train_linear(num_workers=2, use_gpu=True)
@ -95,7 +95,7 @@ def test_train_linear_dataset_gpu(ray_start_4_cpus_2_gpus):
def test_tensorflow_linear_dataset_gpu(ray_start_4_cpus_2_gpus):
from ray.util.sgd.v2.examples.tensorflow_linear_dataset_example import \
from ray.train.examples.tensorflow_linear_dataset_example import \
train_tensorflow_linear
results = train_tensorflow_linear(num_workers=2, use_gpu=True)

View file

@ -3,7 +3,7 @@ import time
import pytest
import ray
from ray.util.sgd.v2.session import init_session, shutdown_session, \
from ray.train.session import init_session, shutdown_session, \
get_session, world_rank, local_rank, report, save_checkpoint, \
TrainingResultType, load_checkpoint, get_dataset_shard
@ -61,11 +61,11 @@ def test_get_dataset_shard():
def test_report():
def train():
def train_func():
for i in range(2):
report(loss=i)
init_session(training_func=train, world_rank=0, local_rank=0)
init_session(training_func=train_func, world_rank=0, local_rank=0)
session = get_session()
session.start()
assert session.get_next().data["loss"] == 0
@ -77,12 +77,12 @@ def test_report():
def test_report_fail():
def train():
def train_func():
for i in range(2):
report(i)
return 1
init_session(training_func=train, world_rank=0, local_rank=0)
init_session(training_func=train_func, world_rank=0, local_rank=0)
session = get_session()
session.start()
assert session.get_next() is None
@ -106,7 +106,7 @@ def test_no_start(session):
def test_checkpoint():
def train():
def train_func():
for i in range(2):
save_checkpoint(epoch=i)
@ -116,7 +116,7 @@ def test_checkpoint():
assert next.type == TrainingResultType.CHECKPOINT
assert next.data["epoch"] == expected
init_session(training_func=train, world_rank=0, local_rank=0)
init_session(training_func=train_func, world_rank=0, local_rank=0)
session = get_session()
session.start()
validate_zero(0)
@ -130,7 +130,7 @@ def test_checkpoint():
assert next.type == TrainingResultType.CHECKPOINT
assert next.data == {}
init_session(training_func=train, world_rank=1, local_rank=1)
init_session(training_func=train_func, world_rank=1, local_rank=1)
session = get_session()
session.start()
validate_nonzero()
@ -143,13 +143,13 @@ def test_checkpoint():
def test_load_checkpoint_after_save():
def train():
def train_func():
for i in range(2):
save_checkpoint(epoch=i)
checkpoint = load_checkpoint()
assert checkpoint["epoch"] == i
init_session(training_func=train, world_rank=0, local_rank=0)
init_session(training_func=train_func, world_rank=0, local_rank=0)
session = get_session()
session.start()
for i in range(2):

View file

@ -7,23 +7,23 @@ import horovod.torch as hvd_torch
import pytest
import ray
import ray.util.sgd.v2 as sgd
import ray.train as train
from ray._private.test_utils import wait_for_condition
from ray.util.sgd.v2 import Trainer, TorchConfig, TensorflowConfig, \
from ray.train import Trainer, TorchConfig, TensorflowConfig, \
HorovodConfig
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend, \
from ray.train.backends.backend import BackendConfig, Backend, \
BackendExecutor
from ray.util.sgd.v2.callbacks.callback import SGDCallback
from ray.util.sgd.v2.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.util.sgd.v2.examples.horovod.horovod_example import train_func as \
from ray.train.callbacks.callback import TrainingCallback
from ray.train.constants import ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV
from ray.train.examples.horovod.horovod_example import train_func as \
horovod_torch_train_func, HorovodTrainClass
from ray.util.sgd.v2.examples.tensorflow_mnist_example import train_func as \
from ray.train.examples.tensorflow_mnist_example import train_func as \
tensorflow_mnist_train_func
from ray.util.sgd.v2.examples.train_fashion_mnist_example import train_func \
from ray.train.examples.train_fashion_mnist_example import train_func \
as fashion_mnist_train_func
from ray.util.sgd.v2.examples.train_linear_example import train_func as \
from ray.train.examples.train_linear_example import train_func as \
linear_train_func
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.train.worker_group import WorkerGroup
@pytest.fixture
@ -81,7 +81,7 @@ class TestBackend(Backend):
pass
class TestCallback(SGDCallback):
class TestCallback(TrainingCallback):
def __init__(self):
self.result_list = []
@ -122,7 +122,7 @@ def gen_new_backend_executor(special_f):
return TestBackendExecutor
class KillCallback(SGDCallback):
class KillCallback(TrainingCallback):
def __init__(self, fail_on, worker_group):
self.counter = 0
self.fail_on = fail_on
@ -193,7 +193,7 @@ def test_report(ray_start_2_cpus):
def train_func():
for i in range(3):
sgd.report(index=i)
train.report(index=i)
return 1
callback = TestCallback()
@ -214,26 +214,26 @@ def test_report(ray_start_2_cpus):
def test_fast_slow(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
sgd.report(index=i)
train.save_checkpoint(epoch=i)
train.report(index=i)
def train_slow():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
time.sleep(5)
sgd.report(index=i)
train.report(index=i)
time.sleep(5)
new_backend_executor_cls = gen_new_backend_executor(train_slow)
callback = TestCallback()
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
trainer.run(train, callbacks=[callback])
trainer.run(train_func, callbacks=[callback])
assert trainer.latest_checkpoint["epoch"] == 1
@ -249,21 +249,21 @@ def test_fast_slow(ray_start_2_cpus):
def test_mismatch_report(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
for _ in range(2):
sgd.report(loss=1)
train.report(loss=1)
def train_mismatch():
sgd.report(loss=1)
train.report(loss=1)
new_backend_executor_cls = gen_new_backend_executor(train_mismatch)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
with pytest.raises(RuntimeError):
trainer.run(train)
trainer.run(train_func)
def test_run_iterator(ray_start_2_cpus):
@ -271,7 +271,7 @@ def test_run_iterator(ray_start_2_cpus):
def train_func():
for i in range(3):
sgd.report(index=i)
train.report(index=i)
return 1
trainer = Trainer(config, num_workers=2)
@ -296,7 +296,7 @@ def test_run_iterator_returns(ray_start_2_cpus):
def train_func():
for i in range(3):
sgd.report(index=i)
train.report(index=i)
return 1
trainer = Trainer(config, num_workers=2)
@ -331,9 +331,9 @@ def test_checkpoint(ray_start_2_cpus):
config = TestConfig()
def train_func():
assert sgd.load_checkpoint() is None
assert train.load_checkpoint() is None
for i in range(3):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
return 1
trainer = Trainer(config, num_workers=2)
@ -345,12 +345,12 @@ def test_checkpoint(ray_start_2_cpus):
assert checkpoint["epoch"] == 2
def train_func_checkpoint():
checkpoint = sgd.load_checkpoint()
checkpoint = train.load_checkpoint()
assert checkpoint is not None
assert checkpoint["epoch"] == 2
for i in range(checkpoint["epoch"], 5):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
return 1
trainer.run(train_func_checkpoint, checkpoint=checkpoint)
@ -363,46 +363,46 @@ def test_checkpoint(ray_start_2_cpus):
def test_mismatch_checkpoint(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
def train_mismatch():
sgd.save_checkpoint(epoch=0)
train.save_checkpoint(epoch=0)
new_backend_executor_cls = gen_new_backend_executor(train_mismatch)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
with pytest.raises(RuntimeError):
trainer.run(train)
trainer.run(train_func)
def test_mismatch_checkpoint_report(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
sgd.report(index=i)
train.save_checkpoint(epoch=i)
train.report(index=i)
def train_mismatch():
sgd.save_checkpoint(epoch=0)
sgd.report(index=0)
train.save_checkpoint(epoch=0)
train.report(index=0)
# skip checkpoint
sgd.report(index=1)
train.report(index=1)
new_backend_executor_cls = gen_new_backend_executor(train_mismatch)
callback = TestCallback()
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
with pytest.raises(RuntimeError):
trainer.run(train, callbacks=[callback])
trainer.run(train_func, callbacks=[callback])
# validate checkpoint
assert trainer.latest_checkpoint["epoch"] == 0
# validate callback
@ -418,7 +418,7 @@ def test_load_checkpoint(ray_start_2_cpus):
config = TestConfig()
def train_func_checkpoint():
checkpoint = sgd.load_checkpoint()
checkpoint = train.load_checkpoint()
assert checkpoint is not None
assert checkpoint["epoch"] == 3
@ -444,13 +444,13 @@ def test_load_checkpoint(ray_start_2_cpus):
def test_persisted_checkpoint(ray_start_2_cpus, logdir):
config = TestConfig()
def train():
def train_func():
for i in range(2):
sgd.save_checkpoint(epoch=i)
train.save_checkpoint(epoch=i)
trainer = Trainer(config, num_workers=2, logdir=logdir)
trainer.start()
trainer.run(train)
trainer.run(train_func)
assert trainer.latest_checkpoint_path is not None
if logdir is not None:
@ -462,7 +462,7 @@ def test_persisted_checkpoint(ray_start_2_cpus, logdir):
latest_checkpoint = trainer.latest_checkpoint
def validate():
checkpoint = sgd.load_checkpoint()
checkpoint = train.load_checkpoint()
assert checkpoint is not None
assert checkpoint == latest_checkpoint
@ -473,7 +473,7 @@ def test_world_rank(ray_start_2_cpus):
config = TestConfig()
def train_func():
return sgd.world_rank()
return train.world_rank()
trainer = Trainer(config, num_workers=2)
trainer.start()
@ -640,7 +640,7 @@ def test_user_error(ray_start_2_cpus):
def fail_train_2():
for _ in range(2):
sgd.report(loss=1)
train.report(loss=1)
raise NotImplementedError
with pytest.raises(NotImplementedError):
@ -650,7 +650,7 @@ def test_user_error(ray_start_2_cpus):
def test_worker_failure_1(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
return 1
def train_actor_failure():
@ -659,56 +659,56 @@ def test_worker_failure_1(ray_start_2_cpus):
new_backend_executor_cls = gen_new_backend_executor(train_actor_failure)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
results = trainer.run(train)
results = trainer.run(train_func)
assert results == [1, 1]
def test_worker_failure_2(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
for _ in range(2):
sgd.report(loss=1)
train.report(loss=1)
return 1
def train_actor_failure():
for _ in range(2):
sgd.report(loss=1)
train.report(loss=1)
import sys
sys.exit(0)
new_backend_executor_cls = gen_new_backend_executor(train_actor_failure)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
results = trainer.run(train)
results = trainer.run(train_func)
assert results == [1, 1]
def test_worker_failure_local_rank(ray_start_2_cpus):
test_config = TestConfig()
def train():
return sgd.local_rank()
def train_func():
return train.local_rank()
def train_actor_failure():
import sys
sys.exit(0)
return sgd.local_rank()
return train.local_rank()
new_backend_executor_cls = gen_new_backend_executor(train_actor_failure)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
trainer = Trainer(test_config, num_workers=2)
trainer.start()
results = trainer.run(train)
results = trainer.run(train_func)
assert set(results) == {0, 1}
@ -737,13 +737,13 @@ def test_worker_start_failure(ray_start_2_cpus):
def test_max_failures(ray_start_2_cpus):
test_config = TestConfig()
def train():
def train_func():
import sys
sys.exit(0)
trainer = Trainer(test_config, num_workers=2)
trainer.start()
iterator = trainer.run_iterator(train)
iterator = trainer.run_iterator(train_func)
with pytest.raises(RuntimeError):
iterator.get_final_results(force=True)
assert iterator._executor._num_failures == 3
@ -777,7 +777,7 @@ def test_worker_kill(ray_start_2_cpus, backend):
def train_func():
for i in range(2):
sgd.report(loss=1, iter=i)
train.report(loss=1, iter=i)
trainer.start()
kill_callback = KillCallback(
@ -802,33 +802,33 @@ def test_worker_kill(ray_start_2_cpus, backend):
# Run 5: iter=1, counter=4, Successful
assert kill_callback.counter == 4
def train():
def train_func():
return 1
# Make sure Trainer is usable even after failure handling.
trainer.run(train)
trainer.run(train_func)
def test_worker_kill_checkpoint(ray_start_2_cpus):
test_config = TestConfig()
def train():
checkpoint = sgd.load_checkpoint()
def train_func():
checkpoint = train.load_checkpoint()
if checkpoint:
epoch = checkpoint["epoch"]
else:
epoch = 0
print("Epoch: ", epoch)
for i in range(epoch, 2):
sgd.report(loss=1, iter=i)
sgd.save_checkpoint(epoch=i + 1)
train.report(loss=1, iter=i)
train.save_checkpoint(epoch=i + 1)
trainer = Trainer(test_config, num_workers=2)
trainer.start()
kill_callback = KillCallback(
fail_on=0, worker_group=trainer._executor.worker_group)
trainer.run(train, callbacks=[kill_callback])
trainer.run(train_func, callbacks=[kill_callback])
# Run 1: epoch=0, counter=1, Successful
# *Checkpoint is saved.*
@ -844,7 +844,7 @@ def test_worker_kill_checkpoint(ray_start_2_cpus):
kill_callback = KillCallback(
fail_on=1, worker_group=trainer._executor.worker_group)
trainer.run(train, callbacks=[kill_callback])
trainer.run(train_func, callbacks=[kill_callback])
# Run 1: epoch=0, counter=1, Successful
# *Checkpoint saved*
# *Latest checkpoint updated, epoch=1
@ -856,11 +856,11 @@ def test_worker_kill_checkpoint(ray_start_2_cpus):
assert kill_callback.counter == 3
assert trainer.latest_checkpoint["epoch"] == 2
def train():
def train_func():
return 1
# Make sure Trainer is usable even after failure handling.
trainer.run(train)
trainer.run(train_func)
def test_multiple_run(ray_start_2_cpus):
@ -893,10 +893,10 @@ def test_run_after_user_error(ray_start_2_cpus):
with pytest.raises(NotImplementedError):
trainer.run(fail_train)
def train():
def train_func():
return 1
output = trainer.run(train)
output = trainer.run(train_func)
assert output == [1, 1]
@ -922,7 +922,7 @@ def test_dataset(ray_start_4_cpus):
data_all_epochs = []
for _ in range(2):
data_this_epoch = []
dataset = sgd.get_dataset_shard()
dataset = train.get_dataset_shard()
for batch in dataset.iter_batches():
data_this_epoch.extend(batch)
data_all_epochs.append(data_this_epoch)
@ -950,13 +950,13 @@ def test_multiple_datasets(ray_start_4_cpus):
data_val_all_epochs = []
for _ in range(2):
data_this_epoch_train = []
train_dataset = sgd.get_dataset_shard("train")
train_dataset = train.get_dataset_shard("train")
for batch in train_dataset.iter_batches():
data_this_epoch_train.extend(batch)
data_train_all_epochs.append(data_this_epoch_train)
data_this_epoch_val = []
val_dataset = sgd.get_dataset_shard("val")
val_dataset = train.get_dataset_shard("val")
for batch in val_dataset.iter_batches():
data_this_epoch_val.extend(batch)
data_val_all_epochs.append(data_this_epoch_val)
@ -987,7 +987,7 @@ def test_dataset_pipeline(ray_start_4_cpus):
dataset = ray.data.range(num_data).repeat()
def get_dataset():
pipeline_iterator = sgd.get_dataset_shard().iter_datasets()
pipeline_iterator = train.get_dataset_shard().iter_datasets()
data_all_epochs = []
for _ in range(num_epochs):
dataset_this_epoch = next(pipeline_iterator)
@ -1012,7 +1012,7 @@ def test_dataset_pipeline_shuffle(ray_start_4_cpus):
dataset = ray.data.range(num_data).repeat().random_shuffle_each_window()
def get_dataset():
pipeline_iterator = sgd.get_dataset_shard().iter_datasets()
pipeline_iterator = train.get_dataset_shard().iter_datasets()
data_all_epochs = []
for _ in range(2):
dataset_this_epoch = next(pipeline_iterator)
@ -1040,7 +1040,7 @@ def test_dataset_fault_tolerance(ray_start_4_cpus):
dataset_splits = dataset.split(n=2, equal=True)
test_config = TestConfig()
def train():
def train_func():
return 1
def train_actor_failure():
@ -1049,7 +1049,7 @@ def test_dataset_fault_tolerance(ray_start_4_cpus):
new_backend_executor_cls = gen_new_backend_executor(train_actor_failure)
with patch.object(ray.util.sgd.v2.trainer, "BackendExecutor",
with patch.object(ray.train.trainer, "BackendExecutor",
new_backend_executor_cls):
with patch.object(
new_backend_executor_cls,
@ -1057,7 +1057,7 @@ def test_dataset_fault_tolerance(ray_start_4_cpus):
return_value=dataset_splits) as mock_method:
trainer = Trainer(test_config, num_workers=2)
trainer.start()
trainer.run(train, dataset=dataset)
trainer.run(train_func, dataset=dataset)
mock_method.assert_called_once()

View file

@ -2,17 +2,17 @@ import os
import pytest
import ray
import ray.util.sgd.v2 as sgd
import ray.train as train
from ray import tune, cloudpickle
from ray.tune import TuneError
from ray.util.sgd.v2 import Trainer
from ray.util.sgd.v2.backends.backend import Backend, BackendConfig
from ray.util.sgd.v2.constants import TUNE_CHECKPOINT_FILE_NAME
from ray.util.sgd.v2.examples.tensorflow_mnist_example import train_func as \
from ray.train import Trainer
from ray.train.backends.backend import Backend, BackendConfig
from ray.train.constants import TUNE_CHECKPOINT_FILE_NAME
from ray.train.examples.tensorflow_mnist_example import train_func as \
tensorflow_mnist_train_func
from ray.util.sgd.v2.examples.train_fashion_mnist_example import train_func \
from ray.train.examples.train_fashion_mnist_example import train_func \
as fashion_mnist_train_func
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.train.worker_group import WorkerGroup
@pytest.fixture
@ -107,8 +107,8 @@ def test_tune_error(ray_start_2_cpus):
def test_tune_checkpoint(ray_start_2_cpus):
def train_func():
for i in range(10):
sgd.report(test=i)
sgd.save_checkpoint(hello="world")
train.report(test=i)
train.save_checkpoint(hello="world")
trainer = Trainer(TestConfig())
TestTrainable = trainer.to_tune_trainable(train_func)
@ -125,13 +125,13 @@ def test_tune_checkpoint(ray_start_2_cpus):
def test_reuse_checkpoint(ray_start_2_cpus):
def train_func(config):
itr = 0
ckpt = sgd.load_checkpoint()
ckpt = train.load_checkpoint()
if ckpt is not None:
itr = ckpt["iter"] + 1
for i in range(itr, config["max_iter"]):
sgd.save_checkpoint(iter=i)
sgd.report(test=i, training_iteration=i)
train.save_checkpoint(iter=i)
train.report(test=i, training_iteration=i)
trainer = Trainer(TestConfig())
TestTrainable = trainer.to_tune_trainable(train_func)
@ -151,7 +151,7 @@ def test_reuse_checkpoint(ray_start_2_cpus):
def test_retry(ray_start_2_cpus):
def train_func():
ckpt = sgd.load_checkpoint()
ckpt = train.load_checkpoint()
restored = bool(ckpt) # Does a previous checkpoint exist?
itr = 0
if ckpt:
@ -160,8 +160,8 @@ def test_retry(ray_start_2_cpus):
for i in range(itr, 4):
if i == 2 and not restored:
raise Exception("try to fail me")
sgd.save_checkpoint(iter=i)
sgd.report(test=i, training_iteration=i)
train.save_checkpoint(iter=i)
train.report(test=i, training_iteration=i)
trainer = Trainer(TestConfig())
TestTrainable = trainer.to_tune_trainable(train_func)

View file

@ -1,6 +1,6 @@
from pathlib import Path
from ray.util.sgd.v2.utils import construct_path
from ray.train.utils import construct_path
def test_construct_path():

View file

@ -2,7 +2,7 @@ import pytest
import time
import ray
from ray.util.sgd.v2.worker_group import WorkerGroup
from ray.train.worker_group import WorkerGroup
@pytest.fixture

View file

@ -7,20 +7,20 @@ from typing import Union, Callable, List, TypeVar, Optional, Any, Dict, \
Type
from ray.actor import ActorHandle
from ray.util.sgd.v2.backends.backend import BackendConfig, BackendExecutor, \
InactiveWorkerGroupError, SGDBackendError, TrainingWorkerError
from ray.util.sgd.v2.backends.horovod import HorovodConfig
from ray.util.sgd.v2.backends.tensorflow import TensorflowConfig
from ray.util.sgd.v2.backends.torch import TorchConfig
from ray.util.sgd.v2.callbacks.callback import SGDCallback
from ray.util.sgd.v2.utils import RayDataset
from ray.util.sgd.v2.checkpoint import CheckpointStrategy
from ray.util.sgd.v2.constants import TUNE_INSTALLED, DEFAULT_RESULTS_DIR, \
from ray.train.backends.backend import BackendConfig, BackendExecutor, \
InactiveWorkerGroupError, TrainBackendError, TrainingWorkerError
from ray.train.backends.horovod import HorovodConfig
from ray.train.backends.tensorflow import TensorflowConfig
from ray.train.backends.torch import TorchConfig
from ray.train.callbacks.callback import TrainingCallback
from ray.train.utils import RayDataset
from ray.train.checkpoint import CheckpointStrategy
from ray.train.constants import TUNE_INSTALLED, DEFAULT_RESULTS_DIR, \
TUNE_CHECKPOINT_FILE_NAME
# Ray SGD should be usable even if Tune is not installed.
from ray.util.sgd.v2.utils import construct_path
from ray.util.sgd.v2.worker_group import WorkerGroup
# Ray Train should be usable even if Tune is not installed.
from ray.train.utils import construct_path
from ray.train.worker_group import WorkerGroup
if TUNE_INSTALLED:
from ray import tune
@ -53,7 +53,7 @@ class Trainer:
Directory structure:
- A logdir is created during instantiation. This will hold all the
results/checkpoints for the lifetime of the Trainer. By default, it will be
of the form ``~/ray_results/sgd_<datestring>``.
of the form ``~/ray_results/train_<datestring>``.
- A run_dir is created for each ``run`` call. This will
hold the checkpoints and results for a single ``trainer.run()`` or
``trainer.run_iterator()`` call. It will be of the form ``run_<run_id>``.
@ -137,9 +137,9 @@ class Trainer:
# Create directory for logs.
log_dir = Path(log_dir) if log_dir else None
if not log_dir:
# Initialize timestamp for identifying this SGD training execution.
# Initialize timestamp for identifying this Train execution.
timestr = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
log_dir = Path(f"sgd_{timestr}")
log_dir = Path(f"train_{timestr}")
log_dir = construct_path(log_dir, DEFAULT_RESULTS_DIR)
log_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Trainer logs will be logged in: {log_dir}")
@ -189,7 +189,7 @@ class Trainer:
def run(self,
train_func: Union[Callable[[], T], Callable[[Dict[str, Any]], T]],
config: Optional[Dict[str, Any]] = None,
callbacks: Optional[List[SGDCallback]] = None,
callbacks: Optional[List[TrainingCallback]] = None,
dataset: Optional[Union[RayDataset, Dict[str, RayDataset]]] = None,
checkpoint: Optional[Union[Dict, str, Path]] = None,
checkpoint_strategy: Optional[CheckpointStrategy] = None
@ -201,23 +201,23 @@ class Trainer:
This can either take in no arguments or a ``config`` dict.
config (Optional[Dict]): Configurations to pass into
``train_func``. If None then an empty Dict will be created.
callbacks (Optional[List[SGDCallback]]): A list of Callbacks which
will be executed during training. If this is not set,
callbacks (Optional[List[TrainingCallback]]): A list of Callbacks
which will be executed during training. If this is not set,
currently there are NO default Callbacks.
dataset (Optional[Union[RayDataset, Dict[str, RayDataset]]]):
Distributed Ray :ref:`Dataset <dataset-api>` or
:ref:`DatasetPipeline <dataset-pipeline-api>` to pass into the
workers, which can be accessed from the training function via
``sgd.get_dataset_shard()``. Sharding will automatically be
``train.get_dataset_shard()``. Sharding will automatically be
handled by the Trainer. Multiple Datasets can be passed in as
a ``Dict`` that maps each name key to a Dataset value,
and each Dataset can be accessed from the training function
by passing in a `dataset_name` argument to
``sgd.get_dataset_shard()``.
``train.get_dataset_shard()``.
checkpoint (Optional[Dict|str|Path]): The checkpoint data that
should be loaded onto each worker and accessed by the training
function via ``sgd.load_checkpoint()``. If this is a ``str`` or
``Path`` then the value is expected to be a path to a file
function via ``train.load_checkpoint()``. If this is a ``str``
or ``Path`` then the value is expected to be a path to a file
that contains a serialized checkpoint dict. If this is
``None`` then no checkpoint will be loaded.
checkpoint_strategy (Optional[CheckpointStrategy]): The
@ -242,7 +242,7 @@ class Trainer:
train_func = self._get_train_func(train_func, config)
try:
iterator = SGDIterator(
iterator = TrainingIterator(
backend_executor=self._executor,
train_func=train_func,
dataset=dataset,
@ -267,7 +267,7 @@ class Trainer:
dataset: Optional[Union[RayDataset, Dict[str, RayDataset]]] = None,
checkpoint: Optional[Union[Dict, str, Path]] = None,
checkpoint_strategy: Optional[CheckpointStrategy] = None
) -> "SGDIterator":
) -> "TrainingIterator":
"""Same as ``run`` except returns an iterator over the results.
This is useful if you want to have more customization of what to do
@ -281,7 +281,7 @@ class Trainer:
for _ in config["epochs"]:
metrics = train()
metrics = validate(...)
ray.sgd.report(**metrics)
ray.train.report(**metrics)
return model
iterator = trainer.run_iterator(train_func, config=config)
@ -300,7 +300,7 @@ class Trainer:
``train_func``. If None then an empty Dict will be created.
checkpoint (Optional[Dict|Path|str]): The checkpoint data that
should be loaded onto each worker and accessed by the
training function via ``sgd.load_checkpoint()``. If this is a
training function via ``train.load_checkpoint()``. If this is a
``str`` or ``Path`` then the value is expected to be a path
to a file that contains a serialized checkpoint dict. If this
is ``None`` then no checkpoint will be loaded.
@ -308,7 +308,7 @@ class Trainer:
configurations for saving checkpoints.
Returns:
An Iterator over the intermediate results from ``sgd.report()``.
An Iterator over the intermediate results from ``train.report()``.
"""
# Create new log directory for this run.
self._run_id += 1
@ -316,7 +316,7 @@ class Trainer:
train_func = self._get_train_func(train_func, config)
return SGDIterator(
return TrainingIterator(
backend_executor=self._executor,
train_func=train_func,
run_dir=self.latest_run_dir,
@ -369,7 +369,7 @@ class Trainer:
"""Path to the checkpoint directory.
Returns ``None`` if ``run()`` has not been called or if
``sgd.checkpoint()`` has not been called from ``train_func``within
``train.checkpoint()`` has not been called from ``train_func``within
the most recent call to ``run``.
"""
return self._executor.latest_checkpoint_dir
@ -379,7 +379,7 @@ class Trainer:
"""Path to the latest persisted checkpoint from the latest run.
Returns ``None`` if ``run()`` has not been called or if
``sgd.checkpoint()`` has not been called from ``train_func`` within
``train.checkpoint()`` has not been called from ``train_func`` within
the most recent call to ``run``.
"""
return self._executor.latest_checkpoint_path
@ -391,7 +391,7 @@ class Trainer:
This checkpoint may not be saved to disk.
Returns ``None`` if ``run()`` has not been called or if
``sgd.checkpoint()`` has not been called from ``train_func``.
``train.checkpoint()`` has not been called from ``train_func``.
"""
return self._executor.latest_checkpoint
@ -413,12 +413,12 @@ class Trainer:
Distributed Ray p:ref:`Dataset <dataset-api>` or
:ref:`DatasetPipeline <dataset-pipeline-api>` to pass into the
workers, which can be accessed from the training function via
``sgd.get_dataset_shard()``. Sharding will automatically be
``train.get_dataset_shard()``. Sharding will automatically be
handled by the Trainer. Multiple Datasets can be passed in as
a ``Dict`` that maps each name key to a Dataset value,
and each Dataset can be accessed from the training function
by passing in a `dataset_name` argument to
``sgd.get_dataset_shard()``.
``train.get_dataset_shard()``.
Returns:
A Trainable that can directly be passed into ``tune.run()``.
@ -437,11 +437,11 @@ class Trainer:
self._resources_per_worker)
def to_worker_group(self, train_cls: Type, *args,
**kwargs) -> "SGDWorkerGroup":
**kwargs) -> "TrainWorkerGroup":
"""Returns Ray actors with the provided class and the backend started.
This is useful if you want to provide your own class for training
and have more control over execution, but still want to use Ray SGD
and have more control over execution, but still want to use Ray Train
to setup the appropriate backend configurations (torch, tf, etc.).
.. code-block:: python
@ -474,10 +474,10 @@ class Trainer:
"Trainer or don't start it in the first place.")
self._executor.start(
train_cls=train_cls, train_cls_args=args, train_cls_kwargs=kwargs)
return SGDWorkerGroup(self._executor.worker_group)
return TrainWorkerGroup(self._executor.worker_group)
class SGDWorkerGroup:
class TrainWorkerGroup:
"""A container for a group of Ray actors.
You should not instantiate this directly and only use this as the output
@ -522,8 +522,8 @@ class SGDWorkerGroup:
self._worker_group.shutdown(patience_s=patience_s)
class SGDIterator:
"""An iterator over SGD results. Returned by ``trainer.run_iterator``."""
class TrainingIterator:
"""An iterator over Train results. Returned by ``trainer.run_iterator``."""
def __init__(
self, backend_executor: BackendExecutor,
@ -587,7 +587,7 @@ class SGDIterator:
"already or never started in the first place. "
"Either create a new Trainer or start this one.") \
from None
except SGDBackendError:
except TrainBackendError:
raise RuntimeError("Training failed. You should not be seeing "
"this error and this is a bug. Please create "
"a new issue at "
@ -643,7 +643,7 @@ class SGDIterator:
def _create_tune_trainable(train_func, dataset, backend, num_workers, use_gpu,
resources_per_worker):
"""Creates a Tune Trainable class for SGD training.
"""Creates a Tune Trainable class for Train training.
This function populates class attributes and methods.
"""
@ -676,7 +676,7 @@ def _create_tune_trainable(train_func, dataset, backend, num_workers, use_gpu,
trainable_cls = wrap_function(tune_function)
class SgdTrainable(trainable_cls):
class TrainTrainable(trainable_cls):
"""Add default resources to the Trainable."""
@classmethod
@ -693,4 +693,4 @@ def _create_tune_trainable(train_func, dataset, backend, num_workers, use_gpu,
bundles = head_bundle + worker_bundles
return PlacementGroupFactory(bundles, strategy="PACK")
return SgdTrainable
return TrainTrainable

View file

@ -1,15 +1,4 @@
from ray.util.sgd.v2.backends import (BackendConfig, HorovodConfig,
TensorflowConfig, TorchConfig)
from ray.util.sgd.v2.callbacks import SGDCallback
from ray.util.sgd.v2.checkpoint import CheckpointStrategy
from ray.util.sgd.v2.session import (get_dataset_shard, local_rank,
load_checkpoint, report, save_checkpoint,
world_rank)
from ray.util.sgd.v2.trainer import Trainer, SGDIterator
from ray.train import * # noqa: F401, F403
import warnings
__all__ = [
"BackendConfig", "CheckpointStrategy", "get_dataset_shard",
"HorovodConfig", "load_checkpoint", "local_rank", "report",
"save_checkpoint", "SGDIterator", "TensorflowConfig", "SGDCallback",
"TorchConfig", "Trainer", "world_rank"
]
warnings.warn("ray.util.sgd.v2 has been moved to ray.train.")

View file

@ -1,11 +0,0 @@
from ray.util.sgd.v2.backends.backend import BackendConfig
from ray.util.sgd.v2.backends.horovod import HorovodConfig
from ray.util.sgd.v2.backends.tensorflow import TensorflowConfig
from ray.util.sgd.v2.backends.torch import TorchConfig
__all__ = [
"BackendConfig",
"HorovodConfig",
"TensorflowConfig",
"TorchConfig",
]

View file

@ -1,5 +0,0 @@
from ray.util.sgd.v2.callbacks.callback import SGDCallback
from ray.util.sgd.v2.callbacks.logging import (JsonLoggerCallback,
TBXLoggerCallback)
__all__ = ["SGDCallback", "JsonLoggerCallback", "TBXLoggerCallback"]

View file

@ -41,7 +41,7 @@ xgboost==1.3.3
zoopt==0.4.1
# Dependencies for Hugging Face examples:
# `python/ray/util/sgd/v2/examples/transformers/transformers_example.py`
# `python/ray/train/examples/transformers/transformers_example.py`
accelerate==0.3.0
datasets==1.11.0
sentencepiece==0.1.96