diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py index 7218feef1..05d391876 100644 --- a/python/ray/experimental/serve/__init__.py +++ b/python/ray/experimental/serve/__init__.py @@ -1,4 +1,5 @@ import sys +from ray.experimental.serve.policy import RoutePolicy if sys.version_info < (3, 0): raise ImportError("serve is Python 3 only.") @@ -8,5 +9,5 @@ from ray.experimental.serve.api import (init, create_backend, create_endpoint, __all__ = [ "init", "create_backend", "create_endpoint", "link", "split", "get_handle", - "stat", "scale" + "stat", "scale", "RoutePolicy" ] diff --git a/python/ray/experimental/serve/api.py b/python/ray/experimental/serve/api.py index 8dd3d18d9..9e19d437c 100644 --- a/python/ray/experimental/serve/api.py +++ b/python/ray/experimental/serve/api.py @@ -13,6 +13,7 @@ from ray.experimental.serve.task_runner import RayServeMixin, TaskRunnerActor from ray.experimental.serve.utils import (block_until_http_ready, get_random_letters) from ray.experimental.serve.exceptions import RayServeException +from ray.experimental.serve.policy import RoutePolicy global_state = None @@ -40,7 +41,9 @@ def init(kv_store_connector=None, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, ray_init_kwargs={"object_store_memory": int(1e8)}, - gc_window_seconds=3600): + gc_window_seconds=3600, + queueing_policy=RoutePolicy.Random, + policy_kwargs={}): """Initialize a serve cluster. If serve cluster has already initialized, this function will just return. @@ -63,9 +66,11 @@ def init(kv_store_connector=None, gc_window_seconds(int): How long will we keep the metric data in memory. Data older than the gc_window will be deleted. The default is 3600 seconds, which is 1 hour. + queueing_policy(RoutePolicy): Define the queueing policy for selecting + the backend for a service. (Default: RoutePolicy.Random) + policy_kwargs: Arguments required to instantiate a queueing policy """ global global_state - # Noop if global_state is no longer None if global_state is not None: return @@ -92,7 +97,8 @@ def init(kv_store_connector=None, global_state = GlobalState(nursery) global_state.init_or_get_http_server(host=http_host, port=http_port) - global_state.init_or_get_router() + global_state.init_or_get_router( + queueing_policy=queueing_policy, policy_kwargs=policy_kwargs) global_state.init_or_get_metric_monitor( gc_window_seconds=gc_window_seconds) diff --git a/python/ray/experimental/serve/examples/echo_fixed_packing.py b/python/ray/experimental/serve/examples/echo_fixed_packing.py new file mode 100644 index 000000000..d5de07f45 --- /dev/null +++ b/python/ray/experimental/serve/examples/echo_fixed_packing.py @@ -0,0 +1,48 @@ +""" +Example showing fixed packing policy. The outputs from +v1 and v2 will be coming according to packing_num specified! +This is a packed round robin example. First batch of packing_num +(five in this example) queries would go to 'echo:v1' backend and +then next batch of packing_num queries would go to 'echo:v2' +backend. +""" +import time + +import requests + +from ray.experimental import serve +from ray.experimental.serve.utils import pformat_color_json + + +def echo_v1(_): + return "v1" + + +def echo_v2(_): + return "v2" + + +# specify the router policy as FixedPacking with packing num as 5 +serve.init( + blocking=True, + queueing_policy=serve.RoutePolicy.FixedPacking, + policy_kwargs={"packing_num": 5}) + +# create a service +serve.create_endpoint("my_endpoint", "/echo", blocking=True) + +# create first backend +serve.create_backend(echo_v1, "echo:v1") + +# create second backend +serve.create_backend(echo_v2, "echo:v2") + +# link and split the service to two backends +serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5}) + +while True: + resp = requests.get("http://127.0.0.1:8000/echo").json() + print(pformat_color_json(resp)) + + print("...Sleeping for 2 seconds...") + time.sleep(2) diff --git a/python/ray/experimental/serve/examples/echo_round_robin.py b/python/ray/experimental/serve/examples/echo_round_robin.py new file mode 100644 index 000000000..01b74b92e --- /dev/null +++ b/python/ray/experimental/serve/examples/echo_round_robin.py @@ -0,0 +1,41 @@ +""" +Example showing round robin policy. The outputs from +v1 and v2 will be (almost) interleaved as queries get processed. +""" +import time + +import requests + +from ray.experimental import serve +from ray.experimental.serve.utils import pformat_color_json + + +def echo_v1(_): + return "v1" + + +def echo_v2(_): + return "v2" + + +# specify the router policy as RoundRobin +serve.init(blocking=True, queueing_policy=serve.RoutePolicy.RoundRobin) + +# create a service +serve.create_endpoint("my_endpoint", "/echo", blocking=True) + +# create first backend +serve.create_backend(echo_v1, "echo:v1") + +# create second backend +serve.create_backend(echo_v2, "echo:v2") + +# link and split the service to two backends +serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5}) + +while True: + resp = requests.get("http://127.0.0.1:8000/echo").json() + print(pformat_color_json(resp)) + + print("...Sleeping for 2 seconds...") + time.sleep(2) diff --git a/python/ray/experimental/serve/global_state.py b/python/ray/experimental/serve/global_state.py index be133e8f7..d60a80dec 100644 --- a/python/ray/experimental/serve/global_state.py +++ b/python/ray/experimental/serve/global_state.py @@ -6,7 +6,8 @@ from ray.experimental.serve.kv_store_service import ( BackendTable, RoutingTable, TrafficPolicyTable) from ray.experimental.serve.metric import (MetricMonitor, start_metric_monitor_loop) -from ray.experimental.serve.queues import CentralizedQueuesActor + +from ray.experimental.serve.policy import RoutePolicy from ray.experimental.serve.server import HTTPActor @@ -36,9 +37,9 @@ class ActorNursery: self.bootstrap_state = dict() - def start_actor(self, actor_cls, init_args, tag): + def start_actor(self, actor_cls, tag, init_args=(), init_kwargs={}): """Start an actor and add it to the nursery""" - handle = actor_cls.remote(*init_args) + handle = actor_cls.remote(*init_args, **init_kwargs) self.actor_handles[handle] = tag return [handle] @@ -100,20 +101,41 @@ class GlobalState: if "http_server" not in self.actor_handle_cache: [handle] = ray.get( self.actor_nursery_handle.start_actor.remote( - HTTPActor, init_args=(), tag="http_server")) + HTTPActor, tag="http_server")) + handle.run.remote(host=host, port=port) self.refresh_actor_handle_cache() return self.actor_handle_cache["http_server"] - def init_or_get_router(self): - if "queue_actor" not in self.actor_handle_cache: + def _get_queueing_policy(self, default_policy): + return_policy = default_policy + # check if there is already a queue_actor running + # with policy as p.name for the case where + # serve nursery exists: ray.experimental.get_actor(SERVE_NURSERY_NAME) + for p in RoutePolicy: + queue_actor_tag = "queue_actor::" + p.name + if queue_actor_tag in self.actor_handle_cache: + return_policy = p + break + return return_policy + + def init_or_get_router(self, + queueing_policy=RoutePolicy.Random, + policy_kwargs={}): + # get queueing policy + self.queueing_policy = self._get_queueing_policy( + default_policy=queueing_policy) + queue_actor_tag = "queue_actor::" + self.queueing_policy.name + if queue_actor_tag not in self.actor_handle_cache: [handle] = ray.get( self.actor_nursery_handle.start_actor.remote( - CentralizedQueuesActor, init_args=(), tag="queue_actor")) + self.queueing_policy.value, + init_kwargs=policy_kwargs, + tag=queue_actor_tag)) handle.register_self_handle.remote(handle) self.refresh_actor_handle_cache() - return self.actor_handle_cache["queue_actor"] + return self.actor_handle_cache[queue_actor_tag] def init_or_get_metric_monitor(self, gc_window_seconds=3600): if "metric_monitor" not in self.actor_handle_cache: diff --git a/python/ray/experimental/serve/policy.py b/python/ray/experimental/serve/policy.py new file mode 100644 index 000000000..91998f3ec --- /dev/null +++ b/python/ray/experimental/serve/policy.py @@ -0,0 +1,17 @@ +from enum import Enum +from ray.experimental.serve.queues import ( + RoundRobinPolicyQueueActor, RandomPolicyQueueActor, + PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor) + + +class RoutePolicy(Enum): + """ + A class for registering the backend selection policy. + Add a name and the corresponding class. + Serve will support the added policy and policy can be accessed + in `serve.init` method through name provided here. + """ + Random = RandomPolicyQueueActor + RoundRobin = RoundRobinPolicyQueueActor + PowerOfTwo = PowerOfTwoPolicyQueueActor + FixedPacking = FixedPackingPolicyQueueActor diff --git a/python/ray/experimental/serve/queues.py b/python/ray/experimental/serve/queues.py index 023c554e5..62d96862f 100644 --- a/python/ray/experimental/serve/queues.py +++ b/python/ray/experimental/serve/queues.py @@ -4,6 +4,7 @@ import numpy as np import ray from ray.experimental.serve.utils import logger +import itertools from blist import sortedlist import time @@ -148,8 +149,7 @@ class CentralizedQueues: def link(self, service, backend): logger.debug("Link %s with %s", service, backend) - self.traffic[service][backend] = 1.0 - self.flush() + self.set_traffic(service, {backend: 1.0}) def set_traffic(self, service, traffic_dict): logger.debug("Setting traffic for service %s to %s", service, @@ -173,24 +173,8 @@ class CentralizedQueues: } return list(backends_in_policy.intersection(available_workers)) - def _flush(self): - # perform traffic splitting for requests - for service, queue in self.queues.items(): - # while there are incoming requests and there are backends - while len(queue) and len(self.traffic[service]): - backend_names = list(self.traffic[service].keys()) - backend_weights = list(self.traffic[service].values()) - # TODO(alind): is random choice good for deadline awareness? - # putting query in a buffer of a non available backend may - # not be good - chosen_backend = np.random.choice( - backend_names, p=backend_weights).squeeze() - - request = queue.popleft() - # maintain a sorted list in the buffer queue of the backend - self.buffer_queues[chosen_backend].add(request) - - # distach buffer queues to work queues + # flushes the buffer queue and assigns work to workers + def _flush_buffer(self): for service in self.queues.keys(): ready_backends = self._get_available_backends(service) for backend in ready_backends: @@ -207,9 +191,33 @@ class CentralizedQueues: ) work.replica_handle._ray_serve_call.remote(request) + # selects the backend and puts the service queue query to the buffer + # different policies will implement different backend selection policies + def _flush_service_queue(self): + """ + Expected Implementation: + The implementer is expected to access and manipulate + self.queues : dict[str,Deque] + self.buffer_queues : dict[str,sortedlist] + For registering the implemented policies register at policy.py! + Expected Behavior: + the Deque of all services in self.queues linked with + atleast one backend must be empty irrespective of whatever + backend policy is implemented. + """ + pass + + # _flush function has to flush the service and buffer queues. + def _flush(self): + self._flush_service_queue() + self._flush_buffer() + -@ray.remote class CentralizedQueuesActor(CentralizedQueues): + """ + A wrapper class for converting wrapper policy classes to ray + actors. This is needed to make `flush` call asynchronous. + """ self_handle = None def register_self_handle(self, handle_to_this_actor): @@ -220,3 +228,167 @@ class CentralizedQueuesActor(CentralizedQueues): self.self_handle._flush.remote() else: self._flush() + + +class RandomPolicyQueue(CentralizedQueues): + """ + A wrapper class for Random policy.This backend selection policy is + `Stateless` meaning the current decisions of selecting backend are + not dependent on previous decisions. Random policy (randomly) samples + backends based on backend weights for every query. This policy uses the + weights assigned to backends. + """ + + def _flush_service_queue(self): + # perform traffic splitting for requests + for service, queue in self.queues.items(): + # while there are incoming requests and there are backends + while len(queue) and len(self.traffic[service]): + backend_names = list(self.traffic[service].keys()) + backend_weights = list(self.traffic[service].values()) + # randomly choose a backend for every query + chosen_backend = np.random.choice( + backend_names, p=backend_weights).squeeze() + + request = queue.popleft() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class RandomPolicyQueueActor(RandomPolicyQueue, CentralizedQueuesActor): + pass + + +class RoundRobinPolicyQueue(CentralizedQueues): + """ + A wrapper class for RoundRobin policy. This backend selection policy + is `Stateful` meaning the current decisions of selecting backend are + dependent on previous decisions. RoundRobinPolicy assigns queries in + an interleaved manner to every backend serving for a service. Consider + backend A,B linked to a service. Now queries will be assigned to backends + in the following order - [ A, B, A, B ... ] . This policy doesn't use the + weights assigned to backends. + """ + + # Saves the information about last assigned + # backend for every service + round_robin_iterator_map = {} + + def set_traffic(self, service, traffic_dict): + logger.debug("Setting traffic for service %s to %s", service, + traffic_dict) + self.traffic[service] = traffic_dict + backend_names = list(self.traffic[service].keys()) + self.round_robin_iterator_map[service] = itertools.cycle(backend_names) + self.flush() + + def _flush_service_queue(self): + # perform traffic splitting for requests + for service, queue in self.queues.items(): + # if there are incoming requests and there are backends + if len(queue) and len(self.traffic[service]): + while len(queue): + # choose the next backend available from persistent + # information + chosen_backend = next( + self.round_robin_iterator_map[service]) + request = queue.popleft() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue, + CentralizedQueuesActor): + pass + + +class PowerOfTwoPolicyQueue(CentralizedQueues): + """ + A wrapper class for powerOfTwo policy. This backend selection policy is + `Stateless` meaning the current decisions of selecting backend are + dependent on previous decisions. PowerOfTwo policy (randomly) samples two + backends (say Backend A,B among A,B,C) based on the backend weights + specified and chooses the backend which is less loaded. This policy uses + the weights assigned to backends. + """ + + def _flush_service_queue(self): + # perform traffic splitting for requests + for service, queue in self.queues.items(): + # while there are incoming requests and there are backends + while len(queue) and len(self.traffic[service]): + backend_names = list(self.traffic[service].keys()) + backend_weights = list(self.traffic[service].values()) + if len(self.traffic[service]) >= 2: + # randomly pick 2 backends + backend1, backend2 = np.random.choice( + backend_names, 2, p=backend_weights) + + # see the length of buffer queues of the two backends + # and pick the one which has less no. of queries + # in the buffer + if (len(self.buffer_queues[backend1]) <= len( + self.buffer_queues[backend2])): + chosen_backend = backend1 + else: + chosen_backend = backend2 + else: + chosen_backend = np.random.choice( + backend_names, p=backend_weights).squeeze() + request = queue.popleft() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue, + CentralizedQueuesActor): + pass + + +class FixedPackingPolicyQueue(CentralizedQueues): + """ + A wrapper class for FixedPacking policy. This backend selection policy is + `Stateful` meaning the current decisions of selecting backend are dependent + on previous decisions. FixedPackingPolicy is k RoundRobin policy where + first packing_num queries are handled by 'backend-1' and next k queries are + handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are + served by the same service. This policy doesn't use the weights assigned to + backends. + + """ + + def __init__(self, packing_num=3): + # Saves the information about last assigned + # backend for every service + self.fixed_packing_iterator_map = {} + self.packing_num = packing_num + super().__init__() + + def set_traffic(self, service, traffic_dict): + logger.debug("Setting traffic for service %s to %s", service, + traffic_dict) + self.traffic[service] = traffic_dict + backend_names = list(self.traffic[service].keys()) + self.fixed_packing_iterator_map[service] = itertools.cycle( + itertools.chain.from_iterable( + itertools.repeat(x, self.packing_num) for x in backend_names)) + self.flush() + + def _flush_service_queue(self): + # perform traffic splitting for requests + for service, queue in self.queues.items(): + # if there are incoming requests and there are backends + if len(queue) and len(self.traffic[service]): + while len(queue): + # choose the next backend available from persistent + # information + chosen_backend = next( + self.fixed_packing_iterator_map[service]) + request = queue.popleft() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue, + CentralizedQueuesActor): + pass diff --git a/python/ray/experimental/serve/tests/test_queue.py b/python/ray/experimental/serve/tests/test_queue.py index 9d74da089..181be51cd 100644 --- a/python/ray/experimental/serve/tests/test_queue.py +++ b/python/ray/experimental/serve/tests/test_queue.py @@ -1,6 +1,8 @@ import pytest import ray -from ray.experimental.serve.queues import CentralizedQueues +from ray.experimental.serve.queues import RandomPolicyQueue +from ray.experimental.serve.queues import (RoundRobinPolicyQueue, + FixedPackingPolicyQueue) @pytest.fixture(scope="session") @@ -21,7 +23,7 @@ def task_runner_mock_actor(): def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): - q = CentralizedQueues() + q = RandomPolicyQueue() q.link("svc", "backend") result_object_id = q.enqueue_request("svc", 1, "kwargs", None) @@ -35,7 +37,7 @@ def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): def test_slo(serve_instance, task_runner_mock_actor): - q = CentralizedQueues() + q = RandomPolicyQueue() q.link("svc", "backend") for i in range(10): @@ -48,7 +50,7 @@ def test_slo(serve_instance, task_runner_mock_actor): def test_alter_backend(serve_instance, task_runner_mock_actor): - q = CentralizedQueues() + q = RandomPolicyQueue() q.set_traffic("svc", {"backend-1": 1}) result_object_id = q.enqueue_request("svc", 1, "kwargs", None) @@ -68,7 +70,7 @@ def test_alter_backend(serve_instance, task_runner_mock_actor): def test_split_traffic(serve_instance, task_runner_mock_actor): - q = CentralizedQueues() + q = RandomPolicyQueue() q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) # assume 50% split, the probability of all 20 requests goes to a @@ -84,8 +86,50 @@ def test_split_traffic(serve_instance, task_runner_mock_actor): assert [g.request_args for g in got_work] == [1, 1] +def test_split_traffic_round_robin(serve_instance, task_runner_mock_actor): + q = RoundRobinPolicyQueue() + q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) + # since round robin policy is stateful firing two queries consecutively + # would transfer the queries to two different backends + for _ in range(2): + q.enqueue_request("svc", 1, "kwargs", None) + q.dequeue_request("backend-1", task_runner_mock_actor) + result_one = ray.get(task_runner_mock_actor.get_recent_call.remote()) + q.dequeue_request("backend-2", task_runner_mock_actor) + result_two = ray.get(task_runner_mock_actor.get_recent_call.remote()) + + got_work = [result_one, result_two] + assert [g.request_args for g in got_work] == [1, 1] + + +def test_split_traffic_fixed_packing(serve_instance, task_runner_mock_actor): + packing_num = 4 + q = FixedPackingPolicyQueue(packing_num=packing_num) + q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) + + # fire twice the number of queries as the packing number + for i in range(2 * packing_num): + q.enqueue_request("svc", i, "kwargs", None) + + # both the backends will get equal number of queries + # as it is packed round robin + for _ in range(packing_num): + q.dequeue_request("backend-1", task_runner_mock_actor) + + result_one = ray.get(task_runner_mock_actor.get_recent_call.remote()) + + for _ in range(packing_num): + q.dequeue_request("backend-2", task_runner_mock_actor) + + result_two = ray.get(task_runner_mock_actor.get_recent_call.remote()) + + got_work = [result_one, result_two] + assert [g.request_args + for g in got_work] == [packing_num - 1, 2 * packing_num - 1] + + def test_queue_remove_replicas(serve_instance, task_runner_mock_actor): - q = CentralizedQueues() + q = RandomPolicyQueue() q.dequeue_request("backend", task_runner_mock_actor) q.remove_and_destory_replica("backend", task_runner_mock_actor) assert len(q.workers["backend"]) == 0 diff --git a/python/ray/experimental/serve/tests/test_task_runner.py b/python/ray/experimental/serve/tests/test_task_runner.py index d5182c7dc..158d88fc4 100644 --- a/python/ray/experimental/serve/tests/test_task_runner.py +++ b/python/ray/experimental/serve/tests/test_task_runner.py @@ -2,7 +2,7 @@ import pytest import ray import ray.experimental.serve.context as context -from ray.experimental.serve.queues import CentralizedQueuesActor +from ray.experimental.serve.queues import RoundRobinPolicyQueueActor from ray.experimental.serve.task_runner import ( RayServeMixin, TaskRunner, TaskRunnerActor, wrap_to_ray_error) @@ -21,7 +21,7 @@ def test_runner_wraps_error(): def test_runner_actor(serve_instance): - q = CentralizedQueuesActor.remote() + q = RoundRobinPolicyQueueActor.remote() def echo(flask_request, i=None): return i @@ -48,7 +48,7 @@ def test_runner_actor(serve_instance): def test_ray_serve_mixin(serve_instance): - q = CentralizedQueuesActor.remote() + q = RoundRobinPolicyQueueActor.remote() CONSUMER_NAME = "runner-cls" PRODUCER_NAME = "prod-cls" @@ -83,7 +83,7 @@ def test_ray_serve_mixin(serve_instance): def test_task_runner_check_context(serve_instance): - q = CentralizedQueuesActor.remote() + q = RoundRobinPolicyQueueActor.remote() def echo(flask_request, i=None): # Accessing the flask_request without web context should throw.