From 92010ca5b584343a5403e42b2584775814605967 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 22 Dec 2016 21:54:19 -0800 Subject: [PATCH] Check that we can connect to Redis and that there aren't existing redis clients on the same node in start_ray.py (#148) --- lib/python/ray/services.py | 48 ++++++++++++++++++++++++++------------ lib/python/ray/worker.py | 2 +- scripts/start_ray.py | 25 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 16 deletions(-) diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 3733bdb59..fc9b9c034 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -72,6 +72,38 @@ def cleanup(): def all_processes_alive(): return all([p.poll() is None for p in all_processes]) +def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): + """Wait for a Redis server to be available. + + This is accomplished by creating a Redis client and sending a random command + to the server until the command gets through. + + Args: + redis_host (str): The IP address of the redis server. + redis_port (int): The port of the redis server. + num_retries (int): The number of times to try connecting with redis. The + client will sleep for one second between attempts. + + Raises: + Exception: An exception is raised if we could not connect with Redis. + """ + redis_client = redis.StrictRedis(host=redis_host, port=redis_port) + # Wait for the Redis server to start. + counter = 0 + while counter < num_retries: + try: + # Run some random command and see if it worked. + redis_client.client_list() + except redis.ConnectionError as e: + # Wait a little bit. + time.sleep(1) + print("Failed to connect to the redis server, retrying.") + counter += 1 + else: + break + if counter == num_retries: + raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.") + def start_redis(num_retries=20, cleanup=True, redirect_output=False): """Start a Redis server. @@ -115,22 +147,8 @@ def start_redis(num_retries=20, cleanup=True, redirect_output=False): # Create a Redis client just for configuring Redis. redis_client = redis.StrictRedis(host="127.0.0.1", port=port) - # Wait for the Redis server to start. - counter = 0 - while counter < num_retries: - try: - # Run some random command and see if it worked. - redis_client.client_list() - except redis.ConnectionError as e: - # Wait a little bit. - time.sleep(1) - counter += 1 - else: - break - if counter == num_retries: - raise Exception("The Redis server did not start properly.") - + wait_for_redis_to_start("127.0.0.1", port) # Configure Redis to generate keyspace notifications. TODO(rkn): Change this # to only generate notifications for the export keys. redis_client.config_set("notify-keyspace-events", "Kl") diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index df6fbfdde..bd880ecd1 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -658,7 +658,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address): "local_scheduler_socket_name": local_schedulers[0][b"local_scheduler_socket_name"].decode("ascii")} return client_info -def get_address_info_from_redis(redis_address, node_ip_address, num_retries=10): +def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): counter = 0 while True: try: diff --git a/scripts/start_ray.py b/scripts/start_ray.py index 667fcd531..b2f119b19 100644 --- a/scripts/start_ray.py +++ b/scripts/start_ray.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import argparse +import redis import ray.services as services @@ -12,6 +13,22 @@ parser.add_argument("--redis-address", required=False, type=str, help="the addre parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") parser.add_argument("--head", action="store_true", help="provide this argument for the head node") +def check_no_existing_redis_clients(node_ip_address, redis_address): + redis_host, redis_port = redis_address.split(":") + redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + # The client table prefix must be kept in sync with the file + # "src/common/redis_module/ray_redis_module.c" where it is defined. + REDIS_CLIENT_TABLE_PREFIX = "CL:" + client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX)) + # Filter to clients on the same node and do some basic checking. + for key in client_keys: + info = redis_client.hgetall(key) + assert b"ray_client_id" in info + assert b"node_ip_address" in info + assert b"client_type" in info + if info[b"node_ip_address"].decode("ascii") == node_ip_address: + raise Exception("This Redis instance is already connected to clients with this IP address.") + if __name__ == "__main__": args = parser.parse_args() @@ -32,6 +49,14 @@ if __name__ == "__main__": # Start Ray on a non-head node. if args.redis_address is None: raise Exception("If --head is not passed in, --redis-address must be provided.") + redis_host, redis_port = args.redis_address.split(":") + # Wait for the Redis server to be started. And throw an exception if we + # can't connect to it. + services.wait_for_redis_to_start(redis_host, int(redis_port)) + # Check that there aren't already Redis clients with the same IP address + # connected with this Redis instance. This raises an exception if the Redis + # server already has clients on this node. + check_no_existing_redis_clients(args.node_ip_address, args.redis_address) address_info = services.start_ray_node(node_ip_address=args.node_ip_address, redis_address=args.redis_address, num_workers=args.num_workers,