mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
Added example to user guide for cloud checkpointing (#20045)
Co-authored-by: will <will@anyscale.com> Co-authored-by: Antoni Baum <antoni.baum@protonmail.com> Co-authored-by: Kai Fricke <kai@anyscale.com>
This commit is contained in:
parent
6ff4061f3a
commit
fa878e2d4d
16 changed files with 401 additions and 182 deletions
|
@ -6,6 +6,7 @@
|
|||
- Get rid of the badges in the top
|
||||
- Get rid of the references section at the bottom
|
||||
- Be sure not to delete the API reference section in the bottom of this file.
|
||||
- add `.. _lightgbm-ray-tuning:` before the "Hyperparameter Tuning" section
|
||||
- Adjust some link targets (e.g. for "Ray Tune") to anonymous references
|
||||
by adding a second underscore (use `target <link>`__)
|
||||
- Search for `\ **` and delete this from the links (bold links are not supported)
|
||||
|
@ -217,6 +218,8 @@ Example loading multiple parquet files:
|
|||
columns=columns,
|
||||
filetype=RayFileType.PARQUET)
|
||||
|
||||
.. _lightgbm-ray-tuning:
|
||||
|
||||
Hyperparameter Tuning
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
- remove the table of contents
|
||||
- remove the PyTorch Lightning Compatibility section
|
||||
- Be sure not to delete the API reference section in the bottom of this file.
|
||||
- add `.. _ray-lightning-tuning:` before the "Hyperparameter Tuning with Ray Tune" section
|
||||
- Adjust some link targets (e.g. for "Ray Tune") to anonymous references
|
||||
by adding a second underscore (use `target <link>`__)
|
||||
- Search for `\ **` and delete this from the links (bold links are not supported)
|
||||
|
@ -131,6 +132,8 @@ With sharded training, leverage the scalability of data parallel training while
|
|||
|
||||
See the `Pytorch Lightning docs <https://pytorch-lightning.readthedocs.io/en/stable/advanced/multi_gpu.html#sharded-training>`__ for more information on sharded training.
|
||||
|
||||
.. _ray-lightning-tuning:
|
||||
|
||||
Hyperparameter Tuning with Ray Tune
|
||||
-----------------------------------
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ In your training program, insert the following, and **customize** for each worke
|
|||
|
||||
And on each machine, launch a separate process that contains the index of the worker and information about all other nodes of the cluster.
|
||||
|
||||
.. _ray-train-tftrainer-example:
|
||||
|
||||
TFTrainer Example
|
||||
-----------------
|
||||
|
|
|
@ -127,55 +127,13 @@ If you used a cluster configuration (starting a cluster with ``ray up`` or ``ray
|
|||
1. In the examples, the Ray redis address commonly used is ``localhost:6379``.
|
||||
2. If the Ray cluster is already started, you should not need to run anything on the worker nodes.
|
||||
|
||||
|
||||
Syncing
|
||||
-------
|
||||
|
||||
Tune stores checkpoints on the node where the trials are executed. If you are training on more than one node,
|
||||
this means that some trial checkpoints may be on the head node and others are not.
|
||||
|
||||
When trials are restored (e.g. after a failure or when the experiment was paused), they may be scheduled on
|
||||
different nodes, but still would need access to the latest checkpoint. To make sure this works, Ray Tune
|
||||
comes with facilities to synchronize trial checkpoints between nodes.
|
||||
|
||||
Generally we consider three cases:
|
||||
|
||||
1. When using a shared directory (e.g. via NFS)
|
||||
2. When using cloud storage (e.g. S3 or GS)
|
||||
3. When using neither
|
||||
|
||||
The default option here is 3, which will be automatically used if nothing else is configured.
|
||||
|
||||
Using a shared directory
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
If all Ray nodes have access to a shared filesystem, e.g. via NFS, they can all write to this directory.
|
||||
In this case, we don't need any synchronization at all, as it is implicitly done by the operating system.
|
||||
|
||||
For this case, we only need to tell Ray Tune not to do any syncing at all (as syncing is the default):
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
local_dir="/path/to/shared/storage/",
|
||||
sync_config=tune.SyncConfig(
|
||||
syncer=None # Disable syncing
|
||||
)
|
||||
)
|
||||
|
||||
Note that the driver (on the head node) will have access to all checkpoints locally (in the
|
||||
shared directory) for further processing.
|
||||
|
||||
|
||||
Using cloud storage
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
If all nodes have access to cloud storage, e.g. S3 or GS, we end up with a similar situation as in the first case,
|
||||
only that the consolidated directory including all logs and checkpoints lives on cloud storage.
|
||||
|
||||
For this case, we tell Ray Tune to use an ``upload_dir`` to store checkpoints at.
|
||||
This will automatically store both the experiment state and the trial checkpoints at that directory:
|
||||
In a distributed experiment, you should try to use :ref:`cloud checkpointing <tune-cloud-checkpointing>` to
|
||||
reduce synchronization overhead. For this, you just have to specify an ``upload_dir`` in the
|
||||
:class:`tune.SyncConfig <ray.tune.SyncConfig>`:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
@ -189,89 +147,10 @@ This will automatically store both the experiment state and the trial checkpoint
|
|||
)
|
||||
)
|
||||
|
||||
We don't have to provide a ``syncer`` here as it will be automatically detected. However, you can provide
|
||||
a string if you want to use a custom command:
|
||||
|
||||
.. code-block:: python
|
||||
For more details or customization, see our
|
||||
:ref:`guide on checkpointing <tune-checkpoint-syncing>`.
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
upload_dir="s3://bucket-name/sub-path/",
|
||||
syncer="aws s3 sync {source} {target}", # Custom sync command
|
||||
)
|
||||
)
|
||||
|
||||
If a string is provided, then it must include replacement fields ``{source}`` and ``{target}``,
|
||||
as demonstrated in the example above.
|
||||
|
||||
The consolidated data will live be available in the cloud bucket. This means that the driver
|
||||
(on the head node) will not have access to all checkpoints locally. If you want to process
|
||||
e.g. the best checkpoint further, you will first have to fetch it from the cloud storage.
|
||||
|
||||
|
||||
Default syncing (no shared/cloud storage)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
If you're using neither a shared filesystem nor cloud storage, Ray Tune will resort to the
|
||||
default syncing mechanisms, which utilizes ``rsync`` (via SSH) to synchronize checkpoints across
|
||||
nodes.
|
||||
|
||||
Please note that this approach is likely the least efficient one - you should always try to use
|
||||
shared or cloud storage if possible when training on a multi node cluster.
|
||||
|
||||
For the syncing to work, the head node must be able to SSH into the worker nodes. If you are using
|
||||
the Ray cluster launcher this is usually the case (note that Kubernetes is an exception, but
|
||||
:ref:`see here for more details <tune-kubernetes>`).
|
||||
|
||||
If you don't provide a ``tune.SyncConfig`` at all, rsync-based syncing will be used.
|
||||
|
||||
If you want to customize syncing behavior, you can again specify a custom sync template:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
# Do not specify an upload dir here
|
||||
syncer="rsync -savz -e "ssh -i ssh_key.pem" {source} {target}", # Custom sync command
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Alternatively, a function can be provided with the following signature:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def custom_sync_func(source, target):
|
||||
sync_cmd = "rsync {source} {target}".format(
|
||||
source=source,
|
||||
target=target)
|
||||
sync_process = subprocess.Popen(sync_cmd, shell=True)
|
||||
sync_process.wait()
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
syncer=custom_sync_func,
|
||||
sync_period=60 # Synchronize more often
|
||||
)
|
||||
)
|
||||
|
||||
When syncing results back to the driver, the source would be a path similar to ``ubuntu@192.0.0.1:/home/ubuntu/ray_results/trial1``, and the target would be a local path.
|
||||
|
||||
Note that we adjusted the sync period in the example above. Setting this to a lower number will pull
|
||||
checkpoints from remote nodes more often. This will lead to more robust trial recovery,
|
||||
but it will also lead to more synchronization overhead (as SHH is usually slow).
|
||||
|
||||
As in the first case, the driver (on the head node) will have access to all checkpoints locally
|
||||
for further processing.
|
||||
|
||||
|
||||
.. _tune-distributed-spot:
|
||||
|
|
|
@ -148,7 +148,7 @@ to decide which hyperparameter configuration lead to the best results. These met
|
|||
can also be used to stop bad performing trials early in order to avoid wasting
|
||||
resources on those trials.
|
||||
|
||||
The :ref:`checkpoint saving <tune-checkpoint>` is optional. However, it is necessary if we wanted to use advanced
|
||||
The :ref:`checkpoint saving <tune-checkpoint-syncing>` is optional. However, it is necessary if we wanted to use advanced
|
||||
schedulers like `Population Based Training <https://docs.ray.io/en/master/tune/tutorials/tune-advanced-tutorial.html>`_.
|
||||
In this cases, the created checkpoint directory will be passed as the ``checkpoint_dir`` parameter
|
||||
to the training function.
|
||||
|
|
|
@ -20,7 +20,7 @@ Tune includes distributed implementations of early stopping algorithms such as `
|
|||
|
||||
.. tip:: The easiest scheduler to start with is the ``ASHAScheduler`` which will aggressively terminate low-performing trials.
|
||||
|
||||
When using schedulers, you may face compatibility issues, as shown in the below compatibility matrix. Certain schedulers cannot be used with Search Algorithms, and certain schedulers are require :ref:`checkpointing to be implemented <tune-checkpoint>`.
|
||||
When using schedulers, you may face compatibility issues, as shown in the below compatibility matrix. Certain schedulers cannot be used with Search Algorithms, and certain schedulers are require :ref:`checkpointing to be implemented <tune-checkpoint-syncing>`.
|
||||
|
||||
Schedulers can dynamically change trial resource requirements during tuning. This is currently implemented in ``ResourceChangingScheduler``, which can wrap around any other scheduler.
|
||||
|
||||
|
@ -145,7 +145,7 @@ Tune includes a distributed implementation of `Population Based Training (PBT) <
|
|||
})
|
||||
tune.run( ... , scheduler=pbt_scheduler)
|
||||
|
||||
When the PBT scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore <tune-checkpoint>`). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation.
|
||||
When the PBT scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore <tune-checkpoint-syncing>`). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation.
|
||||
|
||||
You can run this :doc:`toy PBT example </tune/examples/pbt_function>` to get an idea of how how PBT operates. When training in PBT mode, a single trial may see many different hyperparameters over its lifetime, which is recorded in its ``result.json`` file. The following figure generated by the example shows PBT with optimizing a LR schedule over the course of a single experiment:
|
||||
|
||||
|
@ -212,7 +212,7 @@ PB2 can be enabled by setting the ``scheduler`` parameter of ``tune.run``, e.g.:
|
|||
tune.run( ... , scheduler=pb2_scheduler)
|
||||
|
||||
|
||||
When the PB2 scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore <tune-checkpoint>`). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation.
|
||||
When the PB2 scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore <tune-checkpoint-syncing>`). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation.
|
||||
|
||||
The primary motivation for PB2 is the ability to find promising hyperparamters with only a small population size. With that in mind, you can run this :doc:`PB2 PPO example </tune/examples/pb2_ppo_example>` to compare PB2 vs. PBT, with a population size of ``4`` (as in the paper). The example uses the ``BipedalWalker`` environment so does not require any additional licenses.
|
||||
|
||||
|
@ -239,7 +239,7 @@ This class is a utility scheduler, allowing for trial resource requirements to b
|
|||
|
||||
* If you are using the Trainable (class) API for tuning, your Trainable must implement ``Trainable.update_resources``, which will let your model know about the new resources assigned. You can also obtain the current trial resources by calling ``Trainable.trial_resources``.
|
||||
|
||||
* If you are using the functional API for tuning, the current trial resources can be obtained by calling `tune.get_trial_resources()` inside the training function. The function should be able to :ref:`load and save checkpoints <tune-checkpoint>` (the latter preferably every iteration).
|
||||
* If you are using the functional API for tuning, the current trial resources can be obtained by calling `tune.get_trial_resources()` inside the training function. The function should be able to :ref:`load and save checkpoints <tune-checkpoint-syncing>` (the latter preferably every iteration).
|
||||
|
||||
An example of this in use can be found here: :doc:`/tune/examples/xgboost_dynamic_resources_example`.
|
||||
|
||||
|
|
|
@ -256,17 +256,6 @@ Use ``validate_save_restore`` to catch ``save_checkpoint``/``load_checkpoint`` e
|
|||
validate_save_restore(MyTrainableClass, use_object_store=True)
|
||||
|
||||
|
||||
.. _tune-cloud-checkpointing:
|
||||
|
||||
Storing checkpoints on cloud storage
|
||||
------------------------------------
|
||||
|
||||
Ray Tune trainables can sync trial logs and checkpoints to cloud storage (via the `upload_dir`). This is especially
|
||||
useful when training a large number of distributed trials, as logs and checkpoints are otherwise synchronized
|
||||
via SSH, which quickly can become a performance bottleneck.
|
||||
|
||||
To make use of cloud checkpointing, just specify an ``upload_dir`` in the
|
||||
:ref:`tune.SyncConfig <tune-sync-config>`.
|
||||
|
||||
Advanced: Reusing Actors
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
6
doc/source/tune/examples/custom_func_checkpointing.rst
Normal file
6
doc/source/tune/examples/custom_func_checkpointing.rst
Normal file
|
@ -0,0 +1,6 @@
|
|||
:orphan:
|
||||
|
||||
custom_func_checkpointing
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. literalinclude:: /../../python/ray/tune/examples/custom_func_checkpointing.py
|
|
@ -13,7 +13,7 @@ Tune is a Python library for experiment execution and hyperparameter tuning at a
|
|||
|
||||
* Launch a multi-node :ref:`distributed hyperparameter sweep <tune-distributed>` in less than 10 lines of code.
|
||||
* Supports any machine learning framework, :ref:`including PyTorch, XGBoost, MXNet, and Keras <tune-guides>`.
|
||||
* Automatically manages :ref:`checkpoints <tune-checkpoint>` and logging to :ref:`TensorBoard <tune-logging>`.
|
||||
* Automatically manages :ref:`checkpoints <tune-checkpoint-syncing>` and logging to :ref:`TensorBoard <tune-logging>`.
|
||||
* Choose among state of the art algorithms such as :ref:`Population Based Training (PBT) <tune-scheduler-pbt>`, :ref:`BayesOptSearch <bayesopt>`, :ref:`HyperBand/ASHA <tune-scheduler-hyperband>`.
|
||||
* Move your models from training to serving on the same infrastructure with `Ray Serve`_.
|
||||
|
||||
|
@ -70,7 +70,7 @@ A key problem with machine learning frameworks is the need to restructure all of
|
|||
|
||||
With Tune, you can optimize your model just by :ref:`adding a few code snippets <tune-tutorial>`.
|
||||
|
||||
Further, Tune actually removes boilerplate from your code training workflow, automatically :ref:`managing checkpoints <tune-checkpoint>` and :ref:`logging results to tools <tune-logging>` such as MLflow and TensorBoard.
|
||||
Further, Tune actually removes boilerplate from your code training workflow, automatically :ref:`managing checkpoints <tune-checkpoint-syncing>` and :ref:`logging results to tools <tune-logging>` such as MLflow and TensorBoard.
|
||||
|
||||
|
||||
Multi-GPU & distributed training out of the box
|
||||
|
|
|
@ -66,7 +66,7 @@ See the documentation: :ref:`trainable-docs` and :ref:`examples <tune-general-ex
|
|||
tune.run and Trials
|
||||
-------------------
|
||||
|
||||
Use :ref:`tune.run <tune-run-ref>` to execute hyperparameter tuning. This function manages your experiment and provides many features such as :ref:`logging <tune-logging>`, :ref:`checkpointing <tune-checkpoint>`, and :ref:`early stopping <tune-stopping>`.
|
||||
Use :ref:`tune.run <tune-run-ref>` to execute hyperparameter tuning. This function manages your experiment and provides many features such as :ref:`logging <tune-logging>`, :ref:`checkpointing <tune-checkpoint-syncing>`, and :ref:`early stopping <tune-stopping>`.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
|
|
|
@ -284,60 +284,310 @@ globally could have side effects. For instance, it could influence the
|
|||
way your dataset is split. Thus, we leave it up to the user to make
|
||||
these global configuration changes.
|
||||
|
||||
.. _tune-checkpoint:
|
||||
.. _tune-checkpoint-syncing:
|
||||
|
||||
Checkpointing
|
||||
-------------
|
||||
Checkpointing and synchronization
|
||||
---------------------------------
|
||||
|
||||
When running a hyperparameter search, Tune can automatically and periodically save/checkpoint your model. This allows you to:
|
||||
|
||||
* save intermediate models throughout training
|
||||
* use pre-emptible machines (by automatically restoring from last checkpoint)
|
||||
* Pausing trials when using Trial Schedulers such as HyperBand and PBT.
|
||||
* save intermediate models throughout training
|
||||
* use pre-emptible machines (by automatically restoring from last checkpoint)
|
||||
* Pausing trials when using Trial Schedulers such as HyperBand and PBT.
|
||||
|
||||
To use Tune's checkpointing features, you must expose a ``checkpoint_dir`` argument in the function signature, and call ``tune.checkpoint_dir``:
|
||||
Tune stores checkpoints on the node where the trials are executed. If you are training on more than one node,
|
||||
this means that some trial checkpoints may be on the head node and others are not.
|
||||
|
||||
When trials are restored (e.g. after a failure or when the experiment was paused), they may be scheduled on
|
||||
different nodes, but still would need access to the latest checkpoint. To make sure this works, Ray Tune
|
||||
comes with facilities to synchronize trial checkpoints between nodes.
|
||||
|
||||
Generally we consider three cases:
|
||||
|
||||
1. When using a shared directory (e.g. via NFS)
|
||||
2. When using cloud storage (e.g. S3 or GS)
|
||||
3. When using neither
|
||||
|
||||
The default option here is 3, which will be automatically used if nothing else is configured.
|
||||
|
||||
Using a shared directory
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
If all Ray nodes have access to a shared filesystem, e.g. via NFS, they can all write to this directory.
|
||||
In this case, we don't need any synchronization at all, as it is implicitly done by the operating system.
|
||||
|
||||
For this case, we only need to tell Ray Tune not to do any syncing at all (as syncing is the default):
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import os
|
||||
import time
|
||||
from ray import tune
|
||||
|
||||
def train_func(config, checkpoint_dir=None):
|
||||
start = 0
|
||||
if checkpoint_dir:
|
||||
with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
|
||||
state = json.loads(f.read())
|
||||
start = state["step"] + 1
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
local_dir="/path/to/shared/storage/",
|
||||
sync_config=tune.SyncConfig(
|
||||
syncer=None # Disable syncing
|
||||
)
|
||||
)
|
||||
|
||||
for step in range(start, 100):
|
||||
time.sleep(1)
|
||||
Note that the driver (on the head node) will have access to all checkpoints locally (in the
|
||||
shared directory) for further processing.
|
||||
|
||||
# Obtain a checkpoint directory
|
||||
with tune.checkpoint_dir(step=step) as checkpoint_dir:
|
||||
path = os.path.join(checkpoint_dir, "checkpoint")
|
||||
with open(path, "w") as f:
|
||||
f.write(json.dumps({"step": step}))
|
||||
|
||||
tune.report(hello="world", ray="tune")
|
||||
.. _tune-cloud-checkpointing:
|
||||
|
||||
tune.run(train_func)
|
||||
Using cloud storage
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
If all nodes have access to cloud storage, e.g. S3 or GS, the remote trials can automatically synchronize their
|
||||
checkpoints. For the filesyste, we end up with a similar situation as in the first case,
|
||||
only that the consolidated directory including all logs and checkpoints lives on cloud storage.
|
||||
|
||||
In this example, checkpoints will be saved by training iteration to ``local_dir/exp_name/trial_name/checkpoint_<step>``.
|
||||
This approach is especially useful when training a large number of distributed trials,
|
||||
as logs and checkpoints are otherwise synchronized via SSH, which quickly can become a performance bottleneck.
|
||||
|
||||
You can restore a single trial checkpoint by using ``tune.run(restore=<checkpoint_dir>)`` By doing this, you can change whatever experiments' configuration such as the experiment's name:
|
||||
For this case, we tell Ray Tune to use an ``upload_dir`` to store checkpoints at.
|
||||
This will automatically store both the experiment state and the trial checkpoints at that directory:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
upload_dir="s3://bucket-name/sub-path/"
|
||||
)
|
||||
)
|
||||
|
||||
We don't have to provide a ``syncer`` here as it will be automatically detected. However, you can provide
|
||||
a string if you want to use a custom command:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
upload_dir="s3://bucket-name/sub-path/",
|
||||
syncer="aws s3 sync {source} {target}", # Custom sync command
|
||||
)
|
||||
)
|
||||
|
||||
If a string is provided, then it must include replacement fields ``{source}`` and ``{target}``,
|
||||
as demonstrated in the example above.
|
||||
|
||||
The consolidated data will live be available in the cloud bucket. This means that the driver
|
||||
(on the head node) will not have access to all checkpoints locally. If you want to process
|
||||
e.g. the best checkpoint further, you will first have to fetch it from the cloud storage.
|
||||
|
||||
|
||||
Default syncing (no shared/cloud storage)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
If you're using neither a shared filesystem nor cloud storage, Ray Tune will resort to the
|
||||
default syncing mechanisms, which utilizes ``rsync`` (via SSH) to synchronize checkpoints across
|
||||
nodes.
|
||||
|
||||
Please note that this approach is likely the least efficient one - you should always try to use
|
||||
shared or cloud storage if possible when training on a multi node cluster.
|
||||
|
||||
For the syncing to work, the head node must be able to SSH into the worker nodes. If you are using
|
||||
the Ray cluster launcher this is usually the case (note that Kubernetes is an exception, but
|
||||
:ref:`see here for more details <tune-kubernetes>`).
|
||||
|
||||
If you don't provide a ``tune.SyncConfig`` at all, rsync-based syncing will be used.
|
||||
|
||||
If you want to customize syncing behavior, you can again specify a custom sync template:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray import tune
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
# Do not specify an upload dir here
|
||||
syncer="rsync -savz -e "ssh -i ssh_key.pem" {source} {target}", # Custom sync command
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Alternatively, a function can be provided with the following signature:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def custom_sync_func(source, target):
|
||||
sync_cmd = "rsync {source} {target}".format(
|
||||
source=source,
|
||||
target=target)
|
||||
sync_process = subprocess.Popen(sync_cmd, shell=True)
|
||||
sync_process.wait()
|
||||
|
||||
tune.run(
|
||||
trainable,
|
||||
name="experiment_name",
|
||||
sync_config=tune.SyncConfig(
|
||||
syncer=custom_sync_func,
|
||||
sync_period=60 # Synchronize more often
|
||||
)
|
||||
)
|
||||
|
||||
When syncing results back to the driver, the source would be a path similar to ``ubuntu@192.0.0.1:/home/ubuntu/ray_results/trial1``, and the target would be a local path.
|
||||
|
||||
Note that we adjusted the sync period in the example above. Setting this to a lower number will pull
|
||||
checkpoints from remote nodes more often. This will lead to more robust trial recovery,
|
||||
but it will also lead to more synchronization overhead (as SHH is usually slow).
|
||||
|
||||
As in the first case, the driver (on the head node) will have access to all checkpoints locally
|
||||
for further processing.
|
||||
|
||||
Checkpointing examples
|
||||
----------------------
|
||||
|
||||
Let's cover how to configure your checkpoints storage location, checkpointing frequency, and how to resume from a previous run.
|
||||
|
||||
A simple (cloud) checkpointing example
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Cloud storage-backed Tune checkpointing is the recommended best practice for both performance and reliability reasons.
|
||||
It also enables checkpointing if using Ray on Kubernetes, which does not work out of the box with rsync-based sync,
|
||||
which relies on SSH. If you'd rather checkpoint locally or use rsync based checkpointing, see :ref:`here <rsync-checkpointing>`.
|
||||
|
||||
Prerequisites to use cloud checkpointing in Ray Tune for the example below:
|
||||
|
||||
Your ``my_trainable`` is either a:
|
||||
|
||||
1. **Model with an existing Ray integration**
|
||||
|
||||
* XGBoost (:ref:`example <xgboost-ray-tuning>`)
|
||||
* Pytorch (:ref:`example <tune-pytorch-lightning>`)
|
||||
* Pytorch Lightning (:ref:`example <ray-lightning-tuning>`)
|
||||
* Keras (:doc:`example </tune/examples/tune_mnist_keras>`)
|
||||
* Tensorflow (:ref:`example <ray-train-tftrainer-example>`)
|
||||
* LightGBM (:ref:`example <lightgbm-ray-tuning>`)
|
||||
|
||||
2. **Custom training function**
|
||||
|
||||
* All this means is that your function has to expose a ``checkpoint_dir`` argument in the function signature, and call ``tune.checkpoint_dir``. See :doc:`this example </tune/examples/custom_func_checkpointing>`, it's quite simple to do.
|
||||
|
||||
Let's assume for this example you're running this script from your laptop, and connecting to your remote Ray cluster via ``ray.init()``, making your script on your laptop the "driver".
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from your_module import my_trainable
|
||||
|
||||
ray.init(address="<cluster-IP>:<port>") # set `address=None` to train on laptop
|
||||
|
||||
# configure how checkpoints are sync'd to the scheduler/sampler
|
||||
# we recommend cloud storage checkpointing as it survives the cluster when
|
||||
# instances are terminated, and has better performance
|
||||
sync_config = tune.syncConfig(
|
||||
upload_dir="s3://my-checkpoints-bucket/path/", # requires AWS credentials
|
||||
)
|
||||
|
||||
# this starts the run!
|
||||
tune.run(
|
||||
my_trainable,
|
||||
|
||||
# name of your experiment
|
||||
name="my-tune-exp",
|
||||
|
||||
# a directory where results are stored before being
|
||||
# sync'd to head node/cloud storage
|
||||
local_dir="/tmp/mypath",
|
||||
|
||||
# see above! we will sync our checkpoints to S3 directory
|
||||
sync_config=sync_config,
|
||||
|
||||
# we'll keep the best five checkpoints at all times
|
||||
# checkpoints (by AUC score, reported by the trainable, descending)
|
||||
checkpoint_score_attr="max-auc",
|
||||
keep_checkpoints_num=5,
|
||||
|
||||
# a very useful trick! this will resume from the last run specified by
|
||||
# sync_config (if one exists), otherwise it will start a new tuning run
|
||||
resume="AUTO",
|
||||
)
|
||||
|
||||
In this example, checkpoints will be saved:
|
||||
|
||||
* **Locally**: not saved! Nothing will be sync'd to the driver (your laptop) automatically (because cloud syncing is enabled)
|
||||
* **S3**: ``s3://my-checkpoints-bucket/path/my-tune-exp/<trial_name>/checkpoint_<step>``
|
||||
* **On head node**: ``~/ray-results/my-tune-exp/<trial_name>/checkpoint_<step>`` (but only for trials done on that node)
|
||||
* **On workers nodes**: ``~/ray-results/my-tune-exp/<trial_name>/checkpoint_<step>`` (but only for trials done on that node)
|
||||
|
||||
If your run stopped for any reason (finished, errored, user CTRL+C), you can restart it any time by running the script above again -- note with ``resume="AUTO"``, it will detect the previous run so long as the ``sync_config`` points to the same location.
|
||||
|
||||
If, however, you prefer not to use ``resume="AUTO"`` (or are on an older version of Ray) you can resume manaully:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Restored previous trial from the given checkpoint
|
||||
tune.run(
|
||||
"PG",
|
||||
name="RestoredExp", # The name can be different.
|
||||
stop={"training_iteration": 10}, # train 5 more iterations than previous
|
||||
restore="~/ray_results/Original/PG_<xxx>/checkpoint_5/checkpoint-5",
|
||||
config={"env": "CartPole-v0"},
|
||||
# our same trainable as before
|
||||
my_trainable,
|
||||
|
||||
# The name can be different from your original name
|
||||
name="my-tune-exp-restart",
|
||||
|
||||
# our same config as above!
|
||||
restore=sync_config,
|
||||
)
|
||||
|
||||
.. _rsync-checkpointing:
|
||||
|
||||
A simple local/rsync checkpointing example
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Local or rsync checkpointing can be a good option if:
|
||||
|
||||
1. You want to tune on a single laptop Ray cluster
|
||||
2. You aren't using Ray on Kubernetes (rsync doesn't work with Ray on Kubernetes)
|
||||
3. You don't want to use S3
|
||||
|
||||
Let's take a look at an example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from your_module import my_trainable
|
||||
|
||||
ray.init(address="<cluster-IP>:<port>") # set `address=None` to train on laptop
|
||||
|
||||
# configure how checkpoints are sync'd to the scheduler/sampler
|
||||
sync_config = tune.syncConfig() # the default mode is to use use rsync
|
||||
|
||||
# this starts the run!
|
||||
tune.run(
|
||||
my_trainable,
|
||||
|
||||
# name of your experiment
|
||||
name="my-tune-exp",
|
||||
|
||||
# a directory where results are stored before being
|
||||
# sync'd to head node/cloud storage
|
||||
local_dir="/tmp/mypath",
|
||||
|
||||
# sync our checkpoints via rsync
|
||||
# you don't have to pass an empty sync config - but we
|
||||
# do it here for clarity and comparison
|
||||
sync_config=sync_config,
|
||||
|
||||
# we'll keep the best five checkpoints at all times
|
||||
# checkpoints (by AUC score, reported by the trainable, descending)
|
||||
checkpoint_score_attr="max-auc",
|
||||
keep_checkpoints_num=5,
|
||||
|
||||
# a very useful trick! this will resume from the last run specified by
|
||||
# sync_config (if one exists), otherwise it will start a new tuning run
|
||||
resume="AUTO",
|
||||
)
|
||||
|
||||
.. _tune-distributed-checkpointing:
|
||||
|
||||
|
@ -346,7 +596,7 @@ Distributed Checkpointing
|
|||
|
||||
On a multinode cluster, Tune automatically creates a copy of all trial checkpoints on the head node. This requires the Ray cluster to be started with the :ref:`cluster launcher <cluster-cloud>` and also requires rsync to be installed.
|
||||
|
||||
Note that you must use the ``tune.checkpoint_dir`` API to trigger syncing.
|
||||
Note that you must use the ``tune.checkpoint_dir`` API to trigger syncing (or use a model type with a built-in Ray Tune integration as described here). See :doc:`/tune/examples/custom_func_checkpointing` for an example.
|
||||
|
||||
If you are running Ray Tune on Kubernetes, you should usually use a
|
||||
:ref:`cloud checkpointing <tune-sync-config>` or a shared filesystem for checkpoint sharing.
|
||||
|
@ -466,7 +716,6 @@ For more flexibility, you can pass in a function instead. If a function is passe
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
|
||||
def stopper(trial_id, result):
|
||||
return result["mean_accuracy"] / result["training_iteration"] > 5
|
||||
|
||||
|
@ -650,9 +899,9 @@ If a string is provided, then it must include replacement fields ``{source}`` an
|
|||
sync_process = subprocess.Popen(sync_cmd, shell=True)
|
||||
sync_process.wait()
|
||||
|
||||
By default, syncing occurs every 300 seconds. To change the frequency of syncing, set the ``TUNE_CLOUD_SYNC_S`` environment variable in the driver to the desired syncing period.
|
||||
By default, syncing occurs every 300 seconds. To change the frequency of syncing, set the ``sync_period`` attribute of the sync config to the desired syncing period.
|
||||
|
||||
Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the ``TUNE_GLOBAL_CHECKPOINT_S`` environment variable. So the true upload period is given by ``max(TUNE_CLOUD_SYNC_S, TUNE_GLOBAL_CHECKPOINT_S)``.
|
||||
Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the sync period. So the true upload period is given by ``max(sync period, TUNE_GLOBAL_CHECKPOINT_S)``.
|
||||
|
||||
Make sure that worker nodes have the write access to the cloud storage. Failing to do so would cause error messages like ``Error message (1): fatal error: Unable to locate credentials``.
|
||||
For AWS set up, this involves adding an IamInstanceProfile configuration for worker nodes. Please :ref:`see here for more tips <aws-cluster-s3>`.
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
- Get rid of the badges in the top
|
||||
- Get rid of the references section at the bottom
|
||||
- Be sure not to delete the API reference section in the bottom of this file.
|
||||
- add `.. _xgboost-ray-tuning:` before the "Hyperparameter Tuning" section
|
||||
- Adjust some link targets (e.g. for "Ray Tune") to anonymous references
|
||||
by adding a second underscore (use `target <link>`__)
|
||||
- Search for `\ **` and delete this from the links (bold links are not supported)
|
||||
|
@ -220,6 +221,9 @@ Example loading multiple parquet files:
|
|||
columns=columns,
|
||||
filetype=RayFileType.PARQUET)
|
||||
|
||||
|
||||
.. _xgboost-ray-tuning:
|
||||
|
||||
Hyperparameter Tuning
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -430,6 +430,15 @@ py_test(
|
|||
args = ["--smoke-test"]
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "custom_func_checkpointing",
|
||||
size = "small",
|
||||
srcs = ["examples/custom_func_checkpointing.py"],
|
||||
deps = [":tune_lib"],
|
||||
tags = ["team:ml", "exclusive", "example"],
|
||||
args = ["--smoke-test"]
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_torch_trainable",
|
||||
size = "medium",
|
||||
|
|
|
@ -17,6 +17,7 @@ General Examples
|
|||
- `PBT with Function API <https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/pbt_function.py>`__: Example of using the function API with a PopulationBasedTraining scheduler.
|
||||
- `pbt_ppo_example <https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/pbt_ppo_example.py>`__: Example of optimizing a distributed RLlib algorithm (PPO) with the PopulationBasedTraining scheduler.
|
||||
- `logging_example <https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/logging_example.py>`__: Example of custom loggers and custom trial directory naming.
|
||||
- `custom_func_checkpointing <https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/logging_example.py>`__: Example of custom checkpointing logic using the function API.
|
||||
|
||||
Search Algorithm Examples
|
||||
-------------------------
|
||||
|
|
75
python/ray/tune/examples/custom_func_checkpointing.py
Normal file
75
python/ray/tune/examples/custom_func_checkpointing.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
# If want to use checkpointing with a custom training function (not a Ray
|
||||
# integration like PyTorch or Tensorflow), you must expose a
|
||||
# ``checkpoint_dir`` argument in the function signature, and call
|
||||
# ``tune.checkpoint_dir``:
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import argparse
|
||||
|
||||
from ray import tune
|
||||
|
||||
|
||||
def evaluation_fn(step, width, height):
|
||||
time.sleep(0.1)
|
||||
return (0.1 + width * step / 100)**(-1) + height * 0.1
|
||||
|
||||
|
||||
def train_func(config, checkpoint_dir=None):
|
||||
start = 0
|
||||
width, height = config["width"], config["height"]
|
||||
|
||||
if checkpoint_dir:
|
||||
with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
|
||||
state = json.loads(f.read())
|
||||
start = state["step"] + 1
|
||||
|
||||
for step in range(start, 100):
|
||||
intermediate_score = evaluation_fn(step, width, height)
|
||||
|
||||
# Obtain a checkpoint directory
|
||||
with tune.checkpoint_dir(step=step) as checkpoint_dir:
|
||||
path = os.path.join(checkpoint_dir, "checkpoint")
|
||||
with open(path, "w") as f:
|
||||
f.write(json.dumps({"step": step}))
|
||||
|
||||
tune.report(iterations=step, mean_loss=intermediate_score)
|
||||
|
||||
|
||||
# You can restore a single trial checkpoint by using
|
||||
# ``tune.run(restore=<checkpoint_dir>)`` By doing this, you can change
|
||||
# whatever experiments' configuration such as the experiment's name.
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--smoke-test", action="store_true", help="Finish quickly for testing")
|
||||
parser.add_argument(
|
||||
"--server-address",
|
||||
type=str,
|
||||
default=None,
|
||||
required=False,
|
||||
help="The address of server to connect to if using "
|
||||
"Ray Client.")
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
if args.server_address:
|
||||
import ray
|
||||
ray.init(f"ray://{args.server_address}")
|
||||
|
||||
analysis = tune.run(
|
||||
train_func,
|
||||
name="hyperband_test",
|
||||
metric="mean_loss",
|
||||
mode="min",
|
||||
num_samples=5,
|
||||
stop={"training_iteration": 1 if args.smoke_test else 10},
|
||||
config={
|
||||
"steps": 10,
|
||||
"width": tune.randint(10, 100),
|
||||
"height": tune.loguniform(10, 100)
|
||||
})
|
||||
print("Best hyperparameters: ", analysis.best_config)
|
||||
print("Best checkpoint directory: ", analysis.best_checkpoint)
|
||||
with open(os.path.join(analysis.best_checkpoint, "checkpoint"), "r") as f:
|
||||
print("Best checkpoint: ", json.load(f))
|
|
@ -220,7 +220,7 @@ class ResourceChangingScheduler(TrialScheduler):
|
|||
If the functional API is used, the current trial resources can be obtained
|
||||
by calling `tune.get_trial_resources()` inside the training function.
|
||||
The function should be able to
|
||||
:ref:`load and save checkpoints <tune-checkpoint>`
|
||||
:ref:`load and save checkpoints <tune-checkpoint-syncing>`
|
||||
(the latter preferably every iteration).
|
||||
|
||||
If the Trainable (class) API is used, when the resources of a
|
||||
|
|
Loading…
Add table
Reference in a new issue