From 3d1b8c24fdcedd774c29678c9d4040fca792e83f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 15 Jun 2020 18:38:56 -0700 Subject: [PATCH] [Dashboard] Dashboard pubsub hotfix. (#8944) --- python/ray/dashboard/node_stats.py | 9 ++- python/ray/dashboard/tests/test_node_stats.py | 61 +++++++++++++++++-- python/ray/gcs_utils.py | 3 + 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/python/ray/dashboard/node_stats.py b/python/ray/dashboard/node_stats.py index 0c037b83a..094b6a413 100644 --- a/python/ray/dashboard/node_stats.py +++ b/python/ray/dashboard/node_stats.py @@ -209,7 +209,7 @@ class NodeStats(threading.Thread): try: with self._node_stats_lock: channel = ray.utils.decode(x["channel"])\ - if "pattern" not in x\ + if "pattern" not in x or x["pattern"] is None\ else x["pattern"] data = x["data"] if channel == log_channel: @@ -250,9 +250,14 @@ class NodeStats(threading.Thread): "state": actor_data.state, "timestamp": actor_data.timestamp } - else: + elif channel == ray.gcs_utils.RAY_REPORTER_PUBSUB_PATTERN: data = json.loads(ray.utils.decode(data)) self._node_stats[data["hostname"]] = data + else: + logger.warning("Unexpected channel data received, " + "channel: {}, data: {}".format( + channel, + json.loads(ray.utils.decode(data)))) except Exception: logger.exception(traceback.format_exc()) diff --git a/python/ray/dashboard/tests/test_node_stats.py b/python/ray/dashboard/tests/test_node_stats.py index 80818299e..839d79d7c 100644 --- a/python/ray/dashboard/tests/test_node_stats.py +++ b/python/ray/dashboard/tests/test_node_stats.py @@ -1,14 +1,17 @@ -from ray.dashboard.node_stats import NodeStats -from ray.ray_constants import REDIS_DEFAULT_PASSWORD +import os + from datetime import datetime from time import sleep + +import ray import pytest +from ray.dashboard.node_stats import NodeStats +from ray.ray_constants import REDIS_DEFAULT_PASSWORD +from ray.test_utils import wait_for_condition -def test_basic(ray_start_with_dashboard): - """Dashboard test that starts a Ray cluster with a dashboard server running, - then hits the dashboard API and asserts that it receives sensible data.""" - redis_address = ray_start_with_dashboard["redis_address"] + +def start_node_stats(redis_address): redis_password = REDIS_DEFAULT_PASSWORD node_stats = NodeStats(redis_address, redis_password) node_stats.start() @@ -27,6 +30,52 @@ def test_basic(ray_start_with_dashboard): break except Exception: continue + return node_stats + + +def test_basic(ray_start_with_dashboard): + """Dashboard test that starts a Ray cluster with a dashboard server running, + then hits the dashboard API and asserts that it receives sensible data.""" + node_stats = start_node_stats(ray_start_with_dashboard["redis_address"]) + stats = node_stats.get_node_stats() + client_stats = stats and stats.get("clients") + assert len(client_stats) == 1 client = client_stats[0] assert len(client["workers"]) == 1 + + +def test_log_and_error_messages(ray_start_with_dashboard): + node_stats = start_node_stats(ray_start_with_dashboard["redis_address"]) + + LOG_MESSAGE = "LOG_MESSAGE" + LOG_TIMES = 3 + + @ray.remote + def generate_log(): + for _ in range(LOG_TIMES): + print(LOG_MESSAGE) + return os.getpid() + + pid = str(ray.get(generate_log.remote())) + stats = node_stats.get_node_stats() + client_stats = stats and stats.get("clients") + assert len(client_stats) == 1, "Client stats is not available." + hostname = client_stats[0]["hostname"] + + wait_for_condition( + lambda: len(node_stats.get_logs(hostname, pid)[pid]) == 3) + + @ray.remote + class Actor: + def get_pid(self): + return os.getpid() + + def generate_error(self): + raise Exception(LOG_MESSAGE) + + actor = Actor.remote() + pid = str(ray.get(actor.get_pid.remote())) + actor.generate_error.remote() + wait_for_condition( + lambda: len(node_stats.get_errors(hostname, pid)[pid]) == 1) diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index ab236e367..43051dfcc 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -55,6 +55,9 @@ XRAY_JOB_PATTERN = "JOB:*".encode("ascii") # Actor pub/sub updates RAY_ACTOR_PUBSUB_PATTERN = "ACTOR:*".encode("ascii") +# Reporter pub/sub updates +RAY_REPORTER_PUBSUB_PATTERN = "RAY_REPORTER.*".encode("ascii") + # These prefixes must be kept up-to-date with the TablePrefix enum in # gcs.proto. # TODO(rkn): We should use scoped enums, in which case we should be able to