From fdf85ea403c9479be4c2e4e42381d22db1275c7e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Wed, 15 Jun 2022 03:11:32 +0200 Subject: [PATCH] [air] Add tutorial to convert existing pytorch code to Ray AIR (#25723) --- doc/source/_toc.yml | 1 + ...ert_existing_pytorch_code_to_ray_air.ipynb | 1766 +++++++++++++++++ doc/source/ray-air/examples/index.rst | 1 + 3 files changed, 1768 insertions(+) create mode 100644 doc/source/ray-air/examples/convert_existing_pytorch_code_to_ray_air.ipynb diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index 2f4e43619..988196584 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -25,6 +25,7 @@ parts: - file: ray-air/examples/index sections: - file: ray-air/examples/torch_image_example + - file: ray-air/examples/convert_existing_pytorch_code_to_ray_air - file: ray-air/examples/tfx_tabular_train_to_serve - file: ray-air/examples/huggingface_text_classification - file: ray-air/examples/sklearn_example diff --git a/doc/source/ray-air/examples/convert_existing_pytorch_code_to_ray_air.ipynb b/doc/source/ray-air/examples/convert_existing_pytorch_code_to_ray_air.ipynb new file mode 100644 index 000000000..6e0379973 --- /dev/null +++ b/doc/source/ray-air/examples/convert_existing_pytorch_code_to_ray_air.ipynb @@ -0,0 +1,1766 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "16a75f79", + "metadata": {}, + "source": [ + "# Convert existing PyTorch code to Ray AIR\n", + "\n", + "If you already have working PyTorch code, you don't have to start from scratch to utilize the benefits of Ray AIR. Instead, you can continue to use your existing code and incrementally add Ray AIR components as needed.\n", + "\n", + "Some of the benefits you'll get by using Ray AIR with your existing PyTorch training code:\n", + "\n", + "- Easy distributed data-parallel training on a cluster\n", + "- Automatic checkpointing/fault tolerance and result tracking\n", + "- Parallel data preprocessing\n", + "- Seamless integration with hyperparameter tuning\n", + "- Scalable batch prediction\n", + "- Scalable model serving\n", + "\n", + "This tutorial will show you how to start with Ray AIR from your existing PyTorch training code. We will learn how to **distribute your training** and do **scalable batch prediction**.\n" + ] + }, + { + "cell_type": "markdown", + "id": "9a4855cf", + "metadata": {}, + "source": [ + "## The example code\n", + "\n", + "The example code we'll be using is that of the [PyTorch quickstart tutorial](https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html). This code trains a neural network classifier on the FashionMNIST dataset.\n", + "\n", + "You can find the code we used for this tutorial [here on GitHub](https://github.com/pytorch/tutorials/blob/8dddccc4c69116ca724aa82bd5f4596ef7ad119c/beginner_source/basics/quickstart_tutorial.py)." + ] + }, + { + "cell_type": "markdown", + "id": "a42faedb", + "metadata": {}, + "source": [ + "## Unmodified\n", + "Let's start with the unmodified code from the example. A thorough explanation of the parts is given in the full tutorial - we'll just focus on the code here.\n", + "\n", + "We start with some imports:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "01af2222", + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "from torch import nn\n", + "from torch.utils.data import DataLoader\n", + "from torchvision import datasets\n", + "from torchvision.transforms import ToTensor" + ] + }, + { + "cell_type": "markdown", + "id": "db36ae56", + "metadata": {}, + "source": [ + "Then we download the data:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "28126be5", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/torchvision/datasets/mnist.py:498: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:180.)\n", + " return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)\n" + ] + } + ], + "source": [ + "# Download training data from open datasets.\n", + "training_data = datasets.FashionMNIST(\n", + " root=\"data\",\n", + " train=True,\n", + " download=True,\n", + " transform=ToTensor(),\n", + ")\n", + "\n", + "# Download test data from open datasets.\n", + "test_data = datasets.FashionMNIST(\n", + " root=\"data\",\n", + " train=False,\n", + " download=True,\n", + " transform=ToTensor(),\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "id": "9795c146", + "metadata": {}, + "source": [ + "We can now define the dataloaders:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "b99cac23", + "metadata": {}, + "outputs": [], + "source": [ + "batch_size = 64\n", + "\n", + "# Create data loaders.\n", + "train_dataloader = DataLoader(training_data, batch_size=batch_size)\n", + "test_dataloader = DataLoader(test_data, batch_size=batch_size)" + ] + }, + { + "cell_type": "markdown", + "id": "ae11399e", + "metadata": {}, + "source": [ + "We can then define and instantiate the neural network:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "3b027562", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using cpu device\n", + "NeuralNetwork(\n", + " (flatten): Flatten(start_dim=1, end_dim=-1)\n", + " (linear_relu_stack): Sequential(\n", + " (0): Linear(in_features=784, out_features=512, bias=True)\n", + " (1): ReLU()\n", + " (2): Linear(in_features=512, out_features=512, bias=True)\n", + " (3): ReLU()\n", + " (4): Linear(in_features=512, out_features=10, bias=True)\n", + " )\n", + ")\n" + ] + } + ], + "source": [ + "# Get cpu or gpu device for training.\n", + "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "print(f\"Using {device} device\")\n", + "\n", + "# Define model\n", + "class NeuralNetwork(nn.Module):\n", + " def __init__(self):\n", + " super(NeuralNetwork, self).__init__()\n", + " self.flatten = nn.Flatten()\n", + " self.linear_relu_stack = nn.Sequential(\n", + " nn.Linear(28*28, 512),\n", + " nn.ReLU(),\n", + " nn.Linear(512, 512),\n", + " nn.ReLU(),\n", + " nn.Linear(512, 10)\n", + " )\n", + "\n", + " def forward(self, x):\n", + " x = self.flatten(x)\n", + " logits = self.linear_relu_stack(x)\n", + " return logits\n", + "\n", + "model = NeuralNetwork().to(device)\n", + "print(model)" + ] + }, + { + "cell_type": "markdown", + "id": "b692d06a", + "metadata": {}, + "source": [ + "Define our optimizer and loss:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "efe92797", + "metadata": {}, + "outputs": [], + "source": [ + "loss_fn = nn.CrossEntropyLoss()\n", + "optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)" + ] + }, + { + "cell_type": "markdown", + "id": "681d5798", + "metadata": {}, + "source": [ + "And finally our training loop. Note that we renamed the function from `train` to `train_epoch` to avoid conflicts with the Ray Train module later (which is also called `train`):" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "2ce258ed", + "metadata": {}, + "outputs": [], + "source": [ + "def train_epoch(dataloader, model, loss_fn, optimizer):\n", + " size = len(dataloader.dataset)\n", + " model.train()\n", + " for batch, (X, y) in enumerate(dataloader):\n", + " X, y = X.to(device), y.to(device)\n", + "\n", + " # Compute prediction error\n", + " pred = model(X)\n", + " loss = loss_fn(pred, y)\n", + "\n", + " # Backpropagation\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + "\n", + " if batch % 100 == 0:\n", + " loss, current = loss.item(), batch * len(X)\n", + " print(f\"loss: {loss:>7f} [{current:>5d}/{size:>5d}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "6621cffa", + "metadata": {}, + "source": [ + "And while we're at it, here is our validation loop (note that we sneaked in a `return test_loss` statement and also renamed the function):" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "bbefec77", + "metadata": {}, + "outputs": [], + "source": [ + "def test_epoch(dataloader, model, loss_fn):\n", + " size = len(dataloader.dataset)\n", + " num_batches = len(dataloader)\n", + " model.eval()\n", + " test_loss, correct = 0, 0\n", + " with torch.no_grad():\n", + " for X, y in dataloader:\n", + " X, y = X.to(device), y.to(device)\n", + " pred = model(X)\n", + " test_loss += loss_fn(pred, y).item()\n", + " correct += (pred.argmax(1) == y).type(torch.float).sum().item()\n", + " test_loss /= num_batches\n", + " correct /= size\n", + " print(f\"Test Error: \\n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \\n\")\n", + " return test_loss" + ] + }, + { + "cell_type": "markdown", + "id": "d915d788", + "metadata": {}, + "source": [ + "Now we can trigger training and save a model:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "27f80fc7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Epoch 1\n", + "-------------------------------\n", + "loss: 2.304186 [ 0/60000]\n", + "loss: 2.289454 [ 6400/60000]\n", + "loss: 2.275760 [12800/60000]\n", + "loss: 2.270670 [19200/60000]\n", + "loss: 2.240363 [25600/60000]\n", + "loss: 2.226019 [32000/60000]\n", + "loss: 2.230945 [38400/60000]\n", + "loss: 2.197411 [44800/60000]\n", + "loss: 2.197602 [51200/60000]\n", + "loss: 2.160038 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 43.4%, Avg loss: 2.156346 \n", + "\n", + "Epoch 2\n", + "-------------------------------\n", + "loss: 2.171642 [ 0/60000]\n", + "loss: 2.156236 [ 6400/60000]\n", + "loss: 2.107473 [12800/60000]\n", + "loss: 2.118233 [19200/60000]\n", + "loss: 2.052204 [25600/60000]\n", + "loss: 2.006639 [32000/60000]\n", + "loss: 2.030226 [38400/60000]\n", + "loss: 1.953917 [44800/60000]\n", + "loss: 1.964219 [51200/60000]\n", + "loss: 1.873772 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 52.9%, Avg loss: 1.883439 \n", + "\n", + "Epoch 3\n", + "-------------------------------\n", + "loss: 1.929272 [ 0/60000]\n", + "loss: 1.888294 [ 6400/60000]\n", + "loss: 1.783824 [12800/60000]\n", + "loss: 1.807983 [19200/60000]\n", + "loss: 1.694384 [25600/60000]\n", + "loss: 1.663575 [32000/60000]\n", + "loss: 1.671774 [38400/60000]\n", + "loss: 1.587723 [44800/60000]\n", + "loss: 1.612172 [51200/60000]\n", + "loss: 1.490934 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 59.6%, Avg loss: 1.520538 \n", + "\n", + "Epoch 4\n", + "-------------------------------\n", + "loss: 1.602018 [ 0/60000]\n", + "loss: 1.556215 [ 6400/60000]\n", + "loss: 1.419982 [12800/60000]\n", + "loss: 1.469938 [19200/60000]\n", + "loss: 1.351850 [25600/60000]\n", + "loss: 1.363095 [32000/60000]\n", + "loss: 1.361990 [38400/60000]\n", + "loss: 1.299478 [44800/60000]\n", + "loss: 1.331679 [51200/60000]\n", + "loss: 1.225714 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 63.5%, Avg loss: 1.255372 \n", + "\n", + "Epoch 5\n", + "-------------------------------\n", + "loss: 1.341576 [ 0/60000]\n", + "loss: 1.316398 [ 6400/60000]\n", + "loss: 1.161010 [12800/60000]\n", + "loss: 1.250706 [19200/60000]\n", + "loss: 1.122512 [25600/60000]\n", + "loss: 1.159244 [32000/60000]\n", + "loss: 1.169590 [38400/60000]\n", + "loss: 1.114988 [44800/60000]\n", + "loss: 1.154847 [51200/60000]\n", + "loss: 1.066008 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 64.9%, Avg loss: 1.089063 \n", + "\n", + "Done!\n" + ] + } + ], + "source": [ + "epochs = 5\n", + "for t in range(epochs):\n", + " print(f\"Epoch {t+1}\\n-------------------------------\")\n", + " train_epoch(train_dataloader, model, loss_fn, optimizer)\n", + " test_epoch(test_dataloader, model, loss_fn)\n", + "print(\"Done!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "e62fc82b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Saved PyTorch Model State to model.pth\n" + ] + } + ], + "source": [ + "torch.save(model.state_dict(), \"model.pth\")\n", + "print(\"Saved PyTorch Model State to model.pth\")" + ] + }, + { + "cell_type": "markdown", + "id": "6655d903", + "metadata": {}, + "source": [ + "We'll cover the rest of the tutorial (loading the model and doing batch prediction) later!" + ] + }, + { + "cell_type": "markdown", + "id": "d0b98b1c", + "metadata": {}, + "source": [ + "## Introducing a wrapper function (no Ray AIR, yet!)\n", + "The notebook-style from the tutorial is great for tutorials, but in your production code you probably wrapped the actual training logic in a function. So let's do this here, too.\n", + "\n", + "Note that we do not add or alter any code here (apart from variable definitions) - we just take the loose bits of code in the current tutorial and put them into one function." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "aacdf4a6", + "metadata": {}, + "outputs": [], + "source": [ + "def train_func():\n", + " batch_size = 64\n", + " lr = 1e-3\n", + " epochs = 5\n", + " \n", + " # Create data loaders.\n", + " train_dataloader = DataLoader(training_data, batch_size=batch_size)\n", + " test_dataloader = DataLoader(test_data, batch_size=batch_size)\n", + " \n", + " # Get cpu or gpu device for training.\n", + " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + " print(f\"Using {device} device\")\n", + " \n", + " model = NeuralNetwork().to(device)\n", + " print(model)\n", + " \n", + " loss_fn = nn.CrossEntropyLoss()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=lr)\n", + " \n", + " for t in range(epochs):\n", + " print(f\"Epoch {t+1}\\n-------------------------------\")\n", + " train_epoch(train_dataloader, model, loss_fn, optimizer)\n", + " test_epoch(test_dataloader, model, loss_fn)\n", + "\n", + " print(\"Done!\")" + ] + }, + { + "cell_type": "markdown", + "id": "60f7341a", + "metadata": {}, + "source": [ + "Let's see it in action again:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "7130c361", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using cpu device\n", + "NeuralNetwork(\n", + " (flatten): Flatten(start_dim=1, end_dim=-1)\n", + " (linear_relu_stack): Sequential(\n", + " (0): Linear(in_features=784, out_features=512, bias=True)\n", + " (1): ReLU()\n", + " (2): Linear(in_features=512, out_features=512, bias=True)\n", + " (3): ReLU()\n", + " (4): Linear(in_features=512, out_features=10, bias=True)\n", + " )\n", + ")\n", + "Epoch 1\n", + "-------------------------------\n", + "loss: 2.300660 [ 0/60000]\n", + "loss: 2.296696 [ 6400/60000]\n", + "loss: 2.275931 [12800/60000]\n", + "loss: 2.269738 [19200/60000]\n", + "loss: 2.264045 [25600/60000]\n", + "loss: 2.234024 [32000/60000]\n", + "loss: 2.238547 [38400/60000]\n", + "loss: 2.209435 [44800/60000]\n", + "loss: 2.214618 [51200/60000]\n", + "loss: 2.185344 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 41.6%, Avg loss: 2.178503 \n", + "\n", + "Epoch 2\n", + "-------------------------------\n", + "loss: 2.183002 [ 0/60000]\n", + "loss: 2.178075 [ 6400/60000]\n", + "loss: 2.126900 [12800/60000]\n", + "loss: 2.142223 [19200/60000]\n", + "loss: 2.095714 [25600/60000]\n", + "loss: 2.040030 [32000/60000]\n", + "loss: 2.066279 [38400/60000]\n", + "loss: 1.993092 [44800/60000]\n", + "loss: 2.015497 [51200/60000]\n", + "loss: 1.938319 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 53.5%, Avg loss: 1.936191 \n", + "\n", + "Epoch 3\n", + "-------------------------------\n", + "loss: 1.962513 [ 0/60000]\n", + "loss: 1.940845 [ 6400/60000]\n", + "loss: 1.830932 [12800/60000]\n", + "loss: 1.868069 [19200/60000]\n", + "loss: 1.754753 [25600/60000]\n", + "loss: 1.703867 [32000/60000]\n", + "loss: 1.726961 [38400/60000]\n", + "loss: 1.626829 [44800/60000]\n", + "loss: 1.669507 [51200/60000]\n", + "loss: 1.556128 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 58.2%, Avg loss: 1.572178 \n", + "\n", + "Epoch 4\n", + "-------------------------------\n", + "loss: 1.636078 [ 0/60000]\n", + "loss: 1.604536 [ 6400/60000]\n", + "loss: 1.457413 [12800/60000]\n", + "loss: 1.516031 [19200/60000]\n", + "loss: 1.394795 [25600/60000]\n", + "loss: 1.390730 [32000/60000]\n", + "loss: 1.400060 [38400/60000]\n", + "loss: 1.321162 [44800/60000]\n", + "loss: 1.366145 [51200/60000]\n", + "loss: 1.265664 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 62.5%, Avg loss: 1.288956 \n", + "\n", + "Epoch 5\n", + "-------------------------------\n", + "loss: 1.367953 [ 0/60000]\n", + "loss: 1.350934 [ 6400/60000]\n", + "loss: 1.184691 [12800/60000]\n", + "loss: 1.275289 [19200/60000]\n", + "loss: 1.155229 [25600/60000]\n", + "loss: 1.179248 [32000/60000]\n", + "loss: 1.196911 [38400/60000]\n", + "loss: 1.129392 [44800/60000]\n", + "loss: 1.173347 [51200/60000]\n", + "loss: 1.095378 [57600/60000]\n", + "Test Error: \n", + " Accuracy: 64.4%, Avg loss: 1.112272 \n", + "\n", + "Done!\n" + ] + } + ], + "source": [ + "train_func()" + ] + }, + { + "cell_type": "markdown", + "id": "b3df2581", + "metadata": {}, + "source": [ + "The output should look very similar to the previous ouput." + ] + }, + { + "cell_type": "markdown", + "id": "abe8e708", + "metadata": {}, + "source": [ + "## Starting with Ray AIR: Distribute the training\n", + "\n", + "As a first step, we want to distribute the training across multiple workers. For this we want to\n", + "\n", + "1. Use data-parallel training by sharding the training data\n", + "2. Setup the model to communicate gradient updates across machines\n", + "3. Report the results back to Ray Train.\n", + "\n", + "\n", + "To facilitate this, we only need a few changes to the code:\n", + "\n", + "1. We import Ray Train:\n", + "\n", + "```python\n", + "import ray.train as train\n", + "```\n", + "\n", + "\n", + "2. We use a `config` dict to configure some hyperparameters (this is not strictly needed but good practice, especially if you want to o hyperparameter tuning later):\n", + "\n", + "```python\n", + "def train_func(config: dict):\n", + " batch_size = config[\"batch_size\"]\n", + " lr = config[\"lr\"]\n", + " epochs = config[\"epochs\"]\n", + "```\n", + "\n", + "3. We dynamically adjust the worker batch size according to the number of workers:\n", + "\n", + "```python\n", + " batch_size_per_worker = batch_size // train.world_size()\n", + "```\n", + "\n", + "4. We prepare the data loader for distributed data sharding:\n", + "\n", + "```python\n", + " train_dataloader = train.torch.prepare_data_loader(train_dataloader)\n", + " test_dataloader = train.torch.prepare_data_loader(test_dataloader)\n", + "```\n", + "\n", + "5. We prepare the model for distributed gradient updates:\n", + "\n", + "```python\n", + " model = train.torch.prepare_model(model)\n", + "```\n", + "\n", + "Note that `train.torch.prepare_model()` also automatically takes care of setting up devices (e.g. GPU training) - so we can get rid of those lines in our current code!\n", + "\n", + "\n", + "6. We capture the validation loss and report it to Ray train:\n", + "\n", + "```python\n", + " test_loss = test(test_dataloader, model, loss_fn)\n", + " train.report(loss=test_loss)\n", + "```\n", + "\n", + "7. In the `train_epoch()` and `test_epoch()` functions we divide the `size` by the world size:\n", + "\n", + "```python\n", + " size = len(dataloader.dataset) // train.world_size() # Divide by word size\n", + "```\n", + "\n", + "8. In the `train_epoch()` function we can get rid of the device mapping. Ray Train does this for us:\n", + "\n", + "```python\n", + " # We don't need this anymore! Ray Train does this automatically:\n", + " # X, y = X.to(device), y.to(device) \n", + "```\n", + "\n", + "That's it - you need less than 10 lines of Ray Train-specific code and can otherwise continue to use your original code.\n", + "\n", + "Let's take a look at the resulting code. First the `train_epoch()` function (2 lines changed, and we also commented out the print statement):" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "50b2f602", + "metadata": {}, + "outputs": [], + "source": [ + "def train_epoch(dataloader, model, loss_fn, optimizer):\n", + " size = len(dataloader.dataset) // train.world_size() # Divide by word size\n", + " model.train()\n", + " for batch, (X, y) in enumerate(dataloader):\n", + " # We don't need this anymore! Ray Train does this automatically:\n", + " # X, y = X.to(device), y.to(device) \n", + "\n", + " # Compute prediction error\n", + " pred = model(X)\n", + " loss = loss_fn(pred, y)\n", + "\n", + " # Backpropagation\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + "\n", + " if batch % 100 == 0:\n", + " loss, current = loss.item(), batch * len(X)\n", + " # print(f\"loss: {loss:>7f} [{current:>5d}/{size:>5d}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "6e260f44", + "metadata": {}, + "source": [ + "Then the `test_epoch()` function (1 line changed, and we also commented out the print statement):" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "72aa3e48", + "metadata": {}, + "outputs": [], + "source": [ + "def test_epoch(dataloader, model, loss_fn):\n", + " size = len(dataloader.dataset) // train.world_size() # Divide by word size\n", + " num_batches = len(dataloader)\n", + " model.eval()\n", + " test_loss, correct = 0, 0\n", + " with torch.no_grad():\n", + " for X, y in dataloader:\n", + " X, y = X.to(device), y.to(device)\n", + " pred = model(X)\n", + " test_loss += loss_fn(pred, y).item()\n", + " correct += (pred.argmax(1) == y).type(torch.float).sum().item()\n", + " test_loss /= num_batches\n", + " correct /= size\n", + " # print(f\"Test Error: \\n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \\n\")\n", + " return test_loss" + ] + }, + { + "cell_type": "markdown", + "id": "cf280e6a", + "metadata": {}, + "source": [ + "And lastly, the wrapping `train_func()` where we added 4 lines and modified 2 (apart from the config dict):" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "3f79c731", + "metadata": {}, + "outputs": [], + "source": [ + "import ray.train as train\n", + "\n", + "\n", + "def train_func(config: dict):\n", + " batch_size = config[\"batch_size\"]\n", + " lr = config[\"lr\"]\n", + " epochs = config[\"epochs\"]\n", + " \n", + " batch_size_per_worker = batch_size // train.world_size()\n", + " \n", + " # Create data loaders.\n", + " train_dataloader = DataLoader(training_data, batch_size=batch_size_per_worker)\n", + " test_dataloader = DataLoader(test_data, batch_size=batch_size_per_worker)\n", + " \n", + " train_dataloader = train.torch.prepare_data_loader(train_dataloader)\n", + " test_dataloader = train.torch.prepare_data_loader(test_dataloader)\n", + " \n", + " model = NeuralNetwork()\n", + " model = train.torch.prepare_model(model)\n", + " \n", + " loss_fn = nn.CrossEntropyLoss()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=lr)\n", + " \n", + " for t in range(epochs):\n", + " train_epoch(train_dataloader, model, loss_fn, optimizer)\n", + " test_loss = test_epoch(test_dataloader, model, loss_fn)\n", + " train.report(loss=test_loss)\n", + "\n", + " print(\"Done!\")" + ] + }, + { + "cell_type": "markdown", + "id": "0fc52cc7", + "metadata": {}, + "source": [ + "Now we'll use Ray Train's TorchTrainer to kick off the training. Note that we can set the hyperparmameters here! In the `scaling_config` we can also configure how many parallel workers to use and if we want to enable GPU training or not." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "939e767f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-06-14 13:02:53,006\tINFO services.py:1483 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265\u001b[39m\u001b[22m\n" + ] + }, + { + "data": { + "text/html": [ + "== Status ==
Current time: 2022-06-14 13:03:47 (running for 00:00:49.63)
Memory usage on this node: 10.3/16.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/3.63 GiB heap, 0.0/1.81 GiB objects
Result logdir: /Users/kai/ray_results/TorchTrainer_2022-06-14_13-02-55
Number of trials: 1/1 (1 TERMINATED)
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_8bcc7_00000TERMINATED127.0.0.1:7443 4 42.56151.24926 1655204626 9.67353


" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-06-14 13:03:00,221\tWARNING worker.py:1737 -- Warning: The actor TrainTrainable is very large (52 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7448)\u001b[0m 2022-06-14 13:03:06,880\tINFO config.py:71 -- Setting up process group for: env:// [rank=0, world_size=2]\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7449)\u001b[0m 2022-06-14 13:03:06,879\tINFO config.py:71 -- Setting up process group for: env:// [rank=1, world_size=2]\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7448)\u001b[0m 2022-06-14 13:03:08,303\tINFO train_loop_utils.py:293 -- Moving model to device: cpu\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7448)\u001b[0m 2022-06-14 13:03:08,303\tINFO train_loop_utils.py:331 -- Wrapping provided model in DDP.\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7449)\u001b[0m 2022-06-14 13:03:08,303\tINFO train_loop_utils.py:293 -- Moving model to device: cpu\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7449)\u001b[0m 2022-06-14 13:03:08,303\tINFO train_loop_utils.py:331 -- Wrapping provided model in DDP.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Result for TorchTrainer_8bcc7_00000:\n", + " _time_this_iter_s: 9.377830982208252\n", + " _timestamp: 1655204597\n", + " _training_iteration: 1\n", + " date: 2022-06-14_13-03-17\n", + " done: false\n", + " experiment_id: d262dbae86774c4fb809871401db393d\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 1\n", + " loss: 2.1573975238071124\n", + " node_ip: 127.0.0.1\n", + " pid: 7443\n", + " time_since_restore: 13.348651885986328\n", + " time_this_iter_s: 13.348651885986328\n", + " time_total_s: 13.348651885986328\n", + " timestamp: 1655204597\n", + " timesteps_since_restore: 0\n", + " training_iteration: 1\n", + " trial_id: 8bcc7_00000\n", + " warmup_time: 0.0038008689880371094\n", + " \n", + "Result for TorchTrainer_8bcc7_00000:\n", + " _time_this_iter_s: 9.486207962036133\n", + " _timestamp: 1655204607\n", + " _training_iteration: 2\n", + " date: 2022-06-14_13-03-27\n", + " done: false\n", + " experiment_id: d262dbae86774c4fb809871401db393d\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 2\n", + " loss: 1.88913657179304\n", + " node_ip: 127.0.0.1\n", + " pid: 7443\n", + " time_since_restore: 22.830953121185303\n", + " time_this_iter_s: 9.482301235198975\n", + " time_total_s: 22.830953121185303\n", + " timestamp: 1655204607\n", + " timesteps_since_restore: 0\n", + " training_iteration: 2\n", + " trial_id: 8bcc7_00000\n", + " warmup_time: 0.0038008689880371094\n", + " \n", + "Result for TorchTrainer_8bcc7_00000:\n", + " _time_this_iter_s: 10.05704402923584\n", + " _timestamp: 1655204617\n", + " _training_iteration: 3\n", + " date: 2022-06-14_13-03-37\n", + " done: false\n", + " experiment_id: d262dbae86774c4fb809871401db393d\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 3\n", + " loss: 1.5208747804544533\n", + " node_ip: 127.0.0.1\n", + " pid: 7443\n", + " time_since_restore: 32.88802194595337\n", + " time_this_iter_s: 10.057068824768066\n", + " time_total_s: 32.88802194595337\n", + " timestamp: 1655204617\n", + " timesteps_since_restore: 0\n", + " training_iteration: 3\n", + " trial_id: 8bcc7_00000\n", + " warmup_time: 0.0038008689880371094\n", + " \n", + "Result for TorchTrainer_8bcc7_00000:\n", + " _time_this_iter_s: 9.673533201217651\n", + " _timestamp: 1655204626\n", + " _training_iteration: 4\n", + " date: 2022-06-14_13-03-46\n", + " done: false\n", + " experiment_id: d262dbae86774c4fb809871401db393d\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 4\n", + " loss: 1.2492616913121217\n", + " node_ip: 127.0.0.1\n", + " pid: 7443\n", + " time_since_restore: 42.56152319908142\n", + " time_this_iter_s: 9.673501253128052\n", + " time_total_s: 42.56152319908142\n", + " timestamp: 1655204626\n", + " timesteps_since_restore: 0\n", + " training_iteration: 4\n", + " trial_id: 8bcc7_00000\n", + " warmup_time: 0.0038008689880371094\n", + " \n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7449)\u001b[0m Done!\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7448)\u001b[0m Done!\n", + "Result for TorchTrainer_8bcc7_00000:\n", + " _time_this_iter_s: 9.673533201217651\n", + " _timestamp: 1655204626\n", + " _training_iteration: 4\n", + " date: 2022-06-14_13-03-46\n", + " done: true\n", + " experiment_id: d262dbae86774c4fb809871401db393d\n", + " experiment_tag: '0'\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 4\n", + " loss: 1.2492616913121217\n", + " node_ip: 127.0.0.1\n", + " pid: 7443\n", + " time_since_restore: 42.56152319908142\n", + " time_this_iter_s: 9.673501253128052\n", + " time_total_s: 42.56152319908142\n", + " timestamp: 1655204626\n", + " timesteps_since_restore: 0\n", + " training_iteration: 4\n", + " trial_id: 8bcc7_00000\n", + " warmup_time: 0.0038008689880371094\n", + " \n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-06-14 13:03:47,154\tINFO tune.py:742 -- Total run time: 51.21 seconds (49.63 seconds for the tuning loop).\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Last result: {'loss': 1.2492616913121217, '_timestamp': 1655204626, '_time_this_iter_s': 9.673533201217651, '_training_iteration': 4, 'time_this_iter_s': 9.673501253128052, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 4, 'trial_id': '8bcc7_00000', 'experiment_id': 'd262dbae86774c4fb809871401db393d', 'date': '2022-06-14_13-03-46', 'timestamp': 1655204626, 'time_total_s': 42.56152319908142, 'pid': 7443, 'hostname': 'Kais-MacBook-Pro.fritz.box', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 42.56152319908142, 'timesteps_since_restore': 0, 'iterations_since_restore': 4, 'warmup_time': 0.0038008689880371094, 'experiment_tag': '0'}\n" + ] + } + ], + "source": [ + "from ray.train.torch import TorchTrainer\n", + "\n", + "\n", + "trainer = TorchTrainer(\n", + " train_loop_per_worker=train_func,\n", + " train_loop_config={\"lr\": 1e-3, \"batch_size\": 64, \"epochs\": 4},\n", + " scaling_config={\"num_workers\": 2, \"use_gpu\": False},\n", + ")\n", + "result = trainer.fit()\n", + "print(f\"Last result: {result.metrics}\")" + ] + }, + { + "cell_type": "markdown", + "id": "341f4fd8", + "metadata": {}, + "source": [ + "Great, this works! You're now training your model in parallel. You could now scale this up to more nodes and workers on your Ray cluster.\n", + "\n", + "But there are a few improvements we can make to the code in order to get the most of the system. For one, we should enable **checkpointing** to get access to the trained model afterwards. Additionally, we should optimize the **data loading** to take place within the workers." + ] + }, + { + "cell_type": "markdown", + "id": "3bbe06f3", + "metadata": {}, + "source": [ + "### Enabling checkpointing to retrieve the model\n", + "Enabling checkpointing is pretty easy - we just need to call the `train.save_checkpoint()` API and pass the model state to it:\n", + "\n", + "```python\n", + " train.save_checkpoint(epoch=t, model=model.module.state_dict())\n", + "```\n", + "\n", + "Note that the `model.module` part is needed because the model gets wrapped in `torch.nn.DistributedDataParallel` by `train.torch.prepare_model`.\n", + "\n", + "### Move the data loader to the training function\n", + "\n", + "You may have noticed a warning: `Warning: The actor TrainTrainable is very large (52 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.`.\n", + "\n", + "This is because we load the data outside the training function. Ray then serializes it to make it accessible to the remote tasks (that may get executed on a remote node!). This is not too bad with just 52 MB of data, but imagine this were a full image dataset - you wouldn't want to ship this around the cluster unnecessarily. Instead, you should move the dataset loading part into the `train_func()`. This will then download the data *to disk* once per machine and result in much more efficient data loading.\n", + "\n", + "The result looks like this:" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "059953f8", + "metadata": {}, + "outputs": [], + "source": [ + "def load_data():\n", + " # Download training data from open datasets.\n", + " training_data = datasets.FashionMNIST(\n", + " root=\"data\",\n", + " train=True,\n", + " download=True,\n", + " transform=ToTensor(),\n", + " )\n", + "\n", + " # Download test data from open datasets.\n", + " test_data = datasets.FashionMNIST(\n", + " root=\"data\",\n", + " train=False,\n", + " download=True,\n", + " transform=ToTensor(),\n", + " )\n", + " return training_data, test_data\n", + "\n", + "\n", + "def train_func(config: dict):\n", + " batch_size = config[\"batch_size\"]\n", + " lr = config[\"lr\"]\n", + " epochs = config[\"epochs\"]\n", + " \n", + " batch_size_per_worker = batch_size // train.world_size()\n", + " \n", + " training_data, test_data = load_data() # <- this is new!\n", + " \n", + " # Create data loaders.\n", + " train_dataloader = DataLoader(training_data, batch_size=batch_size_per_worker)\n", + " test_dataloader = DataLoader(test_data, batch_size=batch_size_per_worker)\n", + " \n", + " train_dataloader = train.torch.prepare_data_loader(train_dataloader)\n", + " test_dataloader = train.torch.prepare_data_loader(test_dataloader)\n", + " \n", + " model = NeuralNetwork()\n", + " model = train.torch.prepare_model(model)\n", + " \n", + " loss_fn = nn.CrossEntropyLoss()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=lr)\n", + " \n", + " for t in range(epochs):\n", + " train_epoch(train_dataloader, model, loss_fn, optimizer)\n", + " test_loss = test_epoch(test_dataloader, model, loss_fn)\n", + " train.save_checkpoint(epoch=t, model=model.module.state_dict()) # <- this is new!\n", + " train.report(loss=test_loss)\n", + "\n", + " print(\"Done!\")" + ] + }, + { + "cell_type": "markdown", + "id": "d2af219d", + "metadata": {}, + "source": [ + "Let's train again:" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "de249d61", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "== Status ==
Current time: 2022-06-14 13:04:34 (running for 00:00:47.20)
Memory usage on this node: 11.4/16.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/3.63 GiB heap, 0.0/1.81 GiB objects
Result logdir: /Users/kai/ray_results/TorchTrainer_2022-06-14_13-03-47
Number of trials: 1/1 (1 TERMINATED)
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
Trial name status loc iter total time (s) loss _timestamp _time_this_iter_s
TorchTrainer_a9dda_00000TERMINATED127.0.0.1:7485 4 41.98631.22261 1655204673 9.94109


" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m 2022-06-14 13:03:54,234\tINFO config.py:71 -- Setting up process group for: env:// [rank=0, world_size=2]\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m 2022-06-14 13:03:54,234\tINFO config.py:71 -- Setting up process group for: env:// [rank=1, world_size=2]\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/torchvision/datasets/mnist.py:498: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:180.)\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m 2022-06-14 13:03:55,404\tINFO train_loop_utils.py:293 -- Moving model to device: cpu\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m 2022-06-14 13:03:55,404\tINFO train_loop_utils.py:331 -- Wrapping provided model in DDP.\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/torchvision/datasets/mnist.py:498: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:180.)\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m 2022-06-14 13:03:55,404\tINFO train_loop_utils.py:293 -- Moving model to device: cpu\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m 2022-06-14 13:03:55,404\tINFO train_loop_utils.py:331 -- Wrapping provided model in DDP.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Result for TorchTrainer_a9dda_00000:\n", + " _time_this_iter_s: 10.072710990905762\n", + " _timestamp: 1655204645\n", + " _training_iteration: 1\n", + " date: 2022-06-14_13-04-05\n", + " done: false\n", + " experiment_id: 1f001dea6bd948c39235e589f07fcd8f\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 1\n", + " loss: 2.1288197860596285\n", + " node_ip: 127.0.0.1\n", + " pid: 7485\n", + " should_checkpoint: true\n", + " time_since_restore: 13.953598976135254\n", + " time_this_iter_s: 13.953598976135254\n", + " time_total_s: 13.953598976135254\n", + " timestamp: 1655204645\n", + " timesteps_since_restore: 0\n", + " training_iteration: 1\n", + " trial_id: a9dda_00000\n", + " warmup_time: 0.0025289058685302734\n", + " \n", + "Result for TorchTrainer_a9dda_00000:\n", + " _time_this_iter_s: 9.129379987716675\n", + " _timestamp: 1655204654\n", + " _training_iteration: 2\n", + " date: 2022-06-14_13-04-14\n", + " done: false\n", + " experiment_id: 1f001dea6bd948c39235e589f07fcd8f\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 2\n", + " loss: 1.8414961441307311\n", + " node_ip: 127.0.0.1\n", + " pid: 7485\n", + " should_checkpoint: true\n", + " time_since_restore: 23.07395100593567\n", + " time_this_iter_s: 9.120352029800415\n", + " time_total_s: 23.07395100593567\n", + " timestamp: 1655204654\n", + " timesteps_since_restore: 0\n", + " training_iteration: 2\n", + " trial_id: a9dda_00000\n", + " warmup_time: 0.0025289058685302734\n", + " \n", + "Result for TorchTrainer_a9dda_00000:\n", + " _time_this_iter_s: 8.954904079437256\n", + " _timestamp: 1655204663\n", + " _training_iteration: 3\n", + " date: 2022-06-14_13-04-23\n", + " done: false\n", + " experiment_id: 1f001dea6bd948c39235e589f07fcd8f\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 3\n", + " loss: 1.4759019393070487\n", + " node_ip: 127.0.0.1\n", + " pid: 7485\n", + " should_checkpoint: true\n", + " time_since_restore: 32.03323698043823\n", + " time_this_iter_s: 8.959285974502563\n", + " time_total_s: 32.03323698043823\n", + " timestamp: 1655204663\n", + " timesteps_since_restore: 0\n", + " training_iteration: 3\n", + " trial_id: a9dda_00000\n", + " warmup_time: 0.0025289058685302734\n", + " \n", + "Result for TorchTrainer_a9dda_00000:\n", + " _time_this_iter_s: 9.941093921661377\n", + " _timestamp: 1655204673\n", + " _training_iteration: 4\n", + " date: 2022-06-14_13-04-33\n", + " done: false\n", + " experiment_id: 1f001dea6bd948c39235e589f07fcd8f\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 4\n", + " loss: 1.2226136068629612\n", + " node_ip: 127.0.0.1\n", + " pid: 7485\n", + " should_checkpoint: true\n", + " time_since_restore: 41.98633909225464\n", + " time_this_iter_s: 9.953102111816406\n", + " time_total_s: 41.98633909225464\n", + " timestamp: 1655204673\n", + " timesteps_since_restore: 0\n", + " training_iteration: 4\n", + " trial_id: a9dda_00000\n", + " warmup_time: 0.0025289058685302734\n", + " \n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7492)\u001b[0m Done!\n", + "\u001b[2m\u001b[36m(BaseWorkerMixin pid=7491)\u001b[0m Done!\n", + "Result for TorchTrainer_a9dda_00000:\n", + " _time_this_iter_s: 9.941093921661377\n", + " _timestamp: 1655204673\n", + " _training_iteration: 4\n", + " date: 2022-06-14_13-04-33\n", + " done: true\n", + " experiment_id: 1f001dea6bd948c39235e589f07fcd8f\n", + " experiment_tag: '0'\n", + " hostname: Kais-MacBook-Pro.fritz.box\n", + " iterations_since_restore: 4\n", + " loss: 1.2226136068629612\n", + " node_ip: 127.0.0.1\n", + " pid: 7485\n", + " should_checkpoint: true\n", + " time_since_restore: 41.98633909225464\n", + " time_this_iter_s: 9.953102111816406\n", + " time_total_s: 41.98633909225464\n", + " timestamp: 1655204673\n", + " timesteps_since_restore: 0\n", + " training_iteration: 4\n", + " trial_id: a9dda_00000\n", + " warmup_time: 0.0025289058685302734\n", + " \n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-06-14 13:04:34,537\tINFO tune.py:742 -- Total run time: 47.32 seconds (47.19 seconds for the tuning loop).\n" + ] + } + ], + "source": [ + "trainer = TorchTrainer(\n", + " train_loop_per_worker=train_func,\n", + " train_loop_config={\"lr\": 1e-3, \"batch_size\": 64, \"epochs\": 4},\n", + " scaling_config={\"num_workers\": 2, \"use_gpu\": False},\n", + ")\n", + "result = trainer.fit()\n" + ] + }, + { + "cell_type": "markdown", + "id": "534ed4df", + "metadata": {}, + "source": [ + "We can see our results here:" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "b81ca48f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Last result: {'loss': 1.2226136068629612, '_timestamp': 1655204673, '_time_this_iter_s': 9.941093921661377, '_training_iteration': 4, 'time_this_iter_s': 9.953102111816406, 'should_checkpoint': True, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 4, 'trial_id': 'a9dda_00000', 'experiment_id': '1f001dea6bd948c39235e589f07fcd8f', 'date': '2022-06-14_13-04-33', 'timestamp': 1655204673, 'time_total_s': 41.98633909225464, 'pid': 7485, 'hostname': 'Kais-MacBook-Pro.fritz.box', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 41.98633909225464, 'timesteps_since_restore': 0, 'iterations_since_restore': 4, 'warmup_time': 0.0025289058685302734, 'experiment_tag': '0'}\n", + "Checkpoint: \n" + ] + } + ], + "source": [ + "print(f\"Last result: {result.metrics}\")\n", + "print(f\"Checkpoint: {result.checkpoint}\")" + ] + }, + { + "cell_type": "markdown", + "id": "b6b15d88", + "metadata": {}, + "source": [ + "## Loading the model for prediction\n", + "You may have noticed that we skipped one part of the original tutorial - loading the model and using it for inference. The original code looks like this (we've wrapped it in a function):" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "68e664ff", + "metadata": {}, + "outputs": [], + "source": [ + "def predict_from_model(model):\n", + " classes = [\n", + " \"T-shirt/top\",\n", + " \"Trouser\",\n", + " \"Pullover\",\n", + " \"Dress\",\n", + " \"Coat\",\n", + " \"Sandal\",\n", + " \"Shirt\",\n", + " \"Sneaker\",\n", + " \"Bag\",\n", + " \"Ankle boot\",\n", + " ]\n", + "\n", + " model.eval()\n", + " x, y = test_data[0][0], test_data[0][1]\n", + " with torch.no_grad():\n", + " pred = model(x)\n", + " predicted, actual = classes[pred[0].argmax(0)], classes[y]\n", + " print(f'Predicted: \"{predicted}\", Actual: \"{actual}\"')\n" + ] + }, + { + "cell_type": "markdown", + "id": "1abf022a", + "metadata": {}, + "source": [ + "We can use our saved model with the existing code to do prediction:" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "0c135a17", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Predicted: \"Ankle boot\", Actual: \"Ankle boot\"\n" + ] + } + ], + "source": [ + "from ray.train.torch import load_checkpoint\n", + "\n", + "model, _ = load_checkpoint(result.checkpoint, NeuralNetwork())\n", + "\n", + "predict_from_model(model)" + ] + }, + { + "cell_type": "markdown", + "id": "f6fc1441", + "metadata": {}, + "source": [ + "To predict more than one example, we can use a loop:" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "17652fa1", + "metadata": {}, + "outputs": [], + "source": [ + "classes = [\n", + " \"T-shirt/top\",\n", + " \"Trouser\",\n", + " \"Pullover\",\n", + " \"Dress\",\n", + " \"Coat\",\n", + " \"Sandal\",\n", + " \"Shirt\",\n", + " \"Sneaker\",\n", + " \"Bag\",\n", + " \"Ankle boot\",\n", + "]\n", + "\n", + "def predict_from_model(model, data):\n", + " model.eval()\n", + " with torch.no_grad():\n", + " for x, y in data:\n", + " pred = model(x)\n", + " predicted, actual = classes[pred[0].argmax(0)], classes[y]\n", + " print(f'Predicted: \"{predicted}\", Actual: \"{actual}\"')\n" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "3bc14ed6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Predicted: \"Ankle boot\", Actual: \"Ankle boot\"\n", + "Predicted: \"Pullover\", Actual: \"Pullover\"\n", + "Predicted: \"Trouser\", Actual: \"Trouser\"\n", + "Predicted: \"Trouser\", Actual: \"Trouser\"\n", + "Predicted: \"Pullover\", Actual: \"Shirt\"\n", + "Predicted: \"Trouser\", Actual: \"Trouser\"\n", + "Predicted: \"Coat\", Actual: \"Coat\"\n", + "Predicted: \"Coat\", Actual: \"Shirt\"\n", + "Predicted: \"Sneaker\", Actual: \"Sandal\"\n", + "Predicted: \"Sneaker\", Actual: \"Sneaker\"\n" + ] + } + ], + "source": [ + "predict_from_model(model, [test_data[i] for i in range(10)])" + ] + }, + { + "cell_type": "markdown", + "id": "a0ce0733", + "metadata": {}, + "source": [ + "## Using Ray AIR for scalable batch prediction\n", + "However, we can also use Ray AIRs `BatchPredictor` class to do scalable prediction." + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "4d8b0f50", + "metadata": {}, + "outputs": [], + "source": [ + "from ray.air import BatchPredictor\n", + "from ray.air.predictors.integrations.torch import TorchPredictor\n", + "\n", + "batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, TorchPredictor, model=NeuralNetwork())" + ] + }, + { + "cell_type": "markdown", + "id": "ad556eeb", + "metadata": {}, + "source": [ + "Batch predictors work with Ray Datasets. Here we convert our test dataset into a Ray Dataset - note that this is not very efficient, and you can look at our {ref}`other tutorials ` to see more efficient ways to generate a Ray Dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "8cb0556f", + "metadata": {}, + "outputs": [], + "source": [ + "import ray.data\n", + "\n", + "ds = ray.data.from_items([x for x, y in test_data])" + ] + }, + { + "cell_type": "markdown", + "id": "264dd2e4", + "metadata": {}, + "source": [ + "We can then trigger prediction with two workers:" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "8a823f7a", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Map Progress (2 actors 1 pending): 100%|██████████████████████████████████████████████████████████████████████████| 200/200 [00:04<00:00, 44.83it/s]\n" + ] + } + ], + "source": [ + "results = batch_predictor.predict(ds, min_scoring_workers=2)" + ] + }, + { + "cell_type": "markdown", + "id": "41094a55", + "metadata": {}, + "source": [ + "`results` is another Ray Dataset. We can use `results.to_pandas()` to see our prediction results:" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "d3dce40d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
predictions
0[-1.3625717, -1.7147198, -0.7944063, -1.516942...
1[0.7415036, -2.1898255, 2.9233487, -0.8718336,...
2[1.9445083, 4.0967875, -0.07387225, 2.8944397,...
3[1.460784, 3.2333734, -0.15551251, 2.3267126, ...
4[0.758382, -0.8887838, 1.1806433, -0.04382074,...
......
9995[-1.6255455, -2.7318435, -0.8888813, -2.205097...
9996[0.90374756, 2.051165, -0.046540327, 1.4930309...
9997[0.89324296, -0.14099044, -0.08300409, 0.74801...
9998[1.4642937, 3.2236817, -0.23001938, 2.5179548,...
9999[-1.0059572, -1.2176754, -0.36726016, -1.10825...
\n", + "

10000 rows × 1 columns

\n", + "
" + ], + "text/plain": [ + " predictions\n", + "0 [-1.3625717, -1.7147198, -0.7944063, -1.516942...\n", + "1 [0.7415036, -2.1898255, 2.9233487, -0.8718336,...\n", + "2 [1.9445083, 4.0967875, -0.07387225, 2.8944397,...\n", + "3 [1.460784, 3.2333734, -0.15551251, 2.3267126, ...\n", + "4 [0.758382, -0.8887838, 1.1806433, -0.04382074,...\n", + "... ...\n", + "9995 [-1.6255455, -2.7318435, -0.8888813, -2.205097...\n", + "9996 [0.90374756, 2.051165, -0.046540327, 1.4930309...\n", + "9997 [0.89324296, -0.14099044, -0.08300409, 0.74801...\n", + "9998 [1.4642937, 3.2236817, -0.23001938, 2.5179548,...\n", + "9999 [-1.0059572, -1.2176754, -0.36726016, -1.10825...\n", + "\n", + "[10000 rows x 1 columns]" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "results.to_pandas()" + ] + }, + { + "cell_type": "markdown", + "id": "427b68e8", + "metadata": {}, + "source": [ + "If we want to convert these predictions into class names (as in the original example), we can use a `map` function to do this:" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "f17b5c10", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Map_Batches: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:01<00:00, 117.68it/s]\n" + ] + } + ], + "source": [ + "predicted_classes = results.map_batches(\n", + " lambda batch: [classes[pred.argmax(0)] for pred in batch[\"predictions\"]], \n", + " batch_format=\"pandas\")" + ] + }, + { + "cell_type": "markdown", + "id": "cb7040db", + "metadata": {}, + "source": [ + "To compare this with the actual labels, let's create a Ray dataset for these and zip it together with the predicted classes:" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "207e13b9", + "metadata": {}, + "outputs": [], + "source": [ + "real_classes = ray.data.from_items([classes[y] for x, y in test_data])\n", + "merged = predicted_classes.zip(real_classes)" + ] + }, + { + "cell_type": "markdown", + "id": "18b1012c", + "metadata": {}, + "source": [ + "Let's examine our results:" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "2b2decc6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
value
0(Ankle boot, Ankle boot)
1(Pullover, Pullover)
2(Trouser, Trouser)
3(Trouser, Trouser)
4(Pullover, Shirt)
......
9995(Ankle boot, Ankle boot)
9996(Trouser, Trouser)
9997(T-shirt/top, Bag)
9998(Trouser, Trouser)
9999(Sneaker, Sandal)
\n", + "

10000 rows × 1 columns

\n", + "
" + ], + "text/plain": [ + " value\n", + "0 (Ankle boot, Ankle boot)\n", + "1 (Pullover, Pullover)\n", + "2 (Trouser, Trouser)\n", + "3 (Trouser, Trouser)\n", + "4 (Pullover, Shirt)\n", + "... ...\n", + "9995 (Ankle boot, Ankle boot)\n", + "9996 (Trouser, Trouser)\n", + "9997 (T-shirt/top, Bag)\n", + "9998 (Trouser, Trouser)\n", + "9999 (Sneaker, Sandal)\n", + "\n", + "[10000 rows x 1 columns]" + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "merged.to_pandas()" + ] + }, + { + "cell_type": "markdown", + "id": "2963e1f7", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This tutorial demonstrated how to turn your existing PyTorch code into code you can use with Ray AIR.\n", + "\n", + "We learned how to\n", + "- enable distributed training using Ray Train abstractions\n", + "- save and retrieve model checkpoints via Ray AIR\n", + "- load a model for batch prediction\n", + "\n", + "In our {ref}`other examples ` you can learn how to do more things with the Ray AIR API, such as **serving your model with Ray Serve** or **tune your hyperparameters with Ray Tune.** You can also learn how to **construct Ray Datasets** to leverage Ray AIR's **preprocessing** API.\n", + "\n", + "We hope this tutorial gave you a good starting point to leverage Ray AIR. If you have any questions, suggestions, or run into any problems pelase reach out on [Discuss](https://discuss.ray.io/) or [GitHub](https://github.com/ray-project/ray)!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/source/ray-air/examples/index.rst b/doc/source/ray-air/examples/index.rst index 3f8a1e8cb..3facf81e2 100644 --- a/doc/source/ray-air/examples/index.rst +++ b/doc/source/ray-air/examples/index.rst @@ -11,6 +11,7 @@ Framework-specific Examples - :doc:`/ray-air/examples/lightgbm_example`: Distributed training with LightGBM - :doc:`/ray-air/examples/xgboost_example`: Distributed training with LightGBM - :doc:`/ray-air/examples/sklearn_example`: Integrating with Scikit-Learn (non-distributed) +- :doc:`/ray-air/examples/convert_existing_pytorch_code_to_ray_air`: How to get started with Ray AIR from your code base Tabular Data