[Serve][Doc] Update docs about input schema, and json_request adapter (#24191)

This commit is contained in:
Simon Mo 2022-04-27 14:51:07 -07:00 committed by GitHub
parent f3857b7aa1
commit ee528957c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 78 deletions

View file

@ -16,7 +16,7 @@ kernelspec:
# Deployment Graph
```{note}
```{note}
Note: This feature is in Alpha, so APIs are subject to change.
```
@ -24,7 +24,7 @@ Note: This feature is in Alpha, so APIs are subject to change.
Machine learning serving systems are getting longer and wider. They often consist of many models to make a single prediction. This is common in use cases like image / video content classification and tagging, fraud detection pipeline with multiple policies, multi-stage ranking and recommendation, etc.
Meanwhile, the size of a model is also growing beyond the memory limit of a single machine due to the exponentially growing number of parameters. GPT-3 and sparse feature embeddings in large recommendation models are two prime examples.
Meanwhile, the size of a model is also growing beyond the memory limit of a single machine due to the exponentially growing number of parameters. GPT-3 and sparse feature embeddings in large recommendation models are two prime examples.
Ray has unique strengths suited to distributed inference pipelines: flexible scheduling, efficient communication, and shared memory. Ray Serve leverages these strengths to build inference graphs, enabling users to develop complex ML applications locally and then deploy them to production with dynamic scaling and lightweight updates (e.g., for model weights).
@ -58,9 +58,9 @@ At the end of this document we have the full and end to end executable implement
```
- Building a graph:
- A deployment node is created from `@serve.deployment` decorated function or class.
- A deployment node is created from `@serve.deployment` decorated function or class.
- You can construct different deployment nodes from same class or function.
- Deployment nodes can be used as input args in .bind() of other nodes in the DAG.
- Deployment nodes can be used as input args in .bind() of other nodes in the DAG.
- Multiple nodes in the deployment graph naturally forms a DAG structure.
- Accessing input:
- Same input or output can be used in multiple nodes in the DAG.
@ -77,7 +77,7 @@ At the end of this document we have the full and end to end executable implement
+++
### Step 1: User InputNode and preprocessor
### Step 1: User InputNode and preprocessor
Let's start with the first layer of DAG: Building user input to two preprocessor functions, where each function receives parts of the input data. For simplicity, we use the same existing `@serve.deployment` decorator on an async function body.
@ -86,7 +86,7 @@ Let's start with the first layer of DAG: Building user input to two preprocessor
```python
import asyncio
from ray import serve
# We will later move Ray DAG related components
# We will later move Ray DAG related components
# out of experimental in later stable release
from ray.experimental.dag.input_node import InputNode
@ -105,7 +105,7 @@ async def avg_preprocessor(input_data):
# DAG building
with InputNode() as dag_input:
# Partial access of user input by index
preprocessed_1 = preprocessor.bind(dag_input[0])
preprocessed_1 = preprocessor.bind(dag_input[0])
preprocessed_2 = avg_preprocessor.bind(dag_input[1])
```
@ -145,7 +145,7 @@ This means we're creating a DeploymentNode called `preprocessed_1` in graph buil
Each deployment node used in graph is individually scalable and configurable by default. This means in real production workload where we can expect difference in compute resource and latency, we can fine tune the nodes to optimal `num_replicas` and `num_cpus` to avoid a single node being the bottleneck of your deployment graph's latency or throughput.
```
#### bind() on class constructor
#### bind() on class constructor
**`Class.bind(*args, **kwargs)`** constructs and returns a DeploymentNode that acts as the instantiated instance of Class, where `*args` and `**kwargs` are used as init args. In our implementation, we have
@ -183,7 +183,7 @@ class Model:
self.weight = weight
async def forward(self, input: int):
await asyncio.sleep(0.3) # Manual delay for blocking computation
await asyncio.sleep(0.3) # Manual delay for blocking computation
return f"({self.weight} * {input})"
@ -194,13 +194,13 @@ class Combiner:
self.m2 = m2
async def run(self, req_part_1, req_part_2, operation):
# Merge model input from two preprocessors
# Merge model input from two preprocessors
req = f"({req_part_1} + {req_part_2})"
# Submit to both m1 and m2 with same req data in parallel
r1_ref = self.m1.forward.remote(req)
r2_ref = self.m2.forward.remote(req)
# Async gathering of model forward results for same request data
rst = await asyncio.gather(*[r1_ref, r2_ref])
@ -208,12 +208,12 @@ class Combiner:
# DAG building
with InputNode() as dag_input:
# Partial access of user input by index
preprocessed_1 = preprocessor.bind(dag_input[0])
preprocessed_1 = preprocessor.bind(dag_input[0])
preprocessed_2 = avg_preprocessor.bind(dag_input[1])
m1 = Model.bind(1)
m2 = Model.bind(2)
combiner = Combiner.bind(m1, m2)
dag = combiner.run.bind(preprocessed_1, preprocessed_2, dag_input[2])
dag = combiner.run.bind(preprocessed_1, preprocessed_2, dag_input[2])
```
+++
@ -264,16 +264,16 @@ class Combiner:
...
async def run(self, req_part_1, req_part_2, operation):
# Merge model input from two preprocessors
# Merge model input from two preprocessors
req = f"({req_part_1} + {req_part_2})"
# Submit to both m1 and m2 with same req data in parallel
r1_ref = self.m1.forward.remote(req)
r2_ref = self.m2.forward.remote(req)
# Async gathering of model forward results for same request data
rst = await asyncio.gather(*[r1_ref, r2_ref])
# Control flow that determines runtime behavior based on user input
if operation == "sum":
return f"sum({rst})"
@ -293,15 +293,44 @@ Support control flow in plain python code can be very useful to build dynamic di
Now we've built the entire serve DAG with the topology, args binding and user input. It's time to add the last piece for serve -- a Driver deployment to expose and configure http. We can configure it to start with two replicas in case the ingress of deployment becomes bottleneck of the DAG.
```{note}
We expect each DAG has a driver class implementation as root, similar to the example below. This is where HTTP ingress are configured and implemented. We provide a default `DAGDriver` to handle simple HTTP parsing, but in this example we put up a custom implementation.
Serve provides a default DAGDriver implementation that accepts HTTP request and orchestrate the deployment graph execution. You can import it from `from ray.serve.drivers import DAGDriver`.
You can configure how does the DAGDriver convert HTTP request types. By default, we directly send in a [```starlette.requests.Request```](https://www.starlette.io/requests/) object to represent the whole request. You can also specifies built-in adapters. In this example, we will use a `json_request` adapter that parses HTTP body with JSON parser.
```{tip}
There are several useful adapters like ndarray JSON, image object, etc. You can checkout {ref}`the list of adapters here <serve-http-adapters>`. You can also easily plug in your own ```input_schema```.
```
+++
```python
@serve.deployment(num_replicas=2)
class DAGDriver:
from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_request
# DAG building
with InputNode() as dag_input:
...
dag = combiner.run.bind(
preprocessed_1, preprocessed_2, dag_input[2] # Partial access of user input by index
)
# Each serve dag has a driver deployment as ingress that can be user provided.
serve_dag = DAGDriver.options(route_prefix="/my-dag", num_replicas=2).bind(
dag, input_schema=json_request
)
```
+++
````{note}
Even though we provide a default `DAGDriver`, you don't have to use it in your Serve DAG. You can easily bring your own Serve deployment to act as the root node. You can find more information about configuring HTTP deployment at {ref}`serve-http`. For example, a simple DAG Driver that parse JSON input can be implemented as follows:
```python
@serve.deployment(route_prefix="/my-dag")
class MyDAGDriver:
def __init__(self, dag_handle):
self.dag_handle = dag_handle
@ -317,15 +346,12 @@ class DAGDriver:
# DAG building
with InputNode() as dag_input:
...
dag = combiner.run.bind(
preprocessed_1, preprocessed_2, dag_input[2] # Partial access of user input by index
)
# Each serve dag has a driver deployment as ingress that can be user provided.
serve_dag = DAGDriver.options(route_prefix="/my-dag").bind(dag)
dag = combiner.run.bind(...)
serve_dag = MyDAGDriver.bind(dag)
```
+++
````
### Step 5: Test the full DAG in both python and http
@ -343,7 +369,7 @@ root DeploymentNode into ```serve.run()```:
```python
with InputNode() as dag_input:
serve_dag = ...
dag_handle = serve.run(serve_dag)
```
@ -394,23 +420,6 @@ more info on these options.
Now we're done! The full example below covers the full example for you to try out.
```{code-cell} ipython3
:tags: [remove-cell]
import ray
from ray import serve
from ray.serve.pipeline.generate import DeploymentNameGenerator
if ray.is_initialized():
serve.shutdown()
DeploymentNameGenerator.reset()
ray.shutdown()
ray.init(num_cpus=16)
serve.start()
### Setting up clean ray cluster with serve ###
```
```{code-cell} ipython3
import time
@ -418,7 +427,14 @@ import asyncio
import requests
import starlette
import ray
from ray import serve
from ray.experimental.dag.input_node import InputNode
from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_request
ray.init(num_cpus=16)
serve.start()
@serve.deployment
async def preprocessor(input_data: str):
@ -438,7 +454,7 @@ class Model:
self.weight = weight
async def forward(self, input: int):
await asyncio.sleep(0.3) # Manual delay for blocking computation
await asyncio.sleep(0.3) # Manual delay for blocking computation
return f"({self.weight} * {input})"
@ -449,35 +465,21 @@ class Combiner:
self.m2 = m2
async def run(self, req_part_1, req_part_2, operation):
# Merge model input from two preprocessors
# Merge model input from two preprocessors
req = f"({req_part_1} + {req_part_2})"
# Submit to both m1 and m2 with same req data in parallel
r1_ref = self.m1.forward.remote(req)
r2_ref = self.m2.forward.remote(req)
# Async gathering of model forward results for same request data
rst = await asyncio.gather(r1_ref, r2_ref)
# Control flow that determines runtime behavior based on user input
if operation == "sum":
return f"sum({rst})"
else:
return f"max({rst})"
@serve.deployment(num_replicas=2)
class DAGDriver:
def __init__(self, dag_handle):
self.dag_handle = dag_handle
async def predict(self, inp):
"""Perform inference directly without HTTP."""
return await self.dag_handle.remote(inp)
async def __call__(self, request: starlette.requests.Request):
"""HTTP endpoint of the DAG."""
input_data = await request.json()
return await self.predict(input_data)
# DAG building
with InputNode() as dag_input:
@ -492,10 +494,12 @@ with InputNode() as dag_input:
# Use output of function DeploymentNode in bind()
dag = combiner.run.bind(
preprocessed_1, preprocessed_2, dag_input[2]
)
)
# Each serve dag has a driver deployment as ingress that can be user provided.
serve_dag = DAGDriver.options(route_prefix="/my-dag").bind(dag)
serve_dag = DAGDriver.options(route_prefix="/my-dag", num_replicas=2).bind(
dag, input_schema=json_request
)
dag_handle = serve.run(serve_dag)
@ -503,7 +507,7 @@ dag_handle = serve.run(serve_dag)
# Warm up
ray.get(dag_handle.predict.remote(["0", [0, 0], "sum"]))
# Python handle
# Python handle
cur = time.time()
print(ray.get(dag_handle.predict.remote(["5", [1, 2], "sum"])))
print(f"Time spent: {round(time.time() - cur, 2)} secs.")
@ -512,7 +516,7 @@ cur = time.time()
print(requests.post("http://127.0.0.1:8000/my-dag", json=["5", [1, 2], "sum"]).text)
print(f"Time spent: {round(time.time() - cur, 2)} secs.")
# Python handle
# Python handle
cur = time.time()
print(ray.get(dag_handle.predict.remote(["1", [0, 2], "max"])))
print(f"Time spent: {round(time.time() - cur, 2)} secs.")
@ -539,7 +543,7 @@ Time spent: 0.48 secs.
```
Critical path for each request in the DAG is
Critical path for each request in the DAG is
preprocessing: ```max(preprocessor, avg_preprocessor) = 0.15 secs```
<br>
@ -557,7 +561,7 @@ We've walked through key concepts and a simple representative example that cover
- __[Ray Serve forum](https://discuss.ray.io/c/ray-serve/6)__
- __[Github issues / feature request](https://github.com/ray-project/ray/issues)__ (tag `serve`)
Potential Future improvements:
Potential Future improvements:
- `serve.build()` to fulfill the Ops API so user's deployment graph can generate a YAML file for deployment, scaling and reconfiguration.
- Performance optimizations:
- Tuning guide for deployment graph to avoid single node being bottleneck

View file

@ -9,6 +9,8 @@ This section should help you:
.. contents:: Calling Deployments via HTTP and Python
.. _serve-http:
Calling Deployments via HTTP
============================
@ -157,10 +159,22 @@ To try it out, save a code snippet in a local python file (i.e. main.py) and in
HTTP Adapters
^^^^^^^^^^^^^
HTTP adapters are functions that convert raw HTTP request to Python types that you know and recognize.
You can use it in three different scenarios:
- Ray AIR ``ModelWrapper``
- Serve Deployment Graph ``DAGDriver``
- Embedded in Bring Your Own ``FastAPI`` Application
Let's go over them one by one.
Ray AIR ``ModelWrapper``
""""""""""""""""""""""""
Ray Serve provides a suite of adapters to convert HTTP requests to ML inputs like `numpy` arrays.
You can just use it with :ref:`Ray AI Runtime (AIR) model wrapper<air-serve-integration>` feature
to one click deploy pre-trained models.
Alternatively, you can directly import them and put them into your FastAPI app.
For example, we provide a simple adapter for n-dimensional array.
@ -178,6 +192,27 @@ With :ref:`model wrappers<air-serve-integration>`, you can specify it via the ``
input_schema=json_to_ndarray
)
Serve Deployment Graph ``DAGDriver``
""""""""""""""""""""""""""""""""""""
In :ref:`Serve Deployment Graph <serve-deployment-graph>`, you can configure
``ray.serve.drivers.DAGDriver`` to accept an http adapter via it's ``input_schema`` field.
For example, the json request adapters parse JSON in HTTP body:
.. code-block:: python
from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_request
from ray.experimental.dag.input_node import InputNode
with InputNode() as input_node:
...
dag = DAGDriver.bind(other_node, input_schema=json_request)
Embedded in Bring Your Own ``FastAPI`` Application
""""""""""""""""""""""""""""""""""""""""""""""""""
You can also bring the adapter to your own FastAPI app using
`Depends <https://fastapi.tiangolo.com/tutorial/dependencies/#import-depends>`_.
The input schema will automatically be part of the generated OpenAPI schema with FastAPI.
@ -200,10 +235,13 @@ It has the following schema for input:
.. autopydantic_model:: ray.serve.http_adapters.NdArray
List of Built-in Adapters
"""""""""""""""""""""""""
Here is a list of adapters and please feel free to `contribute more <https://github.com/ray-project/ray/issues/new/choose>`_!
.. automodule:: ray.serve.http_adapters
:members: json_to_ndarray, image_to_ndarray
:members: json_to_ndarray, image_to_ndarray, starlette_request, json_request
Configuring HTTP Server Locations

View file

@ -77,6 +77,6 @@ class DAGDriver(SimpleSchemaIngress):
self.dag_handle = dag_handle
super().__init__(input_schema)
async def predict(self, inp):
async def predict(self, *args, **kwargs):
"""Perform inference directly without HTTP."""
return await self.dag_handle.remote(inp)
return await self.dag_handle.remote(*args, **kwargs)

View file

@ -1,5 +1,5 @@
from io import BytesIO
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from fastapi import File
from pydantic import BaseModel, Field
@ -58,6 +58,11 @@ def starlette_request(
return request
async def json_request(request: starlette.requests.Request) -> Dict[str, Any]:
"""Return the JSON object from request body."""
return await request.json()
@require_packages(["PIL"])
def image_to_ndarray(img: bytes = File(...)) -> np.ndarray:
"""Accepts a PIL-readable file from an HTTP form and converts

View file

@ -6,6 +6,7 @@ import starlette.requests
from starlette.testclient import TestClient
from ray.serve.drivers import DAGDriver, SimpleSchemaIngress, load_input_schema
from ray.serve.http_adapters import json_request
from ray.experimental.dag.input_node import InputNode
from ray import serve
import ray
@ -100,5 +101,26 @@ def test_dag_driver_custom_schema(serve_instance):
assert resp.json() == 100
@serve.deployment
def combine(*args):
return list(args)
def test_dag_driver_partial_input(serve_instance):
with InputNode() as inp:
dag = DAGDriver.bind(
combine.bind(echo.bind(inp[0]), echo.bind(inp[1]), echo.bind(inp[2])),
input_schema=json_request,
)
handle = serve.run(dag)
assert ray.get(handle.predict.remote([1, 2, [3, 4]])) == [1, 2, [3, 4]]
assert ray.get(handle.predict.remote(1, 2, [3, 4])) == [1, 2, [3, 4]]
resp = requests.post("http://127.0.0.1:8000/", json=[1, 2, [3, 4]])
print(resp.text)
resp.raise_for_status()
assert resp.json() == [1, 2, [3, 4]]
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))