[serve] Deprecate client-based API in favor of process-wide singleton (#14696)

This commit is contained in:
Edward Oakes 2021-03-17 09:39:54 -05:00 committed by GitHub
parent 69202c6a7d
commit aab7ccc466
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 827 additions and 727 deletions

View file

@ -10,17 +10,17 @@ Splitting Traffic
=================
At times it may be useful to expose a single endpoint that is served by multiple backends.
You can do this by splitting the traffic for an endpoint between backends using :mod:`client.set_traffic <ray.serve.api.Client.set_traffic>`.
When calling :mod:`client.set_traffic <ray.serve.api.Client.set_traffic>`, you provide a dictionary of backend name to a float value that will be used to randomly route that portion of traffic (out of a total of 1.0) to the given backend.
You can do this by splitting the traffic for an endpoint between backends using :mod:`serve.set_traffic <ray.serve.api.set_traffic>`.
When calling :mod:`serve.set_traffic <ray.serve.api.set_traffic>`, you provide a dictionary of backend name to a float value that will be used to randomly route that portion of traffic (out of a total of 1.0) to the given backend.
For example, here we split traffic 50/50 between two backends:
.. code-block:: python
client.create_backend("backend1", MyClass1)
client.create_backend("backend2", MyClass2)
serve.create_backend("backend1", MyClass1)
serve.create_backend("backend2", MyClass2)
client.create_endpoint("fifty-fifty", backend="backend1", route="/fifty")
client.set_traffic("fifty-fifty", {"backend1": 0.5, "backend2": 0.5})
serve.create_endpoint("fifty-fifty", backend="backend1", route="/fifty")
serve.set_traffic("fifty-fifty", {"backend1": 0.5, "backend2": 0.5})
Each request is routed randomly between the backends in the traffic dictionary according to the provided weights.
Please see :ref:`session-affinity` for details on how to ensure that clients or users are consistently mapped to the same backend.
@ -28,52 +28,52 @@ Please see :ref:`session-affinity` for details on how to ensure that clients or
Canary Deployments
==================
:mod:`client.set_traffic <ray.serve.api.Client.set_traffic>` can be used to implement canary deployments, where one backend serves the majority of traffic, while a small fraction is routed to a second backend. This is especially useful for "canary testing" a new model on a small percentage of users, while the tried and true old model serves the majority. Once you are satisfied with the new model, you can reroute all traffic to it and remove the old model:
:mod:`serve.set_traffic <ray.serve.api.set_traffic>` can be used to implement canary deployments, where one backend serves the majority of traffic, while a small fraction is routed to a second backend. This is especially useful for "canary testing" a new model on a small percentage of users, while the tried and true old model serves the majority. Once you are satisfied with the new model, you can reroute all traffic to it and remove the old model:
.. code-block:: python
client.create_backend("default_backend", MyClass)
serve.create_backend("default_backend", MyClass)
# Initially, set all traffic to be served by the "default" backend.
client.create_endpoint("canary_endpoint", backend="default_backend", route="/canary-test")
serve.create_endpoint("canary_endpoint", backend="default_backend", route="/canary-test")
# Add a second backend and route 1% of the traffic to it.
client.create_backend("new_backend", MyNewClass)
client.set_traffic("canary_endpoint", {"default_backend": 0.99, "new_backend": 0.01})
serve.create_backend("new_backend", MyNewClass)
serve.set_traffic("canary_endpoint", {"default_backend": 0.99, "new_backend": 0.01})
# Add a third backend that serves another 1% of the traffic.
client.create_backend("new_backend2", MyNewClass2)
client.set_traffic("canary_endpoint", {"default_backend": 0.98, "new_backend": 0.01, "new_backend2": 0.01})
serve.create_backend("new_backend2", MyNewClass2)
serve.set_traffic("canary_endpoint", {"default_backend": 0.98, "new_backend": 0.01, "new_backend2": 0.01})
# Route all traffic to the new, better backend.
client.set_traffic("canary_endpoint", {"new_backend": 1.0})
serve.set_traffic("canary_endpoint", {"new_backend": 1.0})
# Or, if not so succesful, revert to the "default" backend for all traffic.
client.set_traffic("canary_endpoint", {"default_backend": 1.0})
serve.set_traffic("canary_endpoint", {"default_backend": 1.0})
Incremental Rollout
===================
:mod:`client.set_traffic <ray.serve.api.Client.set_traffic>` can also be used to implement incremental rollout.
:mod:`serve.set_traffic <ray.serve.api.set_traffic>` can also be used to implement incremental rollout.
Here, we want to replace an existing backend with a new implementation by gradually increasing the proportion of traffic that it serves.
In the example below, we do this repeatedly in one script, but in practice this would likely happen over time across multiple scripts.
.. code-block:: python
client.create_backend("existing_backend", MyClass)
serve.create_backend("existing_backend", MyClass)
# Initially, all traffic is served by the existing backend.
client.create_endpoint("incremental_endpoint", backend="existing_backend", route="/incremental")
serve.create_endpoint("incremental_endpoint", backend="existing_backend", route="/incremental")
# Then we can slowly increase the proportion of traffic served by the new backend.
client.create_backend("new_backend", MyNewClass)
client.set_traffic("incremental_endpoint", {"existing_backend": 0.9, "new_backend": 0.1})
client.set_traffic("incremental_endpoint", {"existing_backend": 0.8, "new_backend": 0.2})
client.set_traffic("incremental_endpoint", {"existing_backend": 0.5, "new_backend": 0.5})
client.set_traffic("incremental_endpoint", {"new_backend": 1.0})
serve.create_backend("new_backend", MyNewClass)
serve.set_traffic("incremental_endpoint", {"existing_backend": 0.9, "new_backend": 0.1})
serve.set_traffic("incremental_endpoint", {"existing_backend": 0.8, "new_backend": 0.2})
serve.set_traffic("incremental_endpoint", {"existing_backend": 0.5, "new_backend": 0.5})
serve.set_traffic("incremental_endpoint", {"new_backend": 1.0})
# At any time, we can roll back to the existing backend.
client.set_traffic("incremental_endpoint", {"existing_backend": 1.0})
serve.set_traffic("incremental_endpoint", {"existing_backend": 1.0})
.. _session-affinity:
@ -93,7 +93,7 @@ The shard key can either be specified via the X-SERVE-SHARD-KEY HTTP header or :
requests.get("127.0.0.1:8000/api", headers={"X-SERVE-SHARD-KEY": session_id})
# Specifying the shard key in a call made via serve handle.
handle = client.get_handle("api_endpoint")
handle = serve.get_handle("api_endpoint")
handler.options(shard_key=session_id).remote(args)
.. _serve-shadow-testing:
@ -102,30 +102,30 @@ Shadow Testing
==============
Sometimes when deploying a new backend, you may want to test it out without affecting the results seen by users.
You can do this with :mod:`client.shadow_traffic <ray.serve.api.Client.shadow_traffic>`, which allows you to duplicate requests to multiple backends for testing while still having them served by the set of backends specified via :mod:`client.set_traffic <ray.serve.api.Client.set_traffic>`.
You can do this with :mod:`serve.shadow_traffic <ray.serve.api.shadow_traffic>`, which allows you to duplicate requests to multiple backends for testing while still having them served by the set of backends specified via :mod:`serve.set_traffic <ray.serve.api.set_traffic>`.
Metrics about these requests are recorded as usual so you can use them to validate model performance.
This is demonstrated in the example below, where we create an endpoint serviced by a single backend but shadow traffic to two other backends for testing.
.. code-block:: python
client.create_backend("existing_backend", MyClass)
serve.create_backend("existing_backend", MyClass)
# All traffic is served by the existing backend.
client.create_endpoint("shadowed_endpoint", backend="existing_backend", route="/shadow")
serve.create_endpoint("shadowed_endpoint", backend="existing_backend", route="/shadow")
# Create two new backends that we want to test.
client.create_backend("new_backend_1", MyNewClass)
client.create_backend("new_backend_2", MyNewClass)
serve.create_backend("new_backend_1", MyNewClass)
serve.create_backend("new_backend_2", MyNewClass)
# Shadow traffic to the two new backends. This does not influence the result
# of requests to the endpoint, but a proportion of requests are
# *additionally* sent to these backends.
# Send 50% of all queries to the endpoint new_backend_1.
client.shadow_traffic("shadowed_endpoint", "new_backend_1", 0.5)
serve.shadow_traffic("shadowed_endpoint", "new_backend_1", 0.5)
# Send 10% of all queries to the endpoint new_backend_2.
client.shadow_traffic("shadowed_endpoint", "new_backend_2", 0.1)
serve.shadow_traffic("shadowed_endpoint", "new_backend_2", 0.1)
# Stop shadowing traffic to the backends.
client.shadow_traffic("shadowed_endpoint", "new_backend_1", 0)
client.shadow_traffic("shadowed_endpoint", "new_backend_2", 0)
serve.shadow_traffic("shadowed_endpoint", "new_backend_1", 0)
serve.shadow_traffic("shadowed_endpoint", "new_backend_2", 0)

View file

@ -16,14 +16,14 @@ the properties of a particular backend.
Sync and Async Handles
======================
Ray Serve offers two types of ``ServeHandle``. You can use the ``client.get_handle(..., sync=True|False)``
Ray Serve offers two types of ``ServeHandle``. You can use the ``serve.get_handle(..., sync=True|False)``
flag to toggle between them.
- When you set ``sync=True`` (the default), a synchronous handle is returned.
Calling ``handle.remote()`` should return a Ray ObjectRef.
- When you set ``sync=False``, an asyncio based handle is returned. You need to
Call it with ``await handle.remote()`` to return a Ray ObjectRef. To use ``await``,
you have to run ``client.get_handle`` and ``handle.remote`` in Python asyncio event loop.
you have to run ``serve.get_handle`` and ``handle.remote`` in Python asyncio event loop.
The async handle has performance advantage because it uses asyncio directly; as compared
to the sync handle, which talks to an asyncio event loop in a thread. To learn more about
@ -54,7 +54,7 @@ Ray Serve supports capturing path parameters. For example, in a call of the for
.. code-block:: python
client.create_endpoint("my_endpoint", backend="my_backend", route="/api/{username}")
serve.create_endpoint("my_endpoint", backend="my_backend", route="/api/{username}")
the ``username`` parameter will be accessible in your backend code as follows:
@ -70,7 +70,7 @@ For example, suppose this route is used:
.. code-block:: python
client.create_endpoint(
serve.create_endpoint(
"complex", backend="f", route="/api/{user_id:int}/{number:float}")
Then for a query to the route ``/api/123/3.14``, the ``request.path_params`` dictionary

View file

@ -19,8 +19,8 @@ There are three kinds of actors that are created to make up a Serve instance:
- Controller: A global actor unique to each Serve instance that manages
the control plane. The Controller is responsible for creating, updating, and
destroying other actors. Serve API calls like :mod:`client.create_backend <ray.serve.api.Client.create_backend>`,
:mod:`client.create_endpoint <ray.serve.api.Client.create_endpoint>` make remote calls to the Controller.
destroying other actors. Serve API calls like :mod:`create_backend <ray.serve.api.create_backend>`,
:mod:`create_endpoint <ray.serve.api.create_endpoint>` make remote calls to the Controller.
- Router: There is one router per node. Each router is a `Uvicorn <https://www.uvicorn.org/>`_ HTTP
server that accepts incoming requests, forwards them to replicas, and
responds once they are completed.

View file

@ -13,9 +13,9 @@ To define a backend, you must first define the "handler" or the business logic y
The handler should take as input a `Starlette Request object <https://www.starlette.io/requests/>`_ and return any JSON-serializable object as output. For a more customizable response type, the handler may return a
`Starlette Response object <https://www.starlette.io/responses/>`_.
A backend is defined using :mod:`client.create_backend <ray.serve.api.Client.create_backend>`, and the implementation can be defined as either a function or a class.
A backend is defined using :mod:`create_backend <ray.serve.api.create_backend>`, and the implementation can be defined as either a function or a class.
Use a function when your response is stateless and a class when you might need to maintain some state (like a model).
When using a class, you can specify arguments to be passed to the constructor in :mod:`client.create_backend <ray.serve.api.Client.create_backend>`, shown below.
When using a class, you can specify arguments to be passed to the constructor in :mod:`create_backend <ray.serve.api.create_backend>`, shown below.
A backend consists of a number of *replicas*, which are individual copies of the function or class that are started in separate Ray Workers (processes).
@ -32,23 +32,23 @@ A backend consists of a number of *replicas*, which are individual copies of the
def __call__(self, starlette_request):
return self.msg
client.create_backend("simple_backend", handle_request)
serve.create_backend("simple_backend", handle_request)
# Pass in the message that the backend will return as an argument.
# If we call this backend, it will respond with "hello, world!".
client.create_backend("simple_backend_class", RequestHandler, "hello, world!")
serve.create_backend("simple_backend_class", RequestHandler, "hello, world!")
We can also list all available backends and delete them to reclaim resources.
Note that a backend cannot be deleted while it is in use by an endpoint because then traffic to an endpoint may not be able to be handled.
.. code-block:: python
>> client.list_backends()
>> serve.list_backends()
{
'simple_backend': {'num_replicas': 1},
'simple_backend_class': {'num_replicas': 1},
}
>> client.delete_backend("simple_backend")
>> client.list_backends()
>> serve.delete_backend("simple_backend")
>> serve.list_backends()
{
'simple_backend_class': {'num_replicas': 1},
}
@ -59,12 +59,12 @@ Exposing a Backend
While backends define the implementation of your request handling logic, endpoints allow you to expose them via HTTP.
Endpoints are "logical" and can have one or multiple backends that serve requests to them.
To create an endpoint, we simply need to specify a name for the endpoint, the name of a backend to handle requests to the endpoint, and the route and methods where it will be accesible.
By default endpoints are serviced only by the backend provided to :mod:`client.create_endpoint <ray.serve.api.Client.create_endpoint>`, but in some cases you may want to specify multiple backends for an endpoint, e.g., for A/B testing or incremental rollout.
By default endpoints are serviced only by the backend provided to :mod:`create_endpoint <ray.serve.api.create_endpoint>`, but in some cases you may want to specify multiple backends for an endpoint, e.g., for A/B testing or incremental rollout.
For information on how to do this, please see :ref:`serve-split-traffic`.
.. code-block:: python
client.create_endpoint("simple_endpoint", backend="simple_backend", route="/simple", methods=["GET"])
serve.create_endpoint("simple_endpoint", backend="simple_backend", route="/simple", methods=["GET"])
After creating the endpoint, it is now exposed by the HTTP server and handles requests using the specified backend.
We can query the model to verify that it's working.
@ -78,23 +78,23 @@ We can also query the endpoint using the :mod:`ServeHandle <ray.serve.handle.Ray
.. code-block:: python
handle = client.get_handle("simple_endpoint")
handle = serve.get_handle("simple_endpoint")
print(ray.get(handle.remote()))
To view all of the existing endpoints that have created, use :mod:`client.list_endpoints <ray.serve.api.Client.list_endpoints>`.
To view all of the existing endpoints that have created, use :mod:`serve.list_endpoints <ray.serve.api.list_endpoints>`.
.. code-block:: python
>>> client.list_endpoints()
>>> serve.list_endpoints()
{'simple_endpoint': {'route': '/simple', 'methods': ['GET'], 'traffic': {}}}
You can also delete an endpoint using :mod:`client.delete_endpoint <ray.serve.api.Client.delete_endpoint>`.
You can also delete an endpoint using :mod:`serve.delete_endpoint <ray.serve.api.delete_endpoint>`.
Endpoints and backends are independent, so deleting an endpoint will not delete its backends.
However, an endpoint must be deleted in order to delete the backends that serve its traffic.
.. code-block:: python
client.delete_endpoint("simple_endpoint")
serve.delete_endpoint("simple_endpoint")
.. _configuring-a-backend:
@ -106,7 +106,7 @@ scaling out, splitting traffic, or configuring the maximum number of in-flight r
All of these options are encapsulated in a ``BackendConfig`` object for each backend.
The ``BackendConfig`` for a running backend can be updated using
:mod:`client.update_backend_config <ray.serve.api.Client.update_backend_config>`.
:mod:`serve.update_backend_config <ray.serve.api.update_backend_config>`.
Scaling Out
-----------
@ -116,11 +116,11 @@ To scale out a backend to many processes, simply configure the number of replica
.. code-block:: python
config = {"num_replicas": 10}
client.create_backend("my_scaled_endpoint_backend", handle_request, config=config)
serve.create_backend("my_scaled_endpoint_backend", handle_request, config=config)
# scale it back down...
config = {"num_replicas": 2}
client.update_backend_config("my_scaled_endpoint_backend", config)
serve.update_backend_config("my_scaled_endpoint_backend", config)
This will scale up or down the number of replicas that can accept requests.
@ -140,7 +140,7 @@ following:
.. code-block:: python
config = {"num_gpus": 1}
client.create_backend("my_gpu_backend", handle_request, ray_actor_options=config)
serve.create_backend("my_gpu_backend", handle_request, ray_actor_options=config)
Fractional Resources
--------------------
@ -153,8 +153,8 @@ The same could be done to multiplex over CPUs.
.. code-block:: python
half_gpu_config = {"num_gpus": 0.5}
client.create_backend("my_gpu_backend_1", handle_request, ray_actor_options=half_gpu_config)
client.create_backend("my_gpu_backend_2", handle_request, ray_actor_options=half_gpu_config)
serve.create_backend("my_gpu_backend_1", handle_request, ray_actor_options=half_gpu_config)
serve.create_backend("my_gpu_backend_2", handle_request, ray_actor_options=half_gpu_config)
Configuring Parallelism with OMP_NUM_THREADS
--------------------------------------------
@ -176,7 +176,7 @@ If you *do* want to enable this parallelism in your Serve backend, just set OMP_
os.environ["OMP_NUM_THREADS"] = parallelism
# Download model weights, initialize model, etc.
client.create_backend("parallel_backend", MyBackend, 12)
serve.create_backend("parallel_backend", MyBackend, 12)
.. note::
@ -200,7 +200,7 @@ The `reconfigure` method is called when the class is created if `user_config`
is set. In particular, it's also called when new replicas are created in the
future if scale up your backend later. The `reconfigure` method is also called
each time `user_config` is updated via
:mod:`client.update_backend_config <ray.serve.api.Client.update_backend_config>`.
:mod:`serve.update_backend_config <ray.serve.api.update_backend_config>`.
Dependency Management
=====================
@ -213,7 +213,7 @@ Currently this is supported using `conda <https://docs.conda.io/en/latest/>`_
via Ray's built-in ``runtime_env`` option for actors.
As with all other actor options, pass these in via ``ray_actor_options`` in
your call to
:mod:`client.create_backend <ray.serve.api.Client.create_backend>`.
:mod:`serve.create_backend <ray.serve.api.create_backend>`.
You must have a conda environment set up for each set of
dependencies you want to isolate. If using a multi-node cluster, the
desired conda environment must be present on all nodes.
@ -229,7 +229,7 @@ Python versions must be the same in both environments.
.. note::
If a conda environment is not specified, your backend will be started in the
same conda environment as the client (the process calling
:mod:`client.create_backend <ray.serve.api.Client.create_backend>`) by
:mod:`serve.create_backend <ray.serve.api.create_backend>`) by
default. (When using :ref:`ray-client`, your backend will be started in the
conda environment that the Serve controller is running in, which by default is the
conda environment the remote Ray cluster was started in.)

View file

@ -14,9 +14,8 @@ Lifetime of a Ray Serve Instance
================================
Ray Serve instances run on top of Ray clusters and are started using :mod:`serve.start <ray.serve.start>`.
:mod:`serve.start <ray.serve.start>` returns a :mod:`Client <ray.serve.api.Client>` object that can be used to create the backends and endpoints
that will be used to serve your Python code (including ML models).
The Serve instance will be torn down when the client object goes out of scope or the script exits.
Once :mod:`serve.start <ray.serve.start>` has been called, further API calls can be used to create the backends and endpoints that will be used to serve your Python code (including ML models).
The Serve instance will be torn down when the script exits.
When running on a long-lived Ray cluster (e.g., one started using ``ray start`` and connected
to using ``ray.init(address="auto")``, you can also deploy a Ray Serve instance as a long-running
@ -40,12 +39,12 @@ In general, **Option 2 is recommended for most users** because it allows you to
from ray import serve
# This will start Ray locally and start Serve on top of it.
client = serve.start()
serve.start()
def my_backend_func(request):
return "hello"
client.create_backend("my_backend", my_backend_func)
serve.create_backend("my_backend", my_backend_func)
# Serve will be shut down once the script exits, so keep it alive manually.
while True:
@ -66,12 +65,11 @@ In general, **Option 2 is recommended for most users** because it allows you to
# This will connect to the running Ray cluster.
ray.init(address="auto")
client = serve.connect()
def my_backend_func(request):
return "hello"
client.create_backend("my_backend", my_backend_func)
serve.create_backend("my_backend", my_backend_func)
Deploying on Kubernetes
=======================
@ -168,13 +166,13 @@ With the cluster now running, we can run a simple script to start Ray Serve and
# Connect to the running Ray cluster.
ray.init(address="auto")
# Bind on 0.0.0.0 to expose the HTTP server on external IPs.
client = serve.start(detached=True, http_options={"host": "0.0.0.0"})
serve.start(detached=True, http_options={"host": "0.0.0.0"})
def hello():
return "hello world"
client.create_backend("hello_backend", hello)
client.create_endpoint("hello_endpoint", backend="hello_backend", route="/hello")
serve.create_backend("hello_backend", hello)
serve.create_endpoint("hello_endpoint", backend="hello_backend", route="/hello")
Save this script locally as ``deploy.py`` and run it on the head node using ``ray submit``:

View file

@ -16,14 +16,14 @@ See :doc:`deployment` for information about how to deploy serve.
How do I call an endpoint from Python code?
-------------------------------------------
Use :mod:`client.get_handle <ray.serve.api.Client.get_handle>` to get a handle to the endpoint,
Use :mod:`serve.get_handle <ray.serve.api.get_handle>` to get a handle to the endpoint,
then use :mod:`handle.remote <ray.serve.handle.RayServeHandle.remote>` to send requests to that
endpoint. This returns a Ray ObjectRef whose result can be waited for or retrieved using
``ray.wait`` or ``ray.get``, respectively.
.. code-block:: python
handle = client.get_handle("api_endpoint")
handle = serve.get_handle("api_endpoint")
ray.get(handle.remote(request))
@ -47,7 +47,7 @@ To call a method via Python, use :mod:`handle.options <ray.serve.handle.RayServe
self.count += inc
return True
handle = client.get_handle("endpoint_name")
handle = serve.get_handle("endpoint_name")
handle.options(method_name="other_method").remote(5)
The call is the same as a regular query except a different method is called
@ -65,7 +65,7 @@ You can return a `Starlette Response object <https://www.starlette.io/responses/
def f(starlette_request):
return Response('Hello, world!', status_code=123, media_type='text/plain')
client.create_backend("hello", f)
serve.create_backend("hello", f)
How do I enable CORS and other HTTP features?
---------------------------------------------

View file

@ -32,8 +32,8 @@ You can also have Ray Serve batch requests for performance, which is especially
async def __call__(self, request):
return await self.handle_batch(request)
client.create_backend("counter1", BatchingExample)
client.create_endpoint("counter1", backend="counter1", route="/increment")
serve.create_backend("counter1", BatchingExample)
serve.create_endpoint("counter1", backend="counter1", route="/increment")
Please take a look at :ref:`Batching Tutorial<serve-batch-tutorial>` for a deep
dive.

View file

@ -1,15 +1,22 @@
Serve API Reference
===================
Start or Connect to a Cluster
-----------------------------
Core APIs
---------
.. autofunction:: ray.serve.start
.. autofunction:: ray.serve.connect
Client API
----------
.. autoclass:: ray.serve.api.Client
:members: create_backend, list_backends, delete_backend, get_backend_config, update_backend_config, create_endpoint, list_endpoints, delete_endpoint, set_traffic, shadow_traffic, get_handle, shutdown
.. autofunction:: ray.serve.create_backend
.. autofunction:: ray.serve.list_backends
.. autofunction:: ray.serve.delete_backend
.. autofunction:: ray.serve.get_backend_config
.. autofunction:: ray.serve.update_backend_config
.. autofunction:: ray.serve.create_endpoint
.. autofunction:: ray.serve.list_endpoints
.. autofunction:: ray.serve.delete_endpoint
.. autofunction:: ray.serve.set_traffic
.. autofunction:: ray.serve.shadow_traffic
.. autofunction:: ray.serve.get_handle
.. autofunction:: ray.serve.shutdown
Backend Configuration
---------------------

View file

@ -33,13 +33,11 @@ Next, start the Ray Serve runtime:
.. code-block:: python
client = serve.start()
Here ``client`` is a Ray Serve Client object. You will access most of the Ray Serve API via methods of this object.
serve.start()
.. warning::
When ``client`` goes out of scope, for example when you exit the interactive terminal or when you exit a Python script, Ray Serve will shut down.
When the Python script exits, Ray Serve will shut down.
If you would rather keep Ray Serve running in the background, see :doc:`deployment`.
Now we will define a simple Counter class. The goal is to serve this class behind an HTTP endpoint using Ray Serve.
@ -64,7 +62,7 @@ Now we are ready to deploy our class using Ray Serve. First, create a Ray Serve
.. code-block:: python
client.create_backend("my_backend", Counter)
serve.create_backend("my_backend", Counter)
Here we have assigned the tag ``"my_backend"`` to this backend, which we can use to identify this backend in the future.
@ -76,7 +74,7 @@ To complete the deployment, we will expose this backend over HTTP by creating a
.. code-block:: python
client.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
serve.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
Here ``"my_endpoint"`` is a tag used to identify this endpoint, and we have specified the backend to place behind the endpoint via the `backend` parameter.
The last parameter, ``route``, is the path at which our endpoint will be available over HTTP.
@ -86,4 +84,4 @@ In your browser, simply visit http://127.0.0.1:8000/counter, and you should see
If you keep refreshing the page, the count should increase, as expected.
You just built and ran your first Ray Serve application! Now you can dive into the :doc:`core-apis` to get a deeper understanding of Ray Serve.
For more interesting example applications, including integrations with popular machine learning frameworks and Python web servers, be sure to check out :doc:`tutorials/index`.
For more interesting example applications, including integrations with popular machine learning frameworks and Python web servers, be sure to check out :doc:`tutorials/index`.

View file

@ -69,7 +69,7 @@ Here's how to run this example:
2. In the directory where the example files are saved, run ``python deploy_serve.py`` to deploy our Ray Serve endpoint.
.. note::
Because we have omitted the keyword argument ``route`` in ``client.create_endpoint()``, our endpoint will not be exposed over HTTP by Ray Serve.
Because we have omitted the keyword argument ``route`` in ``serve.create_endpoint()``, our endpoint will not be exposed over HTTP by Ray Serve.
3. Run ``gunicorn aiohttp_app:app --worker-class aiohttp.GunicornWebWorker --bind localhost:8001`` to start the AIOHTTP app using gunicorn. We bind to port 8001 because the Ray Dashboard is already using port 8000 by default.

View file

@ -1,5 +1,8 @@
from ray.serve.api import (accept_batch, Client, connect, start,
get_replica_context)
from ray.serve.api import (
accept_batch, connect, start, get_replica_context, get_handle,
shadow_traffic, set_traffic, delete_backend, list_backends, create_backend,
get_backend_config, update_backend_config, list_endpoints, delete_endpoint,
create_endpoint, shutdown)
from ray.serve.batching import batch
from ray.serve.config import BackendConfig, HTTPOptions
from ray.serve.utils import ServeRequest
@ -10,13 +13,9 @@ import ray.worker
ray.worker.blocking_get_inside_async_warned = True
__all__ = [
"accept_batch",
"BackendConfig",
"batch",
"connect",
"Client",
"start",
"HTTPOptions",
"get_replica_context",
"ServeRequest",
"accept_batch", "BackendConfig", "batch", "connect", "start",
"HTTPOptions", "get_replica_context", "ServeRequest", "get_handle",
"shadow_traffic", "set_traffic", "delete_backend", "list_backends",
"create_backend", "get_backend_config", "update_backend_config",
"list_endpoints", "delete_endpoint", "create_endpoint", "shutdown"
]

View file

@ -24,7 +24,20 @@ from ray.serve.router import RequestMetadata, Router
from ray.actor import ActorHandle
_INTERNAL_REPLICA_CONTEXT = None
global_async_loop = None
_global_async_loop = None
_global_client = None
def _get_global_client():
if _global_client is not None:
return _global_client
return connect()
def _set_global_client(client):
global _global_client
_global_client = client
@dataclass
@ -36,15 +49,15 @@ class ReplicaContext:
def create_or_get_async_loop_in_thread():
global global_async_loop
if global_async_loop is None:
global_async_loop = asyncio.new_event_loop()
global _global_async_loop
if _global_async_loop is None:
_global_async_loop = asyncio.new_event_loop()
thread = threading.Thread(
daemon=True,
target=global_async_loop.run_forever,
target=_global_async_loop.run_forever,
)
thread.start()
return global_async_loop
return _global_async_loop
def _set_internal_replica_context(backend_tag, replica_tag, controller_name):
@ -55,9 +68,15 @@ def _set_internal_replica_context(backend_tag, replica_tag, controller_name):
def _ensure_connected(f: Callable) -> Callable:
@wraps(f)
def check(self, *args, **kwargs):
def check(self, *args, _internal=False, **kwargs):
if self._shutdown:
raise RayServeException("Client has already been shut down.")
if not _internal:
logger.warning(
"The client-based API is being deprecated in favor of global "
"API calls (e.g., `serve.create_backend()`). Please replace "
"all instances of `client.api_call()` with "
"`serve.api_call()`.")
return f(self, *args, **kwargs)
return check
@ -174,7 +193,6 @@ class Client:
self._shutdown = True
@_ensure_connected
def _wait_for_goal(self, result_object_id: ray.ObjectRef) -> bool:
goal_id: Optional[UUID] = ray.get(result_object_id)
if goal_id is not None:
@ -216,7 +234,7 @@ class Client:
"methods must be a list of strings, but got type {}".format(
type(methods)))
endpoints = self.list_endpoints()
endpoints = self.list_endpoints(_internal=True)
if endpoint_name in endpoints:
methods_old = endpoints[endpoint_name]["methods"]
route_old = endpoints[endpoint_name]["route"]
@ -365,7 +383,7 @@ class Client:
reconfigure method of the backend. The reconfigure method is
called if "user_config" is not None.
"""
if backend_tag in self.list_backends().keys():
if backend_tag in self.list_backends(_internal=True).keys():
raise ValueError(
"Cannot create backend. "
"Backend '{}' is already registered.".format(backend_tag))
@ -633,7 +651,9 @@ def start(
raise TimeoutError(
"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s.")
return Client(controller, controller_name, detached=detached)
client = Client(controller, controller_name, detached=detached)
_set_global_client(client)
return client
def connect() -> Client:
@ -642,8 +662,8 @@ def connect() -> Client:
If calling from the driver program, the Serve instance on this Ray cluster
must first have been initialized using `serve.start(detached=True)`.
If called from within a backend, will connect to the same Serve instance
that the backend is running in.
If called from within a backend, this will connect to the same Serve
instance that the backend is running in.
"""
# Initialize ray if needed.
@ -666,7 +686,231 @@ def connect() -> Client:
"call `serve.start(detached=True) to start "
"one.")
return Client(controller, controller_name, detached=True)
client = Client(controller, controller_name, detached=True)
_set_global_client(client)
return client
def shutdown() -> None:
"""Completely shut down the connected Serve instance.
Shuts down all processes and deletes all state associated with the
instance.
"""
if _global_client is None:
return
_get_global_client().shutdown()
_set_global_client(None)
def create_endpoint(endpoint_name: str,
*,
backend: str = None,
route: Optional[str] = None,
methods: List[str] = ["GET"]) -> None:
"""Create a service endpoint given route_expression.
Args:
endpoint_name (str): A name to associate to with the endpoint.
backend (str, required): The backend that will serve requests to
this endpoint. To change this or split traffic among backends,
use `serve.set_traffic`.
route (str, optional): A string begin with "/". HTTP server will
use the string to match the path.
methods(List[str], optional): The HTTP methods that are valid for
this endpoint.
"""
return _get_global_client().create_endpoint(
endpoint_name,
backend=backend,
route=route,
methods=methods,
_internal=True)
def delete_endpoint(endpoint: str) -> None:
"""Delete the given endpoint.
Does not delete any associated backends.
"""
return _get_global_client().delete_endpoint(endpoint, _internal=True)
def list_endpoints() -> Dict[str, Dict[str, Any]]:
"""Returns a dictionary of all registered endpoints.
The dictionary keys are endpoint names and values are dictionaries
of the form: {"methods": List[str], "traffic": Dict[str, float]}.
"""
return _get_global_client().list_endpoints(_internal=True)
def update_backend_config(
backend_tag: str,
config_options: Union[BackendConfig, Dict[str, Any]]) -> None:
"""Update a backend configuration for a backend tag.
Keys not specified in the passed will be left unchanged.
Args:
backend_tag(str): A registered backend.
config_options(dict, serve.BackendConfig): Backend config options
to update. Either a BackendConfig object or a dict mapping
strings to values for the following supported options:
- "num_replicas": number of processes to start up that
will handle requests to this backend.
- "max_batch_size": the maximum number of requests that will
be processed in one batch by this backend.
- "batch_wait_timeout": time in seconds that backend replicas
will wait for a full batch of requests before
processing a partial batch.
- "max_concurrent_queries": the maximum number of queries
that will be sent to a replica of this backend
without receiving a response.
- "user_config" (experimental): Arguments to pass to the
reconfigure method of the backend. The reconfigure method is
called if "user_config" is not None.
"""
return _get_global_client().update_backend_config(
backend_tag, config_options, _internal=True)
def get_backend_config(backend_tag: str) -> BackendConfig:
"""Get the backend configuration for a backend tag.
Args:
backend_tag(str): A registered backend.
"""
return _get_global_client().get_backend_config(backend_tag, _internal=True)
def create_backend(
backend_tag: str,
backend_def: Union[Callable, Type[Callable], str],
*init_args: Any,
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[BackendConfig, Dict[str, Any]]] = None) -> None:
"""Create a backend with the provided tag.
Args:
backend_tag (str): a unique tag assign to identify this backend.
backend_def (callable, class, str): a function or class
implementing __call__ and returning a JSON-serializable object
or a Starlette Response object. A string import path can also
be provided (e.g., "my_module.MyClass"), in which case the
underlying function or class will be imported dynamically in
the worker replicas.
*init_args (optional): the arguments to pass to the class
initialization method. Not valid if backend_def is a function.
ray_actor_options (optional): options to be passed into the
@ray.remote decorator for the backend actor.
config (dict, serve.BackendConfig, optional): configuration options
for this backend. Either a BackendConfig, or a dictionary
mapping strings to values for the following supported options:
- "num_replicas": number of processes to start up that
will handle requests to this backend.
- "max_batch_size": the maximum number of requests that will
be processed in one batch by this backend.
- "batch_wait_timeout": time in seconds that backend replicas
will wait for a full batch of requests before processing a
partial batch.
- "max_concurrent_queries": the maximum number of queries that
will be sent to a replica of this backend without receiving a
response.
- "user_config" (experimental): Arguments to pass to the
reconfigure method of the backend. The reconfigure method is
called if "user_config" is not None.
"""
return _get_global_client().create_backend(
backend_tag,
backend_def,
*init_args,
ray_actor_options=ray_actor_options,
config=config,
_internal=True)
def list_backends() -> Dict[str, BackendConfig]:
"""Returns a dictionary of all registered backends.
Dictionary maps backend tags to backend config objects.
"""
return _get_global_client().list_backends(_internal=True)
def delete_backend(backend_tag: str, force: bool = False) -> None:
"""Delete the given backend.
The backend must not currently be used by any endpoints.
Args:
backend_tag (str): The backend tag to be deleted.
force (bool): Whether or not to force the deletion, without waiting
for graceful shutdown. Default to false.
"""
return _get_global_client().delete_backend(
backend_tag, force=force, _internal=True)
def set_traffic(endpoint_name: str,
traffic_policy_dictionary: Dict[str, float]) -> None:
"""Associate a service endpoint with traffic policy.
Example:
>>> serve.set_traffic("service-name", {
"backend:v1": 0.5,
"backend:v2": 0.5
})
Args:
endpoint_name (str): A registered service endpoint.
traffic_policy_dictionary (dict): a dictionary maps backend names
to their traffic weights. The weights must sum to 1.
"""
return _get_global_client().set_traffic(
endpoint_name, traffic_policy_dictionary, _internal=True)
def shadow_traffic(endpoint_name: str, backend_tag: str,
proportion: float) -> None:
"""Shadow traffic from an endpoint to a backend.
The specified proportion of requests will be duplicated and sent to the
backend. Responses of the duplicated traffic will be ignored.
The backend must not already be in use.
To stop shadowing traffic to a backend, call `shadow_traffic` with
proportion equal to 0.
Args:
endpoint_name (str): A registered service endpoint.
backend_tag (str): A registered backend.
proportion (float): The proportion of traffic from 0 to 1.
"""
return _get_global_client().shadow_traffic(
endpoint_name, backend_tag, proportion, _internal=True)
def get_handle(endpoint_name: str,
missing_ok: Optional[bool] = False,
sync: bool = True) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Retrieve RayServeHandle for service endpoint to invoke it from Python.
Args:
endpoint_name (str): A registered service endpoint.
missing_ok (bool): If true, then Serve won't check the endpoint is
registered. False by default.
sync (bool): If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return a ServeHandle
that's only usable in asyncio loop.
Returns:
RayServeHandle
"""
return _get_global_client().get_handle(
endpoint_name, missing_ok=missing_ok, sync=sync, _internal=True)
def get_replica_context() -> ReplicaContext:

View file

@ -7,10 +7,7 @@ from ray import serve
# Connect to the running Ray cluster.
ray.init(address="auto")
# Connect to the running Ray Serve instance.
client = serve.connect()
my_handle = client.get_handle("my_endpoint") # Returns a ServeHandle object.
my_handle = serve.get_handle("my_endpoint") # Returns a ServeHandle object.
# Define our AIOHTTP request handler.

View file

@ -4,7 +4,6 @@ from ray import serve
import requests
ray.init(address="auto")
client = serve.connect()
logger = logging.getLogger("ray")
@ -19,8 +18,8 @@ class Counter:
return {"count": self.count}
client.create_backend("my_backend", Counter)
client.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
serve.create_backend("my_backend", Counter)
serve.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
for i in range(10):
requests.get("http://127.0.0.1:8000/counter")

View file

@ -2,23 +2,23 @@ import requests
from ray import serve
import tensorflow as tf
client = serve.start()
serve.start()
def tf_version(request):
return ("Tensorflow " + tf.__version__)
client.create_backend(
serve.create_backend(
"tf1", tf_version, ray_actor_options={"runtime_env": {
"conda": "ray-tf1"
}})
client.create_endpoint("tf1", backend="tf1", route="/tf1")
client.create_backend(
serve.create_endpoint("tf1", backend="tf1", route="/tf1")
serve.create_backend(
"tf2", tf_version, ray_actor_options={"runtime_env": {
"conda": "ray-tf2"
}})
client.create_endpoint("tf2", backend="tf2", route="/tf2")
serve.create_endpoint("tf2", backend="tf2", route="/tf2")
print(requests.get("http://127.0.0.1:8000/tf1").text) # Tensorflow 1.15.0
print(requests.get("http://127.0.0.1:8000/tf2").text) # Tensorflow 2.3.0

View file

@ -12,7 +12,7 @@ serve_handle = None
@app.on_event("startup") # Code to be run when the server starts.
async def startup_event():
ray.init(address="auto") # Connect to the running Ray cluster.
client = serve.start(http_host=None) # Start the Ray Serve client.
serve.start(http_host=None) # Start the Ray Serve instance.
# Define a callable class to use for our Ray Serve backend.
class GPT2:
@ -24,12 +24,12 @@ async def startup_event():
# Set up a Ray Serve backend with the desired number of replicas.
backend_config = serve.BackendConfig(num_replicas=2)
client.create_backend("gpt-2", GPT2, config=backend_config)
client.create_endpoint("generate", backend="gpt-2")
serve.create_backend("gpt-2", GPT2, config=backend_config)
serve.create_endpoint("generate", backend="gpt-2")
# Get a handle to our Ray Serve endpoint so we can query it in Python.
global serve_handle
serve_handle = client.get_handle("generate")
serve_handle = serve.get_handle("generate")
@app.get("/generate")

View file

@ -6,7 +6,7 @@ client = serve.start()
# Include your class as input to the ImportedBackend constructor.
import_path = "ray.serve.utils.MockImportedBackend"
client.create_backend("imported", import_path, "input_arg")
client.create_endpoint("imported", backend="imported", route="/imported")
serve.create_backend("imported", import_path, "input_arg")
serve.create_endpoint("imported", backend="imported", route="/imported")
print(requests.get("http://127.0.0.1:8000/imported").text)

View file

@ -3,7 +3,7 @@ from ray import serve
import requests
ray.init(num_cpus=4)
client = serve.start()
serve.start()
class Counter:
@ -16,11 +16,11 @@ class Counter:
# Form a backend from our class and connect it to an endpoint.
client.create_backend("my_backend", Counter)
client.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
serve.create_backend("my_backend", Counter)
serve.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
# Query our endpoint in two different ways: from HTTP and from Python.
print(requests.get("http://127.0.0.1:8000/counter").json())
# > {"count": 1}
print(ray.get(client.get_handle("my_endpoint").remote()))
print(ray.get(serve.get_handle("my_endpoint").remote()))
# > {"count": 2}

View file

@ -3,7 +3,7 @@ from ray import serve
import requests
ray.init(num_cpus=4)
client = serve.start()
serve.start()
def say_hello(request):
@ -11,11 +11,11 @@ def say_hello(request):
# Form a backend from our function and connect it to an endpoint.
client.create_backend("my_backend", say_hello)
client.create_endpoint("my_endpoint", backend="my_backend", route="/hello")
serve.create_backend("my_backend", say_hello)
serve.create_endpoint("my_endpoint", backend="my_backend", route="/hello")
# Query our endpoint in two different ways: from HTTP and from Python.
print(requests.get("http://127.0.0.1:8000/hello?name=serve").text)
# > hello serve!
print(ray.get(client.get_handle("my_endpoint").remote(name="serve")))
print(ray.get(serve.get_handle("my_endpoint").remote(name="serve")))
# > hello serve!

View file

@ -5,7 +5,7 @@ from ray.util import metrics
import time
ray.init(address="auto")
client = serve.start()
serve.start()
class MyBackendClass:
@ -23,10 +23,10 @@ class MyBackendClass:
self.my_counter.inc()
client.create_backend("my_backend", MyBackendClass)
client.create_endpoint("my_endpoint", backend="my_backend")
serve.create_backend("my_backend", MyBackendClass)
serve.create_endpoint("my_endpoint", backend="my_backend")
handle = client.get_handle("my_endpoint")
handle = serve.get_handle("my_endpoint")
while (True):
ray.get(handle.remote(excellent=True))
time.sleep(1)

View file

@ -4,7 +4,7 @@ from ray import serve
import requests
ray.init()
client = serve.start()
serve.start()
logger = logging.getLogger("ray")
@ -13,7 +13,7 @@ def f(request):
logger.info("Some info!")
client.create_backend("my_backend", f)
client.create_endpoint("my_endpoint", backend="my_backend", route="/f")
serve.create_backend("my_backend", f)
serve.create_endpoint("my_endpoint", backend="my_backend", route="/f")
requests.get("http://127.0.0.1:8000/f")

View file

@ -4,16 +4,16 @@ from ray import serve
import time
ray.init(address="auto")
client = serve.start()
serve.start()
def f(request):
time.sleep(1)
client.create_backend("f", f)
client.create_endpoint("f", backend="f")
serve.create_backend("f", f)
serve.create_endpoint("f", backend="f")
handle = client.get_handle("f")
handle = serve.get_handle("f")
while (True):
ray.get(handle.remote())

View file

@ -4,7 +4,7 @@ import ray
from ray import serve
ray.init(num_cpus=8)
client = serve.start()
serve.start()
# Our pipeline will be structured as follows:
# - Input comes in, the composed model sends it to model_one
@ -27,9 +27,8 @@ def model_two(request):
class ComposedModel:
def __init__(self):
client = serve.connect()
self.model_one = client.get_handle("model_one")
self.model_two = client.get_handle("model_two")
self.model_one = serve.get_handle("model_one")
self.model_two = serve.get_handle("model_two")
# This method can be called concurrently!
async def __call__(self, starlette_request):
@ -45,17 +44,17 @@ class ComposedModel:
return result
client.create_backend("model_one", model_one)
client.create_endpoint("model_one", backend="model_one")
serve.create_backend("model_one", model_one)
serve.create_endpoint("model_one", backend="model_one")
client.create_backend("model_two", model_two)
client.create_endpoint("model_two", backend="model_two")
serve.create_backend("model_two", model_two)
serve.create_endpoint("model_two", backend="model_two")
# max_concurrent_queries is optional. By default, if you pass in an async
# function, Ray Serve sets the limit to a high number.
client.create_backend(
serve.create_backend(
"composed_backend", ComposedModel, config={"max_concurrent_queries": 10})
client.create_endpoint(
serve.create_endpoint(
"composed", backend="composed_backend", route="/composed")
for _ in range(5):

View file

@ -6,7 +6,7 @@ from ray import serve
from ray.serve import BackendConfig
ray.init()
client = serve.start()
serve.start()
class Threshold:
@ -24,10 +24,10 @@ class Threshold:
backend_config = BackendConfig(user_config={"threshold": 0.01})
client.create_backend("threshold", Threshold, config=backend_config)
client.create_endpoint("threshold", backend="threshold", route="/threshold")
serve.create_backend("threshold", Threshold, config=backend_config)
serve.create_endpoint("threshold", backend="threshold", route="/threshold")
print(requests.get("http://127.0.0.1:8000/threshold").text) # true, probably
backend_config = BackendConfig(user_config={"threshold": 0.99})
client.update_backend_config("threshold", backend_config)
serve.update_backend_config("threshold", backend_config)
print(requests.get("http://127.0.0.1:8000/threshold").text) # false, probably

View file

@ -36,9 +36,9 @@ class BatchAdder:
# __doc_deploy_begin__
ray.init(num_cpus=8)
client = serve.start()
client.create_backend("adder:v0", BatchAdder)
client.create_endpoint(
serve.start()
serve.create_backend("adder:v0", BatchAdder)
serve.create_endpoint(
"adder", backend="adder:v0", route="/adder", methods=["GET"])
# __doc_deploy_end__
@ -61,7 +61,7 @@ print("Result returned:", results)
# __doc_query_end__
# __doc_query_handle_begin__
handle = client.get_handle("adder")
handle = serve.get_handle("adder")
print(handle)
# Output
# RayServeHandle(

View file

@ -69,9 +69,9 @@ ray.init(address="auto")
# now we initialize /connect to the Ray service
# listen on 0.0.0.0 to make the HTTP server accessible from other machines.
client = serve.start(http_host="0.0.0.0")
client.create_backend("lr:v1", BoostingModel)
client.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
serve.start(http_host="0.0.0.0")
serve.create_backend("lr:v1", BoostingModel)
serve.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
# __doc_create_deploy_end__
# __doc_query_begin__
@ -163,7 +163,6 @@ class BoostingModelv2:
# now we initialize /connect to the Ray service
client = serve.connect()
client.create_backend("lr:v2", BoostingModelv2)
client.set_traffic("iris_classifier", {"lr:v2": 0.25, "lr:v1": 0.75})
serve.create_backend("lr:v2", BoostingModelv2)
serve.set_traffic("iris_classifier", {"lr:v2": 0.25, "lr:v1": 0.75})
# __doc_create_deploy_2_end__

View file

@ -48,9 +48,9 @@ class ImageModel:
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("resnet18:v0", ImageModel)
client.create_endpoint(
serve.start()
serve.create_backend("resnet18:v0", ImageModel)
serve.create_endpoint(
"predictor",
backend="resnet18:v0",
route="/image_predict",

View file

@ -53,9 +53,9 @@ checkpoint_path = train_ppo_model()
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("ppo", ServePPOModel, checkpoint_path)
client.create_endpoint("ppo-endpoint", backend="ppo", route="/cartpole-ppo")
serve.start()
serve.create_backend("ppo", ServePPOModel, checkpoint_path)
serve.create_endpoint("ppo-endpoint", backend="ppo", route="/cartpole-ppo")
# __doc_deploy_end__
# __doc_query_begin__

View file

@ -73,9 +73,9 @@ class BoostingModel:
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("lr:v1", BoostingModel)
client.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
serve.start()
serve.create_backend("lr:v1", BoostingModel)
serve.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
# __doc_deploy_end__
# __doc_query_begin__

View file

@ -71,9 +71,9 @@ class TFMnistModel:
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("tf:v1", TFMnistModel, TRAINED_MODEL_PATH)
client.create_endpoint("tf_classifier", backend="tf:v1", route="/mnist")
serve.start()
serve.create_backend("tf:v1", TFMnistModel, TRAINED_MODEL_PATH)
serve.create_endpoint("tf_classifier", backend="tf:v1", route="/mnist")
# __doc_deploy_end__
# __doc_query_begin__

View file

@ -13,9 +13,9 @@ def echo(starlette_request):
return ["hello " + starlette_request.query_params.get("name", "serve!")]
client = serve.start()
client.create_backend("echo:v1", echo)
client.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
serve.start()
serve.create_backend("echo:v1", echo)
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
while True:
resp = requests.get("http://127.0.0.1:8000/echo").json()

View file

@ -37,9 +37,9 @@ class MagicCounter:
return base_number + self.increment
client = serve.start()
client.create_backend("counter:v1", MagicCounter, 42) # increment=42
client.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
serve.start()
serve.create_backend("counter:v1", MagicCounter, 42) # increment=42
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
print("Sending ten queries via HTTP")
for i in range(10):
@ -51,7 +51,7 @@ for i in range(10):
time.sleep(0.2)
print("Sending ten queries via Python")
handle = client.get_handle("magic_counter")
handle = serve.get_handle("magic_counter")
for i in range(10):
print("> Pinging handle.remote(base_number={})".format(i))
result = ray.get(handle.remote(base_number=i))

View file

@ -45,9 +45,9 @@ class MagicCounter:
return await self.handle_batch(base_number)
client = serve.start()
client.create_backend("counter:v1", MagicCounter, 42) # increment=42
client.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
serve.start()
serve.create_backend("counter:v1", MagicCounter, 42) # increment=42
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
print("Sending ten queries via HTTP")
for i in range(10):
@ -59,7 +59,7 @@ for i in range(10):
time.sleep(0.2)
print("Sending ten queries via Python")
handle = client.get_handle("magic_counter")
handle = serve.get_handle("magic_counter")
for i in range(10):
print("> Pinging handle.remote(base_number={})".format(i))
result = ray.get(handle.remote(base_number=i))

View file

@ -24,11 +24,11 @@ class MagicCounter:
return await self.handle_batch(base_number)
client = serve.start()
client.create_backend("counter:v1", MagicCounter, 42) # increment=42
client.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
serve.start()
serve.create_backend("counter:v1", MagicCounter, 42) # increment=42
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
handle = client.get_handle("magic_counter")
handle = serve.get_handle("magic_counter")
future_list = []
# fire 30 requests

View file

@ -38,9 +38,9 @@ def echo(_):
raise Exception("Something went wrong...")
client = serve.start()
serve.start()
client.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
for _ in range(2):
resp = requests.get("http://127.0.0.1:8000/echo").json()
@ -49,6 +49,6 @@ for _ in range(2):
print("...Sleeping for 2 seconds...")
time.sleep(2)
handle = client.get_handle("my_endpoint")
handle = serve.get_handle("my_endpoint")
print("Invoke from python will raise exception with traceback:")
ray.get(handle.remote())

View file

@ -7,7 +7,7 @@ import ray.serve as serve
# initialize ray serve system.
ray.init(num_cpus=10)
client = serve.start()
serve.start()
# a backend can be a function or class.
@ -17,16 +17,16 @@ def echo_v1(starlette_request):
return response
client.create_backend("echo:v1", echo_v1)
serve.create_backend("echo:v1", echo_v1)
# An endpoint is associated with an HTTP path and traffic to the endpoint
# will be serviced by the echo:v1 backend.
client.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
print(requests.get("http://127.0.0.1:8000/echo", timeout=0.5).text)
# The service will be reachable from http
print(ray.get(client.get_handle("my_endpoint").remote(response="hello")))
print(ray.get(serve.get_handle("my_endpoint").remote(response="hello")))
# as well as within the ray system.
@ -37,10 +37,10 @@ def echo_v2(starlette_request):
return "something new"
client.create_backend("echo:v2", echo_v2)
serve.create_backend("echo:v2", echo_v2)
# The two backend will now split the traffic 50%-50%.
client.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
# Observe requests are now split between two backends.
for _ in range(10):
@ -48,5 +48,5 @@ for _ in range(10):
time.sleep(0.2)
# You can also change number of replicas for each backend independently.
client.update_backend_config("echo:v1", {"num_replicas": 2})
client.update_backend_config("echo:v2", {"num_replicas": 2})
serve.update_backend_config("echo:v1", {"num_replicas": 2})
serve.update_backend_config("echo:v2", {"num_replicas": 2})

View file

@ -6,7 +6,7 @@ import ray.serve as serve
import time
# Initialize ray serve instance.
client = serve.start()
serve.start()
# A backend can be a function or class.
@ -15,32 +15,32 @@ def echo_v1(_, response="hello from python!"):
return f"echo_v1({response})"
client.create_backend("echo_v1", echo_v1)
client.create_endpoint("echo_v1", backend="echo_v1", route="/echo_v1")
serve.create_backend("echo_v1", echo_v1)
serve.create_endpoint("echo_v1", backend="echo_v1", route="/echo_v1")
def echo_v2(_, relay=""):
return f"echo_v2({relay})"
client.create_backend("echo_v2", echo_v2)
client.create_endpoint("echo_v2", backend="echo_v2", route="/echo_v2")
serve.create_backend("echo_v2", echo_v2)
serve.create_endpoint("echo_v2", backend="echo_v2", route="/echo_v2")
def echo_v3(_, relay=""):
return f"echo_v3({relay})"
client.create_backend("echo_v3", echo_v3)
client.create_endpoint("echo_v3", backend="echo_v3", route="/echo_v3")
serve.create_backend("echo_v3", echo_v3)
serve.create_endpoint("echo_v3", backend="echo_v3", route="/echo_v3")
def echo_v4(_, relay1="", relay2=""):
return f"echo_v4({relay1} , {relay2})"
client.create_backend("echo_v4", echo_v4)
client.create_endpoint("echo_v4", backend="echo_v4", route="/echo_v4")
serve.create_backend("echo_v4", echo_v4)
serve.create_endpoint("echo_v4", backend="echo_v4", route="/echo_v4")
"""
The pipeline created is as follows -
"my_endpoint1"
@ -62,10 +62,10 @@ The pipeline created is as follows -
"""
# get the handle of the endpoints
handle1 = client.get_handle("echo_v1")
handle2 = client.get_handle("echo_v2")
handle3 = client.get_handle("echo_v3")
handle4 = client.get_handle("echo_v4")
handle1 = serve.get_handle("echo_v1")
handle2 = serve.get_handle("echo_v2")
handle3 = serve.get_handle("echo_v3")
handle4 = serve.get_handle("echo_v4")
start = time.time()
print("Start firing to the pipeline: {} s".format(time.time()))

View file

@ -30,10 +30,10 @@ def echo_v2(_):
return "v2"
client = serve.start()
serve.start()
client.create_backend("echo:v1", echo_v1)
client.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
serve.create_backend("echo:v1", echo_v1)
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
for _ in range(3):
resp = requests.get("http://127.0.0.1:8000/echo").json()
@ -42,8 +42,8 @@ for _ in range(3):
print("...Sleeping for 2 seconds...")
time.sleep(2)
client.create_backend("echo:v2", echo_v2)
client.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
serve.create_backend("echo:v2", echo_v2)
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
while True:
resp = requests.get("http://127.0.0.1:8000/echo").json()
print(pformat_color_json(resp))

View file

@ -7,6 +7,7 @@ import starlette.responses
import starlette.routing
import ray
from ray import serve
from ray.exceptions import RayTaskError
from ray.serve.common import EndpointTag
from ray.serve.constants import LongPollKey
@ -45,7 +46,7 @@ class ServeStarletteEndpoint:
headers = {k.decode(): v.decode() for k, v in scope["headers"]}
if self.handle is None:
self.handle = self.client.get_handle(self.endpoint_tag, sync=False)
self.handle = serve.get_handle(self.endpoint_tag, sync=False)
object_ref = await self.handle.options(
method_name=headers.get("X-SERVE-CALL-METHOD".lower(),

View file

@ -36,12 +36,12 @@ def _shared_serve_instance():
@pytest.fixture
def serve_instance(_shared_serve_instance):
yield _shared_serve_instance
controller = _shared_serve_instance._controller
controller = serve.api._global_client._controller
# Clear all state between tests to avoid naming collisions.
for endpoint in ray.get(controller.get_all_endpoints.remote()):
_shared_serve_instance.delete_endpoint(endpoint)
serve.delete_endpoint(endpoint)
for backend in ray.get(controller.get_all_backends.remote()).keys():
_shared_serve_instance.delete_backend(backend, force=True)
serve.delete_backend(backend, force=True)
@pytest.fixture

View file

@ -1,7 +1,6 @@
import asyncio
import time
import requests
import pytest
import ray
@ -11,29 +10,25 @@ from ray.serve.config import BackendConfig
def test_serve_forceful_shutdown(serve_instance):
client = serve_instance
def sleeper(_):
while True:
time.sleep(1000)
client.create_backend(
serve.create_backend(
"sleeper",
sleeper,
config=BackendConfig(experimental_graceful_shutdown_timeout_s=1))
client.create_endpoint("sleeper", backend="sleeper")
handle = client.get_handle("sleeper")
serve.create_endpoint("sleeper", backend="sleeper")
handle = serve.get_handle("sleeper")
ref = handle.remote()
client.delete_endpoint("sleeper")
client.delete_backend("sleeper")
serve.delete_endpoint("sleeper")
serve.delete_backend("sleeper")
with pytest.raises(ray.exceptions.RayActorError):
ray.get(ref)
def test_serve_graceful_shutdown(serve_instance):
client = serve_instance
signal = SignalActor.remote()
class WaitBackend:
@ -43,7 +38,7 @@ def test_serve_graceful_shutdown(serve_instance):
await signal_actor.wait.remote()
return ["" for _ in range(len(requests))]
client.create_backend(
serve.create_backend(
"wait",
WaitBackend,
config=BackendConfig(
@ -53,8 +48,8 @@ def test_serve_graceful_shutdown(serve_instance):
experimental_graceful_shutdown_wait_loop_s=0.5,
experimental_graceful_shutdown_timeout_s=1000,
))
client.create_endpoint("wait", backend="wait")
handle = client.get_handle("wait")
serve.create_endpoint("wait", backend="wait")
handle = serve.get_handle("wait")
refs = [handle.remote(signal) for _ in range(10)]
# Wait for all the queries to be enqueued
@ -63,9 +58,8 @@ def test_serve_graceful_shutdown(serve_instance):
@ray.remote(num_cpus=0)
def do_blocking_delete():
client = serve.connect()
client.delete_endpoint("wait")
client.delete_backend("wait")
serve.delete_endpoint("wait")
serve.delete_backend("wait")
# Now delete the backend. This should trigger the shutdown sequence.
delete_ref = do_blocking_delete.remote()
@ -83,47 +77,7 @@ def test_serve_graceful_shutdown(serve_instance):
ray.get(delete_ref)
def test_multiple_instances():
route = "/api"
backend = "backend"
endpoint = "endpoint"
client1 = serve.start(http_port=8001)
def function(_):
return "hello1"
client1.create_backend(backend, function)
client1.create_endpoint(endpoint, backend=backend, route=route)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Create a second cluster on port 8002. Create an endpoint and backend with
# the same names and check that they don't collide.
client2 = serve.start(http_port=8002)
def function(_):
return "hello2"
client2.create_backend(backend, function)
client2.create_endpoint(endpoint, backend=backend, route=route)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
assert requests.get("http://127.0.0.1:8002" + route).text == "hello2"
# Check that deleting the backend in the current cluster doesn't.
client2.delete_endpoint(endpoint)
client2.delete_backend(backend)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Check that the first client still works.
client1.delete_endpoint(endpoint)
client1.delete_backend(backend)
def test_parallel_start(serve_instance):
client = serve_instance
# Test the ability to start multiple replicas in parallel.
# In the past, when Serve scale up a backend, it does so one by one and
# wait for each replica to initialize. This test avoid this by preventing
@ -153,9 +107,9 @@ def test_parallel_start(serve_instance):
return "Ready"
config = BackendConfig(num_replicas=2)
client.create_backend("p:v0", LongStartingServable, config=config)
client.create_endpoint("test-parallel", backend="p:v0")
handle = client.get_handle("test-parallel")
serve.create_backend("p:v0", LongStartingServable, config=config)
serve.create_endpoint("test-parallel", backend="p:v0")
handle = serve.get_handle("test-parallel")
ray.get(handle.remote(), timeout=10)

View file

@ -10,20 +10,16 @@ import starlette.responses
import ray
from ray import serve
from ray.test_utils import wait_for_condition
from ray.serve.constants import SERVE_PROXY_NAME
from ray.serve.exceptions import RayServeException
from ray.serve.config import BackendConfig
from ray.serve.utils import (format_actor_name, get_random_letters)
from ray.serve.utils import get_random_letters
def test_e2e(serve_instance):
client = serve_instance
def function(starlette_request):
return {"method": starlette_request.method}
client.create_backend("echo:v1", function)
client.create_endpoint(
serve.create_backend("echo:v1", function)
serve.create_endpoint(
"endpoint", backend="echo:v1", route="/api", methods=["GET", "POST"])
resp = requests.get("http://127.0.0.1:8000/api").json()["method"]
@ -34,14 +30,12 @@ def test_e2e(serve_instance):
def test_starlette_response(serve_instance):
client = serve_instance
def basic_response(_):
return starlette.responses.Response(
"Hello, world!", media_type="text/plain")
client.create_backend("basic_response", basic_response)
client.create_endpoint(
serve.create_backend("basic_response", basic_response)
serve.create_endpoint(
"basic_response", backend="basic_response", route="/basic_response")
assert requests.get(
"http://127.0.0.1:8000/basic_response").text == "Hello, world!"
@ -50,8 +44,8 @@ def test_starlette_response(serve_instance):
return starlette.responses.HTMLResponse(
"<html><body><h1>Hello, world!</h1></body></html>")
client.create_backend("html_response", html_response)
client.create_endpoint(
serve.create_backend("html_response", html_response)
serve.create_endpoint(
"html_response", backend="html_response", route="/html_response")
assert requests.get(
"http://127.0.0.1:8000/html_response"
@ -60,8 +54,8 @@ def test_starlette_response(serve_instance):
def plain_text_response(_):
return starlette.responses.PlainTextResponse("Hello, world!")
client.create_backend("plain_text_response", plain_text_response)
client.create_endpoint(
serve.create_backend("plain_text_response", plain_text_response)
serve.create_endpoint(
"plain_text_response",
backend="plain_text_response",
route="/plain_text_response")
@ -71,8 +65,8 @@ def test_starlette_response(serve_instance):
def json_response(_):
return starlette.responses.JSONResponse({"hello": "world"})
client.create_backend("json_response", json_response)
client.create_endpoint(
serve.create_backend("json_response", json_response)
serve.create_endpoint(
"json_response", backend="json_response", route="/json_response")
assert requests.get("http://127.0.0.1:8000/json_response").json()[
"hello"] == "world"
@ -81,8 +75,8 @@ def test_starlette_response(serve_instance):
return starlette.responses.RedirectResponse(
url="http://127.0.0.1:8000/basic_response")
client.create_backend("redirect_response", redirect_response)
client.create_endpoint(
serve.create_backend("redirect_response", redirect_response)
serve.create_endpoint(
"redirect_response",
backend="redirect_response",
route="/redirect_response")
@ -98,8 +92,8 @@ def test_starlette_response(serve_instance):
return starlette.responses.StreamingResponse(
slow_numbers(), media_type="text/plain")
client.create_backend("streaming_response", streaming_response)
client.create_endpoint(
serve.create_backend("streaming_response", streaming_response)
serve.create_endpoint(
"streaming_response",
backend="streaming_response",
route="/streaming_response")
@ -108,8 +102,6 @@ def test_starlette_response(serve_instance):
def test_backend_user_config(serve_instance):
client = serve_instance
class Counter:
def __init__(self):
self.count = 10
@ -121,9 +113,9 @@ def test_backend_user_config(serve_instance):
self.count = config["count"]
config = BackendConfig(num_replicas=2, user_config={"count": 123, "b": 2})
client.create_backend("counter", Counter, config=config)
client.create_endpoint("counter", backend="counter")
handle = client.get_handle("counter")
serve.create_backend("counter", Counter, config=config)
serve.create_endpoint("counter", backend="counter")
handle = serve.get_handle("counter")
def check(val, num_replicas):
pids_seen = set()
@ -136,23 +128,21 @@ def test_backend_user_config(serve_instance):
wait_for_condition(lambda: check("123", 2))
client.update_backend_config("counter", BackendConfig(num_replicas=3))
serve.update_backend_config("counter", BackendConfig(num_replicas=3))
wait_for_condition(lambda: check("123", 3))
config = BackendConfig(user_config={"count": 456})
client.update_backend_config("counter", config)
serve.update_backend_config("counter", config)
wait_for_condition(lambda: check("456", 3))
def test_call_method(serve_instance):
client = serve_instance
class CallMethod:
def method(self, request):
return "hello"
client.create_backend("backend", CallMethod)
client.create_endpoint("endpoint", backend="backend", route="/api")
serve.create_backend("backend", CallMethod)
serve.create_endpoint("endpoint", backend="backend", route="/api")
# Test HTTP path.
resp = requests.get(
@ -162,69 +152,59 @@ def test_call_method(serve_instance):
assert resp.text == "hello"
# Test serve handle path.
handle = client.get_handle("endpoint")
handle = serve.get_handle("endpoint")
assert ray.get(handle.options(method_name="method").remote()) == "hello"
def test_no_route(serve_instance):
client = serve_instance
def func(_, i=1):
return 1
client.create_backend("backend:1", func)
client.create_endpoint("noroute-endpoint", backend="backend:1")
service_handle = client.get_handle("noroute-endpoint")
serve.create_backend("backend:1", func)
serve.create_endpoint("noroute-endpoint", backend="backend:1")
service_handle = serve.get_handle("noroute-endpoint")
result = ray.get(service_handle.remote(i=1))
assert result == 1
def test_reject_duplicate_backend(serve_instance):
client = serve_instance
def f():
pass
def g():
pass
client.create_backend("backend", f)
serve.create_backend("backend", f)
with pytest.raises(ValueError):
client.create_backend("backend", g)
serve.create_backend("backend", g)
def test_reject_duplicate_route(serve_instance):
client = serve_instance
def f():
pass
client.create_backend("backend", f)
serve.create_backend("backend", f)
route = "/foo"
client.create_endpoint("bar", backend="backend", route=route)
serve.create_endpoint("bar", backend="backend", route=route)
with pytest.raises(ValueError):
client.create_endpoint("foo", backend="backend", route=route)
serve.create_endpoint("foo", backend="backend", route=route)
def test_reject_duplicate_endpoint(serve_instance):
client = serve_instance
def f():
pass
client.create_backend("backend", f)
serve.create_backend("backend", f)
endpoint_name = "foo"
client.create_endpoint(endpoint_name, backend="backend", route="/ok")
serve.create_endpoint(endpoint_name, backend="backend", route="/ok")
with pytest.raises(ValueError):
client.create_endpoint(
serve.create_endpoint(
endpoint_name, backend="backend", route="/different")
def test_reject_duplicate_endpoint_and_route(serve_instance):
client = serve_instance
class SimpleBackend(object):
def __init__(self, message):
self.message = message
@ -232,44 +212,26 @@ def test_reject_duplicate_endpoint_and_route(serve_instance):
def __call__(self, *args, **kwargs):
return {"message": self.message}
client.create_backend("backend1", SimpleBackend, "First")
client.create_backend("backend2", SimpleBackend, "Second")
serve.create_backend("backend1", SimpleBackend, "First")
serve.create_backend("backend2", SimpleBackend, "Second")
client.create_endpoint("test", backend="backend1", route="/test")
serve.create_endpoint("test", backend="backend1", route="/test")
with pytest.raises(ValueError):
client.create_endpoint("test", backend="backend2", route="/test")
def test_no_http(serve_instance):
client = serve.start(http_host=None)
assert len(ray.get(client._controller.get_http_proxies.remote())) == 0
def hello(*args):
return "hello"
client.create_backend("backend", hello)
client.create_endpoint("endpoint", backend="backend")
assert ray.get(client.get_handle("endpoint").remote()) == "hello"
serve.create_endpoint("test", backend="backend2", route="/test")
def test_set_traffic_missing_data(serve_instance):
client = serve_instance
endpoint_name = "foobar"
backend_name = "foo_backend"
client.create_backend(backend_name, lambda: 5)
client.create_endpoint(endpoint_name, backend=backend_name)
serve.create_backend(backend_name, lambda: 5)
serve.create_endpoint(endpoint_name, backend=backend_name)
with pytest.raises(ValueError):
client.set_traffic(endpoint_name, {"nonexistent_backend": 1.0})
serve.set_traffic(endpoint_name, {"nonexistent_backend": 1.0})
with pytest.raises(ValueError):
client.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0})
serve.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0})
def test_scaling_replicas(serve_instance):
client = serve_instance
class Counter:
def __init__(self):
self.count = 0
@ -279,9 +241,9 @@ def test_scaling_replicas(serve_instance):
return self.count
config = BackendConfig(num_replicas=2)
client.create_backend("counter:v1", Counter, config=config)
serve.create_backend("counter:v1", Counter, config=config)
client.create_endpoint("counter", backend="counter:v1", route="/increment")
serve.create_endpoint("counter", backend="counter:v1", route="/increment")
counter_result = []
for _ in range(10):
@ -292,7 +254,7 @@ def test_scaling_replicas(serve_instance):
assert max(counter_result) < 10
update_config = BackendConfig(num_replicas=1)
client.update_backend_config("counter:v1", update_config)
serve.update_backend_config("counter:v1", update_config)
counter_result = []
for _ in range(10):
@ -304,8 +266,6 @@ def test_scaling_replicas(serve_instance):
def test_updating_config(serve_instance):
client = serve_instance
class BatchSimple:
def __init__(self):
self.count = 0
@ -315,15 +275,15 @@ def test_updating_config(serve_instance):
return [1] * len(request)
config = BackendConfig(max_batch_size=2, num_replicas=3)
client.create_backend("bsimple:v1", BatchSimple, config=config)
client.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple")
serve.create_backend("bsimple:v1", BatchSimple, config=config)
serve.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple")
controller = client._controller
controller = serve.api._global_client._controller
old_replica_tag_list = list(
ray.get(controller._all_replica_handles.remote())["bsimple:v1"].keys())
update_config = BackendConfig(max_batch_size=5)
client.update_backend_config("bsimple:v1", update_config)
serve.update_backend_config("bsimple:v1", update_config)
new_replica_tag_list = list(
ray.get(controller._all_replica_handles.remote())["bsimple:v1"].keys())
new_all_tag_list = []
@ -338,41 +298,39 @@ def test_updating_config(serve_instance):
def test_delete_backend(serve_instance):
client = serve_instance
def function(_):
return "hello"
client.create_backend("delete:v1", function)
client.create_endpoint(
serve.create_backend("delete:v1", function)
serve.create_endpoint(
"delete_backend", backend="delete:v1", route="/delete-backend")
assert requests.get("http://127.0.0.1:8000/delete-backend").text == "hello"
# Check that we can't delete the backend while it's in use.
with pytest.raises(ValueError):
client.delete_backend("delete:v1")
serve.delete_backend("delete:v1")
client.create_backend("delete:v2", function)
client.set_traffic("delete_backend", {"delete:v1": 0.5, "delete:v2": 0.5})
serve.create_backend("delete:v2", function)
serve.set_traffic("delete_backend", {"delete:v1": 0.5, "delete:v2": 0.5})
with pytest.raises(ValueError):
client.delete_backend("delete:v1")
serve.delete_backend("delete:v1")
# Check that the backend can be deleted once it's no longer in use.
client.set_traffic("delete_backend", {"delete:v2": 1.0})
client.delete_backend("delete:v1")
serve.set_traffic("delete_backend", {"delete:v2": 1.0})
serve.delete_backend("delete:v1")
# Check that we can no longer use the previously deleted backend.
with pytest.raises(ValueError):
client.set_traffic("delete_backend", {"delete:v1": 1.0})
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
def function2(_):
return "olleh"
# Check that we can now reuse the previously delete backend's tag.
client.create_backend("delete:v1", function2)
client.set_traffic("delete_backend", {"delete:v1": 1.0})
serve.create_backend("delete:v1", function2)
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
for _ in range(10):
try:
@ -388,44 +346,40 @@ def test_delete_backend(serve_instance):
@pytest.mark.parametrize("route", [None, "/delete-endpoint"])
def test_delete_endpoint(serve_instance, route):
client = serve_instance
def function(_):
return "hello"
backend_name = "delete-endpoint:v1"
client.create_backend(backend_name, function)
serve.create_backend(backend_name, function)
endpoint_name = "delete_endpoint" + str(route)
client.create_endpoint(endpoint_name, backend=backend_name, route=route)
client.delete_endpoint(endpoint_name)
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
serve.delete_endpoint(endpoint_name)
# Check that we can reuse a deleted endpoint name and route.
client.create_endpoint(endpoint_name, backend=backend_name, route=route)
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
if route is not None:
assert requests.get(
"http://127.0.0.1:8000/delete-endpoint").text == "hello"
else:
handle = client.get_handle(endpoint_name)
handle = serve.get_handle(endpoint_name)
assert ray.get(handle.remote()) == "hello"
# Check that deleting the endpoint doesn't delete the backend.
client.delete_endpoint(endpoint_name)
client.create_endpoint(endpoint_name, backend=backend_name, route=route)
serve.delete_endpoint(endpoint_name)
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
if route is not None:
assert requests.get(
"http://127.0.0.1:8000/delete-endpoint").text == "hello"
else:
handle = client.get_handle(endpoint_name)
handle = serve.get_handle(endpoint_name)
assert ray.get(handle.remote()) == "hello"
@pytest.mark.parametrize("route", [None, "/shard"])
def test_shard_key(serve_instance, route):
client = serve_instance
# Create five backends that return different integers.
num_backends = 5
traffic_dict = {}
@ -436,11 +390,11 @@ def test_shard_key(serve_instance, route):
backend_name = "backend-split-" + str(i)
traffic_dict[backend_name] = 1.0 / num_backends
client.create_backend(backend_name, function)
serve.create_backend(backend_name, function)
client.create_endpoint(
serve.create_endpoint(
"endpoint", backend=list(traffic_dict.keys())[0], route=route)
client.set_traffic("endpoint", traffic_dict)
serve.set_traffic("endpoint", traffic_dict)
def do_request(shard_key):
if route is not None:
@ -448,7 +402,7 @@ def test_shard_key(serve_instance, route):
headers = {"X-SERVE-SHARD-KEY": shard_key}
result = requests.get(url, headers=headers).text
else:
handle = client.get_handle("endpoint").options(shard_key=shard_key)
handle = serve.get_handle("endpoint").options(shard_key=shard_key)
result = ray.get(handle.options(shard_key=shard_key).remote())
return result
@ -464,20 +418,18 @@ def test_shard_key(serve_instance, route):
def test_list_endpoints(serve_instance):
client = serve_instance
def f():
pass
client.create_backend("backend", f)
client.create_backend("backend2", f)
client.create_backend("backend3", f)
client.create_endpoint(
serve.create_backend("backend", f)
serve.create_backend("backend2", f)
serve.create_backend("backend3", f)
serve.create_endpoint(
"endpoint", backend="backend", route="/api", methods=["GET", "POST"])
client.create_endpoint("endpoint2", backend="backend2", methods=["POST"])
client.shadow_traffic("endpoint", "backend3", 0.5)
serve.create_endpoint("endpoint2", backend="backend2", methods=["POST"])
serve.shadow_traffic("endpoint", "backend3", 0.5)
endpoints = client.list_endpoints()
endpoints = serve.list_endpoints()
assert "endpoint" in endpoints
assert endpoints["endpoint"] == {
"route": "/api",
@ -500,88 +452,55 @@ def test_list_endpoints(serve_instance):
"shadows": {}
}
client.delete_endpoint("endpoint")
assert "endpoint2" in client.list_endpoints()
serve.delete_endpoint("endpoint")
assert "endpoint2" in serve.list_endpoints()
client.delete_endpoint("endpoint2")
assert len(client.list_endpoints()) == 0
serve.delete_endpoint("endpoint2")
assert len(serve.list_endpoints()) == 0
def test_list_backends(serve_instance):
client = serve_instance
@serve.accept_batch
def f():
pass
config1 = BackendConfig(max_batch_size=10)
client.create_backend("backend", f, config=config1)
backends = client.list_backends()
serve.create_backend("backend", f, config=config1)
backends = serve.list_backends()
assert len(backends) == 1
assert "backend" in backends
assert backends["backend"].max_batch_size == 10
config2 = BackendConfig(num_replicas=10)
client.create_backend("backend2", f, config=config2)
backends = client.list_backends()
serve.create_backend("backend2", f, config=config2)
backends = serve.list_backends()
assert len(backends) == 2
assert backends["backend2"].num_replicas == 10
client.delete_backend("backend")
backends = client.list_backends()
serve.delete_backend("backend")
backends = serve.list_backends()
assert len(backends) == 1
assert "backend2" in backends
client.delete_backend("backend2")
assert len(client.list_backends()) == 0
serve.delete_backend("backend2")
assert len(serve.list_backends()) == 0
def test_endpoint_input_validation(serve_instance):
client = serve_instance
def f():
pass
client.create_backend("backend", f)
serve.create_backend("backend", f)
with pytest.raises(TypeError):
client.create_endpoint("endpoint")
serve.create_endpoint("endpoint")
with pytest.raises(TypeError):
client.create_endpoint("endpoint", route="/hello")
serve.create_endpoint("endpoint", route="/hello")
with pytest.raises(TypeError):
client.create_endpoint("endpoint", backend=2)
client.create_endpoint("endpoint", backend="backend")
def test_shutdown():
def f():
pass
client = serve.start(http_port=8003)
client.create_backend("backend", f)
client.create_endpoint("endpoint", backend="backend")
client.shutdown()
with pytest.raises(RayServeException):
client.list_backends()
def check_dead():
for actor_name in [
client._controller_name,
format_actor_name(SERVE_PROXY_NAME, client._controller_name)
]:
try:
ray.get_actor(actor_name)
return False
except ValueError:
pass
return True
wait_for_condition(check_dead)
serve.create_endpoint("endpoint", backend=2)
serve.create_endpoint("endpoint", backend="backend")
def test_shadow_traffic(serve_instance):
client = serve_instance
@ray.remote
class RequestCounter:
def __init__(self):
@ -611,15 +530,15 @@ def test_shadow_traffic(serve_instance):
ray.get(counter.record.remote("backend4"))
return "oops"
client.create_backend("backend1", f)
client.create_backend("backend2", f_shadow_1)
client.create_backend("backend3", f_shadow_2)
client.create_backend("backend4", f_shadow_3)
serve.create_backend("backend1", f)
serve.create_backend("backend2", f_shadow_1)
serve.create_backend("backend3", f_shadow_2)
serve.create_backend("backend4", f_shadow_3)
client.create_endpoint("endpoint", backend="backend1", route="/api")
client.shadow_traffic("endpoint", "backend2", 1.0)
client.shadow_traffic("endpoint", "backend3", 0.5)
client.shadow_traffic("endpoint", "backend4", 0.1)
serve.create_endpoint("endpoint", backend="backend1", route="/api")
serve.shadow_traffic("endpoint", "backend2", 1.0)
serve.shadow_traffic("endpoint", "backend3", 0.5)
serve.shadow_traffic("endpoint", "backend4", 0.1)
start = time.time()
num_requests = 100
@ -642,41 +561,7 @@ def test_shadow_traffic(serve_instance):
wait_for_condition(check_requests)
def test_connect(serve_instance):
client = serve_instance
# Check that you can have multiple clients to the same detached instance.
client2 = serve.connect()
assert client._controller_name == client2._controller_name
# Check that you can have detached and non-detached instances.
client3 = serve.start(http_port=8004)
assert client3._controller_name != client._controller_name
# Check that you can call serve.connect() from within a backend for both
# detached and non-detached instances.
def connect_in_backend(_):
client = serve.connect()
client.create_backend("backend-ception", connect_in_backend)
return client._controller_name
client.create_backend("connect_in_backend", connect_in_backend)
client.create_endpoint("endpoint", backend="connect_in_backend")
handle = client.get_handle("endpoint")
assert ray.get(handle.remote()) == client._controller_name
assert "backend-ception" in client.list_backends().keys()
client3.create_backend("connect_in_backend", connect_in_backend)
client3.create_endpoint("endpoint", backend="connect_in_backend")
handle = client3.get_handle("endpoint")
assert ray.get(handle.remote()) == client3._controller_name
assert "backend-ception" in client3.list_backends().keys()
def test_starlette_request(serve_instance):
client = serve_instance
async def echo_body(starlette_request):
data = await starlette_request.body()
return data
@ -686,8 +571,8 @@ def test_starlette_request(serve_instance):
# Long string to test serialization of multiple messages.
long_string = "x" * 10 * UVICORN_HIGH_WATER_MARK
client.create_backend("echo:v1", echo_body)
client.create_endpoint(
serve.create_backend("echo:v1", echo_body)
serve.create_endpoint(
"endpoint", backend="echo:v1", route="/api", methods=["GET", "POST"])
resp = requests.post("http://127.0.0.1:8000/api", data=long_string).text
@ -695,16 +580,14 @@ def test_starlette_request(serve_instance):
def test_variable_routes(serve_instance):
client = serve_instance
def f(starlette_request):
return starlette_request.path_params
client.create_backend("f", f)
client.create_endpoint("basic", backend="f", route="/api/{username}")
serve.create_backend("f", f)
serve.create_endpoint("basic", backend="f", route="/api/{username}")
# Test multiple variables and test type conversion
client.create_endpoint(
serve.create_endpoint(
"complex", backend="f", route="/api/{user_id:int}/{number:float}")
assert requests.get("http://127.0.0.1:8000/api/scaly").json() == {

View file

@ -8,8 +8,6 @@ from ray.serve.config import BackendConfig
def test_batching(serve_instance):
client = serve_instance
class BatchingExample:
def __init__(self):
self.count = 0
@ -22,12 +20,12 @@ def test_batching(serve_instance):
# set the max batch size
config = BackendConfig(max_batch_size=5, batch_wait_timeout=1)
client.create_backend("counter:v11", BatchingExample, config=config)
client.create_endpoint(
serve.create_backend("counter:v11", BatchingExample, config=config)
serve.create_endpoint(
"counter1", backend="counter:v11", route="/increment2")
future_list = []
handle = client.get_handle("counter1")
handle = serve.get_handle("counter1")
for _ in range(20):
f = handle.remote(temp=1)
future_list.append(f)
@ -40,8 +38,6 @@ def test_batching(serve_instance):
def test_batching_exception(serve_instance):
client = serve_instance
class NoListReturned:
def __init__(self):
self.count = 0
@ -52,17 +48,15 @@ def test_batching_exception(serve_instance):
# Set the max batch size.
config = BackendConfig(max_batch_size=5)
client.create_backend("exception:v1", NoListReturned, config=config)
client.create_endpoint("exception-test", backend="exception:v1")
serve.create_backend("exception:v1", NoListReturned, config=config)
serve.create_endpoint("exception-test", backend="exception:v1")
handle = client.get_handle("exception-test")
handle = serve.get_handle("exception-test")
with pytest.raises(ray.exceptions.RayTaskError):
assert ray.get(handle.remote(temp=1))
def test_app_level_batching(serve_instance):
client = serve_instance
class BatchingExample:
def __init__(self):
self.count = 0
@ -77,12 +71,12 @@ def test_app_level_batching(serve_instance):
return await self.handle_batch(request)
# set the max batch size
client.create_backend("counter:v11", BatchingExample)
client.create_endpoint(
serve.create_backend("counter:v11", BatchingExample)
serve.create_endpoint(
"counter1", backend="counter:v11", route="/increment2")
future_list = []
handle = client.get_handle("counter1")
handle = serve.get_handle("counter1")
for _ in range(20):
f = handle.remote(temp=1)
future_list.append(f)
@ -95,8 +89,6 @@ def test_app_level_batching(serve_instance):
def test_app_level_batching_exception(serve_instance):
client = serve_instance
class NoListReturned:
def __init__(self):
self.count = 0
@ -109,10 +101,10 @@ def test_app_level_batching_exception(serve_instance):
return await self.handle_batch(request)
# Set the max batch size.
client.create_backend("exception:v1", NoListReturned)
client.create_endpoint("exception-test", backend="exception:v1")
serve.create_backend("exception:v1", NoListReturned)
serve.create_endpoint("exception-test", backend="exception:v1")
handle = client.get_handle("exception-test")
handle = serve.get_handle("exception-test")
with pytest.raises(ray.exceptions.RayTaskError):
assert ray.get(handle.remote(temp=1))

View file

@ -1,21 +1,21 @@
import pytest
import ray
from ray import serve
def test_controller_inflight_requests_clear(serve_instance):
client = serve_instance
initial_number_reqs = ray.get(
client._controller._num_pending_goals.remote())
controller = serve.api._global_client._controller
initial_number_reqs = ray.get(controller._num_pending_goals.remote())
def function(_):
return "hello"
client.create_backend("tst", function)
client.create_endpoint("end_pt", backend="tst")
serve.create_backend("tst", function)
serve.create_endpoint("end_pt", backend="tst")
assert ray.get(client._controller._num_pending_goals.remote()
) - initial_number_reqs == 0
assert ray.get(
controller._num_pending_goals.remote()) - initial_number_reqs == 0
if __name__ == "__main__":

View file

@ -5,6 +5,7 @@ import time
import pytest
import ray
from ray import serve
from ray.test_utils import wait_for_condition
from ray.serve.config import BackendConfig, ReplicaConfig
@ -22,13 +23,11 @@ def request_with_retries(endpoint, timeout=30):
def test_controller_failure(serve_instance):
client = serve_instance
def function(_):
return "hello1"
client.create_backend("controller_failure:v1", function)
client.create_endpoint(
serve.create_backend("controller_failure:v1", function)
serve.create_endpoint(
"controller_failure",
backend="controller_failure:v1",
route="/controller_failure")
@ -40,7 +39,7 @@ def test_controller_failure(serve_instance):
response = request_with_retries("/controller_failure", timeout=30)
assert response.text == "hello1"
ray.kill(client._controller, no_restart=False)
ray.kill(serve.api._global_client._controller, no_restart=False)
for _ in range(10):
response = request_with_retries("/controller_failure", timeout=30)
@ -49,10 +48,10 @@ def test_controller_failure(serve_instance):
def function(_):
return "hello2"
ray.kill(client._controller, no_restart=False)
ray.kill(serve.api._global_client._controller, no_restart=False)
client.create_backend("controller_failure:v2", function)
client.set_traffic("controller_failure", {"controller_failure:v2": 1.0})
serve.create_backend("controller_failure:v2", function)
serve.set_traffic("controller_failure", {"controller_failure:v2": 1.0})
def check_controller_failure():
response = request_with_retries("/controller_failure", timeout=30)
@ -63,14 +62,14 @@ def test_controller_failure(serve_instance):
def function(_):
return "hello3"
ray.kill(client._controller, no_restart=False)
client.create_backend("controller_failure_2", function)
ray.kill(client._controller, no_restart=False)
client.create_endpoint(
ray.kill(serve.api._global_client._controller, no_restart=False)
serve.create_backend("controller_failure_2", function)
ray.kill(serve.api._global_client._controller, no_restart=False)
serve.create_endpoint(
"controller_failure_2",
backend="controller_failure_2",
route="/controller_failure_2")
ray.kill(client._controller, no_restart=False)
ray.kill(serve.api._global_client._controller, no_restart=False)
for _ in range(10):
response = request_with_retries("/controller_failure", timeout=30)
@ -79,20 +78,19 @@ def test_controller_failure(serve_instance):
assert response.text == "hello3"
def _kill_http_proxies(client):
http_proxies = ray.get(client._controller.get_http_proxies.remote())
def _kill_http_proxies():
http_proxies = ray.get(
serve.api._global_client._controller.get_http_proxies.remote())
for http_proxy in http_proxies.values():
ray.kill(http_proxy, no_restart=False)
def test_http_proxy_failure(serve_instance):
client = serve_instance
def function(_):
return "hello1"
client.create_backend("proxy_failure:v1", function)
client.create_endpoint(
serve.create_backend("proxy_failure:v1", function)
serve.create_endpoint(
"proxy_failure", backend="proxy_failure:v1", route="/proxy_failure")
assert request_with_retries("/proxy_failure", timeout=1.0).text == "hello1"
@ -101,21 +99,21 @@ def test_http_proxy_failure(serve_instance):
response = request_with_retries("/proxy_failure", timeout=30)
assert response.text == "hello1"
_kill_http_proxies(client)
_kill_http_proxies()
def function(_):
return "hello2"
client.create_backend("proxy_failure:v2", function)
client.set_traffic("proxy_failure", {"proxy_failure:v2": 1.0})
serve.create_backend("proxy_failure:v2", function)
serve.set_traffic("proxy_failure", {"proxy_failure:v2": 1.0})
for _ in range(10):
response = request_with_retries("/proxy_failure", timeout=30)
assert response.text == "hello2"
def _get_worker_handles(client, backend):
controller = client._controller
def _get_worker_handles(backend):
controller = serve.api._global_client._controller
backend_dict = ray.get(controller._all_replica_handles.remote())
return list(backend_dict[backend].values())
@ -124,21 +122,19 @@ def _get_worker_handles(client, backend):
# Test that a worker dying unexpectedly causes it to restart and continue
# serving requests.
def test_worker_restart(serve_instance):
client = serve_instance
class Worker1:
def __call__(self, *args):
return os.getpid()
client.create_backend("worker_failure:v1", Worker1)
client.create_endpoint(
serve.create_backend("worker_failure:v1", Worker1)
serve.create_endpoint(
"worker_failure", backend="worker_failure:v1", route="/worker_failure")
# Get the PID of the worker.
old_pid = request_with_retries("/worker_failure", timeout=1).text
# Kill the worker.
handles = _get_worker_handles(client, "worker_failure:v1")
handles = _get_worker_handles("worker_failure:v1")
assert len(handles) == 1
ray.kill(handles[0], no_restart=False)
@ -156,8 +152,6 @@ def test_worker_restart(serve_instance):
# unexpectedly, the others continue to serve requests.
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_worker_replica_failure(serve_instance):
client = serve_instance
@ray.remote
class Counter:
def __init__(self):
@ -181,10 +175,10 @@ def test_worker_replica_failure(serve_instance):
return self.index
counter = Counter.remote()
client.create_backend("replica_failure", Worker, counter)
client.update_backend_config(
serve.create_backend("replica_failure", Worker, counter)
serve.update_backend_config(
"replica_failure", BackendConfig(num_replicas=2))
client.create_endpoint(
serve.create_endpoint(
"replica_failure", backend="replica_failure", route="/replica_failure")
# Wait until both replicas have been started.
@ -201,7 +195,7 @@ def test_worker_replica_failure(serve_instance):
raise TimeoutError("Timed out waiting for replicas after 30s.")
# Kill one of the replicas.
handles = _get_worker_handles(client, "replica_failure")
handles = _get_worker_handles("replica_failure")
assert len(handles) == 2
ray.kill(handles[0], no_restart=False)
@ -218,12 +212,10 @@ def test_worker_replica_failure(serve_instance):
def test_create_backend_idempotent(serve_instance):
client = serve_instance
def f(_):
return "hello"
controller = client._controller
controller = serve.api._global_client._controller
replica_config = ReplicaConfig(f)
backend_config = BackendConfig(num_replicas=1)
@ -235,21 +227,19 @@ def test_create_backend_idempotent(serve_instance):
replica_config)))
assert len(ray.get(controller.get_all_backends.remote())) == 1
client.create_endpoint(
serve.create_endpoint(
"my_endpoint", backend="my_backend", route="/my_route")
assert requests.get("http://127.0.0.1:8000/my_route").text == "hello"
def test_create_endpoint_idempotent(serve_instance):
client = serve_instance
def f(_):
return "hello"
client.create_backend("my_backend", f)
serve.create_backend("my_backend", f)
controller = client._controller
controller = serve.api._global_client._controller
for i in range(10):
ray.get(

View file

@ -6,13 +6,11 @@ from ray import serve
@pytest.mark.asyncio
async def test_async_handle_serializable(serve_instance):
client = serve_instance
def f(_):
return "hello"
client.create_backend("f", f)
client.create_endpoint("f", backend="f")
serve.create_backend("f", f)
serve.create_endpoint("f", backend="f")
@ray.remote
class TaskActor:
@ -21,7 +19,7 @@ async def test_async_handle_serializable(serve_instance):
output = await ref
return output
handle = client.get_handle("f", sync=False)
handle = serve.get_handle("f", sync=False)
task_actor = TaskActor.remote()
result = await task_actor.task.remote(handle)
@ -29,47 +27,42 @@ async def test_async_handle_serializable(serve_instance):
def test_sync_handle_serializable(serve_instance):
client = serve_instance
def f(_):
return "hello"
client.create_backend("f", f)
client.create_endpoint("f", backend="f")
serve.create_backend("f", f)
serve.create_endpoint("f", backend="f")
@ray.remote
def task(handle):
return ray.get(handle.remote())
handle = client.get_handle("f", sync=True)
handle = serve.get_handle("f", sync=True)
result_ref = task.remote(handle)
assert ray.get(result_ref) == "hello"
def test_handle_in_endpoint(serve_instance):
client = serve_instance
class Endpoint1:
def __call__(self, starlette_request):
return "hello"
class Endpoint2:
def __init__(self):
client = serve.connect()
self.handle = client.get_handle("endpoint1")
self.handle = serve.get_handle("endpoint1")
def __call__(self, _):
return ray.get(self.handle.remote())
client.create_backend("endpoint1:v0", Endpoint1)
client.create_endpoint(
serve.create_backend("endpoint1:v0", Endpoint1)
serve.create_endpoint(
"endpoint1",
backend="endpoint1:v0",
route="/endpoint1",
methods=["GET", "POST"])
client.create_backend("endpoint2:v0", Endpoint2)
client.create_endpoint(
serve.create_backend("endpoint2:v0", Endpoint2)
serve.create_endpoint(
"endpoint2",
backend="endpoint2:v0",
route="/endpoint2",
@ -79,8 +72,6 @@ def test_handle_in_endpoint(serve_instance):
def test_handle_http_args(serve_instance):
client = serve_instance
class Endpoint:
async def __call__(self, request):
return {
@ -90,8 +81,8 @@ def test_handle_http_args(serve_instance):
"json": await request.json()
}
client.create_backend("backend", Endpoint)
client.create_endpoint(
serve.create_backend("backend", Endpoint)
serve.create_endpoint(
"endpoint", backend="backend", route="/endpoint", methods=["POST"])
ground_truth = {
@ -113,7 +104,7 @@ def test_handle_http_args(serve_instance):
headers=ground_truth["headers"],
json=ground_truth["json"]).json()
handle = client.get_handle("endpoint")
handle = serve.get_handle("endpoint")
resp_handle = ray.get(
handle.options(
http_method=ground_truth["method"],
@ -127,20 +118,18 @@ def test_handle_http_args(serve_instance):
def test_handle_inject_starlette_request(serve_instance):
client = serve_instance
def echo_request_type(request):
return str(type(request))
client.create_backend("echo:v0", echo_request_type)
client.create_endpoint("echo", backend="echo:v0", route="/echo")
serve.create_backend("echo:v0", echo_request_type)
serve.create_endpoint("echo", backend="echo:v0", route="/echo")
def wrapper_model(web_request):
handle = serve.connect().get_handle("echo")
handle = serve.get_handle("echo")
return ray.get(handle.remote(web_request))
client.create_backend("wrapper:v0", wrapper_model)
client.create_endpoint("wrapper", backend="wrapper:v0", route="/wrapper")
serve.create_backend("wrapper:v0", wrapper_model)
serve.create_endpoint("wrapper", backend="wrapper:v0", route="/wrapper")
for route in ["/echo", "/wrapper"]:
resp = requests.get(f"http://127.0.0.1:8000{route}")
@ -152,8 +141,6 @@ def test_handle_option_chaining(serve_instance):
# https://github.com/ray-project/ray/issues/12802
# https://github.com/ray-project/ray/issues/12798
client = serve_instance
class MultiMethod:
def method_a(self, _):
return "method_a"
@ -164,12 +151,12 @@ def test_handle_option_chaining(serve_instance):
def __call__(self, _):
return "__call__"
client.create_backend("m", MultiMethod)
client.create_endpoint("m", backend="m")
serve.create_backend("m", MultiMethod)
serve.create_endpoint("m", backend="m")
# get_handle should give you a clean handle
handle1 = client.get_handle("m").options(method_name="method_a")
handle2 = client.get_handle("m")
handle1 = serve.get_handle("m").options(method_name="method_a")
handle2 = serve.get_handle("m")
# options().options() override should work
handle3 = handle1.options(method_name="method_b")

View file

@ -1,24 +1,23 @@
import ray
from ray import serve
from ray.serve.config import BackendConfig
def test_imported_backend(serve_instance):
client = serve_instance
config = BackendConfig(user_config="config", max_batch_size=2)
client.create_backend(
serve.create_backend(
"imported",
"ray.serve.utils.MockImportedBackend",
"input_arg",
config=config)
client.create_endpoint("imported", backend="imported")
serve.create_endpoint("imported", backend="imported")
# Basic sanity check.
handle = client.get_handle("imported")
handle = serve.get_handle("imported")
assert ray.get(handle.remote()) == {"arg": "input_arg", "config": "config"}
# Check that updating backend config works.
client.update_backend_config(
serve.update_backend_config(
"imported", BackendConfig(user_config="new_config"))
assert ray.get(handle.remote()) == {
"arg": "input_arg",
@ -30,10 +29,10 @@ def test_imported_backend(serve_instance):
assert ray.get(handle.remote("hello")) == "hello"
# Check that functions work as well.
client.create_backend(
serve.create_backend(
"imported_func",
"ray.serve.utils.mock_imported_function",
config=BackendConfig(max_batch_size=2))
client.create_endpoint("imported_func", backend="imported_func")
handle = client.get_handle("imported_func")
serve.create_endpoint("imported_func", backend="imported_func")
handle = serve.get_handle("imported_func")
assert ray.get(handle.remote("hello")) == "hello"

View file

@ -12,14 +12,12 @@ from ray.serve.utils import block_until_http_ready
def test_serve_metrics(serve_instance):
client = serve_instance
@serve.accept_batch
def batcher(starlette_requests):
return ["hello"] * len(starlette_requests)
client.create_backend("metrics", batcher)
client.create_endpoint("metrics", backend="metrics", route="/metrics")
serve.create_backend("metrics", batcher)
serve.create_endpoint("metrics", backend="metrics", route="/metrics")
# send 10 concurrent requests
url = "http://127.0.0.1:8000/metrics"
@ -72,9 +70,6 @@ def test_serve_metrics(serve_instance):
def test_backend_logger(serve_instance):
# Tests that backend tag and replica tag appear in Serve log output.
client = serve_instance
logger = logging.getLogger("ray")
class Counter:
@ -85,8 +80,8 @@ def test_backend_logger(serve_instance):
self.count += 1
logger.info(f"count: {self.count}")
client.create_backend("my_backend", Counter)
client.create_endpoint(
serve.create_backend("my_backend", Counter)
serve.create_endpoint(
"my_endpoint", backend="my_backend", route="/counter")
f = io.StringIO()
with redirect_stderr(f):

View file

@ -1,26 +1,24 @@
import ray
import ray.test_utils
from ray import serve
def test_new_driver(serve_instance):
client = serve_instance
script = """
import ray
ray.init(address="{}")
from ray import serve
client = serve.connect()
def driver(starlette_request):
return "OK!"
client.create_backend("driver", driver)
client.create_endpoint("driver", backend="driver", route="/driver")
serve.create_backend("driver", driver)
serve.create_endpoint("driver", backend="driver", route="/driver")
""".format(ray.worker._global_node._redis_address)
ray.test_utils.run_string_as_driver(script)
handle = client.get_handle("driver")
handle = serve.get_handle("driver")
assert ray.get(handle.remote()) == "OK!"

View file

@ -23,29 +23,27 @@ def ray_client_instance():
def test_ray_client(ray_client_instance):
ray.util.connect(ray_client_instance)
client = serve.start(detached=True)
serve.start(detached=True)
# TODO(edoakes): disconnecting and reconnecting causes the test to
# spuriously hang.
# ray.util.disconnect()
# ray.util.connect(ray_client_instance)
# client = serve.connect()
def f(*args):
return "hello"
client.create_backend("test1", f)
client.create_endpoint("test1", backend="test1", route="/hello")
serve.create_backend("test1", f)
serve.create_endpoint("test1", backend="test1", route="/hello")
assert requests.get("http://localhost:8000/hello").text == "hello"
# TODO(edoakes): the below tests currently hang.
# assert ray.get(client.get_handle("test1").remote()) == "hello"
# assert ray.get(serve.get_handle("test1").remote()) == "hello"
ray.util.disconnect()
# ray.util.connect(ray_client_instance)
# client = serve.connect()
# client.delete_endpoint("test1")
# client.delete_backend("test1")
# serve.delete_endpoint("test1")
# serve.delete_backend("test1")
if __name__ == "__main__":

View file

@ -11,8 +11,6 @@ from ray.test_utils import SignalActor
def test_np_in_composed_model(serve_instance):
client = serve_instance
# https://github.com/ray-project/ray/issues/9441
# AttributeError: 'bytes' object has no attribute 'readonly'
# in cloudpickle _from_numpy_buffer
@ -22,18 +20,17 @@ def test_np_in_composed_model(serve_instance):
class ComposedModel:
def __init__(self):
client = serve.connect()
self.model = client.get_handle("sum_model")
self.model = serve.get_handle("sum_model")
async def __call__(self, _request):
data = np.ones((10, 10))
result = await self.model.remote(data=data)
return result
client.create_backend("sum_model", sum_model)
client.create_endpoint("sum_model", backend="sum_model")
client.create_backend("model", ComposedModel)
client.create_endpoint(
serve.create_backend("sum_model", sum_model)
serve.create_endpoint("sum_model", backend="sum_model")
serve.create_backend("model", ComposedModel)
serve.create_endpoint(
"model", backend="model", route="/model", methods=["GET"])
result = requests.get("http://127.0.0.1:8000/model")
@ -43,17 +40,15 @@ def test_np_in_composed_model(serve_instance):
def test_backend_worker_memory_growth(serve_instance):
# https://github.com/ray-project/ray/issues/12395
client = serve_instance
def gc_unreachable_objects(starlette_request):
gc.set_debug(gc.DEBUG_SAVEALL)
gc.collect()
return len(gc.garbage)
client.create_backend("model", gc_unreachable_objects)
client.create_endpoint("model", backend="model", route="/model")
serve.create_backend("model", gc_unreachable_objects)
serve.create_endpoint("model", backend="model", route="/model")
handle = client.get_handle("model")
handle = serve.get_handle("model")
for _ in range(10):
result = requests.get("http://127.0.0.1:8000/model")
@ -67,7 +62,6 @@ def test_backend_worker_memory_growth(serve_instance):
def test_ref_in_handle_input(serve_instance):
client = serve_instance
# https://github.com/ray-project/ray/issues/12593
unblock_worker_signal = SignalActor.remote()
@ -76,9 +70,9 @@ def test_ref_in_handle_input(serve_instance):
data = await serve_request.body()
assert not isinstance(data, ray.ObjectRef)
client.create_backend("ref", blocked_by_ref)
client.create_endpoint("ref", backend="ref")
handle = client.get_handle("ref")
serve.create_backend("ref", blocked_by_ref)
serve.create_endpoint("ref", backend="ref")
handle = serve.get_handle("ref")
# Pass in a ref that's not ready yet
ref = unblock_worker_signal.wait.remote()

View file

@ -12,62 +12,134 @@ import ray
from ray import serve
from ray._private.cluster_utils import Cluster
from ray.serve.constants import SERVE_PROXY_NAME
from ray.serve.exceptions import RayServeException
from ray.serve.utils import (block_until_http_ready, get_all_node_ids,
format_actor_name)
from ray.test_utils import wait_for_condition
from ray._private.services import new_port
def test_detached_deployment():
@pytest.fixture
def ray_shutdown():
yield
serve.shutdown()
ray.shutdown()
@pytest.fixture
def ray_cluster():
cluster = Cluster()
yield Cluster()
serve.shutdown()
ray.shutdown()
cluster.shutdown()
def test_shutdown(ray_shutdown):
def f():
pass
ray.init(num_cpus=16)
serve.start(http_port=8003)
serve.create_backend("backend", f)
serve.create_endpoint("endpoint", backend="backend")
actor_names = [
serve.api._global_client._controller_name,
format_actor_name(SERVE_PROXY_NAME,
serve.api._global_client._controller_name,
get_all_node_ids()[0][0])
]
def check_alive():
alive = True
for actor_name in actor_names:
try:
ray.get_actor(actor_name)
except ValueError:
alive = False
return alive
wait_for_condition(check_alive)
serve.shutdown()
with pytest.raises(RayServeException):
serve.list_backends()
def check_dead():
for actor_name in actor_names:
try:
ray.get_actor(actor_name)
return False
except ValueError:
pass
return True
wait_for_condition(check_dead)
def test_detached_deployment(ray_cluster):
# https://github.com/ray-project/ray/issues/11437
cluster = Cluster()
cluster = ray_cluster
head_node = cluster.add_node(node_ip_address="127.0.0.1", num_cpus=6)
# Create first job, check we can run a simple serve endpoint
ray.init(head_node.address)
first_job_id = ray.get_runtime_context().job_id
client = serve.start(detached=True)
client.create_backend("f", lambda _: "hello")
client.create_endpoint("f", backend="f")
assert ray.get(client.get_handle("f").remote()) == "hello"
serve.start(detached=True)
serve.create_backend("f", lambda _: "hello")
serve.create_endpoint("f", backend="f")
assert ray.get(serve.get_handle("f").remote()) == "hello"
serve.api._global_client = None
ray.shutdown()
# Create the second job, make sure we can still create new backends.
ray.init(head_node.address)
assert ray.get_runtime_context().job_id != first_job_id
client = serve.connect()
client.create_backend("g", lambda _: "world")
client.create_endpoint("g", backend="g")
assert ray.get(client.get_handle("g").remote()) == "world"
serve.create_backend("g", lambda _: "world")
serve.create_endpoint("g", backend="g")
assert ray.get(serve.get_handle("g").remote()) == "world"
# Test passed, clean up.
client.shutdown()
ray.shutdown()
cluster.shutdown()
@pytest.mark.parametrize("detached", [True, False])
def test_connect(detached, ray_shutdown):
# Check that you can call serve.connect() from within a backend for both
# detached and non-detached instances.
ray.init(num_cpus=16)
serve.start(detached=detached)
def connect_in_backend(_):
serve.create_backend("backend-ception", connect_in_backend)
serve.create_backend("connect_in_backend", connect_in_backend)
serve.create_endpoint("endpoint", backend="connect_in_backend")
ray.get(serve.get_handle("endpoint").remote())
assert "backend-ception" in serve.list_backends().keys()
@pytest.mark.skipif(
not hasattr(socket, "SO_REUSEPORT"),
reason=("Port sharing only works on newer verion of Linux. "
"This test can only be ran when port sharing is supported."))
def test_multiple_routers():
cluster = Cluster()
def test_multiple_routers(ray_cluster):
cluster = ray_cluster
head_node = cluster.add_node(num_cpus=4)
cluster.add_node(num_cpus=4)
ray.init(head_node.address)
node_ids = ray.state.node_ids()
assert len(node_ids) == 2
client = serve.start(http_options=dict(port=8005, location="EveryNode"))
serve.start(http_options=dict(port=8005, location="EveryNode"))
def get_proxy_names():
proxy_names = []
for node_id, _ in get_all_node_ids():
proxy_names.append(
format_actor_name(SERVE_PROXY_NAME, client._controller_name,
format_actor_name(SERVE_PROXY_NAME,
serve.api._global_client._controller_name,
node_id))
return proxy_names
@ -124,12 +196,8 @@ def test_multiple_routers():
wait_for_condition(third_actor_removed)
ray.get(block_until_http_ready.remote("http://127.0.0.1:8005/-/routes"))
# Clean up the nodes (otherwise Ray will segfault).
ray.shutdown()
cluster.shutdown()
def test_middleware():
def test_middleware(ray_shutdown):
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
@ -157,18 +225,14 @@ def test_middleware():
resp = requests.get(f"{root}/-/routes", headers=headers)
assert resp.headers["access-control-allow-origin"] == "*"
ray.shutdown()
def test_http_proxy_fail_loudly():
def test_http_proxy_fail_loudly(ray_shutdown):
# Test that if the http server fail to start, serve.start should fail.
with pytest.raises(ValueError):
serve.start(http_options={"host": "bad.ip.address"})
ray.shutdown()
def test_no_http():
def test_no_http(ray_shutdown):
# The following should have the same effect.
options = [
{
@ -191,9 +255,10 @@ def test_no_http():
},
]
ray.init()
for option in options:
client = serve.start(**option)
ray.init(num_cpus=16)
for i, option in enumerate(options):
print(f"[{i+1}/{len(options)}] Running with {option}")
serve.start(**option)
# Only controller actor should exist
live_actors = [
@ -201,13 +266,22 @@ def test_no_http():
if actor["State"] == ray.gcs_utils.ActorTableData.ALIVE
]
assert len(live_actors) == 1
controller = serve.api._global_client._controller
assert len(ray.get(controller.get_http_proxies.remote())) == 0
client.shutdown()
ray.shutdown()
# Test that the handle still works.
def hello(*args):
return "hello"
serve.create_backend("backend", hello)
serve.create_endpoint("endpoint", backend="backend")
assert ray.get(serve.get_handle("endpoint").remote()) == "hello"
serve.shutdown()
def test_http_head_only():
cluster = Cluster()
def test_http_head_only(ray_cluster):
cluster = ray_cluster
head_node = cluster.add_node(num_cpus=4)
cluster.add_node(num_cpus=4)
@ -215,10 +289,7 @@ def test_http_head_only():
node_ids = ray.state.node_ids()
assert len(node_ids) == 2
client = serve.start(http_options={
"port": new_port(),
"location": "HeadOnly"
})
serve.start(http_options={"port": new_port(), "location": "HeadOnly"})
# Only the controller and head node actor should be started
assert len(ray.actors()) == 2
@ -230,10 +301,6 @@ def test_http_head_only():
}
assert cpu_per_nodes == {2, 4}
client.shutdown()
ray.shutdown()
cluster.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -69,8 +69,10 @@ async def test_future_chaining():
def test_import_attr():
assert import_attr("ray.serve.Client") == ray.serve.api.Client
assert import_attr("ray.serve.api.Client") == ray.serve.api.Client
assert (import_attr("ray.serve.BackendConfig") ==
ray.serve.config.BackendConfig)
assert (import_attr("ray.serve.config.BackendConfig") ==
ray.serve.config.BackendConfig)
policy_cls = import_attr("ray.serve.controller.TrafficPolicy")
assert policy_cls == ray.serve.controller.TrafficPolicy