ray/doc/source/ray-air/examples/serving_guide.ipynb
Simon Mo 7471b1fa41
[Serve] [AIR] ModelWrapper improvements and docs (#25003)
* batching collation code and tests

* wip notebook for np and dataframe

* finish content

* reset ray-more-libs changes

* add comments

* run through

* Apply suggestions from code review

Co-authored-by: shrekris-anyscale <92341594+shrekris-anyscale@users.noreply.github.com>

* rename package

* lint

* richard's comment

Co-authored-by: shrekris-anyscale <92341594+shrekris-anyscale@users.noreply.github.com>
2022-06-07 08:53:10 -07:00

926 lines
40 KiB
Text

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(air-serving-guide)=\n",
"\n",
"# Serve Ray AIR Predictors with `ModelWrapper`\n",
"\n",
"[Ray Serve](rayserve) is the recommended tool to deploy models trained with AIR.\n",
"\n",
"After training a model with Ray Train, you can serve a model using Ray Serve. In this guide, we will cover how to use Ray AIR's `ModelWrapper`, `Predictor`, and `Checkpoint` abstractions to quickly deploy a model for online inference.\n",
"\n",
"But before that, let's review the key concepts:\n",
"- [`Checkpoint`](ray.air.checkpoint) represents a trained model stored in memory, file, or remote uri.\n",
"- [`Predictor`](ray.air.predictor.Predictor)s understand how to perform a model inference given checkpoints and the model definition. Ray AIR comes with predictors for each supported frameworks. \n",
"- [`Deployment`](serve-key-concepts-deployment) is a Ray Serve construct that represent an HTTP endpoint along with scalable pool of models.\n",
"\n",
"The core concept for deployment is the `ModelWrapper`. The `ModelWrapper` takes a [predictor](ray.air.predictor.Predictor) class and a [checkpoint](ray.air.checkpoint) and transforms them into a live HTTP endpoint. \n",
"\n",
"We'll start with a simple quick-start demo showing how you can use the ModelWrapper to deploy your model for online inference."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's first make sure Ray AIR is installed. For the quick-start, we'll also use Ray AIR to train and serve a XGBoost model."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install \"ray[air]\" xgboost scikit-learn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can find the preprocessor and trainer in the [key concepts walk-through](air-key-concepts)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-06-02 19:31:31,356\tINFO services.py:1483 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265\u001b[39m\u001b[22m\n"
]
},
{
"data": {
"text/html": [
"== Status ==<br>Current time: 2022-06-02 19:31:48 (running for 00:00:13.38)<br>Memory usage on this node: 37.9/64.0 GiB<br>Using FIFO scheduling algorithm.<br>Resources requested: 0/16 CPUs, 0/0 GPUs, 0.0/25.71 GiB heap, 0.0/2.0 GiB objects<br>Result logdir: /Users/simonmo/ray_results/XGBoostTrainer_2022-06-02_19-31-34<br>Number of trials: 1/1 (1 TERMINATED)<br><table>\n",
"<thead>\n",
"<tr><th>Trial name </th><th>status </th><th>loc </th><th style=\"text-align: right;\"> iter</th><th style=\"text-align: right;\"> total time (s)</th><th style=\"text-align: right;\"> train-logloss</th><th style=\"text-align: right;\"> train-error</th><th style=\"text-align: right;\"> valid-logloss</th></tr>\n",
"</thead>\n",
"<tbody>\n",
"<tr><td>XGBoostTrainer_4930d_00000</td><td>TERMINATED</td><td>127.0.0.1:60303</td><td style=\"text-align: right;\"> 5</td><td style=\"text-align: right;\"> 8.72108</td><td style=\"text-align: right;\"> 0.190254</td><td style=\"text-align: right;\"> 0.035176</td><td style=\"text-align: right;\"> 0.20535</td></tr>\n",
"</tbody>\n",
"</table><br><br>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(GBDTTrainable pid=60303)\u001b[0m UserWarning: `num_actors` in `ray_params` is smaller than 2 (1). XGBoost will NOT be distributed!\n",
"\u001b[2m\u001b[36m(GBDTTrainable pid=60303)\u001b[0m 2022-06-02 19:31:42,283\tINFO main.py:980 -- [RayXGBoost] Created 1 new actors (1 total actors). Waiting until actors are ready for training.\n",
"\u001b[2m\u001b[36m(GBDTTrainable pid=60303)\u001b[0m 2022-06-02 19:31:46,324\tINFO main.py:1025 -- [RayXGBoost] Starting XGBoost training.\n",
"\u001b[2m\u001b[36m(_RemoteRayXGBoostActor pid=60578)\u001b[0m [19:31:46] task [xgboost.ray]:140298197243216 got new rank 0\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Result for XGBoostTrainer_4930d_00000:\n",
" date: 2022-06-02_19-31-47\n",
" done: false\n",
" experiment_id: 171c25bee8e7490f933cc082daf7e6e0\n",
" hostname: Simons-MacBook-Pro.local\n",
" iterations_since_restore: 1\n",
" node_ip: 127.0.0.1\n",
" pid: 60303\n",
" should_checkpoint: true\n",
" time_since_restore: 8.666727781295776\n",
" time_this_iter_s: 8.666727781295776\n",
" time_total_s: 8.666727781295776\n",
" timestamp: 1654223507\n",
" timesteps_since_restore: 0\n",
" train-error: 0.047739\n",
" train-logloss: 0.483805\n",
" training_iteration: 1\n",
" trial_id: 4930d_00000\n",
" valid-error: 0.05848\n",
" valid-logloss: 0.488357\n",
" warmup_time: 0.0035247802734375\n",
" \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(GBDTTrainable pid=60303)\u001b[0m 2022-06-02 19:31:47,421\tINFO main.py:1519 -- [RayXGBoost] Finished XGBoost training on training data with total N=398 in 5.16 seconds (1.09 pure XGBoost training time).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Result for XGBoostTrainer_4930d_00000:\n",
" date: 2022-06-02_19-31-47\n",
" done: true\n",
" experiment_id: 171c25bee8e7490f933cc082daf7e6e0\n",
" experiment_tag: '0'\n",
" hostname: Simons-MacBook-Pro.local\n",
" iterations_since_restore: 5\n",
" node_ip: 127.0.0.1\n",
" pid: 60303\n",
" should_checkpoint: true\n",
" time_since_restore: 8.72108268737793\n",
" time_this_iter_s: 0.011542558670043945\n",
" time_total_s: 8.72108268737793\n",
" timestamp: 1654223507\n",
" timesteps_since_restore: 0\n",
" train-error: 0.035176\n",
" train-logloss: 0.190254\n",
" training_iteration: 5\n",
" trial_id: 4930d_00000\n",
" valid-error: 0.046784\n",
" valid-logloss: 0.20535\n",
" warmup_time: 0.0035247802734375\n",
" \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-06-02 19:31:48,266\tINFO tune.py:753 -- Total run time: 13.77 seconds (13.38 seconds for the tuning loop).\n"
]
}
],
"source": [
"import ray\n",
"import pandas as pd\n",
"from sklearn.datasets import load_breast_cancer\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"from ray.air.train.integrations.xgboost import XGBoostTrainer\n",
"from ray.air.preprocessors import StandardScaler\n",
"\n",
"data_raw = load_breast_cancer()\n",
"dataset_df = pd.DataFrame(data_raw[\"data\"], columns=data_raw[\"feature_names\"])\n",
"dataset_df[\"target\"] = data_raw[\"target\"]\n",
"train_df, test_df = train_test_split(dataset_df, test_size=0.3)\n",
"train_dataset = ray.data.from_pandas(train_df)\n",
"valid_dataset = ray.data.from_pandas(test_df)\n",
"test_dataset = ray.data.from_pandas(test_df.drop(\"target\", axis=1))\n",
"\n",
"# Define preprocessor\n",
"columns_to_scale = [\"mean radius\", \"mean texture\"]\n",
"preprocessor = StandardScaler(columns=columns_to_scale)\n",
"\n",
"# Define trainer\n",
"trainer = XGBoostTrainer(\n",
" scaling_config={\n",
" \"num_workers\": 1\n",
" },\n",
" label_column=\"target\",\n",
" params={\n",
" \"tree_method\": \"approx\",\n",
" \"objective\": \"binary:logistic\",\n",
" \"eval_metric\": [\"logloss\", \"error\"],\n",
" \"max_depth\": 2,\n",
"},\n",
" datasets={\"train\": train_dataset, \"valid\": valid_dataset},\n",
" preprocessor=preprocessor,\n",
" num_boost_round=5,\n",
")\n",
"\n",
"result = trainer.fit()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following block serves a Ray AIR model from a [checkpoint](ray.air.checkpoint), using the built-in [`XGBoostPredictor`](ray.air.predictors.integrations.xgboost.XGBoostPredictor)."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:31:52,825 controller 60981 checkpoint_path.py:17 - Using RayInternalKVStore for controller checkpoint and recovery.\n",
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:31:52,828 controller 60981 http_state.py:115 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:SERVE_PROXY_ACTOR-node:127.0.0.1-0' on node 'node:127.0.0.1-0' listening on '127.0.0.1:8000'\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO: Started server process [60984]\n",
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:31:55,191 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'XGBoostService'.\n"
]
}
],
"source": [
"from ray.air.predictors.integrations.xgboost import XGBoostPredictor\n",
"from ray import serve\n",
"from ray.serve.model_wrappers import ModelWrapperDeployment\n",
"from ray.serve.http_adapters import pandas_read_json\n",
"\n",
"\n",
"serve.start(detached=True)\n",
"deployment = ModelWrapperDeployment.options(name=\"XGBoostService\")\n",
"\n",
"deployment.deploy(\n",
" XGBoostPredictor, result.checkpoint, http_adapter=pandas_read_json\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's send a request through HTTP."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[{'predictions': 0.1142289936542511}]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:32:00,604 http_proxy 127.0.0.1 http_proxy.py:320 - POST /XGBoostService 307 5.4ms\n",
"\u001b[2m\u001b[36m(XGBoostService pid=60988)\u001b[0m INFO 2022-06-02 19:32:00,603 XGBoostService XGBoostService#LOYoUm replica.py:484 - HANDLE __call__ OK 0.3ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:32:00,658 http_proxy 127.0.0.1 http_proxy.py:320 - POST /XGBoostService 200 49.8ms\n",
"\u001b[2m\u001b[36m(XGBoostService pid=60988)\u001b[0m INFO 2022-06-02 19:32:00,656 XGBoostService XGBoostService#LOYoUm replica.py:484 - HANDLE __call__ OK 46.8ms\n"
]
}
],
"source": [
"import requests\n",
"\n",
"sample_input = test_dataset.take(1)\n",
"sample_input = dict(sample_input[0])\n",
"\n",
"output = requests.post(deployment.url, json=[sample_input]).json()\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"It works! As you can see, you can use the `ModelWrapper` to deploy checkpoints trained in Ray AIR as live endpoints. You can find more end-to-end examples for your specific frameworks in the [examples](air-examples-ref) page.\n",
"\n",
"This tutorial aims to provide an in-depth understanding of `ModelWrappers`. In particular, it'll demonstrate:\n",
"- How to serve a predictor accepting array input.\n",
"- How to serve a predictor accepting dataframe input.\n",
"- How to serve a predictor accepting custom input that can be transformed to array or dataframe.\n",
"- How to configure micro-batching to enhance performance.\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Predictor accepting NumPy array\n",
"We'll use a simple predictor implementation that adds an increment to an input array."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"from ray.air.predictor import Predictor\n",
"from ray.air.checkpoint import Checkpoint\n",
"\n",
"class AdderPredictor(Predictor):\n",
" \"\"\"Dummy predictor that increments input by a staic value.\"\"\"\n",
" def __init__(self, increment: int):\n",
" self.increment = increment\n",
" \n",
" @classmethod\n",
" def from_checkpoint(cls, ckpt: Checkpoint):\n",
" \"\"\"Create predictor from checkpoint.\n",
" \n",
" Args:\n",
" ckpt: The AIR checkpoint representing a single dictionary. The dictionary\n",
" should have key `increment` and an integer value.\n",
" \"\"\"\n",
" return cls(ckpt.to_dict()[\"increment\"])\n",
" \n",
" def predict(self, inp: np.ndarray) -> np.ndarray:\n",
" return inp + self.increment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's first test it locally."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"local_checkpoint = Checkpoint.from_dict({\"increment\": 2})\n",
"local_predictor = AdderPredictor.from_checkpoint(local_checkpoint)\n",
"assert local_predictor.predict(np.array([40])) == np.array([42])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It worked! Now let's serve it behind HTTP. In Ray Serve, the core unit of an HTTP service is called a [`Deployment`](serve-key-concepts-deployment). It turns a Python class into a queryable HTTP endpoint. For Ray AIR, Serve provides a `ModelWrapperDeployment` to simplify this transformation. You don't need to implement any Python classes. You just pass in your predictor and checkpoint instead.\n",
"\n",
"The deployment takes several arguments. It requires two arguments to start:\n",
"- `predictor_cls (Type[Predictor] | str)`: The predictor Python class. Typically you can use built-in integrations from Ray AIR like the `TorchPredictor`. Alternatively, you can specify the class path to import a predictor like `\"ray.air.integrations.torch.TorchPredictor\"`.\n",
"- `checkpoint (Checkpoint | str)`: A checkpoint instance, or uri to load the checkpoint from.\n",
"\n",
"The following cell showcases how to create a deployment with our `AdderPredictor`\n",
"\n",
"To learn more about Ray Serve, check out [its documentation](rayserve)."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:32:07,559 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'Adder'.\n"
]
}
],
"source": [
"from ray import serve\n",
"from ray.serve.model_wrappers import ModelWrapperDeployment\n",
"\n",
"# Launch a Ray cluster running Ray Serve\n",
"serve.start()\n",
"\n",
"# Deploy the model behind HTTP endpoint\n",
"ModelWrapperDeployment.options(name=\"Adder\").deploy(\n",
" predictor_cls=AdderPredictor,\n",
" checkpoint=local_checkpoint\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After the model has been deployed, let's send an HTTP request."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[42.0]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:32:18,864 http_proxy 127.0.0.1 http_proxy.py:320 - POST /Adder 200 18.0ms\n",
"\u001b[2m\u001b[36m(Adder pid=60999)\u001b[0m INFO 2022-06-02 19:32:18,863 Adder Adder#aqYgDS replica.py:484 - HANDLE __call__ OK 13.1ms\n"
]
}
],
"source": [
"import requests\n",
"resp = requests.post(\"http://localhost:8000/Adder/\", json={\"array\": [40]})\n",
"resp.raise_for_status()\n",
"resp.json()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Nice! We sent `[40]` as our input and got `[42]` as our output in JSON format.\n",
"\n",
"You can also specify multi-dimensional arrays in the JSON payload, as well as \"dtype\" and \"shape\" fields to process to array. For more information about the array input schema, see [Ndarray](serve-ndarray-schema).\n",
" \n",
"That's it for arrays! Let's take a look at tabular input."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Predictor accepting Pandas DataFrame\n",
"Let's now take a look at a predictor accepting dataframe inputs. We'll perform some simple column-wise transformations on the input data."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"\n",
"class DataFramePredictor(Predictor):\n",
" \"\"\"Dummy predictor that first multiplies input then increment it.\"\"\"\n",
" def __init__(self, increment: int):\n",
" self.increment = increment\n",
"\n",
" @classmethod\n",
" def from_checkpoint(cls, ckpt: Checkpoint):\n",
" return cls(ckpt.to_dict()[\"increment\"])\n",
"\n",
" def predict(self, inp: pd.DataFrame) -> pd.DataFrame:\n",
" inp[\"prediction\"] = inp[\"base\"] * inp[\"multiplier\"] + self.increment\n",
" return inp\n",
"\n",
"local_df_predictor = DataFramePredictor.from_checkpoint(local_checkpoint)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Just like the `AdderPredictor`, we'll use the same `ModelWrapperDeployment` approach to make it queryable with HTTP. \n",
"\n",
"Note that we added `http_adapter=pandas_read_json` as the keyword argument. This tells Serve how to convert incoming JSON requests into a DataFrame. The `pandas_read_json` adapter accepts:\n",
"- [Pandas-parsable JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) in HTTP body\n",
"- Optional keyword arguments to the [`pandas.read_json`](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) function via HTTP url parameters.\n",
"\n",
"To learn more, see [HTTP Adapters](serve-http-adapters).\n",
"\n",
"```{note}\n",
"You might wonder why the previous array predictor doesn't need to specify any http adapter. This is because Ray Serve defaults to a built-in adapter called `json_to_ndarray`(ray.serve.http_adapters.json_to_ndarray)!\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:32:24,396 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'DataFramePredictor'.\n"
]
}
],
"source": [
"from ray.serve.http_adapters import pandas_read_json\n",
"\n",
"ModelWrapperDeployment.options(name=\"DataFramePredictor\").deploy(\n",
" predictor_cls=DataFramePredictor,\n",
" checkpoint=local_checkpoint,\n",
" http_adapter=pandas_read_json\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's send a request to our endpoint. "
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'[{\"base\":1,\"multiplier\":2,\"prediction\":4},{\"base\":3,\"multiplier\":4,\"prediction\":14}]'"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:32:28,751 http_proxy 127.0.0.1 http_proxy.py:320 - POST /DataFramePredictor 200 21.0ms\n",
"\u001b[2m\u001b[36m(DataFramePredictor pid=61006)\u001b[0m INFO 2022-06-02 19:32:28,750 DataFramePredictor DataFramePredictor#IJcHCI replica.py:484 - HANDLE __call__ OK 17.2ms\n"
]
}
],
"source": [
"resp = requests.post(\n",
" \"http://localhost:8000/DataFramePredictor/\",\n",
" json=[{\"base\": 1, \"multiplier\": 2}, {\"base\": 3, \"multiplier\": 4}],\n",
" params={\"orient\": \"records\"},\n",
")\n",
"resp.raise_for_status()\n",
"resp.text"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Great! You can see that the input JSON has been converted to a dataframe, so our predictor can work with pure dataframes instead of raw HTTP requests.\n",
"\n",
"But what if we need to configure the HTTP request? You can do that as well.\n",
"\n",
"## 3. Accepting custom inputs via `http_adapter`\n",
"\n",
"The `http_adapter` field accepts any callable function that's type annotated. You can also bring in additional types that are accepted by FastAPI's dependency injection framework. For more detail, see [HTTP Adapters](serve-http-adapters). In the following example, instead of using the pandas adapter Serve provides, we'll implement our own request adapter that works with just http parameters instead of JSON."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"def our_own_http_adapter(base: int, multiplier: int):\n",
" return pd.DataFrame([{\"base\": base, \"multiplier\": multiplier}])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's deploy it."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:33:31,010 controller 60981 deployment_state.py:1180 - Stopping 1 replicas of deployment 'DataFramePredictor' with outdated versions.\n",
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:33:33,165 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'DataFramePredictor'.\n"
]
}
],
"source": [
"from ray.serve.http_adapters import pandas_read_json\n",
"\n",
"ModelWrapperDeployment.options(name=\"DataFramePredictor\").deploy(\n",
" predictor_cls=DataFramePredictor,\n",
" checkpoint=local_checkpoint,\n",
" http_adapter=our_own_http_adapter\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's now send a request. Note that the new predictor accepts our specified input via HTTP parameters. \n",
"\n",
"The equivalent curl request would be `curl -X POST http://localhost:8000/DataFramePredictor/?base=10&multiplier=4`."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'[{\"base\":10,\"multiplier\":4,\"prediction\":42}]'"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:36,070 http_proxy 127.0.0.1 http_proxy.py:320 - POST /DataFramePredictor 200 21.6ms\n",
"\u001b[2m\u001b[36m(DataFramePredictor pid=61037)\u001b[0m INFO 2022-06-02 19:33:36,069 DataFramePredictor DataFramePredictor#QzQiec replica.py:484 - HANDLE __call__ OK 17.5ms\n"
]
}
],
"source": [
"resp = requests.post(\n",
" \"http://localhost:8000/DataFramePredictor/\",\n",
" params={\"base\": 10, \"multiplier\": 4}\n",
")\n",
"resp.raise_for_status()\n",
"resp.text"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. `ModelWrapper` performs microbatching to improve performance\n",
"\n",
"Common machine learning models take a batch of inputs for prediction. Common ML Frameworks are optimized with vectorized instruction to make inference on batch requests almost as fast as single requests. \n",
"\n",
"In Serve's `ModelWrapperDeployment`, the incoming requests are automatically batched. \n",
"\n",
"When multiple clients send requests at the same time, Serve will combine the requests into a single batch (array or dataframe). Then, Serve calls predict on the entire batch. Let's take a look at a predictor that returns each row's content, batch_size, and batch group."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"class BatchSizePredictor(Predictor):\n",
" @classmethod\n",
" def from_checkpoint(cls, _: Checkpoint):\n",
" return cls()\n",
" \n",
" def predict(self, inp: np.ndarray):\n",
" time.sleep(0.5) # simulate model inference.\n",
" return [(i, len(inp), inp) for i in inp]"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:33:39,305 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'BatchSizePredictor'.\n"
]
}
],
"source": [
"ModelWrapperDeployment.options(name=\"BatchSizePredictor\").deploy(\n",
" predictor_cls=BatchSizePredictor,\n",
" checkpoint=local_checkpoint,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's use a threadpool executor to send ten requests at the same time to simulate multiple clients."
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:43,141 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 525.9ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:43,139 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 519.1ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:43,647 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 1030.2ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:43,645 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1013.6ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1015.0ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 511.8ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 511.4ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 511.0ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,661 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2043.3ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,662 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2042.9ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,662 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2039.5ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,662 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2038.1ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,663 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2038.9ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,663 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2036.8ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:44,664 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2036.5ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,661 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1016.0ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,661 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1015.6ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:44,662 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1015.5ms\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Request id: [0.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4\n",
"Request id: [1.0] is part of batch group: [[1.0]], with batch size 1\n",
"Request id: [2.0] is part of batch group: [[2.0]], with batch size 1\n",
"Request id: [3.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4\n",
"Request id: [4.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4\n",
"Request id: [5.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3\n",
"Request id: [6.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3\n",
"Request id: [7.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4\n",
"Request id: [8.0] is part of batch group: [[8.0]], with batch size 1\n",
"Request id: [9.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:45,167 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2539.1ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61041)\u001b[0m INFO 2022-06-02 19:33:45,165 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1516.7ms\n"
]
}
],
"source": [
"from concurrent.futures import ThreadPoolExecutor, wait\n",
"\n",
"with ThreadPoolExecutor() as pool:\n",
" futs = [\n",
" pool.submit(\n",
" requests.post,\n",
" \"http://localhost:8000/BatchSizePredictor/\",\n",
" json={\"array\": [i]},\n",
" )\n",
" for i in range(10)\n",
" ]\n",
" wait(futs)\n",
"for fut in futs:\n",
" i, batch_size, batch_group = fut.result().json()\n",
" print(f\"Request id: {i} is part of batch group: {batch_group}, with batch size {batch_size}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see, some of the requests are part of a bigger group that's run together.\n",
"\n",
"You can also configure the exact details of batching parameters:\n",
"- `max_batch_size(int)`: the maximum batch size that will be executed in one call to predict.\n",
"- `batch_wait_timeout_s (float)`: the maximum duration to wait for `max_batch_size` elements before running the predict call.\n",
"\n",
"Let's set a `max_batch_size` of 10 to group our requests into the same batch."
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:33:47,081 controller 60981 deployment_state.py:1180 - Stopping 1 replicas of deployment 'BatchSizePredictor' with outdated versions.\n",
"\u001b[2m\u001b[36m(ServeController pid=60981)\u001b[0m INFO 2022-06-02 19:33:49,234 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'BatchSizePredictor'.\n"
]
}
],
"source": [
"ModelWrapperDeployment.options(name=\"BatchSizePredictor\").deploy(\n",
" predictor_cls=BatchSizePredictor,\n",
" checkpoint=local_checkpoint,\n",
" batching_params={\"max_batch_size\": 10, \"batch_wait_timeout_s\": 5}\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's call them again! You should see all ten requests executed as part of the same group."
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Request id: [0.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [1.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [2.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [3.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [4.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [5.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [6.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [7.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [8.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n",
"Request id: [9.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,751 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 538.8ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,752 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 526.8ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,753 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 535.1ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,753 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 528.0ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 533.4ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 528.0ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 526.3ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 525.0ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,755 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 524.5ms\n",
"\u001b[2m\u001b[36m(HTTPProxyActor pid=60984)\u001b[0m INFO 2022-06-02 19:33:52,755 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 524.0ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,746 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 530.1ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,746 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 514.7ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,747 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 514.4ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,747 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 513.6ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,747 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 513.4ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,748 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 511.6ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,748 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 510.6ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,748 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 510.4ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,749 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 510.3ms\n",
"\u001b[2m\u001b[36m(BatchSizePredictor pid=61046)\u001b[0m INFO 2022-06-02 19:33:52,749 BatchSizePredictor BatchSizePredictor#mlVwXr replica.py:484 - HANDLE __call__ OK 509.9ms\n"
]
}
],
"source": [
"from concurrent.futures import ThreadPoolExecutor, wait\n",
"\n",
"with ThreadPoolExecutor() as pool:\n",
" futs = [\n",
" pool.submit(\n",
" requests.post,\n",
" \"http://localhost:8000/BatchSizePredictor/\",\n",
" json={\"array\": [i]},\n",
" )\n",
" for i in range(10)\n",
" ]\n",
" wait(futs)\n",
"for fut in futs:\n",
" i, batch_size, batch_group = fut.result().json()\n",
" print(f\"Request id: {i} is part of batch group: {batch_group}, with batch size {batch_size}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The batching behavior is well-defined:\n",
"- When batching arrays, they are all concatenated into a new array with an added batch dimension.\n",
"- When batching dataframes, they are all concatenated row-wise.\n",
"\n",
"You can also turn off this behavior by setting `batching_params=False`."
]
}
],
"metadata": {
"interpreter": {
"hash": "3d292ac03404f7ef6351ef2ed79a7f4abdf879ab5af2497c4aeba036dba01cfa"
},
"kernelspec": {
"display_name": "Python 3.6.9 ('base')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}