Change Python Redis client psubscribe -> subscribe. (#1261)

This commit is contained in:
Robert Nishihara 2017-11-26 23:29:37 -08:00 committed by Philipp Moritz
parent 2865128df0
commit f7c4f41df8

View file

@ -1476,12 +1476,12 @@ def print_error_messages(worker):
worker.error_message_pubsub_client = worker.redis_client.pubsub()
# Exports that are published after the call to
# error_message_pubsub_client.psubscribe and before the call to
# error_message_pubsub_client.subscribe and before the call to
# error_message_pubsub_client.listen will still be processed in the loop.
worker.error_message_pubsub_client.psubscribe("__keyspace@0__:ErrorKeys")
worker.error_message_pubsub_client.subscribe("__keyspace@0__:ErrorKeys")
num_errors_received = 0
# Get the exports that occurred before the call to psubscribe.
# Get the exports that occurred before the call to subscribe.
with worker.lock:
error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1)
for error_key in error_keys:
@ -1589,13 +1589,13 @@ def fetch_and_execute_function_to_run(key, worker=global_worker):
def import_thread(worker, mode):
worker.import_pubsub_client = worker.redis_client.pubsub()
# Exports that are published after the call to
# import_pubsub_client.psubscribe and before the call to
# import_pubsub_client.subscribe and before the call to
# import_pubsub_client.listen will still be processed in the loop.
worker.import_pubsub_client.psubscribe("__keyspace@0__:Exports")
worker.import_pubsub_client.subscribe("__keyspace@0__:Exports")
# Keep track of the number of imports that we've imported.
num_imported = 0
# Get the exports that occurred before the call to psubscribe.
# Get the exports that occurred before the call to subscribe.
with worker.lock:
export_keys = worker.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
@ -1627,7 +1627,7 @@ def import_thread(worker, mode):
try:
for msg in worker.import_pubsub_client.listen():
with worker.lock:
if msg["type"] == "psubscribe":
if msg["type"] == "subscribe":
continue
assert msg["data"] == b"rpush"
num_imports = worker.redis_client.llen("Exports")