[Serve] Added deadline awareness (#6442)

* [Serve] Added deadline awareness

Added deadline awareness while enqueuing a query
Using Blist sorted-list implementation (ascending order) to get queries according to their specified deadlines. [buffer_queues]
Exposed slo_ms via handle/http request
Added slo example 
The queries in example will be executed in almost the opposite order of which they are fired
Added slo pytest
Added check for slo_ms to not be negative
Included the changes suggested

* Linting Corrections

* Adding the code changes suggested by format.sh

* Added the suggested changes

Added justification for blist
Added blist in travis/ci/install-dependencies.sh

* Fixed linting issues

* Added blist to ray/doc/requirements-doc.txt
This commit is contained in:
alindkhare 2019-12-11 19:41:54 -05:00 committed by Simon Mo
parent 0ec613c95a
commit 76e678d775
8 changed files with 160 additions and 11 deletions

View file

@ -36,7 +36,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout networkx tabulate psutil aiohttp \
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures blist
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget -q https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
@ -52,7 +52,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout networkx tabulate psutil aiohttp \
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures blist
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y build-essential curl unzip

View file

@ -23,3 +23,4 @@ flask
uvicorn
pygments
werkzeug
blist

View file

@ -0,0 +1,64 @@
"""
SLO [reverse] example of ray.serve module
"""
import time
import requests
import ray
import ray.experimental.serve as serve
# initialize ray serve system.
# blocking=True will wait for HTTP server to be ready to serve request.
serve.init(blocking=True)
# an endpoint is associated with an http URL.
serve.create_endpoint("my_endpoint", "/echo")
# a backend can be a function or class.
# it can be made to be invoked from web as well as python.
def echo_v1(flask_request, response="hello from python!"):
if serve.context.web:
response = flask_request.url
return response
serve.create_backend(echo_v1, "echo:v1")
serve.link("my_endpoint", "echo:v1")
# wait for routing table to get populated
time.sleep(2)
# slo (10 milliseconds deadline) can be specified via http
slo_ms = 10.0
print("> [HTTP] Pinging http://127.0.0.1:8000/echo?slo_ms={}".format(slo_ms))
print(
requests.get("http://127.0.0.1:8000/echo?slo_ms={}".format(slo_ms)).json())
# get the handle of the endpoint
handle = serve.get_handle("my_endpoint")
future_list = []
# fire 10 requests with slo's in the (almost) reverse order of the order in
# which remote procedure call is done
for r in range(10):
slo_ms = 1000 - 100 * r
response = "hello from request: {} slo: {}".format(r, slo_ms)
print("> [REMOTE] Pinging handle.remote(response='{}',slo_ms={})".format(
response, slo_ms))
# slo can be specified via remote function
f = handle.remote(response=response, slo_ms=slo_ms)
future_list.append(f)
# get results of queries as they complete
# should be completed (almost) according to the order of their slo time
left_futures = future_list
while left_futures:
completed_futures, remaining_futures = ray.wait(left_futures, timeout=0.05)
if len(completed_futures) > 0:
result = ray.get(completed_futures[0])
print(result)
left_futures = remaining_futures

View file

@ -36,12 +36,25 @@ class RayServeHandle:
raise RayServeException(
"handle.remote must be invoked with keyword arguments.")
# get slo_ms before enqueuing the query
request_slo_ms = kwargs.pop("slo_ms", None)
if request_slo_ms is not None:
try:
request_slo_ms = float(request_slo_ms)
if request_slo_ms < 0:
raise ValueError(
"Request SLO must be positive, it is {}".format(
request_slo_ms))
except ValueError as e:
raise RayServeException(str(e))
result_object_id_bytes = ray.get(
self.router_handle.enqueue_request.remote(
service=self.endpoint_name,
request_args=(),
request_kwargs=kwargs,
request_context=TaskContext.Python))
request_context=TaskContext.Python,
request_slo_ms=request_slo_ms))
return ray.ObjectID(result_object_id_bytes)
def get_traffic_policy(self):

View file

@ -4,6 +4,8 @@ import numpy as np
import ray
from ray.experimental.serve.utils import logger
from blist import sortedlist
import time
class Query:
@ -11,6 +13,7 @@ class Query:
request_args,
request_kwargs,
request_context,
request_slo_ms,
result_object_id=None):
self.request_args = request_args
self.request_kwargs = request_kwargs
@ -21,6 +24,15 @@ class Query:
else:
self.result_object_id = result_object_id
# Service level objective in milliseconds. This is expected to be the
# absolute time since unix epoch.
self.request_slo_ms = request_slo_ms
# adding comparator fn for maintaining an
# ascending order sorted list w.r.t request_slo_ms
def __lt__(self, other):
return self.request_slo_ms < other.request_slo_ms
class WorkIntent:
def __init__(self, replica_handle):
@ -74,7 +86,14 @@ class CentralizedQueues:
self.workers = defaultdict(deque)
# backend_name -> worker payload queue
self.buffer_queues = defaultdict(deque)
# using blist sortedlist for deadline awareness
# blist is chosen because:
# 1. pop operation should be O(1) (amortized)
# (helpful even for batched pop)
# 2. There should not be significant overhead in
# maintaining the sorted list.
# 3. The blist implementation is fast and uses C extensions.
self.buffer_queues = defaultdict(sortedlist)
def is_ready(self):
return True
@ -88,9 +107,23 @@ class CentralizedQueues:
for backend_name, queue in self.buffer_queues.items()
}
def enqueue_request(self, service, request_args, request_kwargs,
request_context):
query = Query(request_args, request_kwargs, request_context)
# request_slo_ms is time specified in milliseconds till which the
# answer of the query should be calculated
def enqueue_request(self,
service,
request_args,
request_kwargs,
request_context,
request_slo_ms=None):
if request_slo_ms is None:
# if request_slo_ms is not specified then set it to a high level
request_slo_ms = 1e9
# add wall clock time to specify the deadline for completion of query
# this also assures FIFO behaviour if request_slo_ms is not specified
request_slo_ms += (time.time() * 1000)
query = Query(request_args, request_kwargs, request_context,
request_slo_ms)
self.queues[service].append(query)
self.flush()
return query.result_object_id.binary()
@ -147,11 +180,15 @@ class CentralizedQueues:
while len(queue) and len(self.traffic[service]):
backend_names = list(self.traffic[service].keys())
backend_weights = list(self.traffic[service].values())
# TODO(alind): is random choice good for deadline awareness?
# putting query in a buffer of a non available backend may
# not be good
chosen_backend = np.random.choice(
backend_names, p=backend_weights).squeeze()
request = queue.popleft()
self.buffer_queues[chosen_backend].append(request)
# maintain a sorted list in the buffer queue of the backend
self.buffer_queues[chosen_backend].add(request)
# distach buffer queues to work queues
for service in self.queues.keys():
@ -165,7 +202,7 @@ class CentralizedQueues:
work_queue = self.workers[backend]
while len(buffer_queue) and len(work_queue):
request, work = (
buffer_queue.popleft(),
buffer_queue.pop(0),
work_queue.popleft(),
)
work.replica_handle._ray_serve_call.remote(request)

View file

@ -8,6 +8,7 @@ from ray.experimental.async_api import _async_init, as_future
from ray.experimental.serve.constants import HTTP_ROUTER_CHECKER_INTERVAL_S
from ray.experimental.serve.context import TaskContext
from ray.experimental.serve.utils import BytesEncoder
from urllib.parse import parse_qs
class JSONResponse:
@ -128,13 +129,33 @@ class HTTPProxy:
endpoint_name = self.route_table_cache[current_path]
http_body_bytes = await self.receive_http_body(scope, receive, send)
# get slo_ms before enqueuing the query
query_string = scope["query_string"].decode("ascii")
query_kwargs = parse_qs(query_string)
request_slo_ms = query_kwargs.pop("slo_ms", None)
if request_slo_ms is not None:
try:
if len(request_slo_ms) != 1:
raise ValueError(
"Multiple SLO specified, please specific only one.")
request_slo_ms = request_slo_ms[0]
request_slo_ms = float(request_slo_ms)
if request_slo_ms < 0:
raise ValueError(
"Request SLO must be positive, it is {}".format(
request_slo_ms))
except ValueError as e:
await JSONResponse({"error": str(e)})(scope, receive, send)
return
result_object_id_bytes = await as_future(
self.serve_global_state.init_or_get_router()
.enqueue_request.remote(
service=endpoint_name,
request_args=(scope, http_body_bytes),
request_kwargs=dict(),
request_context=TaskContext.Web))
request_context=TaskContext.Web,
request_slo_ms=request_slo_ms))
result = await as_future(ray.ObjectID(result_object_id_bytes))

View file

@ -34,6 +34,19 @@ def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
assert ray.get(ray.ObjectID(result_object_id)) == 2
def test_slo(serve_instance, task_runner_mock_actor):
q = CentralizedQueues()
q.link("svc", "backend")
for i in range(10):
slo_ms = 1000 - 100 * i
q.enqueue_request("svc", i, "kwargs", None, request_slo_ms=slo_ms)
for i in range(10):
q.dequeue_request("backend", task_runner_mock_actor)
got_work = ray.get(task_runner_mock_actor.get_recent_call.remote())
assert got_work.request_args == (9 - i)
def test_alter_backend(serve_instance, task_runner_mock_actor):
q = CentralizedQueues()

View file

@ -79,7 +79,7 @@ extras = {
],
"debug": ["psutil", "setproctitle", "py-spy >= 0.2.0"],
"dashboard": ["aiohttp", "google", "grpcio", "psutil", "setproctitle"],
"serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas"],
"serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"],
"tune": ["tabulate"],
}