Determine node IP address programatically. (#151)

* Determine node ip address programatically.

* Factor out methods for getting node IP addresses.

* Address comments.
This commit is contained in:
Robert Nishihara 2016-12-23 15:31:40 -08:00 committed by Alexey Tumanov
parent 8d90c9f432
commit 241c955707
3 changed files with 36 additions and 7 deletions

View file

@ -7,6 +7,7 @@ import os
import random import random
import redis import redis
import signal import signal
import socket
import string import string
import subprocess import subprocess
import sys import sys
@ -72,6 +73,21 @@ def cleanup():
def all_processes_alive(): def all_processes_alive():
return all([p.poll() is None for p in all_processes]) return all([p.poll() is None for p in all_processes])
def get_node_ip_address(address="8.8.8.8:53"):
"""Determine the IP address of the local node.
Args:
address (str): The IP address and port of any known live service on the
network you care about.
Returns:
The IP address of the current node.
"""
host, port = address.split(":")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((host, int(port)))
return s.getsockname()[0]
def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): def wait_for_redis_to_start(redis_host, redis_port, num_retries=5):
"""Wait for a Redis server to be available. """Wait for a Redis server to be available.

View file

@ -674,7 +674,7 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5):
time.sleep(1) time.sleep(1)
counter += 1 counter += 1
def init(node_ip_address="127.0.0.1", redis_address=None, start_ray_local=False, object_id_seed=None, num_workers=None, num_local_schedulers=None, driver_mode=SCRIPT_MODE): def init(node_ip_address=None, redis_address=None, start_ray_local=False, object_id_seed=None, num_workers=None, num_local_schedulers=None, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it. """Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we This method handles two cases. Either a Ray cluster already exists and we
@ -735,12 +735,13 @@ def init(node_ip_address="127.0.0.1", redis_address=None, start_ray_local=False,
else: else:
if redis_address is None: if redis_address is None:
raise Exception("If start_ray_local=False, then redis_address must be provided.") raise Exception("If start_ray_local=False, then redis_address must be provided.")
if node_ip_address is None:
raise Exception("If start_ray_local=False, then node_ip_address must be provided.")
if num_workers is not None: if num_workers is not None:
raise Exception("If start_ray_local=False, then num_workers must not be provided.") raise Exception("If start_ray_local=False, then num_workers must not be provided.")
if num_local_schedulers is not None: if num_local_schedulers is not None:
raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.") raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
# Get the address info of the processes to connect to from Redis. # Get the address info of the processes to connect to from Redis.
info = get_address_info_from_redis(redis_address, node_ip_address) info = get_address_info_from_redis(redis_address, node_ip_address)
# Connect this driver to Redis, the object store, and the local scheduler. The # Connect this driver to Redis, the object store, and the local scheduler. The

View file

@ -8,7 +8,7 @@ import redis
import ray.services as services import ray.services as services
parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") parser.add_argument("--node-ip-address", required=False, type=str, help="the IP address of the worker's node")
parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis") parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis")
parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node")
parser.add_argument("--head", action="store_true", help="provide this argument for the head node") parser.add_argument("--head", action="store_true", help="provide this argument for the head node")
@ -41,7 +41,13 @@ if __name__ == "__main__":
# Start Ray on the head node. # Start Ray on the head node.
if args.redis_address is not None: if args.redis_address is not None:
raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.") raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.")
address_info = services.start_ray_local(node_ip_address=args.node_ip_address, # Get the node IP address if one is not provided.
if args.node_ip_address is None:
node_ip_address = services.get_node_ip_address()
else:
node_ip_address = args.node_ip_address
print("Using IP address {} for this node.".format(node_ip_address))
address_info = services.start_ray_local(node_ip_address=node_ip_address,
num_workers=args.num_workers, num_workers=args.num_workers,
cleanup=False, cleanup=False,
redirect_output=True) redirect_output=True)
@ -53,11 +59,17 @@ if __name__ == "__main__":
# Wait for the Redis server to be started. And throw an exception if we # Wait for the Redis server to be started. And throw an exception if we
# can't connect to it. # can't connect to it.
services.wait_for_redis_to_start(redis_host, int(redis_port)) services.wait_for_redis_to_start(redis_host, int(redis_port))
# Get the node IP address if one is not provided.
if args.node_ip_address is None:
node_ip_address = services.get_node_ip_address(args.redis_address)
else:
node_ip_addess = args.node_ip_address
print("Using IP address {} for this node.".format(node_ip_address))
# Check that there aren't already Redis clients with the same IP address # Check that there aren't already Redis clients with the same IP address
# connected with this Redis instance. This raises an exception if the Redis # connected with this Redis instance. This raises an exception if the Redis
# server already has clients on this node. # server already has clients on this node.
check_no_existing_redis_clients(args.node_ip_address, args.redis_address) check_no_existing_redis_clients(node_ip_address, args.redis_address)
address_info = services.start_ray_node(node_ip_address=args.node_ip_address, address_info = services.start_ray_node(node_ip_address=node_ip_address,
redis_address=args.redis_address, redis_address=args.redis_address,
num_workers=args.num_workers, num_workers=args.num_workers,
cleanup=False, cleanup=False,