{ "cells": [ { "cell_type": "markdown", "id": "c656b46d", "metadata": {}, "source": [ "# Training a Torch Classifier\n", "\n", "This tutorial demonstrates how to train an image classifier using the [Ray AI Runtime](air) (AIR).\n", "\n", "You should be familiar with [PyTorch](https://pytorch.org/) before starting the tutorial. If you need a refresher, read PyTorch's [training a classifier](https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html) tutorial.\n", "\n", "## Before you begin\n", "\n", "* Install the [Ray AI Runtime](air). You'll need Ray 1.13 later to run this example." ] }, { "cell_type": "code", "execution_count": 1, "id": "20a51fae", "metadata": {}, "outputs": [], "source": [ "!pip install 'ray[air]'" ] }, { "cell_type": "markdown", "id": "4d6a1fbd", "metadata": {}, "source": [ "* Install `requests`, `torch`, and `torchvision`" ] }, { "cell_type": "code", "execution_count": 2, "id": "2860f0d8", "metadata": {}, "outputs": [], "source": [ "!pip install requests torch torchvision" ] }, { "cell_type": "markdown", "id": "b2e47e6b", "metadata": {}, "source": [ "## Load and normalize CIFAR-10\n", "\n", "We'll train our classifier on a popular image dataset called [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html).\n", "\n", "First, let's load CIFAR-10 into a Ray Dataset." ] }, { "cell_type": "code", "execution_count": 9, "id": "39170e60", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-07-11 14:46:56,693\tWARNING read_api.py:264 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.\n", "\u001b[2m\u001b[36m(_prepare_read pid=54936)\u001b[0m 2022-07-11 14:46:56,687\tWARNING torch_datasource.py:56 -- `SimpleTorchDatasource` doesn't support parallel reads. The `parallelism` argument will be ignored.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\u001b[2m\u001b[36m(_execute_read_task pid=54936)\u001b[0m Files already downloaded and verified\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Map_Batches: 0%| | 0/1 [01:45` loads all data into memory, so you shouldn't use it with larger datasets.\n", "\n", "Next, let's represent our data using pandas dataframes instead of tuples. This lets us call methods like {py:meth}`Dataset.iter_torch_batches ` later in the tutorial." ] }, { "cell_type": "code", "execution_count": 10, "id": "8f3bc4fc", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Read->Map_Batches: 0%| | 0/1 [00:00Map_Batches: 100%|██████████| 1/1 [00:10<00:00, 10.44s/it]\n", "Read->Map_Batches: 0%| | 0/1 [00:00Map_Batches: 100%|██████████| 1/1 [00:02<00:00, 2.56s/it]\n" ] } ], "source": [ "from typing import Tuple\n", "import pandas as pd\n", "from ray.data.extensions import TensorArray\n", "import torch\n", "\n", "\n", "def convert_batch_to_pandas(batch: Tuple[torch.Tensor, int]) -> pd.DataFrame:\n", " images = TensorArray([image.numpy() for image, _ in batch])\n", " labels = [label for _, label in batch]\n", "\n", " df = pd.DataFrame({\"image\": images, \"label\": labels})\n", "\n", " return df\n", "\n", "\n", "train_dataset = train_dataset.map_batches(convert_batch_to_pandas)\n", "test_dataset = test_dataset.map_batches(convert_batch_to_pandas)" ] }, { "cell_type": "code", "execution_count": 11, "id": "4aa50f2e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Dataset(num_blocks=1, num_rows=50000, schema={image: TensorDtype, label: int64})" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "train_dataset" ] }, { "cell_type": "markdown", "id": "454bd960", "metadata": {}, "source": [ "## Train a convolutional neural network\n", "\n", "Now that we've created our datasets, let's define the training logic." ] }, { "cell_type": "code", "execution_count": 12, "id": "19046672", "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "\n", "\n", "class Net(nn.Module):\n", " def __init__(self):\n", " super().__init__()\n", " self.conv1 = nn.Conv2d(3, 6, 5)\n", " self.pool = nn.MaxPool2d(2, 2)\n", " self.conv2 = nn.Conv2d(6, 16, 5)\n", " self.fc1 = nn.Linear(16 * 5 * 5, 120)\n", " self.fc2 = nn.Linear(120, 84)\n", " self.fc3 = nn.Linear(84, 10)\n", "\n", " def forward(self, x):\n", " x = self.pool(F.relu(self.conv1(x)))\n", " x = self.pool(F.relu(self.conv2(x)))\n", " x = torch.flatten(x, 1) # flatten all dimensions except batch\n", " x = F.relu(self.fc1(x))\n", " x = F.relu(self.fc2(x))\n", " x = self.fc3(x)\n", " return x" ] }, { "cell_type": "markdown", "id": "c8e84b09", "metadata": {}, "source": [ "We define our training logic in a function called `train_loop_per_worker`.\n", "\n", "`train_loop_per_worker` contains regular PyTorch code with a few notable exceptions:\n", "* We wrap our model with {py:func}`train.torch.prepare_model `.\n", "* We call {py:func}`session.get_dataset_shard ` and {py:meth}`Dataset.iter_torch_batches ` to convert a subset of our training data to a Torch dataset.\n", "* We save model state using {py:func}`session.report `." ] }, { "cell_type": "code", "execution_count": 24, "id": "3e212725", "metadata": {}, "outputs": [], "source": [ "from ray import train\n", "from ray.air import session, Checkpoint\n", "import torch.optim as optim\n", "\n", "\n", "def train_loop_per_worker(config):\n", " model = train.torch.prepare_model(Net())\n", "\n", " criterion = nn.CrossEntropyLoss()\n", " optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)\n", "\n", " train_dataset_shard = session.get_dataset_shard(\"train\").iter_torch_batches(\n", " batch_size=config[\"batch_size\"],\n", " )\n", "\n", " for epoch in range(2):\n", " running_loss = 0.0\n", " for i, data in enumerate(train_dataset_shard):\n", " # get the inputs and labels\n", " inputs, labels = data[\"image\"], data[\"label\"]\n", " print(inputs)\n", " print(labels)\n", "\n", " # zero the parameter gradients\n", " optimizer.zero_grad()\n", "\n", " # forward + backward + optimize\n", " outputs = model(inputs)\n", " loss = criterion(outputs, labels)\n", " loss.backward()\n", " optimizer.step()\n", "\n", " # print statistics\n", " running_loss += loss.item()\n", " if i % 2000 == 1999: # print every 2000 mini-batches\n", " print(f\"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}\")\n", " running_loss = 0.0\n", "\n", " session.report(\n", " dict(running_loss=running_loss),\n", " checkpoint=Checkpoint.from_dict(dict(model=model.module.state_dict())),\n", " )" ] }, { "cell_type": "markdown", "id": "5f55484e", "metadata": {}, "source": [ "Finally, we can train our model. This should take a few minutes to run." ] }, { "cell_type": "code", "execution_count": 25, "id": "46c48f35", "metadata": {}, "outputs": [ { "data": { "text/html": [ "== Status ==
Current time: 2022-07-11 15:02:20 (running for 00:00:02.72)
Memory usage on this node: 30.2/64.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 3.0/16 CPUs, 0/0 GPUs, 0.0/33.52 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/jiaodong/ray_results/TorchTrainer_2022-07-11_15-02-17
Number of trials: 1/1 (1 RUNNING)
\n", "\n", "\n", "\n", "\n", "\n", "\n", "
Trial name status loc
TorchTrainer_2134d_00000RUNNING 127.0.0.1:61819


" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from ray.train.torch import TorchTrainer\n", "from ray.air.config import ScalingConfig\n", "\n", "trainer = TorchTrainer(\n", " train_loop_per_worker=train_loop_per_worker,\n", " train_loop_config={\"batch_size\": 2},\n", " datasets={\"train\": train_dataset},\n", " scaling_config=ScalingConfig(num_workers=2)\n", ")\n", "result = trainer.fit()\n", "latest_checkpoint = result.checkpoint" ] }, { "cell_type": "markdown", "id": "b0e5c7d2", "metadata": {}, "source": [ "To scale your training script, create a [Ray Cluster](deployment-guide) and increase the number of workers. If your cluster contains GPUs, add `\"use_gpu\": True` to your scaling config.\n", "\n", "```{code-block} python\n", "scaling_config=ScalingConfig(num_workers=8, \"use_gpu=True)\n", "```\n", "\n", "## Test the network on the test data\n", "\n", "Let's see how our model performs.\n", "\n", "To classify images in the test dataset, we'll need to create a {py:class}`Predictor `.\n", "\n", "{py:class}`Predictors ` load data from checkpoints and efficiently perform inference. In contrast to {py:class}`TorchPredictor `, which performs inference on a single batch, {py:class}`BatchPredictor ` performs inference on an entire dataset. Because we want to classify all of the images in the test dataset, we'll use a {py:class}`BatchPredictor `." ] }, { "cell_type": "code", "execution_count": 16, "id": "751b0b2a", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 9.24it/s]\n", "Map Progress (1 actors 1 pending): 0%| | 0/1 [00:01 pd.DataFrame:\n", " \"\"\"Accepts an NdArray JSON from an HTTP body and converts it to a Numpy Array.\"\"\"\n", " # Have to explicitly convert to float since np.array reads as a double.\n", " arr = np.array(payload.array, dtype=np.float32)\n", " return arr\n", "\n", "serve.start(detached=True)\n", "deployment = PredictorDeployment.options(name=\"my-deployment\")\n", "deployment.deploy(TorchPredictor, latest_checkpoint, batching_params=False, model=Net(), http_adapter=json_to_numpy)" ] }, { "cell_type": "markdown", "id": "0944ddbd", "metadata": {}, "source": [ "Let's classify a test image." ] }, { "cell_type": "code", "execution_count": 21, "id": "19f43c62", "metadata": {}, "outputs": [], "source": [ "batch = test_dataset.take(1)\n", "array = np.expand_dims(np.array(batch[0][\"image\"]), axis=0)" ] }, { "cell_type": "code", "execution_count": 22, "id": "1576f7cc", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(1, 3, 32, 32)" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "array.shape" ] }, { "cell_type": "markdown", "id": "f63e995d", "metadata": {}, "source": [ "You can perform inference against a deployed model by posting a dictionary with an `\"array\"` key. To learn more about the default input schema, read the {py:class}`NdArray ` documentation." ] }, { "cell_type": "code", "execution_count": 23, "id": "6b5d452f", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[-1.159639835357666,\n", " -1.4475929737091064,\n", " -0.06824108958244324,\n", " 1.7863765954971313,\n", " 0.19239971041679382,\n", " 0.8146302700042725,\n", " 0.6199826598167419,\n", " -0.4597688317298889,\n", " 0.7662580013275146,\n", " -1.104752779006958]]" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import requests\n", "\n", "payload = {\"array\": array.tolist()}\n", "response = requests.post(deployment.url, json=payload)\n", "response.json()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.6.13 ('ray')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" }, "vscode": { "interpreter": { "hash": "270f2d85f4357bdfaf8bd07e0bf53472a3d1c8c66509b0720fe6bb0939d92e56" } } }, "nbformat": 4, "nbformat_minor": 5 }