diff --git a/python/ray/worker.py b/python/ray/worker.py index 3d84b03ba..223b351a5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1476,12 +1476,12 @@ def print_error_messages(worker): worker.error_message_pubsub_client = worker.redis_client.pubsub() # Exports that are published after the call to - # error_message_pubsub_client.psubscribe and before the call to + # error_message_pubsub_client.subscribe and before the call to # error_message_pubsub_client.listen will still be processed in the loop. - worker.error_message_pubsub_client.psubscribe("__keyspace@0__:ErrorKeys") + worker.error_message_pubsub_client.subscribe("__keyspace@0__:ErrorKeys") num_errors_received = 0 - # Get the exports that occurred before the call to psubscribe. + # Get the exports that occurred before the call to subscribe. with worker.lock: error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) for error_key in error_keys: @@ -1589,13 +1589,13 @@ def fetch_and_execute_function_to_run(key, worker=global_worker): def import_thread(worker, mode): worker.import_pubsub_client = worker.redis_client.pubsub() # Exports that are published after the call to - # import_pubsub_client.psubscribe and before the call to + # import_pubsub_client.subscribe and before the call to # import_pubsub_client.listen will still be processed in the loop. - worker.import_pubsub_client.psubscribe("__keyspace@0__:Exports") + worker.import_pubsub_client.subscribe("__keyspace@0__:Exports") # Keep track of the number of imports that we've imported. num_imported = 0 - # Get the exports that occurred before the call to psubscribe. + # Get the exports that occurred before the call to subscribe. with worker.lock: export_keys = worker.redis_client.lrange("Exports", 0, -1) for key in export_keys: @@ -1627,7 +1627,7 @@ def import_thread(worker, mode): try: for msg in worker.import_pubsub_client.listen(): with worker.lock: - if msg["type"] == "psubscribe": + if msg["type"] == "subscribe": continue assert msg["data"] == b"rpush" num_imports = worker.redis_client.llen("Exports")