[Serve] Added support for no http route services (#7010)

This commit is contained in:
Alind Khare 2020-02-17 14:31:30 -05:00 committed by GitHub
parent 42aea966ff
commit c6d768be14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 11 deletions

View file

@ -1,6 +1,7 @@
import inspect
from functools import wraps
from tempfile import mkstemp
from multiprocessing import cpu_count
import numpy as np
@ -13,7 +14,7 @@ from ray.experimental.serve.global_state import (GlobalState,
from ray.experimental.serve.kv_store_service import SQLiteKVStore
from ray.experimental.serve.task_runner import RayServeMixin, TaskRunnerActor
from ray.experimental.serve.utils import (block_until_http_ready,
get_random_letters)
get_random_letters, expand)
from ray.experimental.serve.exceptions import RayServeException
from ray.experimental.serve.backend_config import BackendConfig
from ray.experimental.serve.policy import RoutePolicy
@ -64,6 +65,7 @@ def accept_batch(f):
def init(kv_store_connector=None,
kv_store_path=None,
blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
ray_init_kwargs={
@ -87,6 +89,8 @@ def init(kv_store_connector=None,
kv_store_path (str, path): Path to the SQLite table.
blocking (bool): If true, the function will wait for the HTTP server to
be healthy, and other components to be ready before returns.
start_server (bool): If true, `serve.init` starts http server.
(Default: True)
http_host (str): Host for HTTP server. Default to "0.0.0.0".
http_port (int): Port for HTTP server. Default to 8000.
ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray
@ -132,18 +136,19 @@ def init(kv_store_connector=None,
nursery = start_initial_state(kv_store_connector)
global_state = GlobalState(nursery)
global_state.init_or_get_http_server(host=http_host, port=http_port)
if start_server:
global_state.init_or_get_http_server(host=http_host, port=http_port)
global_state.init_or_get_router(
queueing_policy=queueing_policy, policy_kwargs=policy_kwargs)
global_state.init_or_get_metric_monitor(
gc_window_seconds=gc_window_seconds)
if blocking:
if start_server and blocking:
block_until_http_ready("http://{}:{}".format(http_host, http_port))
@_ensure_connected
def create_endpoint(endpoint_name, route, blocking=True):
def create_endpoint(endpoint_name, route=None, blocking=True):
"""Create a service endpoint given route_expression.
Args:
@ -396,7 +401,8 @@ def split(endpoint_name, traffic_policy_dictionary):
traffic_policy_dictionary (dict): a dictionary maps backend names
to their traffic weights. The weights must sum to 1.
"""
assert endpoint_name in global_state.route_table.list_service().values()
assert endpoint_name in expand(
global_state.route_table.list_service(include_headless=True).values())
assert isinstance(traffic_policy_dictionary,
dict), "Traffic policy must be dictionary"
@ -430,7 +436,8 @@ def get_handle(endpoint_name, relative_slo_ms=None, absolute_slo_ms=None):
Returns:
RayServeHandle
"""
assert endpoint_name in global_state.route_table.list_service().values()
assert endpoint_name in expand(
global_state.route_table.list_service(include_headless=True).values())
# Delay import due to it's dependency on global_state
from ray.experimental.serve.handle import RayServeHandle

View file

@ -21,3 +21,6 @@ ASYNC_CONCURRENCY = int(1e6)
#: Default latency SLO
DEFAULT_LATENCY_SLO_MS = 1e9
#: Key for storing no http route services
NO_ROUTE_KEY = "NO_ROUTE"

View file

@ -6,6 +6,8 @@ from ray import cloudpickle as pickle
import ray.experimental.internal_kv as ray_kv
from ray.experimental.serve.utils import logger
from typing import Union
from ray.experimental.serve.constants import NO_ROUTE_KEY
class NamespacedKVStore(ABC):
@ -174,7 +176,7 @@ class RoutingTable:
self.routing_table = kv_connector("routing_table")
self.request_count = 0
def register_service(self, route: str, service: str):
def register_service(self, route: Union[str, None], service: str):
"""Create an entry in the routing table
Args:
@ -184,12 +186,27 @@ class RoutingTable:
"""
logger.debug("[KV] Registering route {} to service {}.".format(
route, service))
self.routing_table.put(route, service)
def list_service(self):
"""Returns the routing table."""
self.request_count += 1
# put no route services in default key
if route is None:
no_http_services = json.loads(
self.routing_table.get(NO_ROUTE_KEY, "[]"))
no_http_services.append(service)
self.routing_table.put(NO_ROUTE_KEY, json.dumps(no_http_services))
else:
self.routing_table.put(route, service)
def list_service(self, include_headless=False):
"""Returns the routing table.
Args:
include_headless: If True, returns a no route services (headless)
services with normal services. (Default: False)
"""
table = self.routing_table.as_dict()
if include_headless:
table[NO_ROUTE_KEY] = json.loads(table.get(NO_ROUTE_KEY, "[]"))
else:
table.pop(NO_ROUTE_KEY, None)
return table
def get_request_count(self):

View file

@ -5,6 +5,7 @@ import requests
from ray.experimental import serve
from ray.experimental.serve import BackendConfig
import ray
from ray.experimental.serve.constants import NO_ROUTE_KEY
def test_e2e(serve_instance):
@ -37,6 +38,26 @@ def test_e2e(serve_instance):
assert resp == "OK"
def test_no_route(serve_instance):
serve.create_endpoint("noroute-endpoint", blocking=True)
global_state = serve.api._get_global_state()
result = global_state.route_table.list_service(include_headless=True)
assert result[NO_ROUTE_KEY] == ["noroute-endpoint"]
without_headless_result = global_state.route_table.list_service()
assert NO_ROUTE_KEY not in without_headless_result
def func(_, i=1):
return 1
serve.create_backend(func, "backend:1")
serve.link("noroute-endpoint", "backend:1")
service_handle = serve.get_handle("noroute-endpoint")
result = ray.get(service_handle.remote(i=1))
assert result == 1
def test_scaling_replicas(serve_instance):
class Counter:
def __init__(self):

View file

@ -10,6 +10,21 @@ import requests
from pygments import formatters, highlight, lexers
from ray.experimental.serve.context import FakeFlaskRequest, TaskContext
from ray.experimental.serve.http_util import build_flask_request
import itertools
def expand(l):
"""
Implements a nested flattening of a list.
Example:
>>> serve.utils.expand([1,2,[3,4,5],6])
[1,2,3,4,5,6]
>>> serve.utils.expand(["a", ["b", "c"], "d", ["e", "f"]])
["a", "b", "c", "d", "e", "f"]
"""
return list(
itertools.chain.from_iterable(
[x if isinstance(x, list) else [x] for x in l]))
def parse_request_item(request_item):