Revert "Revert "[Dashboard][Serve] Move Serve related endpoints to dashboard agent"" (#26336)

This commit is contained in:
brucez-anyscale 2022-07-06 19:37:30 -07:00 committed by GitHub
parent b803792b58
commit f76d7b23f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 257 additions and 59 deletions

View file

@ -63,7 +63,7 @@ class DashboardAgent:
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
listen_port=0,
listen_port=ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
object_store_name=None,
raylet_name=None,
logging_params=None,
@ -329,7 +329,7 @@ if __name__ == "__main__":
"--listen-port",
required=False,
type=int,
default=0,
default=ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
help="Port for HTTP server to listen on",
)
parser.add_argument(

View file

@ -53,12 +53,20 @@ class HttpServerAgent:
self.runner = aiohttp.web.AppRunner(app)
await self.runner.setup()
site = aiohttp.web.TCPSite(
self.runner,
"127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0",
self.listen_port,
)
await site.start()
try:
site = aiohttp.web.TCPSite(
self.runner,
"127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0",
self.listen_port,
)
await site.start()
except OSError as e:
logger.error(
f"Agent port #{self.listen_port} already in use. "
"Failed to start agent. "
f"Ensure port #{self.listen_port} is available, and then try again."
)
raise e
self.http_host, self.http_port, *_ = site._server.sockets[0].getsockname()
logger.info(
"Dashboard agent http address: %s:%s", self.http_host, self.http_port

View file

@ -205,6 +205,14 @@ class SubmissionClient:
def _check_connection_and_version(
self, min_version: str = "1.9", version_error_message: str = None
):
self._check_connection_and_version_with_url(min_version, version_error_message)
def _check_connection_and_version_with_url(
self,
min_version: str = "1.9",
version_error_message: str = None,
url: str = "/api/version",
):
if version_error_message is None:
version_error_message = (
@ -212,7 +220,7 @@ class SubmissionClient:
)
try:
r = self._do_request("GET", "/api/version")
r = self._do_request("GET", url)
if r.status_code == 404:
raise RuntimeError(version_error_message)
r.raise_for_status()

View file

@ -19,9 +19,6 @@ from ray.experimental.internal_kv import (
JOB_ID_METADATA_KEY = "job_submission_id"
JOB_NAME_METADATA_KEY = "job_name"
# Version 0 -> 1: Added log streaming and changed behavior of job logs cli.
CURRENT_VERSION = "1"
class JobStatus(str, Enum):
"""An enumeration for describing the status of a job."""
@ -179,13 +176,6 @@ def validate_request_type(json_data: Dict[str, Any], request_type: dataclass) ->
return request_type(**json_data)
@dataclass
class VersionResponse:
version: str
ray_version: str
ray_commit: str
@dataclass
class JobSubmitRequest:
# Command to start execution, ex: "python script.py"

View file

@ -16,16 +16,18 @@ from ray._private.runtime_env.packaging import (
pin_runtime_env_uri,
)
from ray.dashboard.modules.job.common import (
CURRENT_VERSION,
http_uri_components_to_uri,
JobInfo,
JobSubmitRequest,
JobSubmitResponse,
JobStopResponse,
JobLogsResponse,
VersionResponse,
validate_request_type,
)
from ray.dashboard.modules.version import (
CURRENT_VERSION,
VersionResponse,
)
from ray.dashboard.modules.job.job_manager import JobManager
logger = logging.getLogger(__name__)

View file

@ -19,7 +19,8 @@ from ray._private.test_utils import (
wait_until_server_available,
)
from ray.dashboard.modules.dashboard_sdk import ClusterInfo, parse_cluster_info
from ray.dashboard.modules.job.common import CURRENT_VERSION, JobInfo
from ray.dashboard.modules.job.common import JobInfo
from ray.dashboard.modules.version import CURRENT_VERSION
from ray.dashboard.tests.conftest import * # noqa
from ray.job_submission import JobStatus, JobSubmissionClient
from ray.tests.conftest import _ray_start

View file

@ -19,7 +19,7 @@ DELETE_PATH = "/api/serve/deployments/"
class ServeSubmissionClient(SubmissionClient):
def __init__(
self,
dashboard_address: str,
dashboard_agent_address: str,
create_cluster_if_needed=False,
cookies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
@ -31,17 +31,18 @@ class ServeSubmissionClient(SubmissionClient):
"installation: `pip install 'ray[default']``"
)
super().__init__(
address=dashboard_address,
address=dashboard_agent_address,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
headers=headers,
)
self._check_connection_and_version(
self._check_connection_and_version_with_url(
min_version="1.12",
version_error_message="Serve CLI is not supported on the Ray "
"cluster. Please ensure the cluster is "
"running Ray 1.12 or higher.",
url="/api/ray/version",
)
def deploy_application(self, config: Dict) -> None:

View file

@ -3,8 +3,15 @@ import logging
from aiohttp.web import Request, Response
import dataclasses
import ray
import aiohttp.web
import ray.dashboard.optional_utils as optional_utils
import ray.dashboard.utils as dashboard_utils
from ray.dashboard.modules.version import (
CURRENT_VERSION,
VersionResponse,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -13,11 +20,28 @@ routes = optional_utils.ClassMethodRouteTable
# NOTE (shrekris-anyscale): This class uses delayed imports for all
# Ray Serve-related modules. That way, users can use the Ray dashboard for
# Ray Serve-related modules. That way, users can use the Ray dashboard agent for
# non-Serve purposes without downloading Serve dependencies.
class ServeHead(dashboard_utils.DashboardHeadModule):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
class ServeAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
# TODO: It's better to use `/api/version`.
# It requires a refactor of ClassMethodRouteTable to differentiate the server.
@routes.get("/api/ray/version")
async def get_version(self, req: Request) -> Response:
# NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
# client when we have backwards-incompatible changes.
resp = VersionResponse(
version=CURRENT_VERSION,
ray_version=ray.__version__,
ray_commit=ray.__commit__,
)
return Response(
text=json.dumps(dataclasses.asdict(resp)),
content_type="application/json",
status=aiohttp.web.HTTPOk.status_code,
)
@routes.get("/api/serve/deployments/")
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=True)

View file

@ -9,15 +9,21 @@ import requests
import ray
from ray import serve
from ray._private.test_utils import wait_for_condition
import ray._private.ray_constants as ray_constants
GET_OR_PUT_URL = "http://localhost:8265/api/serve/deployments/"
STATUS_URL = "http://localhost:8265/api/serve/deployments/status"
GET_OR_PUT_URL = "http://localhost:52365/api/serve/deployments/"
STATUS_URL = "http://localhost:52365/api/serve/deployments/status"
@pytest.fixture
def ray_start_stop():
subprocess.check_output(["ray", "stop", "--force"])
subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(
lambda: requests.get("http://localhost:52365/api/ray/version").status_code
== 200,
timeout=15,
)
yield
subprocess.check_output(["ray", "stop", "--force"])
@ -248,5 +254,13 @@ def test_serve_namespace(ray_start_stop):
serve.shutdown()
def test_default_dashboard_agent_listen_port():
"""
Defaults in the code and the documentation assume
the dashboard agent listens to HTTP on port 52365.
"""
assert ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT == 52365
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,11 @@
from dataclasses import dataclass
# Version 0 -> 1: Added log streaming and changed behavior of job logs cli.
CURRENT_VERSION = "1"
@dataclass
class VersionResponse:
version: str
ray_version: str
ray_commit: str

View file

@ -259,10 +259,13 @@ def init_ray_and_catch_exceptions(connect_to_serve: bool = False) -> Callable:
try:
if not ray.is_initialized():
try:
address = self._dashboard_head.gcs_address
address = self.get_gcs_address()
logger.info(f"Connecting to ray with address={address}")
# Init ray without logging to driver
# to avoid infinite logging issue.
ray.init(
address=address,
log_to_driver=False,
namespace=RAY_INTERNAL_DASHBOARD_NAMESPACE,
)
except Exception as e:

View file

@ -12,6 +12,7 @@ import time
import numpy as np
import pytest
import requests
import socket
import ray
import ray.dashboard.consts as dashboard_consts
@ -817,6 +818,7 @@ def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
)
def test_dashboard_does_not_depend_on_serve():
"""Check that the dashboard can start without Serve."""
ray.shutdown()
with pytest.raises(ImportError):
from ray import serve # noqa: F401
@ -826,13 +828,117 @@ def test_dashboard_does_not_depend_on_serve():
# Ensure standard dashboard features, like snapshot, still work
response = requests.get(f"http://{ctx.dashboard_url}/api/snapshot")
assert response.status_code == 200
assert response.json()["result"] is True
assert "snapshot" in response.json()["data"]
agent_url = (
ctx.address_info["node_ip_address"]
+ ":"
+ str(ctx.address_info["dashboard_agent_listen_port"])
)
# Check that Serve-dependent features fail
response = requests.get(f"http://{ctx.dashboard_url}/api/serve/deployments/")
response = requests.get(f"http://{agent_url}/api/serve/deployments/")
assert response.status_code == 500
assert "ModuleNotFoundError" in response.text
@pytest.mark.skipif(
os.environ.get("RAY_DEFAULT") != "1",
reason="This test only works for default installation.",
)
def test_agent_does_not_depend_on_serve(shutdown_only):
"""Check that the dashboard agent can start without Serve."""
ray.shutdown()
with pytest.raises(ImportError):
from ray import serve # noqa: F401
ray.init(include_dashboard=True)
node = ray._private.worker._global_node
all_processes = node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
wait_for_condition(lambda: search_agent(raylet_proc.children()))
agent_proc = search_agent(raylet_proc.children())
agent_pid = agent_proc.pid
check_agent_register(raylet_proc, agent_pid)
logger.info("Agent works.")
agent_url = node.node_ip_address + ":" + str(node.dashboard_agent_listen_port)
# Check that Serve-dependent features fail
response = requests.get(f"http://{agent_url}/api/serve/deployments/")
assert response.status_code == 500
# The agent should be dead if raylet exits.
raylet_proc.kill()
raylet_proc.wait()
agent_proc.wait(5)
@pytest.mark.skipif(
os.environ.get("RAY_MINIMAL") == "1" or os.environ.get("RAY_DEFAULT") == "1",
reason="This test is not supposed to work for minimal or default installation.",
)
def test_agent_port_conflict():
ray.shutdown()
# start ray and test agent works.
ray.init(include_dashboard=True)
node = ray._private.worker._global_node
agent_url = node.node_ip_address + ":" + str(node.dashboard_agent_listen_port)
wait_for_condition(
lambda: requests.get(f"http://{agent_url}/api/serve/deployments/").status_code
== 200
)
ray.shutdown()
# ocuppy the port with a socket.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
wait_for_condition(
lambda: s.connect_ex(
("localhost", ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT)
)
!= 0
)
# start ray and the agent http server should fail
# to start due to port conflict, but the agent still starts.
ray.init(include_dashboard=True)
node = ray._private.worker._global_node
all_processes = node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
wait_for_condition(lambda: search_agent(raylet_proc.children()))
agent_proc = search_agent(raylet_proc.children())
agent_pid = agent_proc.pid
check_agent_register(raylet_proc, agent_pid)
# Release the port from socket.
s.close()
agent_url = node.node_ip_address + ":" + str(node.dashboard_agent_listen_port)
# Check that Serve-dependent features fail.
try:
wait_for_condition(
lambda: requests.get(
f"http://{agent_url}/api/serve/deployments/"
).status_code
== 200
)
assert False
except Exception as e:
assert e is not None
@pytest.mark.skipif(

View file

@ -55,6 +55,9 @@ class DashboardAgentModule(abc.ABC):
dependencies.
"""
def get_gcs_address(self):
return self._dashboard_agent.gcs_address
class DashboardHeadModule(abc.ABC):
def __init__(self, dashboard_head):
@ -81,6 +84,9 @@ class DashboardHeadModule(abc.ABC):
dependencies.
"""
def get_gcs_address(self):
return self._dashboard_head.gcs_address
def dashboard_module(enable):
"""A decorator for dashboard module."""

View file

@ -306,44 +306,49 @@ As a side note, you could also package your deployment graph into a standalone P
### Using a Remote Cluster
By default, `serve deploy` deploys to a cluster running locally. However, you should also use `serve deploy` whenever you want to deploy your Serve application to a remote cluster. `serve deploy` takes in an optional `--address/-a` argument where you can specify the dashboard address of your remote Ray cluster. This address should be of the form:
By default, `serve deploy` deploys to a cluster running locally. However, you should also use `serve deploy` whenever you want to deploy your Serve application to a remote cluster. `serve deploy` takes in an optional `--address/-a` argument where you can specify your remote Ray cluster's dashboard agent address. This address should be of the form:
```
[YOUR_RAY_CLUSTER_URI]:[DASHBOARD PORT]
[YOUR_RAY_CLUSTER_URI]:[DASHBOARD AGENT PORT]
```
As an example, the address for the local cluster started by `ray start --head` is `http://127.0.0.1:8265`. We can explicitly deploy to this address using the command
As an example, the address for the local cluster started by `ray start --head` is `http://127.0.0.1:52365`. We can explicitly deploy to this address using the command
```console
$ serve deploy config_file.yaml -a http://127.0.0.1:8265
$ serve deploy config_file.yaml -a http://127.0.0.1:52365
```
The Ray dashboard's default port is 8265. This port may be different if:
* You explicitly set it using the `--dashboard-port` argument when running `ray start`.
* Port 8265 was unavailable when Ray started. In that case, the dashboard port is incremented until an available port is found. E.g. if 8265 is unavailable, the port becomes 8266. If that's unavailable, it becomes 8267, and so on.
The Ray dashboard agent's default port is 52365. You can set it to a different value using the `--dashboard-agent-listen-port` argument when running `ray start`."
:::{note}
If the port 52365 (or whichever port you specify with `--dashboard-agent-listen-port`) is unavailable when Ray starts, the dashboard agents HTTP server will fail. However, the dashboard agent and Ray will continue to run.
You can check if an agents HTTP server is running by sending a curl request: `curl http://{node_ip}:{dashboard_agent_port}/api/serve/deployments/`. If the request succeeds, the server is running on that node. If the request fails, the server is not running on that node. To launch the server on that node, terminate the process occupying the dashboard agents port, and restart Ray on that node.
:::
:::{tip}
By default, all the Serve CLI commands assume that you're working with a local cluster, so if you don't specify an `--address/-a` value, they use the Ray address associated with a local cluster started by `ray start --head`. However, if the `RAY_ADDRESS` environment variable is set, all Serve CLI commands will default to that value instead (unless you also specify an `--address/-a` value).
By default, all the Serve CLI commands assume that you're working with a local cluster. All Serve CLI commands, except `serve start` and `serve run` use the Ray agent address associated with a local cluster started by `ray start --head`. However, if the `RAY_AGENT_ADDRESS` environment variable is set, these Serve CLI commands will default to that value instead.
You can check this variable's value by running:
Similarly, `serve start` and `serve run`, use the Ray head node address associated with a local cluster by default. If the `RAY_ADDRESS` environment variable is set, they will use that value instead.
You can check `RAY_AGENT_ADDRESS`'s value by running:
```console
$ echo $RAY_ADDRESS
$ echo $RAY_AGENT_ADDRESS
```
You can set this variable by running the CLI command:
```console
$ export RAY_ADDRESS=[YOUR VALUE]
$ export RAY_AGENT_ADDRESS=[YOUR VALUE]
```
You can unset this variable by running the CLI command:
```console
$ unset RAY_ADDRESS
$ unset RAY_AGENT_ADDRESS
```
Check for this variable in your environment to make sure you're using your desired Ray address.
Check for this variable in your environment to make sure you're using your desired Ray agent address.
:::
(serve-in-production-inspecting)=

View file

@ -152,6 +152,8 @@ class Node:
self._ray_params = ray_params
self._config = ray_params._system_config or {}
self._dashboard_agent_listen_port = ray_params.dashboard_agent_listen_port
# Configure log rotation parameters.
self.max_bytes = int(
os.getenv("RAY_ROTATION_MAX_BYTES", ray_constants.LOGGING_ROTATE_BYTES)
@ -551,6 +553,11 @@ class Node:
"""Get the port that exposes metrics"""
return self._metrics_export_port
@property
def dashboard_agent_listen_port(self):
"""Get the dashboard agent's listen port"""
return self._dashboard_agent_listen_port
@property
def logging_config(self):
"""Get the logging config of the current node."""
@ -573,6 +580,7 @@ class Node:
"metrics_export_port": self._metrics_export_port,
"gcs_address": self.gcs_address,
"address": self.address,
"dashboard_agent_listen_port": self.dashboard_agent_listen_port,
}
def is_head(self):

View file

@ -87,6 +87,7 @@ class RayParams:
Defaults to 8265.
dashboard_agent_listen_port: The port for dashboard agents to listen on
for HTTP requests.
Defaults to 52365.
plasma_store_socket_name: If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name: If provided, it will specify the socket path
@ -156,7 +157,9 @@ class RayParams:
include_dashboard: Optional[bool] = None,
dashboard_host: Optional[str] = ray_constants.DEFAULT_DASHBOARD_IP,
dashboard_port: Optional[bool] = ray_constants.DEFAULT_DASHBOARD_PORT,
dashboard_agent_listen_port: Optional[int] = 0,
dashboard_agent_listen_port: Optional[
int
] = ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
plasma_store_socket_name: Optional[str] = None,
raylet_socket_name: Optional[str] = None,
temp_dir: Optional[str] = None,

View file

@ -89,6 +89,7 @@ DEFAULT_DASHBOARD_IP = "127.0.0.1"
DEFAULT_DASHBOARD_PORT = 8265
DASHBOARD_ADDRESS = "dashboard"
PROMETHEUS_SERVICE_DISCOVERY_FILE = "prom_metrics_service_discovery.json"
DEFAULT_DASHBOARD_AGENT_LISTEN_PORT = 52365
# Default resource requirements for actors when no resource requirements are
# specified.
DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1

View file

@ -423,7 +423,7 @@ def debug(address):
@click.option(
"--dashboard-agent-listen-port",
type=int,
default=0,
default=ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
help="the port for dashboard agents to listen for http on.",
)
@click.option(

View file

@ -37,9 +37,9 @@ RAY_INIT_ADDRESS_HELP_STR = (
"using the RAY_ADDRESS environment variable."
)
RAY_DASHBOARD_ADDRESS_HELP_STR = (
"Address to use to query the Ray dashboard (defaults to "
"http://localhost:8265). Can also be specified using the "
"RAY_ADDRESS environment variable."
"Address to use to query the Ray dashboard agent (defaults to "
"http://localhost:52365). Can also be specified using the "
"RAY_AGENT_ADDRESS environment variable."
)
@ -122,7 +122,7 @@ def start(
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
default=os.environ.get("RAY_AGENT_ADDRESS", "http://localhost:52365"),
required=False,
type=str,
help=RAY_DASHBOARD_ADDRESS_HELP_STR,
@ -280,7 +280,7 @@ def run(
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
default=os.environ.get("RAY_AGENT_ADDRESS", "http://localhost:52365"),
required=False,
type=str,
help=RAY_DASHBOARD_ADDRESS_HELP_STR,
@ -307,7 +307,7 @@ def config(address: str):
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
default=os.environ.get("RAY_AGENT_ADDRESS", "http://localhost:52365"),
required=False,
type=str,
help=RAY_DASHBOARD_ADDRESS_HELP_STR,
@ -324,7 +324,7 @@ def status(address: str):
@click.option(
"--address",
"-a",
default=os.environ.get("RAY_ADDRESS", "http://localhost:8265"),
default=os.environ.get("RAY_AGENT_ADDRESS", "http://localhost:52365"),
required=False,
type=str,
help=RAY_DASHBOARD_ADDRESS_HELP_STR,

View file

@ -46,7 +46,13 @@ def assert_deployments_live(names: List[str]):
@pytest.fixture
def ray_start_stop():
subprocess.check_output(["ray", "stop", "--force"])
subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(
lambda: requests.get("http://localhost:52365/api/ray/version").status_code
== 200,
timeout=15,
)
yield
subprocess.check_output(["ray", "stop", "--force"])
@ -59,7 +65,7 @@ def test_start_shutdown(ray_start_stop):
@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_deploy(ray_start_stop):
"""Deploys some valid config files and checks that the deployments work."""
ray.shutdown()
# Initialize serve in test to enable calling serve.list_deployments()
ray.init(address="auto", namespace=SERVE_NAMESPACE)
@ -107,7 +113,7 @@ def test_deploy(ray_start_stop):
print("Deploying arithmetic config.")
deploy_response = subprocess.check_output(
["serve", "deploy", arithmetic_file_name, "-a", "http://localhost:8265/"]
["serve", "deploy", arithmetic_file_name, "-a", "http://localhost:52365/"]
)
assert success_message_fragment in deploy_response
print("Deploy request sent successfully.")
@ -170,7 +176,7 @@ def test_status(ray_start_stop):
wait_for_condition(lambda: num_live_deployments() == 5, timeout=15)
status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:8265/"]
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
@ -420,6 +426,7 @@ def test_build(ray_start_stop, node):
@pytest.mark.parametrize("use_command", [True, False])
def test_idempotence_after_controller_death(ray_start_stop, use_command: bool):
"""Check that CLI is idempotent even if controller dies."""
ray.shutdown()
config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "basic_graph.yaml"