[air] update documentation to use session.report (#26051)

Update documentation to use `session.report`.

Next steps:
1. Update our internal caller to use `session.report`. Most importantly, CheckpointManager and DataParallelTrainer.
2. Update `get_trial_resources` to use PGF notions to incorporate the requirement of ResourceChangingScheduler. @Yard1 
3. After 2 is done, change all `tune.get_trial_resources` to `session.get_trial_resources`
4. [internal implementation] remove special checkpoint handling logic from huggingface trainer. Optimize the flow for checkpoint conversion with `session.report`.

Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
This commit is contained in:
xwjiang2010 2022-06-30 10:37:31 -07:00 committed by GitHub
parent 20c6c0725a
commit ac831fded4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
79 changed files with 623 additions and 386 deletions

View file

@ -118,7 +118,7 @@ This example runs a parallel grid search to optimize an example objective functi
.. code-block:: python
from ray import tune
from ray.air import session
def objective(step, alpha, beta):
@ -132,7 +132,7 @@ This example runs a parallel grid search to optimize an example objective functi
# Iterative training function - can be any arbitrary training procedure.
intermediate_score = objective(step, alpha, beta)
# Feed the score back back to Tune.
tune.report(mean_loss=intermediate_score)
session.report({"mean_loss": intermediate_score})
analysis = tune.run(

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 185 KiB

View file

@ -47,6 +47,29 @@ Trainer objects will produce a :ref:`Results <air-results-ref>` object after cal
:end-before: __air_trainer_output_end__
Session
-------
Ray AIR exposes a functional API for users to define training behavior, or for developers to create their own ``Trainer``\s.
In both cases, there is a need for the following interactions:
1. To disseminate information downstream, including ``trial_name``, ``trial_id``, ``trial_resources``, rank information etc.
2. To report information to upstream, including metrics and checkpoint.
To facilitate such interactions, we introduce the :ref:`Session <air-session-ref>` concept.
The session concept exists on several levels: The execution layer (called `Tune Session`) and the Data Parallel training layer
(called `Train Session`).
The following figure shows how these two sessions look like in a Data Parallel training scenario.
.. image:: images/session.svg
:width: 650px
:align: center
..
https://docs.google.com/drawings/d/1g0pv8gqgG29aPEPTcd4BC0LaRNbW1sAkv3H6W1TCp0c/edit
Tuner
-----

View file

@ -124,4 +124,12 @@ Configs
.. automodule:: ray.air.config
:members:
.. autoclass:: ray.air.config.CheckpointConfig
.. autoclass:: ray.air.config.CheckpointConfig
.. _air-session-ref:
Session
~~~~~~~
.. automodule:: ray.air.session
:members:

View file

@ -189,7 +189,7 @@
"outputs": [],
"source": [
"# __function_api_start__\n",
"from ray import tune\n",
"from ray.air import session\n",
"\n",
"\n",
"def objective(x, a, b): # Define an objective function.\n",
@ -201,7 +201,7 @@
" for x in range(20): # \"Train\" for 20 iterations and compute intermediate scores.\n",
" score = objective(x, config[\"a\"], config[\"b\"])\n",
"\n",
" tune.report(score=score) # Send the score to Tune.\n",
" session.report({\"score\": score}) # Send the score to Tune.\n",
"\n",
"\n",
"# __function_api_end__"

View file

@ -1,4 +1,5 @@
from ray import tune
from ray.air import session
def objective(step, alpha, beta):
@ -12,7 +13,7 @@ def training_function(config):
# Iterative training function - can be any arbitrary training procedure.
intermediate_score = objective(step, alpha, beta)
# Feed the score back back to Tune.
tune.report(mean_loss=intermediate_score)
session.report({"mean_loss": intermediate_score})
analysis = tune.run(

View file

@ -24,7 +24,7 @@ These are the environment variables Ray Tune currently considers:
directories when the name is not specified explicitly or the trainable isn't passed
as a string. Setting this environment variable to ``1`` disables adding these date strings.
* **TUNE_DISABLE_STRICT_METRIC_CHECKING**: When you report metrics to Tune via
``tune.report()`` and passed a ``metric`` parameter to ``tune.run()``, a scheduler,
``session.report()`` and passed a ``metric`` parameter to ``tune.run()``, a scheduler,
or a search algorithm, Tune will error
if the metric was not reported in the result. Setting this environment variable
to ``1`` will disable this check.

View file

@ -4,10 +4,10 @@
API does not really have a signature to just describe.
.. TODO: Reusing actors and advanced resources allocation seem ill-placed.
Training (tune.Trainable, tune.report)
======================================
Training (tune.Trainable, session.report)
==========================================
Training can be done with either a **Class API** (``tune.Trainable``) or **function API** (``tune.report``).
Training can be done with either a **Class API** (``tune.Trainable``) or **function API** (``session.report``).
For the sake of example, let's maximize this objective function:
@ -21,7 +21,7 @@ For the sake of example, let's maximize this objective function:
Function API
------------
With the Function API, you can report intermediate metrics by simply calling ``tune.report`` within the provided function.
With the Function API, you can report intermediate metrics by simply calling ``session.report`` within the provided function.
.. code-block:: python
@ -31,7 +31,7 @@ With the Function API, you can report intermediate metrics by simply calling ``t
for x in range(20):
intermediate_score = objective(x, config["a"], config["b"])
tune.report(score=intermediate_score) # This sends the score to Tune.
session.report({"score": intermediate_score}) # This sends the score to Tune.
analysis = tune.run(
trainable,
@ -40,43 +40,13 @@ With the Function API, you can report intermediate metrics by simply calling ``t
print("best config: ", analysis.get_best_config(metric="score", mode="max"))
.. tip:: Do not use ``tune.report`` within a ``Trainable`` class.
.. tip:: Do not use ``session.report`` within a ``Trainable`` class.
Tune will run this function on a separate thread in a Ray actor process.
You'll notice that Ray Tune will output extra values in addition to the user reported metrics,
such as ``iterations_since_restore``. See :ref:`tune-autofilled-metrics` for an explanation/glossary of these values.
Function API return and yield values
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Instead of using ``tune.report()``, you can also use Python's ``yield``
statement to report metrics to Ray Tune:
.. code-block:: python
def trainable(config):
# config (dict): A dict of hyperparameters.
for x in range(20):
intermediate_score = objective(x, config["a"], config["b"])
yield {"score": intermediate_score} # This sends the score to Tune.
analysis = tune.run(
trainable,
config={"a": 2, "b": 4}
)
print("best config: ", analysis.get_best_config(metric="score", mode="max"))
If you yield a dictionary object, this will work just as ``tune.report()``.
If you yield a number, if will be reported to Ray Tune with the key ``_metric``, i.e.
as if you had called ``tune.report(_metric=value)``.
Ray Tune supports the same functionality for return values if you only
report metrics at the end of each run:
.. code-block:: python
def trainable(config):
@ -102,30 +72,27 @@ Function API Checkpointing
~~~~~~~~~~~~~~~~~~~~~~~~~~
Many Tune features rely on checkpointing, including the usage of certain Trial Schedulers and fault tolerance.
To use Tune's checkpointing features, you must expose a ``checkpoint_dir`` argument in the function signature,
and call ``tune.checkpoint_dir`` :
You can save and load checkpoint in Ray Tune in the following manner:
.. code-block:: python
import time
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint
def train_func(config, checkpoint_dir=None):
start = 0
if checkpoint_dir:
with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
state = json.loads(f.read())
start = state["step"] + 1
def train_func(config):
step = 0
loaded_checkpoint = session.get_checkpoint()
if loaded_checkpoint:
last_step = loaded_checkpoint.to_dict()["step"]
step = last_step + 1
for iter in range(start, 100):
for iter in range(step, 100):
time.sleep(1)
with tune.checkpoint_dir(step=step) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps({"step": start}))
tune.report(hello="world", ray="tune")
checkpoint = Checkpoint.from_dict({"step": step})
session.report({"message": "Hello world Ray Tune!"}, checkpoint=checkpoint)
tune.run(train_func)
@ -153,7 +120,7 @@ it is important not to depend on absolute paths in the implementation of ``save`
Trainable Class API
-------------------
.. caution:: Do not use ``tune.report`` within a ``Trainable`` class.
.. caution:: Do not use ``session.report`` within a ``Trainable`` class.
The Trainable **class API** will require users to subclass ``ray.tune.Trainable``. Here's a naive example of this API:
@ -343,18 +310,23 @@ It is also possible to specify memory (``"memory"``, in bytes) and custom resour
.. _tune-function-docstring:
tune.report / tune.checkpoint (Function API)
--------------------------------------------
session (Function API)
----------------------
.. autofunction:: ray.tune.report
.. autofunction:: ray.air.session.report
:noindex:
.. autofunction:: ray.tune.checkpoint_dir
.. autofunction:: ray.air.session.get_checkpoint
:noindex:
.. autofunction:: ray.tune.get_trial_dir
.. autofunction:: ray.air.session.get_trial_name
:noindex:
.. autofunction:: ray.tune.get_trial_name
.. autofunction:: ray.air.session.get_trial_id
:noindex:
.. autofunction:: ray.tune.get_trial_id
.. autofunction:: ray.air.session.get_trial_resources
:noindex:
tune.Trainable (Class API)
--------------------------

View file

@ -3,6 +3,7 @@
# __reproducible_start__
import numpy as np
from ray import tune
from ray.air import session
def train(config):
@ -12,7 +13,7 @@ def train(config):
# is the same.
np.random.seed(config["seed"])
random_result = np.random.uniform(0, 100, size=1).item()
tune.report(result=random_result)
session.report({"result": random_result})
# Set seed for Ray Tune's random search.
@ -54,7 +55,7 @@ config = {
def train(config):
random_result = np.random.uniform(0, 100, size=1).item()
tune.report(result=random_result)
session.report({"result": random_result})
train_fn = train
@ -90,7 +91,7 @@ if not MOCK:
def train_fn(config, checkpoint_dir=None):
# some Modin operations here
# import modin.pandas as pd
tune.report(metric=metric)
session.report({"metric": metric})
tune.run(
train_fn,

View file

@ -1,6 +1,7 @@
# flake8: noqa
# __function_api_start__
from ray.air import session
def objective(x, a, b): # Define an objective function.
@ -12,7 +13,7 @@ def trainable(config): # Pass a "config" dictionary into your trainable.
for x in range(20): # "Train" for 20 iterations and compute intermediate scores.
score = objective(x, config["a"], config["b"])
tune.report(score=score) # Send the score to Tune.
session.report({"score": score}) # Send the score to Tune.
# __function_api_end__

View file

@ -81,6 +81,7 @@ class ConvNet(nn.Module):
# 1. Wrap your PyTorch model in an objective function.
import torch
from ray import tune
from ray.air import session
from ray.tune.search.optuna import OptunaSearch
@ -95,7 +96,7 @@ def objective(config):
while True:
train(model, optimizer, train_loader) # Train the model
acc = test(model, test_loader) # Compute test accuracy
tune.report(mean_accuracy=acc) # Report to Tune
session.report({"mean_accuracy": acc}) # Report to Tune
# 2. Define a search space and initialize the search algorithm.

View file

@ -54,6 +54,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search.ax import AxSearch"
]
},
@ -112,7 +113,7 @@
"metadata": {},
"source": [
"Next, our `objective` function takes a Tune `config`, evaluates the `landscape` of our experiment in a training loop,\n",
"and uses `tune.report` to report the `landscape` back to Tune."
"and uses `session.report` to report the `landscape` back to Tune."
]
},
{
@ -125,8 +126,8 @@
"def objective(config):\n",
" for i in range(config[\"iterations\"]):\n",
" x = np.array([config.get(\"x{}\".format(i + 1)) for i in range(6)])\n",
" tune.report(\n",
" timesteps_total=i, landscape=landscape(x), l2norm=np.sqrt((x ** 2).sum())\n",
" session.report(\n",
" {\"timesteps_total\": i, \"landscape\": landscape(x), \"l2norm\": np.sqrt((x ** 2).sum()})\n",
" )\n",
" time.sleep(0.02)"
]
@ -250,7 +251,7 @@
"id": "91076c5a",
"metadata": {},
"source": [
"Finally, we run the experiment to find the global minimum of the provided landscape (which contains 5 false minima). The argument to metric, `\"landscape\"`, is provided via the `objective` function's `tune.report`. The experiment `\"min\"`imizes the \"mean_loss\" of the `landscape` by searching within `search_space` via `algo`, `num_samples` times or when `\"timesteps_total\": stop_timesteps`. This previous sentence is fully characterizes the search problem we aim to solve. With this in mind, notice how efficient it is to execute `tune.run()`."
"Finally, we run the experiment to find the global minimum of the provided landscape (which contains 5 false minima). The argument to metric, `\"landscape\"`, is provided via the `objective` function's `session.report`. The experiment `\"min\"`imizes the \"mean_loss\" of the `landscape` by searching within `search_space` via `algo`, `num_samples` times or when `\"timesteps_total\": stop_timesteps`. This previous sentence is fully characterizes the search problem we aim to solve. With this in mind, notice how efficient it is to execute `tune.run()`."
]
},
{

View file

@ -52,6 +52,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.bayesopt import BayesOptSearch"
]
@ -85,7 +86,7 @@
"metadata": {},
"source": [
"Next, our ``objective`` function takes a Tune ``config``, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -98,7 +99,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -70,6 +70,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.schedulers.hb_bohb import HyperBandForBOHB\n",
"from ray.tune.search.bohb import TuneBOHB\n",
@ -106,7 +107,7 @@
"metadata": {},
"source": [
"Next, our `objective` function takes a Tune `config`, evaluates the `score` of your\n",
"experiment in a training loop, and uses `tune.report` to report the `score` back to Tune."
"experiment in a training loop, and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -119,7 +120,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -76,6 +76,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.dragonfly import DragonflySearch"
]
@ -96,7 +97,7 @@
"4 ingredients-- these proportions should be considered as hyperparameters.\n",
"Our `objective` function will take a Tune `config`, evaluates the `conductivity` of\n",
"our experiment in a training loop,\n",
"and uses `tune.report` to report the `conductivity` back to Tune."
"and uses `session.report` to report the `conductivity` back to Tune."
]
},
{
@ -118,7 +119,7 @@
" vol4 = 10 - (vol1 + vol2 + vol3) # Water\n",
" conductivity = vol1 + 0.1 * (vol2 + vol3) ** 2 + 2.3 * vol4 * (vol1 ** 1.5)\n",
" conductivity += np.random.normal() * 0.01\n",
" tune.report(timesteps_total=i, objective=conductivity)\n",
" session.report({\"timesteps_total\": i, \"objective\": conductivity})\n",
" time.sleep(0.02)"
]
},

View file

@ -91,7 +91,7 @@
"metadata": {},
"source": [
"Next, our `objective` function takes a Tune `config`, evaluates the `score` of your\n",
"experiment in a training loop, and uses `tune.report` to report the `score` back to Tune."
"experiment in a training loop, and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -104,7 +104,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -54,6 +54,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search.hebo import HEBOSearch"
]
},
@ -87,7 +88,7 @@
"metadata": {},
"source": [
"Next, our ``objective`` function takes a Tune ``config``, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -100,7 +101,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -69,6 +69,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.hyperopt import HyperOptSearch\n",
"from hyperopt import hp"
@ -103,7 +104,7 @@
"metadata": {},
"source": [
"Next, our ``objective`` function takes a Tune ``config``, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -116,7 +117,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{
@ -298,7 +299,7 @@
" \n",
" for step in range(config[\"steps\"]):\n",
" intermediate_score = evaluation_fn(step, width, height, mult)\n",
" tune.report(iterations=step, mean_loss=intermediate_score)\n",
" session.report({\"iterations\": step, \"mean_loss\": intermediate_score})\n",
" time.sleep(0.1)"
]
},

View file

@ -37,6 +37,7 @@
"from sklearn.model_selection import train_test_split\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.schedulers import ASHAScheduler\n",
"from ray.tune.integration.lightgbm import TuneReportCheckpointCallback\n",
"\n",
@ -64,9 +65,9 @@
" )\n",
" preds = gbm.predict(test_x)\n",
" pred_labels = np.rint(preds)\n",
" tune.report(\n",
" mean_accuracy=sklearn.metrics.accuracy_score(test_y, pred_labels), done=True\n",
" )\n",
" session.report({\n",
" \"mean_accuracy\": sklearn.metrics.accuracy_score(test_y, pred_labels), \"done\": True\n",
" })\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",

View file

@ -55,6 +55,7 @@
"import ray\n",
"import nevergrad as ng\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.nevergrad import NevergradSearch"
]
@ -89,7 +90,7 @@
"metadata": {},
"source": [
"Next, our `objective` function takes a Tune `config`, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -102,7 +103,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -57,6 +57,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.optuna import OptunaSearch"
]
@ -106,7 +107,7 @@
"metadata": {},
"source": [
"Next, our `objective` function to be optimized takes a Tune `config`, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -121,8 +122,8 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)\n",
" "
" session.report({\"iterations\": step, \"mean_loss\": score})\n",
" "
]
},
{
@ -483,9 +484,9 @@
" # Iterative training function - can be any arbitrary training procedure\n",
" intermediate_score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" # Feed the score back back to Tune.\n",
" tune.report(\n",
" iterations=step, loss=intermediate_score, gain=intermediate_score * width\n",
" )"
" session.report({\n",
" \"iterations\": step, \"loss\": intermediate_score, \"gain\": intermediate_score * width\n",
" })"
]
},
{

View file

@ -54,6 +54,7 @@
"import ray\n",
"import numpy as np\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search.sigopt import SigOptSearch\n",
"\n",
"if \"SIGOPT_KEY\" not in os.environ:\n",
@ -93,7 +94,7 @@
"metadata": {},
"source": [
"Next, our ``objective`` function takes a Tune ``config``, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -106,7 +107,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{
@ -306,7 +307,7 @@
" w2 = config[\"total_weight\"] - w1\n",
" \n",
" average, std = evaluate(w1, w2)\n",
" tune.report(average=average, std=std, sharpe=average / std)\n",
" session.report({\"average\": average, \"std\": std, \"sharpe\": average / std})\n",
" time.sleep(0.1)"
]
},
@ -425,7 +426,7 @@
" w3 = 1 - total\n",
" \n",
" average, std = evaluate(w1, w2, w3)\n",
" tune.report(average=average, std=std)"
" session.report({\"average\": average, \"std\": std})"
]
},
{

View file

@ -57,6 +57,7 @@
"\n",
"import skopt\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search import ConcurrencyLimiter\n",
"from ray.tune.search.skopt import SkOptSearch"
]
@ -102,7 +103,7 @@
"metadata": {},
"source": [
"Next, our `objective` function to be optimized takes a Tune `config`, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -115,7 +116,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"], config[\"activation\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -40,12 +40,13 @@
"source": [
"import numpy as np\n",
"from ray import tune\n",
"from ray.air import session\n",
"\n",
"\n",
"def train_function(config, checkpoint_dir=None):\n",
" for i in range(30):\n",
" loss = config[\"mean\"] + config[\"sd\"] * np.random.randn()\n",
" tune.report(loss=loss)"
" session.report({\"loss\": loss})"
]
},
{

View file

@ -66,6 +66,7 @@
"import mlflow\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.air.callbacks.mlflow import MLflowLoggerCallback\n",
"from ray.tune.integration.mlflow import mlflow_mixin"
]
@ -108,7 +109,7 @@
" # Iterative training function - can be any arbitrary training procedure\n",
" intermediate_score = evaluation_fn(step, width, height)\n",
" # Feed the score back to Tune.\n",
" tune.report(iterations=step, mean_loss=intermediate_score)\n",
" session.report({\"iterations\": step, \"mean_loss\": intermediate_score})\n",
" time.sleep(0.1)"
]
},
@ -195,7 +196,7 @@
" # Log the metrics to mlflow\n",
" mlflow.log_metrics(dict(mean_loss=intermediate_score), step=step)\n",
" # Feed the score back to Tune.\n",
" tune.report(iterations=step, mean_loss=intermediate_score)\n",
" session.report({\"iterations\": step, \"mean_loss\": intermediate_score})\n",
" time.sleep(0.1)"
]
},

View file

@ -74,6 +74,8 @@
"import torchvision.transforms as transforms\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.air.checkpoint import Checkpoint\n",
"from ray.tune.schedulers import ASHAScheduler"
]
},
@ -177,7 +179,7 @@
"metadata": {},
"outputs": [],
"source": [
"def train_cifar(config, checkpoint_dir=None):\n",
"def train_cifar(config):\n",
" net = Net(config[\"l1\"], config[\"l2\"])\n",
"\n",
" device = \"cpu\"\n",
@ -190,11 +192,11 @@
" criterion = nn.CrossEntropyLoss()\n",
" optimizer = optim.SGD(net.parameters(), lr=config[\"lr\"], momentum=0.9)\n",
"\n",
" # The `checkpoint_dir` parameter gets passed by Ray Tune when a checkpoint\n",
" # should be restored.\n",
" if checkpoint_dir:\n",
" checkpoint = os.path.join(checkpoint_dir, \"checkpoint\")\n",
" model_state, optimizer_state = torch.load(checkpoint)\n",
" # To restore a checkpoint, use `session.get_checkpoint()`.\n",
" loaded_checkpoint = session.get_checkpoint()\n",
" if loaded_checkpoint:\n",
" with loaded_checkpoint.as_directory() as loaded_checkpoint_dir:\n",
" model_state, optimizer_state = torch.load(os.path.join(loaded_checkpoint_dir, \"checkpoint.pt\"))\n",
" net.load_state_dict(model_state)\n",
" optimizer.load_state_dict(optimizer_state)\n",
"\n",
@ -261,14 +263,13 @@
" val_steps += 1\n",
"\n",
" # Here we save a checkpoint. It is automatically registered with\n",
" # Ray Tune and will potentially be passed as the `checkpoint_dir`\n",
" # parameter in future iterations.\n",
" with tune.checkpoint_dir(step=epoch) as checkpoint_dir:\n",
" path = os.path.join(checkpoint_dir, \"checkpoint\")\n",
" torch.save(\n",
" (net.state_dict(), optimizer.state_dict()), path)\n",
"\n",
" tune.report(loss=(val_loss / val_steps), accuracy=correct / total)\n",
" # Ray Tune and can be accessed through `session.get_checkpoint()`\n",
" # API in future iterations.\n",
" os.makedirs(\"my_model\", exist_ok=True)\n",
" torch.save(\n",
" (net.state_dict(), optimizer.state_dict()), \"my_model/checkpoint.pt\")\n",
" checkpoint = Checkpoint.from_directory(\"my_model\")\n",
" session.report({\"loss\": (val_loss / val_steps), \"accuracy\": correct / total}, checkpoint=checkpoint)\n",
" print(\"Finished Training\")"
]
},

View file

@ -59,7 +59,11 @@
"cell_type": "code",
"execution_count": null,
"id": "e6e77570",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"import math\n",
@ -92,7 +96,11 @@
"cell_type": "code",
"execution_count": null,
"id": "48b20f48",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"class LightningMNISTClassifier(pl.LightningModule):\n",
@ -209,7 +217,11 @@
"cell_type": "code",
"execution_count": null,
"id": "86df3d39",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def train_mnist_no_tune():\n",
@ -243,11 +255,16 @@
"cell_type": "code",
"execution_count": null,
"id": "34faeb3b",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"from pytorch_lightning.loggers import TensorBoardLogger\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune import CLIReporter\n",
"from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining\n",
"from ray.tune.integration.pytorch_lightning import TuneReportCallback, \\\n",
@ -274,7 +291,11 @@
"cell_type": "code",
"execution_count": null,
"id": "4bab80bc",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"TuneReportCallback(\n",
@ -312,7 +333,11 @@
"cell_type": "code",
"execution_count": null,
"id": "74e7d1c2",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def train_mnist_tune(config, num_epochs=10, num_gpus=0, data_dir=\"~/data\"):\n",
@ -323,7 +348,7 @@
" # If fractional GPUs passed in, convert to int.\n",
" gpus=math.ceil(num_gpus),\n",
" logger=TensorBoardLogger(\n",
" save_dir=tune.get_trial_dir(), name=\"\", version=\".\"),\n",
" save_dir=os.getcwd(), name=\"\", version=\".\"),\n",
" enable_progress_bar=False,\n",
" callbacks=[\n",
" TuneReportCallback(\n",
@ -354,7 +379,11 @@
"cell_type": "code",
"execution_count": null,
"id": "a50645e9",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"config = {\n",
@ -382,7 +411,11 @@
"cell_type": "code",
"execution_count": null,
"id": "a2596b01",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"num_epochs = 10\n",
@ -409,7 +442,11 @@
"cell_type": "code",
"execution_count": null,
"id": "cd605a16",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"reporter = CLIReporter(\n",
@ -433,7 +470,11 @@
"cell_type": "code",
"execution_count": null,
"id": "332668dc",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"gpus_per_trial = 0\n",
@ -465,7 +506,11 @@
"cell_type": "code",
"execution_count": null,
"id": "dc402716",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"resources_per_trial = {\"cpu\": 1, \"gpu\": gpus_per_trial}"
@ -500,7 +545,11 @@
"cell_type": "code",
"execution_count": null,
"id": "ea182330",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def tune_mnist_asha(num_samples=10, num_epochs=10, gpus_per_trial=0, data_dir=\"~/data\"):\n",
@ -586,7 +635,7 @@
"\n",
"First, we need to introduce\n",
"another callback to save model checkpoints. Since Tune requires a call to\n",
"`tune.report()` after creating a new checkpoint to register it, we will use\n",
"`session.report()` after creating a new checkpoint to register it, we will use\n",
"a combined reporting and checkpointing callback:"
]
},
@ -594,7 +643,11 @@
"cell_type": "code",
"execution_count": null,
"id": "7f86e4d8",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"TuneReportCheckpointCallback(\n",
@ -621,7 +674,11 @@
"cell_type": "code",
"execution_count": null,
"id": "746e962a",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def train_mnist_tune_checkpoint(config,\n",
@ -635,7 +692,7 @@
" # If fractional GPUs passed in, convert to int.\n",
" \"gpus\": math.ceil(num_gpus),\n",
" \"logger\": TensorBoardLogger(\n",
" save_dir=tune.get_trial_dir(), name=\"\", version=\".\"),\n",
" save_dir=os.getcwd(), name=\"\", version=\".\"),\n",
" \"enable_progress_bar\": False,\n",
" \"callbacks\": [\n",
" TuneReportCheckpointCallback(\n",
@ -672,7 +729,11 @@
"cell_type": "code",
"execution_count": null,
"id": "e12a1bd5",
"metadata": {},
"metadata": {
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def tune_mnist_pbt(num_samples=10, num_epochs=10, gpus_per_trial=0, data_dir=\"~/data\"):\n",
@ -733,22 +794,31 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
},
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"data_dir = \"~/data/\"\n",
"\n",
"tune_mnist_asha(num_samples=1, num_epochs=6, gpus_per_trial=0, data_dir=data_dir)\n",
"tune_mnist_pbt(num_samples=1, num_epochs=6, gpus_per_trial=0, data_dir=data_dir)"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"If you have more resources available (e.g. a GPU), you can modify the above parameters accordingly.\n",
"\n",
@ -790,13 +860,7 @@
"- {ref}`A walkthrough tutorial for using Ray Tune with Pytorch-Lightning <tune-pytorch-lightning-ref>`.\n",
"- {doc}`/tune/examples/includes/mlflow_ptl_example`: Example for using [MLflow](https://github.com/mlflow/mlflow/)\n",
" and [Pytorch Lightning](https://github.com/PyTorchLightning/pytorch-lightning) with Ray Tune."
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
]
}
],
"metadata": {

View file

@ -150,6 +150,8 @@
"import torch.optim as optim\n",
"import ray\n",
"from ray import tune, serve\n",
"from ray.air import session\n",
"from ray.air.checkpoint import Checkpoint\n",
"from ray.serve.exceptions import RayServeException\n",
"from ray.tune import CLIReporter\n",
"from ray.tune.schedulers import ASHAScheduler\n",
@ -315,7 +317,6 @@
"def train_mnist(\n",
" config,\n",
" start_model=None,\n",
" checkpoint_dir=None,\n",
" num_epochs=10,\n",
" use_gpus=False,\n",
" data_fn=None,\n",
@ -333,17 +334,21 @@
"\n",
" # Load checkpoint, or load start model if no checkpoint has been\n",
" # passed and a start model is specified\n",
" load_dir = None\n",
" if checkpoint_dir:\n",
" load_dir = checkpoint_dir\n",
" elif start_model:\n",
" load_dir = start_model\n",
" \n",
" model_state = None\n",
" optimizer_state = None\n",
"\n",
" if load_dir:\n",
" model_state, optimizer_state = torch.load(os.path.join(load_dir, \"checkpoint\"))\n",
" if start_model: # load start model if provided\n",
" model_state, optimizer_state = torch.load(os.path.join(load_dir, \"checkpoint.pt\"))\n",
" elif session.get_checkpoint(): # load from previous checkpoint\n",
" with session.get_checkpoint().as_directory() as loaded_checkpoint_dir:\n",
" model_state, optimizer_state = torch.load(os.path.join(loaded_checkpoint_dir, \"checkpoint.pt\"))\n",
"\n",
" if model_state: \n",
" model.load_state_dict(model_state)\n",
" optimizer.load_state_dict(optimizer_state)\n",
"\n",
"\n",
" # Get full training datasets\n",
" train_dataset, validation_dataset = data_fn(day=day)\n",
"\n",
@ -359,14 +364,15 @@
" train(model, optimizer, train_loader, device)\n",
" acc = test(model, validation_loader, device)\n",
" if i == num_epochs - 1:\n",
" with tune.checkpoint_dir(step=i) as checkpoint_dir:\n",
" torch.save(\n",
" (model.state_dict(), optimizer.state_dict()),\n",
" os.path.join(checkpoint_dir, \"checkpoint\"),\n",
" )\n",
" tune.report(mean_accuracy=acc, done=True)\n",
" os.makedirs(\"my_model\", exist_ok=True)\n",
" torch.save(\n",
" (model.state_dict(), optimizer.state_dict()),\n",
" os.path.join(\"my_model/checkpoint.pt\"),\n",
" )\n",
" checkpoint = Checkpoint.from_directory(\"my_model\")\n",
" session.report({\"mean_accuracy\": acc, \"done\": True}, checkpoint=checkpoint)\n",
" else:\n",
" tune.report(mean_accuracy=acc)"
" session.report({\"mean_accuracy\": acc})"
]
},
{
@ -559,7 +565,7 @@
" model = ConvNet(layer_size=self.config[\"layer_size\"]).to(self.device)\n",
"\n",
" model_state, optimizer_state = torch.load(\n",
" os.path.join(self.checkpoint_dir, \"checkpoint\"), map_location=self.device\n",
" os.path.join(self.checkpoint_dir, \"checkpoint.pt\"), map_location=self.device\n",
" )\n",
" model.load_state_dict(model_state)\n",
"\n",
@ -608,7 +614,7 @@
" move the checkpoint to a shared storage, like Amazon S3, instead.\"\"\"\n",
" os.makedirs(model_dir, 0o755, exist_ok=True)\n",
"\n",
" checkpoint_path = os.path.join(model_dir, \"checkpoint\")\n",
" checkpoint_path = os.path.join(model_dir, \"checkpoint.pt\")\n",
" meta_path = os.path.join(model_dir, \"meta.json\")\n",
"\n",
" if os.path.exists(checkpoint_path):\n",
@ -641,7 +647,7 @@
"outputs": [],
"source": [
"def get_current_model(model_dir):\n",
" checkpoint_path = os.path.join(model_dir, \"checkpoint\")\n",
" checkpoint_path = os.path.join(model_dir, \"checkpoint.pt\")\n",
" meta_path = os.path.join(model_dir, \"meta.json\")\n",
"\n",
" if not os.path.exists(checkpoint_path) or not os.path.exists(meta_path):\n",
@ -874,4 +880,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}

View file

@ -58,6 +58,7 @@
"import wandb\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune import Trainable\n",
"from ray.air.callbacks.wandb import WandbLoggerCallback\n",
"from ray.tune.integration.wandb import (\n",
@ -94,7 +95,7 @@
"def objective(config, checkpoint_dir=None):\n",
" for i in range(30):\n",
" loss = config[\"mean\"] + config[\"sd\"] * np.random.randn()\n",
" tune.report(loss=loss)"
" session.report({\"loss\": loss})"
]
},
{
@ -167,7 +168,7 @@
"def decorated_objective(config, checkpoint_dir=None):\n",
" for i in range(30):\n",
" loss = config[\"mean\"] + config[\"sd\"] * np.random.randn()\n",
" tune.report(loss=loss)\n",
" session.report({\"loss\": loss})\n",
" wandb.log(dict(loss=loss))"
]
},

View file

@ -345,7 +345,7 @@
" verbose_eval=False)\n",
" # Return prediction accuracy\n",
" accuracy = 1. - results[\"eval\"][\"error\"][-1]\n",
" tune.report(mean_accuracy=accuracy, done=True)\n",
" session.report({\"mean_accuracy\": accuracy, \"done\": True})\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
@ -374,7 +374,7 @@
"cell_type": "markdown",
"source": [
"As you can see, the changes in the actual training function are minimal. Instead of\n",
"returning the accuracy value, we report it back to Tune using `tune.report()`.\n",
"returning the accuracy value, we report it back to Tune using `session.report()`.\n",
"Our `config` dictionary only changed slightly. Instead of passing hard-coded\n",
"parameters, we tell Tune to choose values from a range of valid options. There are\n",
"a number of options we have here, all of which are explained in\n",
@ -485,6 +485,7 @@
"import xgboost as xgb\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.integration.xgboost import TuneReportCheckpointCallback\n",
"\n",
"\n",

View file

@ -52,6 +52,7 @@
"\n",
"import ray\n",
"from ray import tune\n",
"from ray.air import session\n",
"from ray.tune.search.zoopt import ZOOptSearch\n",
"from zoopt import ValueType"
]
@ -85,7 +86,7 @@
"metadata": {},
"source": [
"Next, our ``objective`` function takes a Tune ``config``, evaluates the `score` of your experiment in a training loop,\n",
"and uses `tune.report` to report the `score` back to Tune."
"and uses `session.report` to report the `score` back to Tune."
]
},
{
@ -98,7 +99,7 @@
"def objective(config):\n",
" for step in range(config[\"steps\"]):\n",
" score = evaluate(step, config[\"width\"], config[\"height\"])\n",
" tune.report(iterations=step, mean_loss=score)"
" session.report({\"iterations\": step, \"mean_loss\": score})"
]
},
{

View file

@ -56,7 +56,7 @@ results per each added tree in GBDTs, etc.) using early stopping usually allows
more configurations, as unpromising trials are pruned before they run their full course.
Please note that not all search algorithms can use information from pruned trials.
Early stopping cannot be used without incremental results - in case of the functional API,
that means that ``tune.report()`` has to be called more than once - usually in a loop.
that means that ``session.report()`` has to be called more than once - usually in a loop.
**If your model is small**, you can usually try to run many different configurations.
A **random search** can be used to generate configurations. You can also grid search
@ -171,7 +171,7 @@ the a and b variables and use them afterwards.
How does early termination (e.g. Hyperband/ASHA) work?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Early termination algorithms look at the intermediately reported values,
e.g. what is reported to them via ``tune.report()`` after each training
e.g. what is reported to them via ``session.report()`` after each training
epoch. After a certain number of steps, they then remove the worst
performing trials and keep only the best performing trials. Goodness of a trial
is determined by ordering them by the objective metric, for instance accuracy
@ -188,8 +188,8 @@ Why are all my trials returning "1" iteration?
**This is most likely applicable for the Tune function API.**
Ray Tune counts iterations internally every time ``tune.report()`` is
called. If you only call ``tune.report()`` once at the end of the training,
Ray Tune counts iterations internally every time ``session.report()`` is
called. If you only call ``session.report()`` once at the end of the training,
the counter has only been incremented once. If you're using the class API,
the counter is increased after calling ``step()``.
@ -203,7 +203,7 @@ What are all these extra outputs?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You'll notice that Ray Tune not only reports hyperparameters (from the
``config``) or metrics (passed to ``tune.report()``), but also some other
``config``) or metrics (passed to ``session.report()``), but also some other
outputs.
.. code-block:: bash
@ -436,15 +436,11 @@ dictionary should only contain primitive types, like numbers or strings.
**The Trial result is very large**
This is the case if you return objects, data, or other large objects via the return value of ``step()`` in
your class trainable or to ``tune.report()`` in your function trainable. The effect is the same as above:
your class trainable or to ``session.report()`` in your function trainable. The effect is the same as above:
The results are repeatedly serialized and written to disk, and this can take a long time.
**Solution**: Usually you should be able to write data to the trial directory instead. You can then pass a
filename back (or assume it is a known location). The trial dir is usually the current working directory. Class
trainables have the ``Trainable.logdir`` property and function trainables the :func:`ray.tune.get_trial_dir`
function to retrieve the logdir. If you really have to, you can also ``ray.put()`` an object to the Ray
object store and retrieve it with ``ray.get()`` on the other side. Generally, your result dictionary
should only contain primitive types, like numbers or strings.
**Solution**: Use checkpoint by writing data to the trainable's current working directory instead. There are various ways
to do that depending on whether you are using class or functional Trainable API.
**You are training a large number of trials on a cluster, or you are saving huge checkpoints**

View file

@ -63,7 +63,7 @@ Below, we define a function that trains the Pytorch model for multiple epochs.
This function will be executed on a separate :ref:`Ray Actor (process) <actor-guide>` underneath the hood,
so we need to communicate the performance of the model back to Tune (which is on the main Python process).
To do this, we call :ref:`tune.report <tune-function-docstring>` in our training function,
To do this, we call :ref:`session.report <tune-function-docstring>` in our training function,
which sends the performance value back to Tune. Since the function is executed on the separate process,
make sure that the function is :ref:`serializable by Ray <serialization-guide>`.

View file

@ -47,11 +47,11 @@ Given concrete choices for ``a``, ``b`` and ``x`` we can evaluate the objective
:start-after: __function_api_start__
:end-before: __function_api_end__
Note that we use ``tune.report(...)`` to report the intermediate ``score`` in the training loop, which can be useful
Note that we use ``session.report(...)`` to report the intermediate ``score`` in the training loop, which can be useful
in many machine learning tasks.
If you just want to report the final ``score`` outside of this loop, you can simply return the score at the
end of the ``trainable`` function with ``return {"score": score}``.
You can also use ``yield {"score": score}`` instead of ``tune.report()``.
You can also use ``yield {"score": score}`` instead of ``session.report()``.
.. tabbed:: Class API
@ -62,7 +62,7 @@ Given concrete choices for ``a``, ``b`` and ``x`` we can evaluate the objective
:start-after: __class_api_start__
:end-before: __class_api_end__
.. tip:: ``tune.report`` can't be used within a ``Trainable`` class.
.. tip:: ``session.report`` can't be used within a ``Trainable`` class.
Learn more about the details of :ref:`Trainables here<trainable-docs>`
and :ref:`have a look at our examples <tune-general-examples>`.
@ -266,7 +266,7 @@ Schedulers
----------
To make your training process more efficient, you can use a :ref:`Trial Scheduler <tune-schedulers>`.
For instance, in our ``trainable`` example minimizing a function in a training loop, we used ``tune.report()``.
For instance, in our ``trainable`` example minimizing a function in a training loop, we used ``session.report()``.
This reported `incremental` results, given a hyperparameter configuration selected by a search algorithm.
Based on these reported results, a Tune scheduler can decide whether to stop the trial early or not.
If you don't specify a scheduler, Tune will use a first-in-first-out (FIFO) scheduler by default, which simply

View file

@ -189,8 +189,10 @@ Your ``my_trainable`` is either a:
2. **Custom training function**
* All this means is that your function has to expose a ``checkpoint_dir`` argument in the function signature,
and call ``tune.checkpoint_dir``. See :doc:`this example </tune/examples/includes/custom_func_checkpointing>`,
* All this means is that your function needs to take care of saving and loading from checkpoint.
For saving, this is done through ``session.report()`` API, which can take in a ``Checkpoint`` object.
For loading, your function can access existing checkpoint through ``Session.get_checkpoint()`` API.
See :doc:`this example </tune/examples/includes/custom_func_checkpointing>`,
it's quite simple to do.
Let's assume for this example you're running this script from your laptop, and connecting to your remote Ray cluster
@ -319,7 +321,7 @@ On a multinode cluster, Tune automatically creates a copy of all trial checkpoin
This requires the Ray cluster to be started with the :ref:`cluster launcher <cluster-cloud>` and also
requires rsync to be installed.
Note that you must use the ``tune.checkpoint_dir`` API to trigger syncing
Note that you must use the ``session.report`` API to trigger syncing
(or use a model type with a built-in Ray Tune integration as described here).
See :doc:`/tune/examples/includes/custom_func_checkpointing` for an example.

View file

@ -56,7 +56,7 @@ After each invocation, the driver is notified that a "result dict" is ready.
The driver will then pull the result via ``ray.get``.
If the trainable is a callable or a function, it will be executed on the Ray actor process on a separate execution thread.
Whenever ``tune.report`` is called, the execution thread is paused and waits for the driver to pull a
Whenever ``session.report`` is called, the execution thread is paused and waits for the driver to pull a
result (see `function_trainable.py <https://github.com/ray-project/ray/blob/master/python/ray/tune/trainable/function_trainable.py>`__.
After pulling, the actors execution thread will automatically resume.

View file

@ -15,6 +15,7 @@ This simple callback just prints a metric each time a result is received:
from ray import tune
from ray.tune import Callback
from ray.air import session
class MyCallback(Callback):
@ -24,7 +25,7 @@ This simple callback just prints a metric each time a result is received:
def train(config):
for i in range(10):
tune.report(metric=i)
session.report({"metric": i})
tune.run(
@ -46,7 +47,7 @@ You can log arbitrary values and metrics in both Function and Class training API
def trainable(config):
for i in range(num_epochs):
...
tune.report(acc=accuracy, metric_foo=random_metric_1, bar=metric_2)
session.report({"acc": accuracy, "metric_foo": random_metric_1, "bar": metric_2})
class Trainable(tune.Trainable):
def step(self):
@ -56,7 +57,7 @@ You can log arbitrary values and metrics in both Function and Class training API
.. tip::
Note that ``tune.report()`` is not meant to transfer large amounts of data, like models or datasets.
Note that ``session.report()`` is not meant to transfer large amounts of data, like models or datasets.
Doing so can incur large overheads and slow down your Tune run significantly.
Which metrics get automatically filled in?
@ -73,7 +74,7 @@ All of these can be used as stopping conditions or passed as a parameter to Tria
* ``experiment_id``: Unique experiment ID
* ``experiment_tag``: Unique experiment tag (includes parameter values)
* ``hostname``: Hostname of the worker
* ``iterations_since_restore``: The number of times ``tune.report()/trainable.train()`` has been
* ``iterations_since_restore``: The number of times ``session.report`` has been
called after restoring the worker from a checkpoint
* ``node_ip``: Host IP of the worker
* ``pid``: Process ID (PID) of the worker process
@ -84,7 +85,7 @@ All of these can be used as stopping conditions or passed as a parameter to Tria
* ``timestamp``: Timestamp when the result was processed
* ``timesteps_since_restore``: Number of timesteps since restoring from a checkpoint
* ``timesteps_total``: Total number of timesteps
* ``training_iteration``: The number of times ``tune.report()`` has been
* ``training_iteration``: The number of times ``session.report()`` has been
called
* ``trial_id``: Unique trial ID

View file

@ -184,6 +184,8 @@ You can do this in the trainable, as shown below:
.. code-block:: python
from ray.air import session
def trainable(config):
library.init(
name=trial_id,
@ -191,12 +193,12 @@ You can do this in the trainable, as shown below:
resume=trial_id,
reinit=True,
allow_val_change=True)
library.set_log_path(tune.get_trial_dir())
library.set_log_path(os.getcwd())
for step in range(100):
library.log_model(...)
library.log(results, step=step)
tune.report(results)
session.report(results)
.. tabbed:: Class API
@ -212,7 +214,7 @@ You can do this in the trainable, as shown below:
resume=trial_id,
reinit=True,
allow_val_change=True)
library.set_log_path(self.logdir)
library.set_log_path(os.getcwd())
def step(self):
library.log_model(...)
@ -227,7 +229,8 @@ You can do this in the trainable, as shown below:
library.log(res_dict, step=step)
Use ``self.logdir`` (only for Class API) or ``tune.get_trial_dir()`` (only for Function API) for the trial log directory.
Note: For both functional and class trainables, the current working directory is changed to something
specific to that trainable once it's launched on a remote actor.
In the distributed case, these logs will be sync'ed back to the driver under your logger path.
This will allow you to visualize and analyze logs of all distributed training workers on a single machine.

View file

@ -62,7 +62,7 @@ We've just covered the case in which you manually interrupt a Tune run.
But you can also control when trials are stopped early by passing the ``stop`` argument to ``tune.run``.
This argument takes, a dictionary, a function, or a :class:`Stopper <ray.tune.stopper.Stopper>` class as an argument.
If a dictionary is passed in, the keys may be any field in the return result of ``tune.report`` in the
If a dictionary is passed in, the keys may be any field in the return result of ``session.report`` in the
Function API or ``step()`` (including the results from ``step`` and auto-filled metrics).
Stopping with a dictionary

View file

@ -6,7 +6,7 @@
import torch
from ray.train.trainer import BaseTrainer
from ray import tune
from ray.air import session
class MyPytorchTrainer(BaseTrainer):
@ -42,7 +42,7 @@ class MyPytorchTrainer(BaseTrainer):
# Use Tune functions to report intermediate
# results.
tune.report(loss=loss, epoch=epoch_idx)
session.report({"loss": loss, "epoch": epoch_idx})
# __custom_trainer_end__

View file

@ -64,44 +64,45 @@ def get_checkpoint() -> Optional[Checkpoint]:
Returns:
Checkpoint object if the session is currently being resumed.
Otherwise, return None.
Otherwise, return None.
Example:
.. code-block: python
######## Using it in the *per worker* train loop (TrainSession) ######
from ray.air import session
from ray.air.checkpoint import Checkpoint
def train_func():
if session.get_checkpoint():
with session.get_checkpoint().as_directory() as
loaded_checkpoint_dir:
import tensorflow as tf
model = tf.keras.models.load_model(loaded_checkpoint_dir)
else:
model = build_model()
######## Using it in the *per worker* train loop (TrainSession) ######
from ray.air import session
from ray.air.checkpoint import Checkpoint
def train_func():
ckpt = session.get_checkpoint()
if ckpt:
with ckpt.as_directory() as loaded_checkpoint_dir:
import tensorflow as tf
model.save("my_model", overwrite=True)
session.report(
metrics={"iter": 1},
checkpoint=Checkpoint.from_directory("my_model")
model = tf.keras.models.load_model(loaded_checkpoint_dir)
else:
model = build_model()
model.save("my_model", overwrite=True)
session.report(
metrics={"iter": 1},
checkpoint=Checkpoint.from_directory("my_model")
)
scaling_config = {"num_workers": 2}
trainer = TensorflowTrainer(
train_loop_per_worker=train_func, scaling_config=scaling_config
)
result = trainer.fit()
scaling_config = {"num_workers": 2}
trainer = TensorflowTrainer(
train_loop_per_worker=train_func, scaling_config=scaling_config
)
result = trainer.fit()
# trainer2 will pick up from the checkpoint saved by trainer1.
trainer2 = TensorflowTrainer(
train_loop_per_worker=train_func,
scaling_config=scaling_config,
# this is ultimately what is accessed through
# ``Session.get_checkpoint()``
resume_from_checkpoint=result.checkpoint,
)
result2 = trainer2.fit()
# trainer2 will pick up from the checkpoint saved by trainer1.
trainer2 = TensorflowTrainer(
train_loop_per_worker=train_func,
scaling_config=scaling_config,
# this is ultimately what is accessed through
# ``Session.get_checkpoint()``
resume_from_checkpoint=result.checkpoint,
)
result2 = trainer2.fit()
"""
return _get_session().loaded_checkpoint

View file

@ -70,6 +70,7 @@ class BaseTrainer(abc.ABC):
from ray.train.trainer import BaseTrainer
from ray import tune
from ray.air import session
class MyPytorchTrainer(BaseTrainer):
@ -106,7 +107,7 @@ class BaseTrainer(abc.ABC):
# Use Tune functions to report intermediate
# results.
tune.report(loss=loss, epoch=epoch_idx)
session.report({"loss": loss, "epoch": epoch_idx})
**How do I use an existing Trainer or one of my custom Trainers?**
@ -283,7 +284,7 @@ class BaseTrainer(abc.ABC):
``self.datasets`` have already been preprocessed by ``self.preprocessor``.
You can use the :ref:`Tune Function API functions <tune-function-docstring>`
(``tune.report()`` and ``tune.save_checkpoint()``) inside
(``session.report()`` and ``session.get_checkpoint()``) inside
this training loop.
Example:
@ -295,7 +296,7 @@ class BaseTrainer(abc.ABC):
def training_loop(self):
for epoch_idx in range(5):
...
tune.report(epoch=epoch_idx)
session.report({"epoch": epoch_idx})
"""
raise NotImplementedError

View file

@ -128,6 +128,13 @@ py_test(
tags = ["team:ml", "exclusive", "tests_dir_F"],
)
py_test(
name = "test_function_api_legacy",
size = "medium",
srcs = ["tests/test_function_api_legacy.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "tests_dir_F"],
)
py_test(
name = "test_legacy_import",

View file

@ -5,6 +5,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
@ -21,7 +22,7 @@ def easy_objective(config):
# Iterative training function - can be an arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -8,6 +8,7 @@ import numpy as np
import time
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.ax import AxSearch
@ -42,8 +43,12 @@ def hartmann6(x):
def easy_objective(config):
for i in range(config["iterations"]):
x = np.array([config.get("x{}".format(i + 1)) for i in range(6)])
tune.report(
timesteps_total=i, hartmann6=hartmann6(x), l2norm=np.sqrt((x ** 2).sum())
session.report(
{
"timesteps_total": i,
"hartmann6": hartmann6(x),
"l2norm": np.sqrt((x ** 2).sum()),
}
)
time.sleep(0.02)

View file

@ -7,6 +7,7 @@ Requires the BayesOpt library to be installed (`pip install bayesian-optimizatio
import time
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.bayesopt import BayesOptSearch
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -8,6 +8,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.flaml import BlendSearch
@ -25,7 +26,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -8,6 +8,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.flaml import CFO
@ -25,7 +26,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -15,6 +15,8 @@ import torchvision
import torchvision.transforms as transforms
import ray
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.tune.schedulers import ASHAScheduler
# __import_end__
@ -63,7 +65,7 @@ class Net(nn.Module):
# __train_begin__
def train_cifar(config, checkpoint_dir=None):
def train_cifar(config):
net = Net(config["l1"], config["l2"])
device = "cpu"
@ -76,13 +78,13 @@ def train_cifar(config, checkpoint_dir=None):
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=0.9)
# The `checkpoint_dir` parameter gets passed by Ray Tune when a checkpoint
# should be restored.
if checkpoint_dir:
checkpoint = os.path.join(checkpoint_dir, "checkpoint")
model_state, optimizer_state = torch.load(checkpoint)
net.load_state_dict(model_state)
optimizer.load_state_dict(optimizer_state)
# Load existing checkpoint through `session.get_checkpoint()` API.
if session.get_checkpoint():
loaded_checkpoint = session.get_checkpoint()
with loaded_checkpoint.as_directory() as loaded_checkpoint_dir:
model_state, optimizer_state = torch.load(os.path.join(loaded_checkpoint_dir, "checkpoint.pt"))
net.load_state_dict(model_state)
optimizer.load_state_dict(optimizer_state)
data_dir = os.path.abspath("./data")
trainset, testset = load_data(data_dir)
@ -147,14 +149,16 @@ def train_cifar(config, checkpoint_dir=None):
val_steps += 1
# Here we save a checkpoint. It is automatically registered with
# Ray Tune and will potentially be passed as the `checkpoint_dir`
# parameter in future iterations.
with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
torch.save(
(net.state_dict(), optimizer.state_dict()), path)
tune.report(loss=(val_loss / val_steps), accuracy=correct / total)
# Ray Tune and will potentially be accessed through in ``session.get_checkpoint()``
# in future iterations.
# Note to save a file like checkpoint, you still need to put it under a directory
# to construct an AIR checkpoint.
os.makedirs("my_model", exist_ok=True) # ok to overwrite the previous one.
path = os.path.join("my_model", "checkpoint.pt")
torch.save(
(net.state_dict(), optimizer.state_dict()), path)
checkpoint = Checkpoint.from_directory("my_model")
session.report({"loss": (val_loss / val_steps), "accuracy": correct / total}, checkpoint=checkpoint)
print("Finished Training")
# __train_end__
@ -165,7 +169,7 @@ def test_best_model(best_trial):
device = "cuda:0" if torch.cuda.is_available() else "cpu"
best_trained_model.to(device)
checkpoint_path = os.path.join(best_trial.checkpoint.dir_or_data, "checkpoint")
checkpoint_path = os.path.join(best_trial.checkpoint.dir_or_data, "checkpoint.pt")
model_state, optimizer_state = torch.load(checkpoint_path)
best_trained_model.load_state_dict(model_state)

View file

@ -1,13 +1,14 @@
# If want to use checkpointing with a custom training function (not a Ray
# integration like PyTorch or Tensorflow), you must expose a
# ``checkpoint_dir`` argument in the function signature, and call
# ``tune.checkpoint_dir``:
# integration like PyTorch or Tensorflow), your function can read/write
# checkpoint through ``ray.air.session`` APIs.
import os
import time
import json
import argparse
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint, _DICT_CHECKPOINT_FILE_NAME
from ray import cloudpickle as pickle
def evaluation_fn(step, width, height):
@ -15,25 +16,20 @@ def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_func(config, checkpoint_dir=None):
start = 0
def train_func(config):
step = 0
width, height = config["width"], config["height"]
if checkpoint_dir:
with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
state = json.loads(f.read())
start = state["step"] + 1
if session.get_checkpoint():
loaded_checkpoint = session.get_checkpoint()
step = loaded_checkpoint.to_dict()["step"] + 1
for step in range(start, 100):
for step in range(step, 100):
intermediate_score = evaluation_fn(step, width, height)
# Obtain a checkpoint directory
with tune.checkpoint_dir(step=step) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps({"step": step}))
tune.report(iterations=step, mean_loss=intermediate_score)
checkpoint = Checkpoint.from_dict({"step": step})
session.report(
{"iterations": step, "mean_loss": intermediate_score}, checkpoint=checkpoint
)
# You can restore a single trial checkpoint by using
@ -73,6 +69,8 @@ if __name__ == "__main__":
},
)
print("Best hyperparameters: ", analysis.best_config)
print("Best checkpoint directory: ", analysis.best_checkpoint)
with open(os.path.join(analysis.best_checkpoint, "checkpoint"), "r") as f:
print("Best checkpoint: ", json.load(f))
best_checkpoint = analysis.best_checkpoint
with best_checkpoint.as_directory() as best_ckpt_dir:
print("Best checkpoint directory: ", best_ckpt_dir)
with open(os.path.join(best_ckpt_dir, _DICT_CHECKPOINT_FILE_NAME), "rb") as f:
print("Best checkpoint: ", pickle.load(f))

View file

@ -12,6 +12,7 @@ import numpy as np
import time
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.dragonfly import DragonflySearch
@ -27,7 +28,7 @@ def objective(config):
conductivity = vol1 + 0.1 * (vol2 + vol3) ** 2 + 2.3 * vol4 * (vol1 ** 1.5)
# Add Gaussian noise to simulate experimental noise
conductivity += np.random.normal() * 0.01
tune.report(timesteps_total=i, objective=conductivity)
session.report({"timesteps_total": i, "objective": conductivity})
time.sleep(0.02)

View file

@ -3,6 +3,7 @@
It also checks that it is usable with a separate scheduler.
"""
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.automl import GeneticSearch
from ray.tune.automl import ContinuousSpace, DiscreteSpace, SearchSpace
@ -19,7 +20,7 @@ def michalewicz_function(config, reporter):
y = np.dot(sin_x, sin_z)
# Negate y since we want to minimize y value
tune.report(timesteps_total=1, neg_mean_loss=-y)
session.report({"timesteps_total": 1, "neg_mean_loss": -y})
if __name__ == "__main__":

View file

@ -7,6 +7,7 @@ Requires the HEBO library to be installed (`pip install 'HEBO>=0.2.0'`).
import time
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.hebo import HEBOSearch
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -7,6 +7,8 @@ import os
import numpy as np
import ray
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray import tune
from ray.tune.schedulers import HyperBandScheduler
@ -23,15 +25,13 @@ def train(config, checkpoint_dir=None):
# Checkpoint the state of the training every 3 steps
# Note that this is only required for certain schedulers
checkpoint = None
if timestep % 3 == 0:
with tune.checkpoint_dir(step=timestep) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps({"timestep": timestep}))
checkpoint = Checkpoint.from_dict({"timestep": timestep})
# Here we use `episode_reward_mean`, but you can also report other
# objectives such as loss or accuracy.
tune.report(episode_reward_mean=v)
session.report({"episode_reward_mean": v}, checkpoint=checkpoint)
if __name__ == "__main__":

View file

@ -11,6 +11,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.hyperopt import HyperOptSearch
@ -58,7 +59,7 @@ def easy_objective(config_in):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height, mult)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -4,6 +4,7 @@ import argparse
import time
from ray import tune
from ray.air import session
from ray.tune.logger import LoggerCallback
@ -29,7 +30,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -8,6 +8,7 @@ import time
import mlflow
from ray import tune
from ray.air import session
from ray.air.callbacks.mlflow import MLflowLoggerCallback
from ray.tune.integration.mlflow import mlflow_mixin
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
@ -59,7 +60,7 @@ def decorated_easy_objective(config):
# Log the metrics to mlflow
mlflow.log_metrics(dict(mean_loss=intermediate_score), step=step)
# Feed the score back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -11,6 +11,7 @@ from torchvision import datasets, transforms
import ray
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
# Change these values if you want the training to run quicker or slower.
@ -103,7 +104,7 @@ def train_mnist(config):
train(model, optimizer, train_loader, device)
acc = test(model, test_loader, device)
# Set this to run Tune.
tune.report(mean_accuracy=acc)
session.report({"mean_accuracy": acc})
if __name__ == "__main__":

View file

@ -147,7 +147,7 @@ def train_mnist_tune(config, num_epochs=10, num_gpus=0, data_dir="~/data"):
# If fractional GPUs passed in, convert to int.
gpus=math.ceil(num_gpus),
logger=TensorBoardLogger(
save_dir=tune.get_trial_dir(), name="", version="."),
save_dir=os.getcwd(), name="", version="."),
enable_progress_bar=False,
callbacks=[
TuneReportCallback(
@ -173,7 +173,7 @@ def train_mnist_tune_checkpoint(config,
# If fractional GPUs passed in, convert to int.
"gpus": math.ceil(num_gpus),
"logger": TensorBoardLogger(
save_dir=tune.get_trial_dir(), name="", version="."),
save_dir=os.getcwd(), name="", version="."),
"enable_progress_bar": False,
"callbacks": [
TuneReportCheckpointCallback(

View file

@ -7,6 +7,7 @@ Requires the Nevergrad library to be installed (`pip install nevergrad`).
import time
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.nevergrad import NevergradSearch
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -12,6 +12,7 @@ from typing import Dict, Optional, Any
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
@ -30,7 +31,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height, mult)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -11,6 +11,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
@ -28,7 +29,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -10,6 +10,7 @@ import time
import ray
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
@ -26,8 +27,12 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(
iterations=step, loss=intermediate_score, gain=intermediate_score * width
session.report(
{
"iterations": step,
"loss": intermediate_score,
"gain": intermediate_score * width,
}
)
time.sleep(0.1)

View file

@ -9,6 +9,8 @@ import torch.optim as optim
from ray.tune.examples.mnist_pytorch import train, test, ConvNet, get_data_loaders
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.tune.schedulers import PopulationBasedTraining
from ray.tune.experiment.trial import ExportFormat
@ -16,7 +18,7 @@ from ray.tune.experiment.trial import ExportFormat
# __train_begin__
def train_convnet(config, checkpoint_dir=None):
def train_convnet(config):
# Create our data loaders, model, and optmizer.
step = 0
train_loader, test_loader = get_data_loaders()
@ -27,36 +29,38 @@ def train_convnet(config, checkpoint_dir=None):
momentum=config.get("momentum", 0.9),
)
# If checkpoint_dir is not None, then we are resuming from a checkpoint.
# If `session.get_checkpoint()` is not None, then we are resuming from a checkpoint.
# Load model state and iteration step from checkpoint.
if checkpoint_dir:
if session.get_checkpoint():
print("Loading from checkpoint.")
path = os.path.join(checkpoint_dir, "checkpoint")
checkpoint = torch.load(path)
model.load_state_dict(checkpoint["model_state_dict"])
step = checkpoint["step"]
loaded_checkpoint = session.get_checkpoint()
with loaded_checkpoint.as_directory() as loaded_checkpoint_dir:
path = os.path.join(loaded_checkpoint_dir, "checkpoint.pt")
checkpoint = torch.load(path)
model.load_state_dict(checkpoint["model_state_dict"])
step = checkpoint["step"]
while True:
train(model, optimizer, train_loader)
acc = test(model, test_loader)
checkpoint = None
if step % 5 == 0:
# Every 5 steps, checkpoint our current state.
# First get the checkpoint directory from tune.
with tune.checkpoint_dir(step=step) as checkpoint_dir:
# Then create a checkpoint file in this directory.
path = os.path.join(checkpoint_dir, "checkpoint")
# Save state to checkpoint file.
# No need to save optimizer for SGD.
torch.save(
{
"step": step,
"model_state_dict": model.state_dict(),
"mean_accuracy": acc,
},
path,
)
# Need to create a directory under current working directory
# to construct an AIR Checkpoint object from.
os.makedirs("my_model", exist_ok=True)
torch.save(
{
"step": step,
"model_state_dict": model.state_dict(),
},
"my_model/checkpoint.pt",
)
checkpoint = Checkpoint.from_directory("my_model")
step += 1
tune.report(mean_accuracy=acc)
session.report({"mean_accuracy": acc}, checkpoint=checkpoint)
# __train_end__
@ -66,7 +70,7 @@ def test_best_model(analysis):
"""Test the best model given output of tune.run"""
best_checkpoint_path = analysis.best_checkpoint
best_model = ConvNet()
best_checkpoint = torch.load(os.path.join(best_checkpoint_path, "checkpoint"))
best_checkpoint = torch.load(os.path.join(best_checkpoint_path, "checkpoint.pt"))
best_model.load_state_dict(best_checkpoint["model_state_dict"])
# Note that test only runs on a small random set of the test data, thus the
# accuracy may be different from metrics shown in tuning process.

View file

@ -4,6 +4,8 @@ Example of training DCGAN on MNIST using PBT with Tune's function API.
"""
import ray
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.tune.schedulers import PopulationBasedTraining
import argparse
@ -22,7 +24,7 @@ from common import Discriminator, Generator, Net
# __Train_begin__
def dcgan_train(config, checkpoint_dir=None):
def dcgan_train(config):
step = 0
use_cuda = config.get("use_gpu") and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
@ -40,14 +42,16 @@ def dcgan_train(config, checkpoint_dir=None):
with FileLock(os.path.expanduser("~/.data.lock")):
dataloader = get_data_loader()
if checkpoint_dir is not None:
path = os.path.join(checkpoint_dir, "checkpoint")
checkpoint = torch.load(path)
netD.load_state_dict(checkpoint["netDmodel"])
netG.load_state_dict(checkpoint["netGmodel"])
optimizerD.load_state_dict(checkpoint["optimD"])
optimizerG.load_state_dict(checkpoint["optimG"])
step = checkpoint["step"]
if session.get_checkpoint():
loaded_checkpoint = session.get_checkpoint()
with loaded_checkpoint.as_directory() as loaded_checkpoint_dir:
path = os.path.join(loaded_checkpoint_dir, "checkpoint.pt")
checkpoint = torch.load(path)
netD.load_state_dict(checkpoint["netDmodel"])
netG.load_state_dict(checkpoint["netGmodel"])
optimizerD.load_state_dict(checkpoint["optimD"])
optimizerG.load_state_dict(checkpoint["optimG"])
step = checkpoint["step"]
if "netD_lr" in config:
for param_group in optimizerD.param_groups:
@ -69,19 +73,22 @@ def dcgan_train(config, checkpoint_dir=None):
config["mnist_model_ref"],
)
step += 1
with tune.checkpoint_dir(step=step) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
torch.save(
{
"netDmodel": netD.state_dict(),
"netGmodel": netG.state_dict(),
"optimD": optimizerD.state_dict(),
"optimG": optimizerG.state_dict(),
"step": step,
},
path,
)
tune.report(lossg=lossG, lossd=lossD, is_score=is_score)
os.makedirs("my_model", exist_ok=True)
torch.save(
{
"netDmodel": netD.state_dict(),
"netGmodel": netG.state_dict(),
"optimD": optimizerD.state_dict(),
"optimG": optimizerG.state_dict(),
"step": step,
},
"my_model/checkpoint.pt",
)
session.report(
{"lossg": lossG, "lossd": lossD, "is_score": is_score},
checkpoint=Checkpoint.from_directory("my_model"),
)
# __Train_end__

View file

@ -2,16 +2,16 @@
import numpy as np
import argparse
import json
import os
import random
import ray
from ray import tune
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.tune.schedulers import PopulationBasedTraining
def pbt_function(config, checkpoint_dir=None):
def pbt_function(config):
"""Toy PBT problem for benchmarking adaptive learning rate.
The goal is to optimize this trainable's accuracy. The accuracy increases
@ -35,11 +35,10 @@ def pbt_function(config, checkpoint_dir=None):
lr = config["lr"]
accuracy = 0.0 # end = 1000
start = 0
if checkpoint_dir:
with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
state = json.loads(f.read())
accuracy = state["acc"]
start = state["step"]
if session.get_checkpoint():
state = session.get_checkpoint().to_dict()
accuracy = state["acc"]
start = state["step"]
midpoint = 100 # lr starts decreasing after acc > midpoint
q_tolerance = 3 # penalize exceeding lr by more than this multiple
@ -64,18 +63,19 @@ def pbt_function(config, checkpoint_dir=None):
accuracy += noise_level * np.random.normal()
accuracy = max(0, accuracy)
checkpoint = None
if step % 3 == 0:
with tune.checkpoint_dir(step=step) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps({"acc": accuracy, "step": start}))
checkpoint = Checkpoint.from_dict({"acc": accuracy, "step": start})
tune.report(
mean_accuracy=accuracy,
cur_lr=lr,
optimal_lr=optimal_lr, # for debugging
q_err=q_err, # for debugging
done=accuracy > midpoint * 2, # this stops the training process
session.report(
{
"mean_accuracy": accuracy,
"cur_lr": lr,
"optimal_lr": optimal_lr, # for debugging
"q_err": q_err, # for debugging
"done": accuracy > midpoint * 2, # this stops the training process
},
checkpoint=checkpoint,
)

View file

@ -8,6 +8,7 @@ import sys
import time
from ray import tune
from ray.air import session
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.sigopt import SigOptSearch
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluate(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)

View file

@ -8,6 +8,7 @@ import time
import numpy as np
from ray import tune
from ray.air import session
from ray.tune.search.sigopt import SigOptSearch
np.random.seed(0)
@ -26,7 +27,7 @@ def easy_objective(config):
w2 = config["total_weight"] - w1
average, std = evaluate(w1, w2)
tune.report(average=average, std=std, sharpe=average / std)
session.report({"average": average, "std": std, "sharpe": average / std})
time.sleep(0.1)

View file

@ -7,6 +7,7 @@ import sys
import numpy as np
from ray import tune
from ray.air import session
from ray.tune.search.sigopt import SigOptSearch
@ -34,7 +35,7 @@ def easy_objective(config):
w3 = 1 - total
average, std = evaluate(w1, w2, w3)
tune.report(average=average, std=std)
session.report({"average": average, "std": std})
if __name__ == "__main__":

View file

@ -7,6 +7,7 @@ Requires the SkOpt library to be installed (`pip install scikit-optimize`).
import time
from ray import tune
from ray.air import session
from ray.tune.search import ConcurrencyLimiter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.skopt import SkOptSearch
@ -25,7 +26,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -2,6 +2,7 @@
import time
import ray
from ray.air import session
from ray import tune
@ -18,7 +19,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -15,6 +15,7 @@ from gluoncv.data import transforms as gcv_transforms
from ray.tune.schedulers import create_scheduler
from ray import tune
from ray.air import session
# Training settings
parser = argparse.ArgumentParser(description="CIFAR-10 Example")
@ -189,7 +190,7 @@ def train_cifar10(config):
for epoch in range(1, args.epochs + 1):
train(epoch)
test_loss, test_acc = test()
tune.report(mean_loss=test_loss, mean_accuracy=test_acc)
session.report({"mean_loss": test_loss, "mean_accuracy": test_acc})
if __name__ == "__main__":

View file

@ -6,6 +6,7 @@ import numpy as np
import wandb
from ray import tune
from ray.air import session
from ray.tune import Trainable
from ray.air.callbacks.wandb import WandbLoggerCallback
from ray.tune.integration.wandb import (
@ -17,7 +18,7 @@ from ray.tune.integration.wandb import (
def train_function(config, checkpoint_dir=None):
for i in range(30):
loss = config["mean"] + config["sd"] * np.random.randn()
tune.report(loss=loss)
session.report({"loss": loss})
def tune_function(api_key_file):
@ -41,7 +42,7 @@ def tune_function(api_key_file):
def decorated_train_function(config, checkpoint_dir=None):
for i in range(30):
loss = config["mean"] + config["sd"] * np.random.randn()
tune.report(loss=loss)
session.report({"loss": loss})
wandb.log(dict(loss=loss))

View file

@ -57,7 +57,7 @@ def train_breast_cancer(config: dict, checkpoint_dir=None):
xgb_model.load_model(os.path.join(checkpoint_dir, CHECKPOINT_FILENAME))
# we can obtain current trial resources through
# tune.get_trial_resources()
# `tune.get_trial_resources()`
config["nthread"] = int(tune.get_trial_resources().head_cpus)
print(f"nthreads: {config['nthread']} xgb_model: {xgb_model}")
# Train the classifier, using the Tune callback

View file

@ -7,6 +7,7 @@ Requires the ZOOpt library to be installed (`pip install zoopt`).
import time
from ray import tune
from ray.air import session
from ray.tune.search.zoopt import ZOOptSearch
from ray.tune.schedulers import AsyncHyperBandScheduler
@ -24,7 +25,7 @@ def easy_objective(config):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Feed the score back back to Tune.
tune.report(iterations=step, mean_loss=intermediate_score)
session.report({"iterations": step, "mean_loss": intermediate_score})
if __name__ == "__main__":

View file

@ -0,0 +1,31 @@
import sys
import warnings
from ray import tune
def test_checkpoint_dir_deprecation():
warnings.filterwarnings("always")
def train(config, checkpoint_dir=None):
for i in range(10):
tune.report({"foo": "bar"})
with warnings.catch_warnings(record=True) as w:
tune.run(train, num_samples=1)
found_pattern = False
for _w in w:
if issubclass(
_w.category, DeprecationWarning
) and "To save and load checkpoint in trainable function" in str(
_w.message
):
found_pattern = True
break
assert found_pattern
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -6,6 +6,7 @@ import sys
import threading
import time
import uuid
import warnings
from functools import partial
from numbers import Number
from typing import Any, Callable, Dict, Optional
@ -620,6 +621,27 @@ def wrap_function(
"arguments to be `func(config, checkpoint_dir=None)`."
)
if use_checkpoint:
if log_once("tune_checkpoint_dir_deprecation") and warn:
with warnings.catch_warnings():
warnings.simplefilter("always")
warning_msg = (
"`checkpoint_dir` in `func(config, checkpoint_dir)` is "
"being deprecated. "
"To save and load checkpoint in trainable functions, "
"please use the `ray.air.session` API:\n\n"
"from ray.air import session\n\n"
"def train(config):\n"
" # ...\n"
' session.report({"metric": metric}, checkpoint=checkpoint)\n\n'
"For more information please see "
"https://docs.ray.io/en/master/ray-air/key-concepts.html#session\n"
)
warnings.warn(
warning_msg,
DeprecationWarning,
)
class ImplicitFunc(*inherit_from):
_name = name or (
train_func.__name__ if hasattr(train_func, "__name__") else "func"

View file

@ -2,6 +2,7 @@ import inspect
import logging
import os
import traceback
import warnings
from contextlib import contextmanager
from typing import Dict, Optional, Set
@ -9,6 +10,7 @@ import ray
from ray.air._internal.session import Session
from ray.air.checkpoint import Checkpoint
from ray.tune.error import TuneError
from ray.tune.trainable.function_trainable import _StatusReporter
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.debug import log_once
from ray.util.placement_group import _valid_resource_shape
@ -16,8 +18,6 @@ from ray.util.scheduling_strategies import (
PlacementGroupSchedulingStrategy,
SchedulingStrategyT,
)
from ray.tune.trainable.function_trainable import _StatusReporter
logger = logging.getLogger(__name__)
@ -25,6 +25,13 @@ _session: Optional[_StatusReporter] = None
# V2 Session API.
_session_v2: Optional["_TuneSessionImpl"] = None
_deprecation_msg = (
"`tune.report` and `tune.checkpoint_dir` APIs are deprecated in Ray "
"2.0, and is replaced by `ray.air.session`. This will provide an easy-"
"to-use API across Tune session and Data parallel worker sessions."
"The old APIs will be removed in the future. "
)
class _TuneSessionImpl(Session):
"""Session client that function trainable can interact with."""
@ -215,6 +222,10 @@ def report(_metric=None, **kwargs):
**kwargs: Any key value pair to be logged by Tune. Any of these
metrics can be used for early stopping or optimization.
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if _session:
if _session._iter:
@ -275,6 +286,11 @@ def checkpoint_dir(step: int):
.. versionadded:: 0.8.7
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if step is None:
@ -302,6 +318,10 @@ def get_trial_dir():
For function API use only.
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if _session:
return _session.logdir
@ -313,6 +333,10 @@ def get_trial_name():
For function API use only.
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if _session:
return _session.trial_name
@ -324,6 +348,10 @@ def get_trial_id():
For function API use only.
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if _session:
return _session.trial_id
@ -338,6 +366,10 @@ def get_trial_resources():
For function API use only.
"""
warnings.warn(
_deprecation_msg,
DeprecationWarning,
)
_session = get_session()
if _session:
return _session.trial_resources

View file

@ -320,11 +320,12 @@ def with_parameters(trainable, **kwargs):
.. code-block:: python
from ray import tune
from ray.air import session
def train(config, data=None):
for sample in data:
loss = update_model(sample)
tune.report(loss=loss)
session.report(loss=loss)
data = HugeDataset(download=True)