diff --git a/python/ray/dashboard/node_stats.py b/python/ray/dashboard/node_stats.py index 4a88f57b8..0c037b83a 100644 --- a/python/ray/dashboard/node_stats.py +++ b/python/ray/dashboard/node_stats.py @@ -186,8 +186,8 @@ class NodeStats(threading.Thread): p.subscribe(error_channel) logger.info("NodeStats: subscribed to {}".format(error_channel)) - actor_channel = ray.gcs_utils.TablePubsub.Value("ACTOR_PUBSUB") - p.subscribe(actor_channel) + actor_channel = ray.gcs_utils.RAY_ACTOR_PUBSUB_PATTERN + p.psubscribe(actor_channel) logger.info("NodeStats: subscribed to {}".format(actor_channel)) current_actor_table = ray.actors() @@ -208,7 +208,9 @@ class NodeStats(threading.Thread): for x in p.listen(): try: with self._node_stats_lock: - channel = ray.utils.decode(x["channel"]) + channel = ray.utils.decode(x["channel"])\ + if "pattern" not in x\ + else x["pattern"] data = x["data"] if channel == log_channel: data = json.loads(ray.utils.decode(data)) @@ -230,11 +232,11 @@ class NodeStats(threading.Thread): "timestamp": error_data.timestamp, "type": error_data.type }) - elif channel == str(actor_channel): - gcs_entry = ray.gcs_utils.PubSubMessage.FromString( + elif channel == actor_channel: + pubsub_msg = ray.gcs_utils.PubSubMessage.FromString( data) actor_data = ray.gcs_utils.ActorTableData.FromString( - gcs_entry.entries[0]) + pubsub_msg.data) addr = (actor_data.address.ip_address, str(actor_data.address.port)) owner_addr = (actor_data.owner_address.ip_address, diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 41068ca5e..ab236e367 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -52,6 +52,9 @@ XRAY_HEARTBEAT_BATCH_PATTERN = "HEARTBEAT_BATCH:".encode("ascii") # xray job updates XRAY_JOB_PATTERN = "JOB:*".encode("ascii") +# Actor pub/sub updates +RAY_ACTOR_PUBSUB_PATTERN = "ACTOR:*".encode("ascii") + # These prefixes must be kept up-to-date with the TablePrefix enum in # gcs.proto. # TODO(rkn): We should use scoped enums, in which case we should be able to diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 591ca2f44..b2dd41f43 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -405,11 +405,6 @@ def test_memory_dashboard(shutdown_only): """Wait until the new fresh memory table is ready.""" global prev_memory_table memory_table = get_memory_table() - from pprint import pprint - print("Current") - pprint(memory_table) - print("Prev") - pprint(prev_memory_table) is_ready = memory_table["group"] != prev_memory_table prev_memory_table = memory_table["group"] return is_ready