[serve] Remove requirement to specify namespace for serve.start(detached=True) (#17470)

This commit is contained in:
Nikita Vemuri 2021-08-25 08:39:32 -07:00 committed by GitHub
parent 4c3276644e
commit a9c731edd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 106 additions and 59 deletions

View file

@ -274,7 +274,8 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
# be published. # be published.
elif actor_data_dict["state"] in ("ALIVE", "DEAD"): elif actor_data_dict["state"] in ("ALIVE", "DEAD"):
assert actor_data_dict.keys() == { assert actor_data_dict.keys() == {
"state", "address", "timestamp", "pid", "creationTaskException" "state", "address", "timestamp", "pid",
"creationTaskException", "rayNamespace"
} }
else: else:
raise Exception("Unknown state: {}".format( raise Exception("Unknown state: {}".format(

View file

@ -173,7 +173,7 @@ my_func_deleted.delete()
"my_func_nondetached".encode()).hexdigest()] "my_func_nondetached".encode()).hexdigest()]
assert entry_nondetached["name"] == "my_func_nondetached" assert entry_nondetached["name"] == "my_func_nondetached"
assert entry_nondetached["version"] == "v1" assert entry_nondetached["version"] == "v1"
assert entry_nondetached["namespace"] == "" assert entry_nondetached["namespace"] == "default_test_namespace"
assert entry_nondetached["httpRoute"] == "/my_func_nondetached" assert entry_nondetached["httpRoute"] == "/my_func_nondetached"
assert entry_nondetached["className"] == "my_func_nondetached" assert entry_nondetached["className"] == "my_func_nondetached"
assert entry_nondetached["status"] == "RUNNING" assert entry_nondetached["status"] == "RUNNING"

View file

@ -23,6 +23,10 @@ service using ``serve.start(detached=True)``. In this case, the Serve instance w
run on the Ray cluster even after the script that calls it exits. If you want to run another script run on the Ray cluster even after the script that calls it exits. If you want to run another script
to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster. to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster.
All non-detached Serve instances will be started in the current namespace that was specified when connecting to the cluster. If a namespace is specified for a detached Serve instance, it will be used. Otherwise if the current namespace is anonymous, the Serve instance will be started in the ``serve`` namespace.
If ``serve.start()`` is called again in a process in which there is already a running Serve instance, Serve will re-connect to the existing instance (regardless of whether the original instance was detached or not). To reconnect to a Serve instance that exists in the Ray cluster but not in the current process, connect to the cluster with the same namespace that was specified when starting the instance and run ``serve.start()``.
Deploying on a Single Node Deploying on a Single Node
========================== ==========================

View file

@ -40,6 +40,19 @@ _UUID_RE = re.compile(
"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}") "[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}")
def _get_controller_namespace(detached):
controller_namespace = ray.get_runtime_context().namespace
if not detached:
return controller_namespace
# Start controller in "serve" namespace if detached and currently
# in anonymous namespace.
if _UUID_RE.fullmatch(controller_namespace) is not None:
controller_namespace = "serve"
return controller_namespace
def _get_global_client(): def _get_global_client():
if _global_client is not None: if _global_client is not None:
return _global_client return _global_client
@ -137,7 +150,10 @@ class Client:
started = time.time() started = time.time()
while True: while True:
try: try:
ray.get_actor(self._controller_name) controller_namespace = _get_controller_namespace(
self._detached)
ray.get_actor(
self._controller_name, namespace=controller_namespace)
if time.time() - started > 5: if time.time() - started > 5:
logger.warning( logger.warning(
"Waited 5s for Serve to shutdown gracefully but " "Waited 5s for Serve to shutdown gracefully but "
@ -331,9 +347,7 @@ def start(
Args: Args:
detached (bool): Whether not the instance should be detached from this detached (bool): Whether not the instance should be detached from this
script. If set, the instance will live on the Ray cluster until it is script. If set, the instance will live on the Ray cluster until it is
explicitly stopped with serve.shutdown(). This should *not* be set in explicitly stopped with serve.shutdown().
an anonymous Ray namespace because you will not be able to reconnect
to the instance after the script exits.
http_options (Optional[Dict, serve.HTTPOptions]): Configuration options http_options (Optional[Dict, serve.HTTPOptions]): Configuration options
for HTTP proxy. You can pass in a dictionary or HTTPOptions object for HTTP proxy. You can pass in a dictionary or HTTPOptions object
with fields: with fields:
@ -368,22 +382,12 @@ def start(
if not ray.is_initialized(): if not ray.is_initialized():
ray.init(namespace="serve") ray.init(namespace="serve")
current_namespace = ray.get_runtime_context().namespace controller_namespace = _get_controller_namespace(detached)
if detached:
if _UUID_RE.fullmatch(current_namespace) is not None:
raise RuntimeError(
"serve.start(detached=True) should not be called in anonymous "
"Ray namespaces because you won't be able to reconnect to the "
"Serve instance after the script exits. If you want to start "
"a long-lived Serve instance, provide a namespace when "
"connecting to Ray. See the documentation for more details: "
"https://docs.ray.io/en/master/namespaces.html?highlight=namespace#using-namespaces." # noqa: E501
)
try: try:
client = _get_global_client() client = _get_global_client()
logger.info("Connecting to existing Serve instance in namespace " logger.info("Connecting to existing Serve instance in namespace "
f"'{current_namespace}'.") f"'{controller_namespace}'.")
return client return client
except RayServeException: except RayServeException:
pass pass
@ -409,6 +413,7 @@ def start(
resources={ resources={
get_current_node_resource_key(): 0.01 get_current_node_resource_key(): 0.01
}, },
namespace=controller_namespace,
).remote( ).remote(
controller_name, controller_name,
http_options, http_options,
@ -429,7 +434,7 @@ def start(
client = Client(controller, controller_name, detached=detached) client = Client(controller, controller_name, detached=detached)
_set_global_client(client) _set_global_client(client)
logger.info(f"Started{' detached ' if detached else ' '}Serve instance in " logger.info(f"Started{' detached ' if detached else ' '}Serve instance in "
f"namespace '{current_namespace}'.") f"namespace '{controller_namespace}'.")
return client return client
@ -452,12 +457,15 @@ def _connect() -> Client:
# ensure that the correct instance is connected to. # ensure that the correct instance is connected to.
if _INTERNAL_REPLICA_CONTEXT is None: if _INTERNAL_REPLICA_CONTEXT is None:
controller_name = SERVE_CONTROLLER_NAME controller_name = SERVE_CONTROLLER_NAME
controller_namespace = _get_controller_namespace(detached=True)
else: else:
controller_name = _INTERNAL_REPLICA_CONTEXT._internal_controller_name controller_name = _INTERNAL_REPLICA_CONTEXT._internal_controller_name
controller_namespace = _get_controller_namespace(detached=False)
# Try to get serve controller if it exists # Try to get serve controller if it exists
try: try:
controller = ray.get_actor(controller_name) controller = ray.get_actor(
controller_name, namespace=controller_namespace)
except ValueError: except ValueError:
raise RayServeException("There is no " raise RayServeException("There is no "
"instance running on this Ray cluster. Please " "instance running on this Ray cluster. Please "

View file

@ -114,7 +114,8 @@ class ActorReplicaWrapper:
**backend_info.replica_config.ray_actor_options).remote( **backend_info.replica_config.ray_actor_options).remote(
self._backend_tag, self._replica_tag, self._backend_tag, self._replica_tag,
backend_info.replica_config.init_args, backend_info.replica_config.init_args,
backend_info.backend_config, self._controller_name) backend_info.backend_config, self._controller_name,
self._detached)
self._ready_obj_ref = self._actor_handle.reconfigure.remote( self._ready_obj_ref = self._actor_handle.reconfigure.remote(
backend_info.backend_config.user_config) backend_info.backend_config.user_config)

View file

@ -40,8 +40,8 @@ def create_backend_replica(name: str, serialized_backend_def: bytes):
# TODO(architkulkarni): Add type hints after upgrading cloudpickle # TODO(architkulkarni): Add type hints after upgrading cloudpickle
class RayServeWrappedReplica(object): class RayServeWrappedReplica(object):
async def __init__(self, backend_tag, replica_tag, init_args, async def __init__(self, backend_tag, replica_tag, init_args,
backend_config: BackendConfig, backend_config: BackendConfig, controller_name: str,
controller_name: str): detached: bool):
backend = cloudpickle.loads(serialized_backend_def) backend = cloudpickle.loads(serialized_backend_def)
if inspect.isfunction(backend): if inspect.isfunction(backend):
@ -75,7 +75,10 @@ def create_backend_replica(name: str, serialized_backend_def: bytes):
servable_object=_callable) servable_object=_callable)
assert controller_name, "Must provide a valid controller_name" assert controller_name, "Must provide a valid controller_name"
controller_handle = ray.get_actor(controller_name) controller_namespace = ray.serve.api._get_controller_namespace(
detached)
controller_handle = ray.get_actor(
controller_name, namespace=controller_namespace)
self.backend = RayServeReplica(_callable, backend_config, self.backend = RayServeReplica(_callable, backend_config,
is_function, controller_handle) is_function, controller_handle)

View file

@ -165,10 +165,10 @@ class HTTPProxy:
"""This class is meant to be instantiated and run by an ASGI HTTP server. """This class is meant to be instantiated and run by an ASGI HTTP server.
>>> import uvicorn >>> import uvicorn
>>> uvicorn.run(HTTPProxy(controller_name)) >>> uvicorn.run(HTTPProxy(controller_name, controller_namespace))
""" """
def __init__(self, controller_name: str): def __init__(self, controller_name: str, controller_namespace: str):
# Set the controller name so that serve will connect to the # Set the controller name so that serve will connect to the
# controller instance this proxy is running in. # controller instance this proxy is running in.
ray.serve.api._set_internal_replica_context(None, None, ray.serve.api._set_internal_replica_context(None, None,
@ -187,7 +187,7 @@ class HTTPProxy:
self.prefix_router = LongestPrefixRouter(get_handle) self.prefix_router = LongestPrefixRouter(get_handle)
self.long_poll_client = LongPollClient( self.long_poll_client = LongPollClient(
ray.get_actor(controller_name), { ray.get_actor(controller_name, namespace=controller_namespace), {
LongPollNamespace.ROUTE_TABLE: self._update_routes, LongPollNamespace.ROUTE_TABLE: self._update_routes,
}, },
call_in_event_loop=asyncio.get_event_loop()) call_in_event_loop=asyncio.get_event_loop())
@ -261,6 +261,7 @@ class HTTPProxyActor:
host: str, host: str,
port: int, port: int,
controller_name: str, controller_name: str,
controller_namespace: str,
http_middlewares: List[ http_middlewares: List[
"starlette.middleware.Middleware"] = []): # noqa: F821 "starlette.middleware.Middleware"] = []): # noqa: F821
self.host = host self.host = host
@ -268,7 +269,7 @@ class HTTPProxyActor:
self.setup_complete = asyncio.Event() self.setup_complete = asyncio.Event()
self.app = HTTPProxy(controller_name) self.app = HTTPProxy(controller_name, controller_namespace)
self.wrapped_app = self.app self.wrapped_app = self.app
for middleware in http_middlewares: for middleware in http_middlewares:

View file

@ -21,6 +21,8 @@ class HTTPState:
def __init__(self, controller_name: str, detached: bool, def __init__(self, controller_name: str, detached: bool,
config: HTTPOptions): config: HTTPOptions):
self._controller_name = controller_name self._controller_name = controller_name
self._controller_namespace = ray.serve.api._get_controller_namespace(
detached)
self._detached = detached self._detached = detached
self._config = config self._config = config
self._proxy_actors: Dict[NodeId, ActorHandle] = dict() self._proxy_actors: Dict[NodeId, ActorHandle] = dict()
@ -67,7 +69,8 @@ class HTTPState:
name = format_actor_name(SERVE_PROXY_NAME, self._controller_name, name = format_actor_name(SERVE_PROXY_NAME, self._controller_name,
node_id) node_id)
try: try:
proxy = ray.get_actor(name) proxy = ray.get_actor(
name, namespace=self._controller_namespace)
except ValueError: except ValueError:
logger.info("Starting HTTP proxy with name '{}' on node '{}' " logger.info("Starting HTTP proxy with name '{}' on node '{}' "
"listening on '{}:{}'".format( "listening on '{}:{}'".format(
@ -87,6 +90,7 @@ class HTTPState:
self._config.host, self._config.host,
self._config.port, self._config.port,
controller_name=self._controller_name, controller_name=self._controller_name,
controller_namespace=self._controller_namespace,
http_middlewares=self._config.middlewares) http_middlewares=self._config.middlewares)
self._proxy_actors[node_id] = proxy self._proxy_actors[node_id] = proxy

View file

@ -21,7 +21,7 @@ def _shared_serve_instance():
# Overriding task_retry_delay_ms to relaunch actors more quickly # Overriding task_retry_delay_ms to relaunch actors more quickly
ray.init( ray.init(
num_cpus=36, num_cpus=36,
namespace="", namespace="default_test_namespace",
_metrics_export_port=9999, _metrics_export_port=9999,
_system_config={ _system_config={
"metrics_report_interval_ms": 1000, "metrics_report_interval_ms": 1000,

View file

@ -6,7 +6,7 @@ from ray import serve
def test_new_driver(serve_instance): def test_new_driver(serve_instance):
script = """ script = """
import ray import ray
ray.init(address="{}", namespace="") ray.init(address="{}", namespace="default_test_namespace")
from ray import serve from ray import serve

View file

@ -29,7 +29,7 @@ def ray_client_instance(scope="module"):
@pytest.fixture @pytest.fixture
def serve_with_client(ray_client_instance): def serve_with_client(ray_client_instance):
ray.util.connect(ray_client_instance, namespace="") ray.util.connect(ray_client_instance, namespace="default_test_namespace")
assert ray.util.client.ray.is_connected() assert ray.util.client.ray.is_connected()
yield yield
@ -40,11 +40,11 @@ def serve_with_client(ray_client_instance):
@pytest.mark.skipif(sys.platform != "linux", reason="Buggy on MacOS + Windows") @pytest.mark.skipif(sys.platform != "linux", reason="Buggy on MacOS + Windows")
def test_ray_client(ray_client_instance): def test_ray_client(ray_client_instance):
ray.util.connect(ray_client_instance, namespace="") ray.util.connect(ray_client_instance, namespace="default_test_namespace")
start = """ start = """
import ray import ray
ray.util.connect("{}", namespace="") ray.util.connect("{}", namespace="default_test_namespace")
from ray import serve from ray import serve
@ -54,7 +54,7 @@ serve.start(detached=True)
deploy = """ deploy = """
import ray import ray
ray.util.connect("{}", namespace="") ray.util.connect("{}", namespace="default_test_namespace")
from ray import serve from ray import serve
@ -71,7 +71,7 @@ f.deploy()
delete = """ delete = """
import ray import ray
ray.util.connect("{}", namespace="") ray.util.connect("{}", namespace="default_test_namespace")
from ray import serve from ray import serve
@ -83,7 +83,7 @@ serve.get_deployment("test1").delete()
fastapi = """ fastapi = """
import ray import ray
ray.util.connect("{}", namespace="") ray.util.connect("{}", namespace="default_test_namespace")
from ray import serve from ray import serve
from fastapi import FastAPI from fastapi import FastAPI

View file

@ -5,6 +5,7 @@ requires a shared Serve instance.
import os import os
import sys import sys
import socket import socket
from typing import Optional
import pytest import pytest
import requests import requests
@ -48,8 +49,9 @@ def test_shutdown(ray_shutdown):
f.deploy() f.deploy()
serve_controller_name = serve.api._global_client._controller_name
actor_names = [ actor_names = [
serve.api._global_client._controller_name, serve_controller_name,
format_actor_name(SERVE_PROXY_NAME, format_actor_name(SERVE_PROXY_NAME,
serve.api._global_client._controller_name, serve.api._global_client._controller_name,
get_all_node_ids()[0][0]) get_all_node_ids()[0][0])
@ -59,7 +61,12 @@ def test_shutdown(ray_shutdown):
alive = True alive = True
for actor_name in actor_names: for actor_name in actor_names:
try: try:
ray.get_actor(actor_name) if actor_name == serve_controller_name:
ray.get_actor(
actor_name,
namespace=ray.get_runtime_context().namespace)
else:
ray.get_actor(actor_name)
except ValueError: except ValueError:
alive = False alive = False
return alive return alive
@ -73,7 +80,12 @@ def test_shutdown(ray_shutdown):
def check_dead(): def check_dead():
for actor_name in actor_names: for actor_name in actor_names:
try: try:
ray.get_actor(actor_name) if actor_name == serve_controller_name:
ray.get_actor(
actor_name,
namespace=ray.get_runtime_context().namespace)
else:
ray.get_actor(actor_name)
return False return False
except ValueError: except ValueError:
pass pass
@ -386,18 +398,6 @@ def test_serve_shutdown(ray_shutdown):
assert len(serve.list_deployments()) == 1 assert len(serve.list_deployments()) == 1
def test_detached_namespace_warning(ray_shutdown):
ray.init()
# Can't start detached instance in anonymous namespace.
with pytest.raises(RuntimeError, match="anonymous Ray namespace"):
serve.start(detached=True)
# Can start non-detached instance in anonymous namespace.
serve.start()
ray.shutdown()
def test_detached_namespace_default_ray_init(ray_shutdown): def test_detached_namespace_default_ray_init(ray_shutdown):
# Can start detached instance when ray is not initialized. # Can start detached instance when ray is not initialized.
serve.start(detached=True) serve.start(detached=True)
@ -409,5 +409,30 @@ def test_detached_instance_in_non_anonymous_namespace(ray_shutdown):
serve.start(detached=True) serve.start(detached=True)
@pytest.mark.parametrize("namespace", [None, "test_namespace"])
@pytest.mark.parametrize("detached", [True, False])
def test_serve_controller_namespace(ray_shutdown, namespace: Optional[str],
detached: bool):
"""
Tests the serve controller is started in the current namespace if not
anonymous or in the "serve" namespace if no namespace is specified.
When the controller is started in the "serve" namespace, this also tests
that we can get the serve controller from another namespace.
"""
ray.init(namespace=namespace)
serve.start(detached=detached)
client = serve.api._global_client
if namespace:
controller_namespace = namespace
elif detached:
controller_namespace = "serve"
else:
controller_namespace = ray.get_runtime_context().namespace
assert ray.get_actor(
client._controller_name, namespace=controller_namespace)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__])) sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -35,7 +35,7 @@ def get_default_fixture_ray_kwargs():
"num_cpus": 1, "num_cpus": 1,
"object_store_memory": 150 * 1024 * 1024, "object_store_memory": 150 * 1024 * 1024,
"dashboard_port": None, "dashboard_port": None,
"namespace": "", "namespace": "default_test_namespace",
"_system_config": system_config, "_system_config": system_config,
} }
return ray_kwargs return ray_kwargs

View file

@ -693,7 +693,7 @@ def test_detached_actor(ray_start_regular):
create_actor_name = "DetachedActor" create_actor_name = "DetachedActor"
driver_script = """ driver_script = """
import ray import ray
ray.init(address="{}", namespace="") ray.init(address="{}", namespace="default_test_namespace")
name = "{}" name = "{}"
assert ray.util.list_named_actors() == [name] assert ray.util.list_named_actors() == [name]

View file

@ -93,7 +93,7 @@ def test_list_named_actors_detached(ray_start_regular):
driver_script = """ driver_script = """
import ray import ray
ray.init(address="{}", namespace="") ray.init(address="{}", namespace="default_test_namespace")
@ray.remote @ray.remote
class A: class A:

View file

@ -291,7 +291,7 @@ def test_detached_actor_autoscaling(ray_start_cluster_head):
ray.get(main_actor.ping.remote()) ray.get(main_actor.ping.remote())
ray.shutdown() ray.shutdown()
ray.init(address=cluster.address, namespace="") ray.init(address=cluster.address, namespace="default_test_namespace")
main_actor = ray.get_actor("main") main_actor = ray.get_actor("main")
num_to_start = int(ray.available_resources().get("CPU", 0) + 1) num_to_start = int(ray.available_resources().get("CPU", 0) + 1)

View file

@ -332,7 +332,7 @@ from ray._private.test_utils import Semaphore
def remote_print(s, file=None): def remote_print(s, file=None):
print(s, file=file) print(s, file=file)
ray.init(address="{}", namespace="") ray.init(address="{}", namespace="default_test_namespace")
driver_wait = ray.get_actor("{}") driver_wait = ray.get_actor("{}")
main_wait = ray.get_actor("main_wait") main_wait = ray.get_actor("main_wait")

View file

@ -39,12 +39,12 @@ try:
if os.environ.get("USE_RAY_CLIENT"): if os.environ.get("USE_RAY_CLIENT"):
ray.client("{address}").env({runtime_env}).namespace("").connect() ray.client("{address}").env({runtime_env}).namespace("default_test_namespace").connect()
else: else:
ray.init(address="{address}", ray.init(address="{address}",
job_config=job_config, job_config=job_config,
logging_level=logging.DEBUG, logging_level=logging.DEBUG,
namespace="" namespace="default_test_namespace"
) )
except ValueError: except ValueError:
print("ValueError") print("ValueError")