ray/dashboard/modules/snapshot/tests/test_snapshot.py
Nikita Vemuri 8fc3409676
[dashboard] Add component_activities API (#25996)
Add /api/component_activities to the dashboard snapshot router which returns whether various Ray components are considered active
This currently only contains a response entry for drivers, but will add entries for other components on request as followups
2022-06-30 13:39:01 -07:00

257 lines
8.7 KiB
Python

import os
import sys
import json
import jsonschema
import hashlib
import time
import pprint
import pytest
import requests
import ray
from ray import serve
from ray.serve.constants import SERVE_NAMESPACE
from ray._private.test_utils import (
format_web_url,
run_string_as_driver,
run_string_as_driver_nonblocking,
)
from ray.dashboard import dashboard
from ray.dashboard.modules.snapshot.snapshot_head import RayActivityResponse
from ray.dashboard.tests.conftest import * # noqa
def test_inactive_component_activities(call_ray_start):
# Verify no activity in response if no active drivers
response = requests.get("http://127.0.0.1:8265/api/component_activities")
response.raise_for_status()
# Validate schema of response
data = response.json()
schema_path = os.path.join(
os.path.dirname(dashboard.__file__),
"modules/snapshot/component_activities_schema.json",
)
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))
# Validate ray_activity_response field can be cast to RayActivityResponse object
driver_ray_activity_response = RayActivityResponse(**data["driver"])
assert not driver_ray_activity_response.is_active
assert driver_ray_activity_response.reason is None
def test_active_component_activities(ray_start_with_dashboard):
# Verify drivers which don't have namespace starting with _ray_internal_job_info_
# are considered active.
driver_template = """
import ray
ray.init(address="auto", namespace="{namespace}")
"""
run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
run_string_as_driver_nonblocking(
driver_template.format(namespace="_ray_internal_job_info_id1")
)
# Wait 1 sec for drivers to start
time.sleep(1)
# Verify drivers are considered active after running script
webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)
response = requests.get(f"{webui_url}/api/component_activities")
response.raise_for_status()
# Validate schema of response
data = response.json()
schema_path = os.path.join(
os.path.dirname(dashboard.__file__),
"modules/snapshot/component_activities_schema.json",
)
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))
# Validate ray_activity_response field can be cast to RayActivityResponse object
driver_ray_activity_response = RayActivityResponse(**data["driver"])
assert driver_ray_activity_response.is_active
# Drivers with namespace starting with "_ray_internal_job_info_" are not
# considered active drivers. Three active drivers are the two
# run with namespace "my_namespace" and the one started
# from ray_start_with_dashboard
assert driver_ray_activity_response.reason == "Number of active drivers: 3"
def test_snapshot(ray_start_with_dashboard):
driver_template = """
import ray
ray.init(address="{address}", namespace="my_namespace")
@ray.remote
class Pinger:
def ping(self):
return "pong"
a = Pinger.options(lifetime={lifetime}, name={name}).remote()
ray.get(a.ping.remote())
"""
address = ray_start_with_dashboard["address"]
detached_driver = driver_template.format(
address=address, lifetime="'detached'", name="'abc'"
)
named_driver = driver_template.format(
address=address, lifetime="None", name="'xyz'"
)
unnamed_driver = driver_template.format(
address=address, lifetime="None", name="None"
)
run_string_as_driver(detached_driver)
run_string_as_driver(named_driver)
run_string_as_driver(unnamed_driver)
webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)
response = requests.get(f"{webui_url}/api/snapshot")
response.raise_for_status()
data = response.json()
schema_path = os.path.join(
os.path.dirname(dashboard.__file__), "modules/snapshot/snapshot_schema.json"
)
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))
assert len(data["data"]["snapshot"]["actors"]) == 3
assert len(data["data"]["snapshot"]["jobs"]) == 4
assert len(data["data"]["snapshot"]["deployments"]) == 0
for actor_id, entry in data["data"]["snapshot"]["actors"].items():
assert entry["jobId"] in data["data"]["snapshot"]["jobs"]
assert entry["actorClass"] == "Pinger"
assert entry["startTime"] >= 0
if entry["isDetached"]:
assert entry["endTime"] == 0, entry
else:
assert entry["endTime"] > 0, entry
assert "runtimeEnv" in entry
assert data["data"]["snapshot"]["rayCommit"] == ray.__commit__
assert data["data"]["snapshot"]["rayVersion"] == ray.__version__
@pytest.mark.parametrize("ray_start_with_dashboard", [{"num_cpus": 4}], indirect=True)
def test_serve_snapshot(ray_start_with_dashboard):
"""Test reconnecting to detached Serve application."""
detached_serve_driver_script = f"""
import ray
from ray import serve
ray.init(
address="{ray_start_with_dashboard['address']}",
namespace="serve")
serve.start(detached=True)
@serve.deployment
def my_func(request):
return "hello"
my_func.deploy()
@serve.deployment(version="v1")
def my_func_deleted(request):
return "hello"
my_func_deleted.deploy()
my_func_deleted.delete()
"""
run_string_as_driver(detached_serve_driver_script)
assert requests.get("http://127.0.0.1:8000/my_func").text == "hello"
# Connect to the running Serve application with detached=False.
serve.start(detached=False)
@serve.deployment(version="v1")
def my_func_nondetached(request):
return "hello"
my_func_nondetached.deploy()
assert requests.get("http://127.0.0.1:8000/my_func_nondetached").text == "hello"
webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)
response = requests.get(f"{webui_url}/api/snapshot")
response.raise_for_status()
data = response.json()
schema_path = os.path.join(
os.path.dirname(dashboard.__file__), "modules/snapshot/snapshot_schema.json"
)
pprint.pprint(data)
jsonschema.validate(instance=data, schema=json.load(open(schema_path)))
assert len(data["data"]["snapshot"]["deployments"]) == 3
entry = data["data"]["snapshot"]["deployments"][
hashlib.sha1("my_func".encode()).hexdigest()
]
assert entry["name"] == "my_func"
assert entry["version"] is None
assert entry["namespace"] == SERVE_NAMESPACE
assert entry["httpRoute"] == "/my_func"
assert entry["className"] == "my_func"
assert entry["status"] == "RUNNING"
assert entry["rayJobId"] is not None
assert entry["startTime"] > 0
assert entry["endTime"] == 0
assert len(entry["actors"]) == 1
actor_id = next(iter(entry["actors"]))
metadata = data["data"]["snapshot"]["actors"][actor_id]["metadata"]["serve"]
assert metadata["deploymentName"] == "my_func"
assert metadata["version"] is None
assert len(metadata["replicaTag"]) > 0
entry_deleted = data["data"]["snapshot"]["deployments"][
hashlib.sha1("my_func_deleted".encode()).hexdigest()
]
assert entry_deleted["name"] == "my_func_deleted"
assert entry_deleted["version"] == "v1"
assert entry_deleted["namespace"] == SERVE_NAMESPACE
assert entry_deleted["httpRoute"] is None
assert entry_deleted["className"] == "my_func_deleted"
assert entry_deleted["status"] == "DELETED"
assert entry["rayJobId"] is not None
assert entry_deleted["startTime"] > 0
assert entry_deleted["endTime"] > entry_deleted["startTime"]
entry_nondetached = data["data"]["snapshot"]["deployments"][
hashlib.sha1("my_func_nondetached".encode()).hexdigest()
]
assert entry_nondetached["name"] == "my_func_nondetached"
assert entry_nondetached["version"] == "v1"
assert entry_nondetached["namespace"] == SERVE_NAMESPACE
assert entry_nondetached["httpRoute"] == "/my_func_nondetached"
assert entry_nondetached["className"] == "my_func_nondetached"
assert entry_nondetached["status"] == "RUNNING"
assert entry_nondetached["rayJobId"] is not None
assert entry_nondetached["startTime"] > 0
assert entry_nondetached["endTime"] == 0
assert len(entry_nondetached["actors"]) == 1
actor_id = next(iter(entry_nondetached["actors"]))
metadata = data["data"]["snapshot"]["actors"][actor_id]["metadata"]["serve"]
assert metadata["deploymentName"] == "my_func_nondetached"
assert metadata["version"] == "v1"
assert len(metadata["replicaTag"]) > 0
my_func_nondetached.delete()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))