[Dashboard] Dashboard pubsub hotfix. (#8944)

This commit is contained in:
SangBin Cho 2020-06-15 18:38:56 -07:00 committed by GitHub
parent 4afa2b304a
commit 3d1b8c24fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 8 deletions

View file

@ -209,7 +209,7 @@ class NodeStats(threading.Thread):
try:
with self._node_stats_lock:
channel = ray.utils.decode(x["channel"])\
if "pattern" not in x\
if "pattern" not in x or x["pattern"] is None\
else x["pattern"]
data = x["data"]
if channel == log_channel:
@ -250,9 +250,14 @@ class NodeStats(threading.Thread):
"state": actor_data.state,
"timestamp": actor_data.timestamp
}
else:
elif channel == ray.gcs_utils.RAY_REPORTER_PUBSUB_PATTERN:
data = json.loads(ray.utils.decode(data))
self._node_stats[data["hostname"]] = data
else:
logger.warning("Unexpected channel data received, "
"channel: {}, data: {}".format(
channel,
json.loads(ray.utils.decode(data))))
except Exception:
logger.exception(traceback.format_exc())

View file

@ -1,14 +1,17 @@
from ray.dashboard.node_stats import NodeStats
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
import os
from datetime import datetime
from time import sleep
import ray
import pytest
from ray.dashboard.node_stats import NodeStats
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
from ray.test_utils import wait_for_condition
def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
redis_address = ray_start_with_dashboard["redis_address"]
def start_node_stats(redis_address):
redis_password = REDIS_DEFAULT_PASSWORD
node_stats = NodeStats(redis_address, redis_password)
node_stats.start()
@ -27,6 +30,52 @@ def test_basic(ray_start_with_dashboard):
break
except Exception:
continue
return node_stats
def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
node_stats = start_node_stats(ray_start_with_dashboard["redis_address"])
stats = node_stats.get_node_stats()
client_stats = stats and stats.get("clients")
assert len(client_stats) == 1
client = client_stats[0]
assert len(client["workers"]) == 1
def test_log_and_error_messages(ray_start_with_dashboard):
node_stats = start_node_stats(ray_start_with_dashboard["redis_address"])
LOG_MESSAGE = "LOG_MESSAGE"
LOG_TIMES = 3
@ray.remote
def generate_log():
for _ in range(LOG_TIMES):
print(LOG_MESSAGE)
return os.getpid()
pid = str(ray.get(generate_log.remote()))
stats = node_stats.get_node_stats()
client_stats = stats and stats.get("clients")
assert len(client_stats) == 1, "Client stats is not available."
hostname = client_stats[0]["hostname"]
wait_for_condition(
lambda: len(node_stats.get_logs(hostname, pid)[pid]) == 3)
@ray.remote
class Actor:
def get_pid(self):
return os.getpid()
def generate_error(self):
raise Exception(LOG_MESSAGE)
actor = Actor.remote()
pid = str(ray.get(actor.get_pid.remote()))
actor.generate_error.remote()
wait_for_condition(
lambda: len(node_stats.get_errors(hostname, pid)[pid]) == 1)

View file

@ -55,6 +55,9 @@ XRAY_JOB_PATTERN = "JOB:*".encode("ascii")
# Actor pub/sub updates
RAY_ACTOR_PUBSUB_PATTERN = "ACTOR:*".encode("ascii")
# Reporter pub/sub updates
RAY_REPORTER_PUBSUB_PATTERN = "RAY_REPORTER.*".encode("ascii")
# These prefixes must be kept up-to-date with the TablePrefix enum in
# gcs.proto.
# TODO(rkn): We should use scoped enums, in which case we should be able to