ray/dashboard/modules/serve/tests/test_serve_head.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

248 lines
8.3 KiB
Python
Raw Normal View History

import copy
import subprocess
import sys
from typing import Dict
import pytest
import requests
import ray
from ray import serve
from ray._private.test_utils import wait_for_condition
GET_OR_PUT_URL = "http://localhost:8265/api/serve/deployments/"
STATUS_URL = "http://localhost:8265/api/serve/deployments/status"
@pytest.fixture
def ray_start_stop():
subprocess.check_output(["ray", "start", "--head"])
yield
subprocess.check_output(["ray", "stop", "--force"])
def deploy_and_check_config(config: Dict):
put_response = requests.put(GET_OR_PUT_URL, json=config, timeout=30)
assert put_response.status_code == 200
print("PUT request sent successfully.")
# Config should be immediately retrievable
get_response = requests.get(GET_OR_PUT_URL, timeout=15)
assert get_response.status_code == 200
assert get_response.json() == config
print("GET request returned correct config.")
def test_put_get(ray_start_stop):
config1 = {
"import_path": (
"ray.serve.tests.test_config_files.test_dag.conditional_dag.serve_dag"
),
"runtime_env": {},
"deployments": [
{
"name": "Multiplier",
"user_config": {"factor": 1},
},
{
"name": "Adder",
"ray_actor_options": {
"runtime_env": {"env_vars": {"override_increment": "1"}}
},
},
],
}
# Use empty dictionary for Adder's ray_actor_options.
config2 = copy.deepcopy(config1)
config2["deployments"][1]["ray_actor_options"] = {}
config3 = {
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
}
# Ensure the REST API is idempotent
num_iterations = 2
for iteration in range(1, num_iterations + 1):
print(f"*** Starting Iteration {iteration}/{num_iterations} ***\n")
print("Sending PUT request for config1.")
deploy_and_check_config(config1)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "3 pizzas please!",
timeout=15,
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["MUL", 2]).json()
== "-4 pizzas please!",
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")
print("Sending PUT request for config2.")
deploy_and_check_config(config2)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 2]).json()
== "4 pizzas please!",
timeout=15,
)
print("Adder deployment updated correctly.\n")
print("Sending PUT request for config3.")
deploy_and_check_config(config3)
wait_for_condition(
lambda: requests.post("http://localhost:8000/").text == "wonderful world",
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")
def test_delete(ray_start_stop):
config = {
"import_path": "dir.subdir.a.add_and_sub.serve_dag",
"runtime_env": {
"working_dir": (
"https://github.com/ray-project/test_dag/archive/"
"76a741f6de31df78411b1f302071cde46f098418.zip"
)
},
"deployments": [
{
"name": "Subtract",
"ray_actor_options": {
"runtime_env": {
"py_modules": [
(
"https://github.com/ray-project/test_module/archive/"
"aa6f366f7daa78c98408c27d917a983caa9f888b.zip"
)
]
}
},
}
],
}
# Ensure the REST API is idempotent
num_iterations = 2
for iteration in range(1, num_iterations + 1):
print(f"*** Starting Iteration {iteration}/{num_iterations} ***\n")
print("Sending PUT request for config.")
deploy_and_check_config(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["ADD", 1]).json()
== 2,
timeout=15,
)
wait_for_condition(
lambda: requests.post("http://localhost:8000/", json=["SUB", 1]).json()
== -1,
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")
print("Sending DELETE request for config.")
delete_response = requests.delete(GET_OR_PUT_URL, timeout=15)
assert delete_response.status_code == 200
print("DELETE request sent successfully.")
# Make sure all deployments are deleted
wait_for_condition(
lambda: len(requests.get(GET_OR_PUT_URL, timeout=15).json()["deployments"])
== 0,
timeout=15,
)
print("GET request returned empty config successfully.")
with pytest.raises(requests.exceptions.HTTPError):
requests.post("http://localhost:8000/", json=["ADD", 1]).raise_for_status()
with pytest.raises(requests.exceptions.HTTPError):
requests.post("http://localhost:8000/", json=["SUB", 1]).raise_for_status()
print("Deployments have been deleted and are not reachable.\n")
def test_get_status(ray_start_stop):
print("Checking status info before any deployments.")
status_response = requests.get(STATUS_URL, timeout=30)
assert status_response.status_code == 200
print("Retrieved status info successfully.")
serve_status = status_response.json()
assert len(serve_status["deployment_statuses"]) == 0
assert serve_status["app_status"]["status"] == "RUNNING"
assert serve_status["app_status"]["deployment_timestamp"] == 0
assert serve_status["app_status"]["message"] == ""
print("Status info on fresh Serve application is correct.\n")
config = {
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
}
print("Deploying config.")
deploy_and_check_config(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/").text == "wonderful world",
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")
status_response = requests.get(STATUS_URL, timeout=30)
assert status_response.status_code == 200
print("Retrieved status info successfully.")
serve_status = status_response.json()
deployment_statuses = serve_status["deployment_statuses"]
assert len(deployment_statuses) == 2
expected_deployment_names = {"f", "BasicDriver"}
for deployment_status in deployment_statuses:
assert deployment_status["name"] in expected_deployment_names
expected_deployment_names.remove(deployment_status["name"])
assert deployment_status["status"] in {"UPDATING", "HEALTHY"}
assert deployment_status["message"] == ""
assert len(expected_deployment_names) == 0
print("Deployments' statuses are correct.")
assert serve_status["app_status"]["status"] in {"DEPLOYING", "RUNNING"}
assert serve_status["app_status"]["deployment_timestamp"] > 0
assert serve_status["app_status"]["message"] == ""
print("Serve app status is correct.")
def test_serve_namespace(ray_start_stop):
"""
Check that the Dashboard's Serve can interact with the Python API
when they both start in the "serve" namespace.
"""
config = {
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
}
print("Deploying config.")
deploy_and_check_config(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/").text == "wonderful world",
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")
ray.init(address="auto", namespace="serve")
client = serve.start()
print("Connected to Serve with Python API.")
serve_status = client.get_serve_status()
assert (
len(serve_status.deployment_statuses) == 2
and serve_status.get_deployment_status("f") is not None
)
print("Successfully retrieved deployment statuses with Python API.")
print("Shutting down Python API.")
serve.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))