Remove previous version of ray.serve (#5541)

This commit is contained in:
Simon Mo 2019-08-26 19:28:19 -07:00 committed by Robert Nishihara
parent f1dcce5a47
commit 948b1b09e8
16 changed files with 0 additions and 851 deletions

View file

@ -1,76 +0,0 @@
Ray Serve Module
``ray.experimental.serve`` is a module for publishing your actors to
interact with outside world.
Use Case
Serve machine learning model
Scalable anayltics query
Composible pipelines
``ray.experimental.serve`` is implemented in a three-tiered system. Each
tier can scale horizontally.
In the following illustration, call chain goes from top to bottom. Each
box is one or more replicated ray actors.
+-------------------+ +-----------------+ +------------+
Frontend | HTTP Frontend | | Arrow RPC | | ... |
Tier | | | | | |
+-------------------+ +-----------------+ +------------+
+--------------------+ +-------------------+
Router | Default Router | | Deadline Aware |
Tier | | | Router |
+--------------------+ +-------------------+
+----------------+ +--------------+ +-------------+
Managed | Managed Actor | | ... | | ... |
Actor | Replica | | | | |
Tier +----------------+ +--------------+ +-------------+
Frontend Tier
The frontend tier is repsonsible for interface with the world. Currently
``ray.experimental.serve`` provides implementation for - HTTP Frontend
And we are planning to add support for - Arrow RPC - zeromq
Router Tier
The router tier receives calls from frontend and route them to the
managed actors. Routers both *route* and *queue* incoming queries.
``ray.experimental.serve`` has native support for (micro-)batching
In addition, we implemented a deadline aware routers that will put high
priority queries in the front of the queue so they will be delivered
Managed Actor Tier
Managed actors will be managed by routers. These actors can contains
arbitrary methods. Methods in the actors class are assumed to be able to
take into a single input. To fully utilize the vectorized instructions, like
``np.sum``, you can use the ``@batched_input`` decorator, it will run your method
in on a micro-batch.

View file

@ -1,28 +0,0 @@
"""A module for serving from actors.
The ray.experimental.serve module is a module for publishing your actors to
interact with the outside world.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
assert sys.version_info >= (3, ), (
"ray.experimental.serve is a python3 only library")
from ray.experimental.serve.router import (DeadlineAwareRouter,
SingleQuery) # noqa: E402
from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402
from ray.experimental.serve.mixin import (RayServeMixin,
batched_input) # noqa: E402
__all__ = [

View file

@ -1,47 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
from ray.experimental.serve import RayServeMixin, batched_input
class VectorizedAdder(RayServeMixin):
"""Actor that adds scaler_increment to input batch.
result = np.array(input_batch) + scaler_increment
def __init__(self, scaler_increment): = scaler_increment
def __call__(self, input_batch):
arr = np.array(input_batch)
arr +=
return arr.tolist()
class ScalerAdder(RayServeMixin):
"""Actor that adds a scaler_increment to a single input."""
def __init__(self, scaler_increment): = scaler_increment
def __call__(self, input_scaler):
return input_scaler +
class VectorDouble(RayServeMixin):
"""Actor that doubles the batched input."""
def __call__(self, batched_vectors):
matrix = np.array(batched_vectors)
matrix *= 2
return [v.tolist() for v in matrix]

View file

@ -1,29 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
from ray.experimental.serve import RayServeMixin, batched_input
class Counter(RayServeMixin):
"""Return the query id. Used for testing router."""
def __init__(self):
self.counter = 0
def __call__(self, batched_input):
self.counter += 1
return self.counter
class CustomCounter(RayServeMixin):
"""Return the query id. Used for testing `serve_method` signature."""
serve_method = "count"
def count(self, input_batch):
return [1 for _ in range(len(input_batch))]

View file

@ -1,41 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import ray
from ray.experimental.serve import RayServeMixin, batched_input
class SleepOnFirst(RayServeMixin):
"""Sleep on the first request, return batch size.
Used for testing the DeadlineAwareRouter.
def __init__(self, sleep_time):
self.nap_time = sleep_time
def __call__(self, input_batch):
return [len(input_batch) for _ in range(len(input_batch))]
class SleepCounter(RayServeMixin):
"""Sleep on input argument seconds, return the query id.
Used to test the DeadlineAwareRouter.
def __init__(self):
self.counter = 0
def __call__(self, inp):
self.counter += 1
return self.counter

View file

@ -1,7 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.experimental.serve.frontend.http_frontend import HTTPFrontendActor
__all__ = ["HTTPFrontendActor"]

View file

@ -1,72 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse
import ray
def unwrap(future):
"""Unwrap the result from ray.experimental.server router.
Router returns a list of object ids when you call them.
return ray.get(future)[0]
class HTTPFrontendActor:
"""HTTP API for an Actor. This exposes /{actor_name} endpoint for query.
GET /{actor_name} or POST /{actor_name}
Content-type: application/json
"slo_ms": float,
"input": any
Content-type: application/json
"success": bool,
"actor": str,
"result": any
def __init__(self, ip="", port=8080, router="DefaultRouter"):
self.ip = ip
self.port = port
self.router = ray.experimental.named_actors.get_actor(router)
def start(self):
default_app = Starlette()
@default_app.route("/{actor}", methods=["GET", "POST"])
async def dispatch_remote_function(request):
data = await request.json()
actor_name = request.path_params["actor"]
slo_seconds = data.pop("slo_ms") / 1000
deadline = time.perf_counter() + slo_seconds
inp = data.pop("input")
result_future = unwrap(, inp, deadline))
# TODO(simon): change to asyncio ray.get
result = ray.get(result_future)
return JSONResponse({
"success": True,
"actor": actor_name,
"result": result
}), host=self.ip, port=self.port)

View file

@ -1,63 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import traceback
from typing import List
import ray
from ray.experimental.serve import SingleQuery
def batched_input(func):
"""Decorator to mark an actor method as accepting only a single input.
By default methods accept a batch.
func.ray_serve_batched_input = True
return func
def _execute_and_seal_error(method, arg, method_name):
"""Execute method with arg and return the result.
If the method fails, return a RayTaskError so it can be sealed in the
resultOID and retried by user.
return method(arg)
except Exception:
return ray.worker.RayTaskError(method_name, traceback.format_exc())
class RayServeMixin:
"""Enable a ray actor to interact with ray.serve
class MyActor(RayServeMixin):
# This is optional, by default it is "__call__"
serve_method = 'my_method'
def my_method(self, arg):
serve_method = "__call__"
def _dispatch(self, input_batch: List[SingleQuery]):
"""Helper method to dispatch a batch of input to self.serve_method."""
method = getattr(self, self.serve_method)
if hasattr(method, "ray_serve_batched_input"):
batch = [ for inp in input_batch]
result = _execute_and_seal_error(method, batch, self.serve_method)
for res, inp in zip(result, input_batch):
ray.worker.global_worker.put_object(inp.result_object_id, res)
for inp in input_batch:
result = _execute_and_seal_error(method,,

View file

@ -1,21 +0,0 @@
Helper methods for dealing with ray.ObjectID
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
def unwrap(future):
return ray.get(future)[0]
def get_new_oid():
worker = ray.worker.global_worker
oid = ray._raylet.compute_put_id(worker.current_task_id,
worker.task_context.put_index += 1
return oid

View file

@ -1,26 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.experimental.serve.router.routers import (DeadlineAwareRouter,
import ray
def start_router(router_class, router_name):
"""Wrapper for starting a router and register it.
router_class: The router class to instantiate.
router_name: The name to give to the router.
A handle to newly started router actor.
handle = router_class.remote(router_name)
ray.experimental.register_actor(router_name, handle)
return handle
__all__ = ["DeadlineAwareRouter", "SingleQuery"]

View file

@ -1,209 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import defaultdict
from functools import total_ordering
from typing import Callable, Dict, List, Set, Tuple
import ray
from ray.experimental.serve.object_id import get_new_oid
from ray.experimental.serve.utils.priority_queue import PriorityQueue
lambda name: ("Actor {} is not registered with this router. Please use "
"'router.register_actor.remote(...)' "
"to register it.").format(name))
# Use @total_ordering so we can sort SingleQuery
class SingleQuery:
"""A data container for a query.
data: The request data.
result_object_id: The result object ID.
deadline: The deadline in seconds.
def __init__(self, data, result_object_id: ray.ObjectID,
deadline_s: float): = data
self.result_object_id = result_object_id
self.deadline = deadline_s
def __lt__(self, other):
return self.deadline < other.deadline
def __eq__(self, other):
return self.deadline == other.deadline
class DeadlineAwareRouter:
"""DeadlineAwareRouter is a router that is aware of deadlines.
It takes into consideration the deadline attached to each query. It will
reorder incoming query based on their deadlines.
def __init__(self, router_name):
# Runtime Data
self.query_queues: Dict[str, PriorityQueue] = defaultdict(
self.running_queries: Dict[ray.ObjectID,] = {}
self.actor_handles: Dict[str, List[]] = (
# Actor Metadata
self.managed_actors: Dict[str,] = {}
self.actor_init_arguments: Dict[str, Tuple[List, Dict]] = {}
self.max_batch_size: Dict[str, int] = {}
# Router Metadata = router_name
def start(self):
"""Kick off the router loop"""
# Note: This is meant for hiding the complexity for a user
# facing method.
# Because the `loop` api can be hard to understand.
def register_actor(
actor_name: str,
init_args: List = [],
init_kwargs: dict = {},
num_replicas: int = 1,
max_batch_size: int = -1, # Unbounded batch size
"""Register a new managed actor.
self.managed_actors[actor_name] = actor_class
self.actor_init_arguments[actor_name] = (init_args, init_kwargs)
self.max_batch_size[actor_name] = max_batch_size
actor_name, num_replicas)
def set_replica(self, actor_name, new_replica_count):
"""Scale a managed actor according to new_replica_count."""
assert actor_name in self.managed_actors, (
current_replicas = len(self.actor_handles[actor_name])
# Increase the number of replicas
if new_replica_count > current_replicas:
for _ in range(new_replica_count - current_replicas):
args = self.actor_init_arguments[actor_name][0]
kwargs = self.actor_init_arguments[actor_name][1]
new_actor_handle = self.managed_actors[actor_name].remote(
*args, **kwargs)
# Decrease the number of replicas
if new_replica_count < current_replicas:
for _ in range(current_replicas - new_replica_count):
# Note actor destructor will be called after all remaining
# calls finish. Therefore it's safe to call del here.
del self.actor_handles[actor_name][-1]
def call(self, actor_name, data, deadline_s):
"""Enqueue a request to one of the actor managed by this router.
List[ray.ObjectID] with length 1, the object ID wrapped inside is
the result object ID when the query is executed.
assert actor_name in self.managed_actors, (
result_object_id = get_new_oid()
# Here, 'data_object_id' is either an ObjectID or an actual object.
# When it is an ObjectID, this is an optimization to avoid creating
# an extra copy of 'data' in the object store.
data_object_id = ray.worker.global_worker._current_task.arguments()[1]
SingleQuery(data_object_id, result_object_id, deadline_s))
return [result_object_id]
def loop(self):
"""Main loop for router. It will does the following things:
1. Check which running actors finished.
2. Iterate over free actors and request queues, dispatch requests batch
to free actors.
3. Tail recursively schedule itself.
# 1. Check which running actors finished.
ready_oids, _ = ray.wait(
for ready_oid in ready_oids:
busy_actors: Set[] = set(
# 2. Iterate over free actors and request queues, dispatch requests
# batch to free actors.
for actor_name, queue in self.query_queues.items():
# try to drain the queue
for actor_handle in self.actor_handles[actor_name]:
if len(queue) == 0:
if actor_handle in busy_actors:
# A free actor found. Dispatch queries.
batch = self._get_next_batch(actor_name)
assert len(batch)
batch_result_object_id = actor_handle._dispatch.remote(batch)
self._mark_running(batch_result_object_id, actor_handle)
# 3. Tail recursively schedule itself.
def _get_next_batch(self, actor_name: str) -> List[SingleQuery]:
"""Get next batch of request for the actor whose name is provided."""
assert actor_name in self.query_queues, (
inputs = []
batch_size = self.max_batch_size[actor_name]
if batch_size == -1:
inp = self.query_queues[actor_name].try_pop()
while inp:
inp = self.query_queues[actor_name].try_pop()
for _ in range(batch_size):
inp = self.query_queues[actor_name].try_pop()
if inp:
return inputs
def _mark_running(self, batch_oid: ray.ObjectID,
"""Mark actor_handle as running identified by batch_oid.
This means that if batch_oid is fullfilled, then actor_handle must be
self.running_queries[batch_oid] = actor_handle

View file

@ -1,68 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import pytest
import ray
from ray.experimental.serve import SingleQuery
from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder
from ray.experimental.serve.examples.counter import Counter, CustomCounter
from ray.experimental.serve.object_id import get_new_oid
def ray_start():
def generated_inputs():
deadline = 11111.11
inputs = []
input_arr = np.arange(10)
for i in input_arr:
oid = get_new_oid()
SingleQuery(data=i, result_object_id=oid, deadline_s=deadline))
return inputs
def test_vadd(ray_start, generated_inputs):
adder = VectorizedAdder.remote(INCREMENT)
inputs = generated_inputs
oids = [inp.result_object_id for inp in inputs]
input_data = [ for inp in inputs]
result_arr = np.array(ray.get(oids))
assert np.array_equal(result_arr, np.array(input_data) + INCREMENT)
def test_batched_input(ray_start, generated_inputs):
counter = Counter.remote()
oids = [inp.result_object_id for inp in generated_inputs]
returned_query_ids = np.array(ray.get(oids))
assert np.array_equal(returned_query_ids, np.arange(1, 11))
def test_custom_method(ray_start, generated_inputs):
dummy = CustomCounter.remote()
oids = [inp.result_object_id for inp in generated_inputs]
returned_query_ids = np.array(ray.get(oids))
assert np.array_equal(returned_query_ids, np.ones(10))
def test_exception(ray_start):
adder = ScalerAdder.remote(INCREMENT)
query = SingleQuery("this can't be added with int", get_new_oid(), 10)
with pytest.raises(ray.worker.RayTaskError):

View file

@ -1,91 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import numpy as np
import pytest
import ray
from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder
from ray.experimental.serve.examples.halt import SleepCounter, SleepOnFirst
from ray.experimental.serve.object_id import unwrap
from ray.experimental.serve.router import DeadlineAwareRouter, start_router
def router():
# We need at least 5 workers so resource won't be oversubscribed
# The following two blobs are equivalent
# handle = DeadlineAwareRouter.remote("DefaultTestRouter")
# ray.experimental.register_actor("DefaultTestRouter", handle)
# handle.start.remote()
# handle = start_router(DeadlineAwareRouter, "DefaultRouter")
handle = start_router(DeadlineAwareRouter, "DefaultRouter")
"VAdder", VectorizedAdder,
init_kwargs={"scaler_increment": 1}) # init args
"SAdder", ScalerAdder, init_kwargs={"scaler_increment": 2})
"SleepFirst", SleepOnFirst, init_kwargs={"sleep_time": 1})
"SleepCounter", SleepCounter, max_batch_size=1)
yield handle
def now():
return time.perf_counter()
def test_throw_assert(router: DeadlineAwareRouter, now: float):
ray.get("Action-Not-Exist", "input", now + 1))
except ray.worker.RayTaskError as e:
assert "AssertionError" in e.traceback_str
def test_vector_adder(router: DeadlineAwareRouter, now: float):
result = unwrap("VAdder", 42, now + 1))
assert isinstance(result, ray.ObjectID)
assert ray.get(result) == 43
def test_scaler_adder(router: DeadlineAwareRouter, now: float):
result = unwrap("SAdder", 42, now + 1))
assert isinstance(result, ray.ObjectID)
assert ray.get(result) == 44
def test_batching_ability(router: DeadlineAwareRouter, now: float):
first = unwrap("SleepFirst", 1, now + 1))
rest = [
unwrap("SleepFirst", 1, now + 1)) for _ in range(10)
assert ray.get(first) == 1
assert np.alltrue(np.array(ray.get(rest)) == 10)
def test_deadline_priority(router: DeadlineAwareRouter, now: float):
# first sleep 2 seconds
first = unwrap("SleepCounter", 2, now + 1))
# then send a request to with deadline farther away
second = unwrap("SleepCounter", 0, now + 10))
# and a request with sooner deadline
third = unwrap("SleepCounter", 0, now + 1))
id_1, id_2, id_3 = ray.get([first, second, third])
assert id_1 < id_3 < id_2

View file

@ -1,46 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import pytest
import requests
import ray
from ray.experimental.serve import DeadlineAwareRouter
from ray.experimental.serve.examples.adder import VectorizedAdder
from ray.experimental.serve.frontend import HTTPFrontendActor
from ray.experimental.serve.router import start_router
ROUTER_NAME = "DefaultRouter"
def get_router():
# We need this many workers so resource are not oversubscribed
router = start_router(DeadlineAwareRouter, ROUTER_NAME)
yield router
def test_http_basic(get_router):
router = get_router
a = HTTPFrontendActor.remote(router=ROUTER_NAME)
"VAdder", VectorizedAdder, init_kwargs={"scaler_increment": 1})
for _ in range(NUMBER_OF_TRIES):
url = ""
payload = {"input": 10, "slo_ms": 1000}
resp = requests.request("POST", url, json=payload)
except Exception:
# it is possible that the actor is not yet instantiated
assert resp.json() == {"success": True, "actor": "VAdder", "result": 11}

View file

@ -1,27 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import heapq
class PriorityQueue:
"""A min-heap class wrapping heapq module."""
def __init__(self):
self.q = []
def push(self, item):
heapq.heappush(self.q, item)
def pop(self):
return heapq.heappop(self.q)
def try_pop(self):
if len(self.q) == 0:
return None
return self.pop()
def __len__(self):
return len(self.q)