From a9c731edd311f8ee3768a19f7e27287c337cd13c Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 25 Aug 2021 08:39:32 -0700 Subject: [PATCH] [serve] Remove requirement to specify namespace for serve.start(detached=True) (#17470) --- dashboard/modules/actor/tests/test_actor.py | 3 +- .../modules/snapshot/tests/test_snapshot.py | 2 +- doc/source/serve/deployment.rst | 4 ++ python/ray/serve/api.py | 44 +++++++++------ python/ray/serve/backend_state.py | 3 +- python/ray/serve/backend_worker.py | 9 ++- python/ray/serve/http_proxy.py | 9 +-- python/ray/serve/http_state.py | 6 +- python/ray/serve/tests/conftest.py | 2 +- python/ray/serve/tests/test_persistence.py | 2 +- python/ray/serve/tests/test_ray_client.py | 12 ++-- python/ray/serve/tests/test_standalone.py | 55 ++++++++++++++----- python/ray/tests/conftest.py | 2 +- python/ray/tests/test_actor_advanced.py | 2 +- python/ray/tests/test_list_actors.py | 2 +- python/ray/tests/test_multi_node_2.py | 2 +- python/ray/tests/test_multi_node_3.py | 2 +- python/ray/tests/test_runtime_env.py | 4 +- 18 files changed, 106 insertions(+), 59 deletions(-) diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index 51a4e1fff..d771f9092 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -274,7 +274,8 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard): # be published. elif actor_data_dict["state"] in ("ALIVE", "DEAD"): assert actor_data_dict.keys() == { - "state", "address", "timestamp", "pid", "creationTaskException" + "state", "address", "timestamp", "pid", + "creationTaskException", "rayNamespace" } else: raise Exception("Unknown state: {}".format( diff --git a/dashboard/modules/snapshot/tests/test_snapshot.py b/dashboard/modules/snapshot/tests/test_snapshot.py index 55b6fa55b..06e5ab067 100644 --- a/dashboard/modules/snapshot/tests/test_snapshot.py +++ b/dashboard/modules/snapshot/tests/test_snapshot.py @@ -173,7 +173,7 @@ my_func_deleted.delete() "my_func_nondetached".encode()).hexdigest()] assert entry_nondetached["name"] == "my_func_nondetached" assert entry_nondetached["version"] == "v1" - assert entry_nondetached["namespace"] == "" + assert entry_nondetached["namespace"] == "default_test_namespace" assert entry_nondetached["httpRoute"] == "/my_func_nondetached" assert entry_nondetached["className"] == "my_func_nondetached" assert entry_nondetached["status"] == "RUNNING" diff --git a/doc/source/serve/deployment.rst b/doc/source/serve/deployment.rst index 67b4ac23e..200b97f5d 100644 --- a/doc/source/serve/deployment.rst +++ b/doc/source/serve/deployment.rst @@ -23,6 +23,10 @@ service using ``serve.start(detached=True)``. In this case, the Serve instance w run on the Ray cluster even after the script that calls it exits. If you want to run another script to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster. +All non-detached Serve instances will be started in the current namespace that was specified when connecting to the cluster. If a namespace is specified for a detached Serve instance, it will be used. Otherwise if the current namespace is anonymous, the Serve instance will be started in the ``serve`` namespace. + +If ``serve.start()`` is called again in a process in which there is already a running Serve instance, Serve will re-connect to the existing instance (regardless of whether the original instance was detached or not). To reconnect to a Serve instance that exists in the Ray cluster but not in the current process, connect to the cluster with the same namespace that was specified when starting the instance and run ``serve.start()``. + Deploying on a Single Node ========================== diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 2c9dbc945..386db0cee 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -40,6 +40,19 @@ _UUID_RE = re.compile( "[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}") +def _get_controller_namespace(detached): + controller_namespace = ray.get_runtime_context().namespace + + if not detached: + return controller_namespace + + # Start controller in "serve" namespace if detached and currently + # in anonymous namespace. + if _UUID_RE.fullmatch(controller_namespace) is not None: + controller_namespace = "serve" + return controller_namespace + + def _get_global_client(): if _global_client is not None: return _global_client @@ -137,7 +150,10 @@ class Client: started = time.time() while True: try: - ray.get_actor(self._controller_name) + controller_namespace = _get_controller_namespace( + self._detached) + ray.get_actor( + self._controller_name, namespace=controller_namespace) if time.time() - started > 5: logger.warning( "Waited 5s for Serve to shutdown gracefully but " @@ -331,9 +347,7 @@ def start( Args: detached (bool): Whether not the instance should be detached from this script. If set, the instance will live on the Ray cluster until it is - explicitly stopped with serve.shutdown(). This should *not* be set in - an anonymous Ray namespace because you will not be able to reconnect - to the instance after the script exits. + explicitly stopped with serve.shutdown(). http_options (Optional[Dict, serve.HTTPOptions]): Configuration options for HTTP proxy. You can pass in a dictionary or HTTPOptions object with fields: @@ -368,22 +382,12 @@ def start( if not ray.is_initialized(): ray.init(namespace="serve") - current_namespace = ray.get_runtime_context().namespace - if detached: - if _UUID_RE.fullmatch(current_namespace) is not None: - raise RuntimeError( - "serve.start(detached=True) should not be called in anonymous " - "Ray namespaces because you won't be able to reconnect to the " - "Serve instance after the script exits. If you want to start " - "a long-lived Serve instance, provide a namespace when " - "connecting to Ray. See the documentation for more details: " - "https://docs.ray.io/en/master/namespaces.html?highlight=namespace#using-namespaces." # noqa: E501 - ) + controller_namespace = _get_controller_namespace(detached) try: client = _get_global_client() logger.info("Connecting to existing Serve instance in namespace " - f"'{current_namespace}'.") + f"'{controller_namespace}'.") return client except RayServeException: pass @@ -409,6 +413,7 @@ def start( resources={ get_current_node_resource_key(): 0.01 }, + namespace=controller_namespace, ).remote( controller_name, http_options, @@ -429,7 +434,7 @@ def start( client = Client(controller, controller_name, detached=detached) _set_global_client(client) logger.info(f"Started{' detached ' if detached else ' '}Serve instance in " - f"namespace '{current_namespace}'.") + f"namespace '{controller_namespace}'.") return client @@ -452,12 +457,15 @@ def _connect() -> Client: # ensure that the correct instance is connected to. if _INTERNAL_REPLICA_CONTEXT is None: controller_name = SERVE_CONTROLLER_NAME + controller_namespace = _get_controller_namespace(detached=True) else: controller_name = _INTERNAL_REPLICA_CONTEXT._internal_controller_name + controller_namespace = _get_controller_namespace(detached=False) # Try to get serve controller if it exists try: - controller = ray.get_actor(controller_name) + controller = ray.get_actor( + controller_name, namespace=controller_namespace) except ValueError: raise RayServeException("There is no " "instance running on this Ray cluster. Please " diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index e0255e831..4e7a5c63c 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -114,7 +114,8 @@ class ActorReplicaWrapper: **backend_info.replica_config.ray_actor_options).remote( self._backend_tag, self._replica_tag, backend_info.replica_config.init_args, - backend_info.backend_config, self._controller_name) + backend_info.backend_config, self._controller_name, + self._detached) self._ready_obj_ref = self._actor_handle.reconfigure.remote( backend_info.backend_config.user_config) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index beaf37c3d..104990678 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -40,8 +40,8 @@ def create_backend_replica(name: str, serialized_backend_def: bytes): # TODO(architkulkarni): Add type hints after upgrading cloudpickle class RayServeWrappedReplica(object): async def __init__(self, backend_tag, replica_tag, init_args, - backend_config: BackendConfig, - controller_name: str): + backend_config: BackendConfig, controller_name: str, + detached: bool): backend = cloudpickle.loads(serialized_backend_def) if inspect.isfunction(backend): @@ -75,7 +75,10 @@ def create_backend_replica(name: str, serialized_backend_def: bytes): servable_object=_callable) assert controller_name, "Must provide a valid controller_name" - controller_handle = ray.get_actor(controller_name) + controller_namespace = ray.serve.api._get_controller_namespace( + detached) + controller_handle = ray.get_actor( + controller_name, namespace=controller_namespace) self.backend = RayServeReplica(_callable, backend_config, is_function, controller_handle) diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 977daaf8a..47dbdb74c 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -165,10 +165,10 @@ class HTTPProxy: """This class is meant to be instantiated and run by an ASGI HTTP server. >>> import uvicorn - >>> uvicorn.run(HTTPProxy(controller_name)) + >>> uvicorn.run(HTTPProxy(controller_name, controller_namespace)) """ - def __init__(self, controller_name: str): + def __init__(self, controller_name: str, controller_namespace: str): # Set the controller name so that serve will connect to the # controller instance this proxy is running in. ray.serve.api._set_internal_replica_context(None, None, @@ -187,7 +187,7 @@ class HTTPProxy: self.prefix_router = LongestPrefixRouter(get_handle) self.long_poll_client = LongPollClient( - ray.get_actor(controller_name), { + ray.get_actor(controller_name, namespace=controller_namespace), { LongPollNamespace.ROUTE_TABLE: self._update_routes, }, call_in_event_loop=asyncio.get_event_loop()) @@ -261,6 +261,7 @@ class HTTPProxyActor: host: str, port: int, controller_name: str, + controller_namespace: str, http_middlewares: List[ "starlette.middleware.Middleware"] = []): # noqa: F821 self.host = host @@ -268,7 +269,7 @@ class HTTPProxyActor: self.setup_complete = asyncio.Event() - self.app = HTTPProxy(controller_name) + self.app = HTTPProxy(controller_name, controller_namespace) self.wrapped_app = self.app for middleware in http_middlewares: diff --git a/python/ray/serve/http_state.py b/python/ray/serve/http_state.py index cec02c4bf..532acbc62 100644 --- a/python/ray/serve/http_state.py +++ b/python/ray/serve/http_state.py @@ -21,6 +21,8 @@ class HTTPState: def __init__(self, controller_name: str, detached: bool, config: HTTPOptions): self._controller_name = controller_name + self._controller_namespace = ray.serve.api._get_controller_namespace( + detached) self._detached = detached self._config = config self._proxy_actors: Dict[NodeId, ActorHandle] = dict() @@ -67,7 +69,8 @@ class HTTPState: name = format_actor_name(SERVE_PROXY_NAME, self._controller_name, node_id) try: - proxy = ray.get_actor(name) + proxy = ray.get_actor( + name, namespace=self._controller_namespace) except ValueError: logger.info("Starting HTTP proxy with name '{}' on node '{}' " "listening on '{}:{}'".format( @@ -87,6 +90,7 @@ class HTTPState: self._config.host, self._config.port, controller_name=self._controller_name, + controller_namespace=self._controller_namespace, http_middlewares=self._config.middlewares) self._proxy_actors[node_id] = proxy diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index b5c4836cf..1e635dd1b 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -21,7 +21,7 @@ def _shared_serve_instance(): # Overriding task_retry_delay_ms to relaunch actors more quickly ray.init( num_cpus=36, - namespace="", + namespace="default_test_namespace", _metrics_export_port=9999, _system_config={ "metrics_report_interval_ms": 1000, diff --git a/python/ray/serve/tests/test_persistence.py b/python/ray/serve/tests/test_persistence.py index 221478aac..11201b10a 100644 --- a/python/ray/serve/tests/test_persistence.py +++ b/python/ray/serve/tests/test_persistence.py @@ -6,7 +6,7 @@ from ray import serve def test_new_driver(serve_instance): script = """ import ray -ray.init(address="{}", namespace="") +ray.init(address="{}", namespace="default_test_namespace") from ray import serve diff --git a/python/ray/serve/tests/test_ray_client.py b/python/ray/serve/tests/test_ray_client.py index 9f7a4f24b..7bc2d54aa 100644 --- a/python/ray/serve/tests/test_ray_client.py +++ b/python/ray/serve/tests/test_ray_client.py @@ -29,7 +29,7 @@ def ray_client_instance(scope="module"): @pytest.fixture def serve_with_client(ray_client_instance): - ray.util.connect(ray_client_instance, namespace="") + ray.util.connect(ray_client_instance, namespace="default_test_namespace") assert ray.util.client.ray.is_connected() yield @@ -40,11 +40,11 @@ def serve_with_client(ray_client_instance): @pytest.mark.skipif(sys.platform != "linux", reason="Buggy on MacOS + Windows") def test_ray_client(ray_client_instance): - ray.util.connect(ray_client_instance, namespace="") + ray.util.connect(ray_client_instance, namespace="default_test_namespace") start = """ import ray -ray.util.connect("{}", namespace="") +ray.util.connect("{}", namespace="default_test_namespace") from ray import serve @@ -54,7 +54,7 @@ serve.start(detached=True) deploy = """ import ray -ray.util.connect("{}", namespace="") +ray.util.connect("{}", namespace="default_test_namespace") from ray import serve @@ -71,7 +71,7 @@ f.deploy() delete = """ import ray -ray.util.connect("{}", namespace="") +ray.util.connect("{}", namespace="default_test_namespace") from ray import serve @@ -83,7 +83,7 @@ serve.get_deployment("test1").delete() fastapi = """ import ray -ray.util.connect("{}", namespace="") +ray.util.connect("{}", namespace="default_test_namespace") from ray import serve from fastapi import FastAPI diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index fa15b1e45..f8a0051f6 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -5,6 +5,7 @@ requires a shared Serve instance. import os import sys import socket +from typing import Optional import pytest import requests @@ -48,8 +49,9 @@ def test_shutdown(ray_shutdown): f.deploy() + serve_controller_name = serve.api._global_client._controller_name actor_names = [ - serve.api._global_client._controller_name, + serve_controller_name, format_actor_name(SERVE_PROXY_NAME, serve.api._global_client._controller_name, get_all_node_ids()[0][0]) @@ -59,7 +61,12 @@ def test_shutdown(ray_shutdown): alive = True for actor_name in actor_names: try: - ray.get_actor(actor_name) + if actor_name == serve_controller_name: + ray.get_actor( + actor_name, + namespace=ray.get_runtime_context().namespace) + else: + ray.get_actor(actor_name) except ValueError: alive = False return alive @@ -73,7 +80,12 @@ def test_shutdown(ray_shutdown): def check_dead(): for actor_name in actor_names: try: - ray.get_actor(actor_name) + if actor_name == serve_controller_name: + ray.get_actor( + actor_name, + namespace=ray.get_runtime_context().namespace) + else: + ray.get_actor(actor_name) return False except ValueError: pass @@ -386,18 +398,6 @@ def test_serve_shutdown(ray_shutdown): assert len(serve.list_deployments()) == 1 -def test_detached_namespace_warning(ray_shutdown): - ray.init() - - # Can't start detached instance in anonymous namespace. - with pytest.raises(RuntimeError, match="anonymous Ray namespace"): - serve.start(detached=True) - - # Can start non-detached instance in anonymous namespace. - serve.start() - ray.shutdown() - - def test_detached_namespace_default_ray_init(ray_shutdown): # Can start detached instance when ray is not initialized. serve.start(detached=True) @@ -409,5 +409,30 @@ def test_detached_instance_in_non_anonymous_namespace(ray_shutdown): serve.start(detached=True) +@pytest.mark.parametrize("namespace", [None, "test_namespace"]) +@pytest.mark.parametrize("detached", [True, False]) +def test_serve_controller_namespace(ray_shutdown, namespace: Optional[str], + detached: bool): + """ + Tests the serve controller is started in the current namespace if not + anonymous or in the "serve" namespace if no namespace is specified. + When the controller is started in the "serve" namespace, this also tests + that we can get the serve controller from another namespace. + """ + + ray.init(namespace=namespace) + serve.start(detached=detached) + client = serve.api._global_client + if namespace: + controller_namespace = namespace + elif detached: + controller_namespace = "serve" + else: + controller_namespace = ray.get_runtime_context().namespace + + assert ray.get_actor( + client._controller_name, namespace=controller_namespace) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 92e6bef28..1d76a8987 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -35,7 +35,7 @@ def get_default_fixture_ray_kwargs(): "num_cpus": 1, "object_store_memory": 150 * 1024 * 1024, "dashboard_port": None, - "namespace": "", + "namespace": "default_test_namespace", "_system_config": system_config, } return ray_kwargs diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 813597100..cf2f9d025 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -693,7 +693,7 @@ def test_detached_actor(ray_start_regular): create_actor_name = "DetachedActor" driver_script = """ import ray -ray.init(address="{}", namespace="") +ray.init(address="{}", namespace="default_test_namespace") name = "{}" assert ray.util.list_named_actors() == [name] diff --git a/python/ray/tests/test_list_actors.py b/python/ray/tests/test_list_actors.py index 966b54f73..004007775 100644 --- a/python/ray/tests/test_list_actors.py +++ b/python/ray/tests/test_list_actors.py @@ -93,7 +93,7 @@ def test_list_named_actors_detached(ray_start_regular): driver_script = """ import ray -ray.init(address="{}", namespace="") +ray.init(address="{}", namespace="default_test_namespace") @ray.remote class A: diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index af00b1cc2..d43bd9dd0 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -291,7 +291,7 @@ def test_detached_actor_autoscaling(ray_start_cluster_head): ray.get(main_actor.ping.remote()) ray.shutdown() - ray.init(address=cluster.address, namespace="") + ray.init(address=cluster.address, namespace="default_test_namespace") main_actor = ray.get_actor("main") num_to_start = int(ray.available_resources().get("CPU", 0) + 1) diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py index fa50226aa..ec6e8fb51 100644 --- a/python/ray/tests/test_multi_node_3.py +++ b/python/ray/tests/test_multi_node_3.py @@ -332,7 +332,7 @@ from ray._private.test_utils import Semaphore def remote_print(s, file=None): print(s, file=file) -ray.init(address="{}", namespace="") +ray.init(address="{}", namespace="default_test_namespace") driver_wait = ray.get_actor("{}") main_wait = ray.get_actor("main_wait") diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 37a1fe732..3f318af71 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -39,12 +39,12 @@ try: if os.environ.get("USE_RAY_CLIENT"): - ray.client("{address}").env({runtime_env}).namespace("").connect() + ray.client("{address}").env({runtime_env}).namespace("default_test_namespace").connect() else: ray.init(address="{address}", job_config=job_config, logging_level=logging.DEBUG, - namespace="" + namespace="default_test_namespace" ) except ValueError: print("ValueError")