[serve] Make Dashboard start Serve in the "serve" namespace (#23198)

The Ray Dashboard starts Serve in the `"_ray_internal_dashboard"` namespace. However, Serve by default starts in the `"serve"` namespace. This causes surprising behavior when working with the Serve CLI and REST API.

This change make the Ray Dashboard start Serve in the `"serve"` namespace, allowing the REST API to work intuitively with the Python API.
This commit is contained in:
shrekris-anyscale 2022-03-16 10:03:44 -07:00 committed by GitHub
parent eca5bcfc87
commit 34ebb3409e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 160 additions and 20 deletions

View file

@ -7,6 +7,8 @@ from typing import List, Dict, Set
import pytest
import requests
import ray
from ray import serve
GET_OR_PUT_URL = "http://localhost:8265/api/serve/deployments/"
@ -228,5 +230,28 @@ def test_get_status_info(ray_start_stop):
print(statuses)
def test_serve_namespace(ray_start_stop):
"""
Check that the Dashboard's Serve can interact with the Python API
when they both start in the "serve namespace"
"""
one = dict(
name="one",
num_replicas=1,
route_prefix="/one",
ray_actor_options={"runtime_env": {"py_modules": [test_module_uri]}},
import_path="test_module.test.one",
)
put_response = requests.put(GET_OR_PUT_URL, json={"deployments": [one]}, timeout=30)
assert put_response.status_code == 200
ray.init(address="auto", namespace="serve")
serve.start()
deployments = serve.list_deployments()
assert len(deployments) == 1
assert "one" in deployments
serve.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -269,9 +269,7 @@ def init_ray_and_catch_exceptions(connect_to_serve: bool = False) -> Callable:
raise e from None
if connect_to_serve:
# TODO(edoakes): this should probably run in the `serve`
# namespace.
serve.start(detached=True)
serve.start(detached=True, _override_controller_namespace="serve")
return await f(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Unexpected error in handler: {e}")

View file

@ -81,7 +81,20 @@ _UUID_RE = re.compile(
_CLIENT_POLLING_INTERVAL_S: float = 1
def _get_controller_namespace(detached):
def _get_controller_namespace(
detached: bool, _override_controller_namespace: Optional[str] = None
):
"""Gets the controller's namespace.
Args:
detached (bool): Whether serve.start() was called with detached=True
_override_controller_namespace (Optional[str]): When set, this is the
controller's namespace
"""
if _override_controller_namespace is not None:
return _override_controller_namespace
controller_namespace = ray.get_runtime_context().namespace
if not detached:
@ -94,11 +107,11 @@ def _get_controller_namespace(detached):
return controller_namespace
def internal_get_global_client():
def internal_get_global_client(_override_controller_namespace: Optional[str] = None):
if _global_client is not None:
return _global_client
return _connect()
return _connect(_override_controller_namespace=_override_controller_namespace)
def _set_global_client(client):
@ -142,11 +155,16 @@ def _ensure_connected(f: Callable) -> Callable:
class Client:
def __init__(
self, controller: ActorHandle, controller_name: str, detached: bool = False
self,
controller: ActorHandle,
controller_name: str,
detached: bool = False,
_override_controller_namespace: Optional[str] = None,
):
self._controller: ServeController = controller
self._controller_name = controller_name
self._detached = detached
self._override_controller_namespace = _override_controller_namespace
self._shutdown = False
self._http_config: HTTPOptions = ray.get(controller.get_http_config.remote())
self._root_url = ray.get(controller.get_root_url.remote())
@ -206,7 +224,10 @@ class Client:
started = time.time()
while True:
try:
controller_namespace = _get_controller_namespace(self._detached)
controller_namespace = _get_controller_namespace(
self._detached,
self._override_controller_namespace,
)
ray.get_actor(self._controller_name, namespace=controller_namespace)
if time.time() - started > 5:
logger.warning(
@ -618,6 +639,7 @@ def start(
http_options: Optional[Union[dict, HTTPOptions]] = None,
dedicated_cpu: bool = False,
_checkpoint_path: str = DEFAULT_CHECKPOINT_PATH,
_override_controller_namespace: Optional[str] = None,
**kwargs,
) -> Client:
"""Initialize a serve instance.
@ -670,10 +692,14 @@ def start(
if not ray.is_initialized():
ray.init(namespace="serve")
controller_namespace = _get_controller_namespace(detached)
controller_namespace = _get_controller_namespace(
detached, _override_controller_namespace=_override_controller_namespace
)
try:
client = internal_get_global_client()
client = internal_get_global_client(
_override_controller_namespace=_override_controller_namespace
)
logger.info(
"Connecting to existing Serve instance in namespace "
f"'{controller_namespace}'."
@ -709,6 +735,7 @@ def start(
http_options,
_checkpoint_path,
detached=detached,
_override_controller_namespace=_override_controller_namespace,
)
proxy_handles = ray.get(controller.get_http_proxies.remote())
@ -723,7 +750,12 @@ def start(
"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
)
client = Client(controller, controller_name, detached=detached)
client = Client(
controller,
controller_name,
detached=detached,
_override_controller_namespace=_override_controller_namespace,
)
_set_global_client(client)
logger.info(
f"Started{' detached ' if detached else ' '}Serve instance in "
@ -732,7 +764,7 @@ def start(
return client
def _connect() -> Client:
def _connect(_override_controller_namespace: Optional[str] = None) -> Client:
"""Connect to an existing Serve instance on this Ray cluster.
If calling from the driver program, the Serve instance on this Ray cluster
@ -740,6 +772,11 @@ def _connect() -> Client:
If called from within a replica, this will connect to the same Serve
instance that the replica is running in.
Args:
_override_controller_namespace (Optional[str]): The namespace to use
when looking for the controller. If None, Serve recalculates the
controller's namespace using _get_controller_namespace().
"""
# Initialize ray if needed.
@ -751,7 +788,9 @@ def _connect() -> Client:
# ensure that the correct instance is connected to.
if _INTERNAL_REPLICA_CONTEXT is None:
controller_name = SERVE_CONTROLLER_NAME
controller_namespace = _get_controller_namespace(detached=True)
controller_namespace = _get_controller_namespace(
detached=True, _override_controller_namespace=_override_controller_namespace
)
else:
controller_name = _INTERNAL_REPLICA_CONTEXT._internal_controller_name
controller_namespace = _INTERNAL_REPLICA_CONTEXT._internal_controller_namespace
@ -767,7 +806,12 @@ def _connect() -> Client:
"one."
)
client = Client(controller, controller_name, detached=True)
client = Client(
controller,
controller_name,
detached=True,
_override_controller_namespace=_override_controller_namespace,
)
_set_global_client(client)
return client

View file

@ -67,6 +67,7 @@ class ServeController:
http_config: HTTPOptions,
checkpoint_path: str,
detached: bool = False,
_override_controller_namespace: Optional[str] = None,
):
# Used to read/write checkpoints.
self.controller_namespace = ray.get_runtime_context().namespace
@ -85,7 +86,12 @@ class ServeController:
self.long_poll_host = LongPollHost()
self.http_state = HTTPState(controller_name, detached, http_config)
self.http_state = HTTPState(
controller_name,
detached,
http_config,
_override_controller_namespace=_override_controller_namespace,
)
self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host)
# Fetch all running actors in current cluster as source of current
# replica state for controller failure recovery
@ -96,6 +102,7 @@ class ServeController:
self.kv_store,
self.long_poll_host,
all_current_actor_names,
_override_controller_namespace=_override_controller_namespace,
)
# TODO(simon): move autoscaling related stuff into a manager.

View file

@ -136,12 +136,15 @@ class ActorReplicaWrapper:
controller_name: str,
replica_tag: ReplicaTag,
deployment_name: str,
_override_controller_namespace: Optional[str] = None,
):
self._actor_name = actor_name
self._placement_group_name = self._actor_name + "_placement_group"
self._detached = detached
self._controller_name = controller_name
self._controller_namespace = ray.serve.api._get_controller_namespace(detached)
self._controller_namespace = ray.serve.api._get_controller_namespace(
detached, _override_controller_namespace=_override_controller_namespace
)
self._replica_tag = replica_tag
self._deployment_name = deployment_name
@ -609,6 +612,7 @@ class DeploymentReplica(VersionedReplica):
replica_tag: ReplicaTag,
deployment_name: str,
version: DeploymentVersion,
_override_controller_namespace: Optional[str] = None,
):
self._actor = ActorReplicaWrapper(
f"{ReplicaName.prefix}{format_actor_name(replica_tag)}",
@ -616,6 +620,7 @@ class DeploymentReplica(VersionedReplica):
controller_name,
replica_tag,
deployment_name,
_override_controller_namespace=_override_controller_namespace,
)
self._controller_name = controller_name
self._deployment_name = deployment_name
@ -907,6 +912,7 @@ class DeploymentState:
detached: bool,
long_poll_host: LongPollHost,
_save_checkpoint_func: Callable,
_override_controller_namespace: Optional[str] = None,
):
self._name = name
@ -914,6 +920,9 @@ class DeploymentState:
self._detached: bool = detached
self._long_poll_host: LongPollHost = long_poll_host
self._save_checkpoint_func = _save_checkpoint_func
self._override_controller_namespace: Optional[
str
] = _override_controller_namespace
# Each time we set a new deployment goal, we're trying to save new
# DeploymentInfo and bring current deployment to meet new status.
@ -973,6 +982,7 @@ class DeploymentState:
replica_name.replica_tag,
replica_name.deployment_tag,
None,
_override_controller_namespace=self._override_controller_namespace,
)
new_deployment_replica.recover()
self._replicas.add(ReplicaState.RECOVERING, new_deployment_replica)
@ -1210,6 +1220,7 @@ class DeploymentState:
replica_name.replica_tag,
replica_name.deployment_tag,
self._target_version,
_override_controller_namespace=self._override_controller_namespace,
)
new_deployment_replica.start(self._target_info, self._target_version)
@ -1525,6 +1536,7 @@ class DeploymentStateManager:
kv_store: KVStoreBase,
long_poll_host: LongPollHost,
all_current_actor_names: List[str],
_override_controller_namespace: Optional[str] = None,
):
self._controller_name = controller_name
@ -1537,6 +1549,7 @@ class DeploymentStateManager:
detached,
long_poll_host,
self._save_checkpoint_func,
_override_controller_namespace=_override_controller_namespace,
)
self._deployment_states: Dict[str, DeploymentState] = dict()
self._deleted_deployment_metadata: Dict[str, DeploymentInfo] = OrderedDict()

View file

@ -1,6 +1,6 @@
import asyncio
import random
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional
import ray
from ray.actor import ActorHandle
@ -28,13 +28,17 @@ class HTTPState:
controller_name: str,
detached: bool,
config: HTTPOptions,
_override_controller_namespace: Optional[str] = None,
# Used by unit testing
_start_proxies_on_init: bool = True,
):
self._controller_name = controller_name
self._controller_namespace = ray.serve.api._get_controller_namespace(detached)
self._controller_namespace = ray.serve.api._get_controller_namespace(
detached, _override_controller_namespace=_override_controller_namespace
)
self._detached = detached
self._config = config
self._override_controller_namespace = _override_controller_namespace
self._proxy_actors: Dict[NodeId, ActorHandle] = dict()
# Will populate self.proxy_actors with existing actors.

View file

@ -11,7 +11,6 @@ import ray
from ray import serve
from ray.tests.conftest import tmp_working_dir # noqa: F401, E501
from ray._private.test_utils import wait_for_condition
from ray.dashboard.optional_utils import RAY_INTERNAL_DASHBOARD_NAMESPACE
from ray.serve.scripts import _process_args_and_kwargs, _configure_runtime_env
@ -255,7 +254,7 @@ def test_deploy(ray_start_stop):
# Deploys some valid config files and checks that the deployments work
# Initialize serve in test to enable calling serve.list_deployments()
ray.init(address="auto", namespace=RAY_INTERNAL_DASHBOARD_NAMESPACE)
ray.init(address="auto", namespace="serve")
serve.start(detached=True)
# Create absolute file names to YAML config files

View file

@ -40,6 +40,7 @@ class MockReplicaActorWrapper:
controller_name: str,
replica_tag: ReplicaTag,
deployment_name: str,
_override_controller_namespace: Optional[str] = None,
):
self._actor_name = actor_name
self._replica_tag = replica_tag

View file

@ -1,9 +1,20 @@
import sys
import pytest
import requests
import ray
from ray import serve
from ray.serve.api import internal_get_global_client
@pytest.fixture
def shutdown_ray():
if ray.is_initialized():
ray.shutdown()
yield
if ray.is_initialized():
ray.shutdown()
def test_standalone_actor_outside_serve():
@ -26,5 +37,43 @@ def test_standalone_actor_outside_serve():
ray.shutdown()
@pytest.mark.parametrize("detached", [True, False])
def test_override_namespace(shutdown_ray, detached):
"""Test the _override_controller_namespace flag in serve.start()."""
ray_namespace = "ray_namespace"
controller_namespace = "controller_namespace"
ray.init(namespace=ray_namespace)
serve.start(detached=detached, _override_controller_namespace=controller_namespace)
controller_name = internal_get_global_client()._controller_name
ray.get_actor(controller_name, namespace=controller_namespace)
serve.shutdown()
@pytest.mark.parametrize("detached", [True, False])
def test_deploy_with_overriden_namespace(shutdown_ray, detached):
"""Test deployments with overriden namespace."""
ray_namespace = "ray_namespace"
controller_namespace = "controller_namespace"
ray.init(namespace=ray_namespace)
serve.start(detached=detached, _override_controller_namespace=controller_namespace)
for iteration in range(2):
@serve.deployment
def f(*args):
return f"{iteration}"
f.deploy()
assert requests.get("http://localhost:8000/f").text == f"{iteration}"
serve.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))