[core] Don't override external dashboard URL in internal KV store (#27901)

Fix 2.0.0 release blocker bug where Ray State API and Jobs not accessible if the override URL doesn't support adding additional subpaths. This PR keeps the localhost dashboard URL in the internal KV store and only overrides in values printed or returned to the user.
images.githubusercontent.com/6900234/184809934-8d150874-90fe-4b45-a13d-bce1807047de.png">
This commit is contained in:
Nikita Vemuri 2022-08-16 22:48:05 -07:00 committed by GitHub
parent 4ad1b4c712
commit 4692e8d802
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 158 additions and 107 deletions

View file

@ -18,7 +18,6 @@ from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.utils import async_loop_forever from ray.dashboard.utils import async_loop_forever
from typing import Optional, Set from typing import Optional, Set
from urllib.parse import urlparse
try: try:
from grpc import aio as aiogrpc from grpc import aio as aiogrpc
@ -236,20 +235,9 @@ class DashboardHead:
if not self.minimal: if not self.minimal:
self.http_server = await self._configure_http_server(modules) self.http_server = await self._configure_http_server(modules)
http_host, http_port = self.http_server.get_address() http_host, http_port = self.http_server.get_address()
# Override with external ray dashboard URL if it exists.
dashboard_address = (
os.environ.get(ray_constants.RAY_OVERRIDE_DASHBOARD_URL)
if ray_constants.RAY_OVERRIDE_DASHBOARD_URL in os.environ
else f"{http_host}:{http_port}"
)
# Add http protocol to dashboard URL if it doesn't
# already contain a protocol.
if not urlparse(dashboard_address).scheme:
dashboard_address = "http://" + dashboard_address
internal_kv._internal_kv_put( internal_kv._internal_kv_put(
ray_constants.DASHBOARD_ADDRESS, ray_constants.DASHBOARD_ADDRESS,
dashboard_address, f"{http_host}:{http_port}",
namespace=ray_constants.KV_NAMESPACE_DASHBOARD, namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
) )

View file

@ -0,0 +1 @@
from ray._private.test_utils import set_override_dashboard_url # noqa: F401

View file

@ -9,6 +9,7 @@ import pprint
import pytest import pytest
import requests import requests
import ray
from ray._private.test_utils import ( from ray._private.test_utils import (
format_web_url, format_web_url,
wait_for_condition, wait_for_condition,
@ -34,10 +35,24 @@ def _get_snapshot(address: str):
@pytest.mark.parametrize("address_suffix", ["", "/"]) # Trailing slash should succeed @pytest.mark.parametrize("address_suffix", ["", "/"]) # Trailing slash should succeed
@pytest.mark.parametrize(
"set_override_dashboard_url",
[
None,
"https://external_dashboard_url",
"https://external_dashboard_url/path1/?query_param1=val1&query_param2=val2",
"new_external_dashboard_url",
],
indirect=True,
)
def test_successful_job_status( def test_successful_job_status(
ray_start_with_dashboard, disable_aiohttp_cache, enable_test_module, address_suffix set_override_dashboard_url,
ray_start_with_dashboard,
disable_aiohttp_cache,
enable_test_module,
address_suffix,
): ):
address = ray_start_with_dashboard.address_info["webui_url"] address = ray._private.worker._global_node.webui_url
assert wait_until_server_available(address) assert wait_until_server_available(address)
address = format_web_url(address) address = format_web_url(address)
@ -104,7 +119,7 @@ def test_successful_job_status(
def test_failed_job_status( def test_failed_job_status(
ray_start_with_dashboard, disable_aiohttp_cache, enable_test_module, address_suffix ray_start_with_dashboard, disable_aiohttp_cache, enable_test_module, address_suffix
): ):
address = ray_start_with_dashboard.address_info["webui_url"] address = ray._private.worker._global_node.webui_url
assert wait_until_server_available(address) assert wait_until_server_available(address)
address = format_web_url(address) address = format_web_url(address)

View file

@ -16,7 +16,6 @@ import time
import traceback import traceback
from collections import defaultdict from collections import defaultdict
from typing import Dict, Optional from typing import Dict, Optional
import urllib
from filelock import FileLock from filelock import FileLock
@ -193,14 +192,8 @@ class Node:
# Initialize webui url # Initialize webui url
if head: if head:
self._webui_url = None self._webui_url = None
self._webui_url_with_protocol = None
else: else:
self._webui_url_with_protocol = ( self._webui_url = ray._private.services.get_webui_url_from_internal_kv()
ray._private.services.get_webui_url_from_internal_kv()
)
self._webui_url = self._remove_protocol_from_url(
self._webui_url_with_protocol
)
self._init_temp() self._init_temp()
@ -541,11 +534,6 @@ class Node:
"""Get the cluster's web UI url.""" """Get the cluster's web UI url."""
return self._webui_url return self._webui_url
@property
def webui_url_with_protocol(self):
"""Get the cluster's web UI URl including the URL protocol."""
return self._webui_url_with_protocol
@property @property
def raylet_socket_name(self): def raylet_socket_name(self):
"""Get the node's raylet socket name.""" """Get the node's raylet socket name."""
@ -879,10 +867,7 @@ class Node:
if we fail to start the API server. Otherwise it will print if we fail to start the API server. Otherwise it will print
a warning if we fail to start the API server. a warning if we fail to start the API server.
""" """
( self._webui_url, process_info = ray._private.services.start_api_server(
self._webui_url_with_protocol,
process_info,
) = ray._private.services.start_api_server(
include_dashboard, include_dashboard,
raise_on_failure, raise_on_failure,
self._ray_params.dashboard_host, self._ray_params.dashboard_host,
@ -896,7 +881,6 @@ class Node:
port=self._ray_params.dashboard_port, port=self._ray_params.dashboard_port,
redirect_logging=self.should_redirect_logs(), redirect_logging=self.should_redirect_logs(),
) )
self._webui_url = self._remove_protocol_from_url(self._webui_url_with_protocol)
assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes
if process_info is not None: if process_info is not None:
self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [ self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [
@ -904,7 +888,7 @@ class Node:
] ]
self.get_gcs_client().internal_kv_put( self.get_gcs_client().internal_kv_put(
b"webui:url", b"webui:url",
self._webui_url_with_protocol.encode(), self._webui_url.encode(),
True, True,
ray_constants.KV_NAMESPACE_DASHBOARD, ray_constants.KV_NAMESPACE_DASHBOARD,
) )
@ -1490,19 +1474,6 @@ class Node:
external_storage.setup_external_storage(deserialized_config, self.session_name) external_storage.setup_external_storage(deserialized_config, self.session_name)
external_storage.reset_external_storage() external_storage.reset_external_storage()
def _remove_protocol_from_url(self, url: Optional[str]) -> str:
"""
Helper function to remove protocol from URL if it exists.
"""
if not url:
return url
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme:
# Construct URL without protocol
scheme = "%s://" % parsed_url.scheme
return parsed_url.geturl().replace(scheme, "", 1)
return url
def _record_stats(self): def _record_stats(self):
# Initialize the internal kv so that the metrics can be put # Initialize the internal kv so that the metrics can be put
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

View file

@ -126,7 +126,9 @@ MIN_RESOURCE_GRANULARITY = 0.0001
# Set this environment variable to populate the dashboard URL with # Set this environment variable to populate the dashboard URL with
# an external hosted Ray dashboard URL (e.g. because the # an external hosted Ray dashboard URL (e.g. because the
# dashboard is behind a proxy or load balancer). # dashboard is behind a proxy or load balancer). This only overrides
# the dashboard URL when returning or printing to a user through a public
# API, but not in the internal KV store.
RAY_OVERRIDE_DASHBOARD_URL = "RAY_OVERRIDE_DASHBOARD_URL" RAY_OVERRIDE_DASHBOARD_URL = "RAY_OVERRIDE_DASHBOARD_URL"

View file

@ -20,12 +20,14 @@ from typing import Any, Dict, List, Optional
import grpc import grpc
import numpy as np import numpy as np
import psutil # We must import psutil after ray because we bundle it with ray. import psutil # We must import psutil after ray because we bundle it with ray.
import pytest
import yaml import yaml
from grpc._channel import _InactiveRpcError from grpc._channel import _InactiveRpcError
import ray import ray
import ray._private.gcs_utils as gcs_utils import ray._private.gcs_utils as gcs_utils
import ray._private.memory_monitor as memory_monitor import ray._private.memory_monitor as memory_monitor
import ray._private.ray_constants as ray_constants
import ray._private.services import ray._private.services
import ray._private.utils import ray._private.utils
from ray._private.gcs_pubsub import GcsErrorSubscriber, GcsLogSubscriber from ray._private.gcs_pubsub import GcsErrorSubscriber, GcsLogSubscriber
@ -46,6 +48,18 @@ except (ImportError, ModuleNotFoundError):
raise ModuleNotFoundError("`prometheus_client` not found") raise ModuleNotFoundError("`prometheus_client` not found")
@pytest.fixture
def set_override_dashboard_url(monkeypatch, request):
override_url = getattr(request, "param", "https://external_dashboard_url")
with monkeypatch.context() as m:
if override_url:
m.setenv(
ray_constants.RAY_OVERRIDE_DASHBOARD_URL,
override_url,
)
yield
class RayTestTimeoutException(Exception): class RayTestTimeoutException(Exception):
"""Exception used to identify timeouts from test utilities.""" """Exception used to identify timeouts from test utilities."""

View file

@ -11,6 +11,7 @@ import sys
import threading import threading
import time import time
import traceback import traceback
import urllib
import warnings import warnings
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from collections.abc import Mapping from collections.abc import Mapping
@ -30,6 +31,7 @@ from typing import (
Union, Union,
overload, overload,
) )
from urllib.parse import urlparse
import colorama import colorama
import setproctitle import setproctitle
@ -889,9 +891,28 @@ def get_dashboard_url():
Returns: Returns:
The URL of the dashboard as a string. The URL of the dashboard as a string.
""" """
worker = global_worker if ray_constants.RAY_OVERRIDE_DASHBOARD_URL in os.environ:
worker.check_connected() return _remove_protocol_from_url(
return _global_node.webui_url os.environ.get(ray_constants.RAY_OVERRIDE_DASHBOARD_URL)
)
else:
worker = global_worker
worker.check_connected()
return _global_node.webui_url
def _remove_protocol_from_url(url: Optional[str]) -> str:
"""
Helper function to remove protocol from URL if it exists.
"""
if not url:
return url
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme:
# Construct URL without protocol
scheme = f"{parsed_url.scheme}://"
return parsed_url.geturl().replace(scheme, "", 1)
return url
class BaseContext(metaclass=ABCMeta): class BaseContext(metaclass=ABCMeta):
@ -1471,7 +1492,15 @@ def init(
# Log a message to find the Ray address that we connected to and the # Log a message to find the Ray address that we connected to and the
# dashboard URL. # dashboard URL.
dashboard_url = _global_node.webui_url_with_protocol if ray_constants.RAY_OVERRIDE_DASHBOARD_URL in os.environ:
dashboard_url = os.environ.get(ray_constants.RAY_OVERRIDE_DASHBOARD_URL)
else:
dashboard_url = _global_node.webui_url
# Add http protocol to dashboard URL if it doesn't
# already contain a protocol.
if dashboard_url and not urlparse(dashboard_url).scheme:
dashboard_url = "http://" + dashboard_url
# We logged the address before attempting the connection, so we don't need # We logged the address before attempting the connection, so we don't need
# to log it again. # to log it again.
info_str = "Connected to Ray cluster." info_str = "Connected to Ray cluster."
@ -1479,7 +1508,7 @@ def init(
info_str = "Started a local Ray instance." info_str = "Started a local Ray instance."
if dashboard_url: if dashboard_url:
logger.info( logger.info(
info_str + " View the dashboard at %s%s%s%s%s.", info_str + " View the dashboard at %s%s%s %s%s",
colorama.Style.BRIGHT, colorama.Style.BRIGHT,
colorama.Fore.GREEN, colorama.Fore.GREEN,
dashboard_url, dashboard_url,
@ -1517,7 +1546,9 @@ def init(
hook() hook()
node_id = global_worker.core_worker.get_current_node_id() node_id = global_worker.core_worker.get_current_node_id()
return RayContext(dict(_global_node.address_info, node_id=node_id.hex())) global_node_address_info = _global_node.address_info.copy()
global_node_address_info["webui_url"] = _remove_protocol_from_url(dashboard_url)
return RayContext(dict(global_node_address_info, node_id=node_id.hex()))
# Functions to run as callback after a successful ray init. # Functions to run as callback after a successful ray init.

View file

@ -3,7 +3,6 @@ from abc import ABC
from dataclasses import dataclass, field, fields from dataclasses import dataclass, field, fields
from enum import Enum, unique from enum import Enum, unique
from typing import Dict, List, Optional, Set, Tuple, Union from typing import Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
import ray import ray
import ray._private.ray_constants as ray_constants import ray._private.ray_constants as ray_constants
@ -937,9 +936,5 @@ def ray_address_to_api_server_url(address: Optional[str]) -> str:
"still alive." "still alive."
) )
) )
api_server_url = f"http://{api_server_url.decode()}"
# Add http protocol to API server URL if it doesn't return api_server_url
# already contain a protocol.
if not urlparse(api_server_url).scheme:
api_server_url = "http://" + api_server_url
return api_server_url.decode()

View file

@ -27,10 +27,11 @@ from ray._private.services import (
REDIS_EXECUTABLE, REDIS_EXECUTABLE,
_start_redis_instance, _start_redis_instance,
) )
from ray._private.test_utils import ( from ray._private.test_utils import ( # noqa: F401
get_and_run_node_killer, get_and_run_node_killer,
init_error_pubsub, init_error_pubsub,
init_log_pubsub, init_log_pubsub,
set_override_dashboard_url,
setup_tls, setup_tls,
teardown_tls, teardown_tls,
enable_external_redis, enable_external_redis,

View file

@ -13,18 +13,6 @@ from ray._private.test_utils import run_string_as_driver
from ray.util.client.ray_client_helpers import ray_start_client_server from ray.util.client.ray_client_helpers import ray_start_client_server
@pytest.fixture
def set_override_dashboard_url(request):
orig_external_dashboard_url = os.environ.get(RAY_OVERRIDE_DASHBOARD_URL)
override_url = getattr(request, "param", "https://external_dashboard_url")
os.environ[RAY_OVERRIDE_DASHBOARD_URL] = override_url
yield
if orig_external_dashboard_url:
os.environ[RAY_OVERRIDE_DASHBOARD_URL] = orig_external_dashboard_url
def test_ray_init_context(shutdown_only): def test_ray_init_context(shutdown_only):
ctx = ray.init() ctx = ray.init()
assert ray.is_initialized() assert ray.is_initialized()
@ -115,38 +103,35 @@ def test_env_var_no_override():
"new_external_dashboard_url", "new_external_dashboard_url",
], ],
) )
def test_hosted_external_dashboard_url(override_url, shutdown_only): def test_hosted_external_dashboard_url(override_url, shutdown_only, monkeypatch):
""" """
Test setting external dashboard URL through environment variable. Test setting external dashboard URL through environment variable.
""" """
orig_external_dashboard_url = os.environ.get(RAY_OVERRIDE_DASHBOARD_URL) with monkeypatch.context() as m:
if override_url: if override_url:
os.environ[RAY_OVERRIDE_DASHBOARD_URL] = override_url m.setenv(
RAY_OVERRIDE_DASHBOARD_URL,
override_url,
)
if not override_url: expected_localhost_url = "127.0.0.1:8265"
# No external dashboard url if not override_url:
expected_webui_url_with_protocol = "http://127.0.0.1:8265" # No external dashboard url
expected_dashboard_url = "127.0.0.1:8265" expected_dashboard_url = "127.0.0.1:8265"
elif "://" in override_url: elif "://" in override_url:
# External dashboard url with https protocol included # External dashboard url with https protocol included
expected_webui_url_with_protocol = override_url expected_dashboard_url = override_url[override_url.index("://") + 3 :]
expected_dashboard_url = override_url[override_url.index("://") + 3 :] else:
else: # External dashboard url with no protocol
# External dashboard url with no protocol -- should default to http expected_dashboard_url = override_url
expected_webui_url_with_protocol = "http://" + override_url
expected_dashboard_url = override_url
info = ray.init(dashboard_port=8265) info = ray.init(dashboard_port=8265)
assert info.dashboard_url == expected_dashboard_url assert info.dashboard_url == expected_dashboard_url
assert info.address_info["webui_url"] == expected_dashboard_url assert info.address_info["webui_url"] == expected_dashboard_url
assert ( assert ray._private.worker._global_node.webui_url == expected_localhost_url
ray._private.worker._global_node.webui_url_with_protocol assert (
== expected_webui_url_with_protocol ray_address_to_api_server_url("auto") == "http://" + expected_localhost_url
) )
assert ray_address_to_api_server_url("auto") == expected_webui_url_with_protocol
if orig_external_dashboard_url:
os.environ[RAY_OVERRIDE_DASHBOARD_URL] = orig_external_dashboard_url
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -180,11 +165,8 @@ def test_hosted_external_dashboard_url_with_connecting_to_existing_cluster(
info = ray.init() info = ray.init()
assert info.dashboard_url == "external_dashboard_url" assert info.dashboard_url == "external_dashboard_url"
assert info.address_info["webui_url"] == "external_dashboard_url" assert info.address_info["webui_url"] == "external_dashboard_url"
assert ( assert ray._private.worker._global_node.webui_url == "127.0.0.1:8265"
ray._private.worker._global_node.webui_url_with_protocol assert ray_address_to_api_server_url("auto") == "http://" + "127.0.0.1:8265"
== "https://external_dashboard_url"
)
assert ray_address_to_api_server_url("auto") == "https://external_dashboard_url"
def test_shutdown_and_reset_global_worker(shutdown_only): def test_shutdown_and_reset_global_worker(shutdown_only):

View file

@ -1600,6 +1600,57 @@ def test_list_get_actors(shutdown_only):
print(list_actors()) print(list_actors())
@pytest.mark.skipif(
sys.platform == "win32",
reason="Failed on Windows",
)
@pytest.mark.parametrize(
"override_url",
[
"https://external_dashboard_url",
"https://external_dashboard_url/path1/?query_param1=val1&query_param2=val2",
"new_external_dashboard_url",
],
)
def test_state_api_with_external_dashboard_override(
shutdown_only, override_url, monkeypatch
):
with monkeypatch.context() as m:
if override_url:
m.setenv(
ray_constants.RAY_OVERRIDE_DASHBOARD_URL,
override_url,
)
ray.init()
@ray.remote
class A:
pass
a = A.remote() # noqa
def verify():
# Test list
actors = list_actors()
assert len(actors) == 1
assert actors[0]["state"] == "ALIVE"
assert is_hex(actors[0]["actor_id"])
assert a._actor_id.hex() == actors[0]["actor_id"]
# Test get
actors = list_actors(detail=True)
for actor in actors:
get_actor_data = get_actor(actor["actor_id"])
assert get_actor_data is not None
assert get_actor_data == actor
return True
wait_for_condition(verify)
print(list_actors())
@pytest.mark.skipif( @pytest.mark.skipif(
sys.platform == "win32", sys.platform == "win32",
reason="Failed on Windows", reason="Failed on Windows",