make it possible to use directory as user source directory that doesn't contain worker.py (#297)

This commit is contained in:
Philipp Moritz 2016-07-26 18:39:06 -07:00 committed by Robert Nishihara
parent 2981fae26d
commit b5215f1e6a
3 changed files with 44 additions and 100 deletions

View file

@ -97,41 +97,31 @@ block until the installation has completed. The standard output from the nodes
will be redirected to your terminal.
5. To check that the installation succeeded, you can ssh to each node, cd into
the directory `ray/test/`, and run the tests (e.g., `python runtest.py`).
6. Create a directory (for example, `mkdir ~/example_ray_code`) containing the
worker `worker.py` code along with the code for any modules imported by
`worker.py`. For example,
```
cp ray/scripts/default_worker.py ~/example_ray_code/worker.py
cp ray/scripts/example_functions.py ~/example_ray_code/
```
7. Start the cluster (the scheduler, object stores, and workers) with the
command `cluster.start_ray("~/example_ray_code")`, where the second argument is
the local path to the worker code that you would like to use. This command will
copy the worker code to each node and will start the cluster. After completing
successfully, this command will print out a command that can be run on the head
node to attach a shell (the driver) to the cluster. For example,
6. Start the cluster (the scheduler, object stores, and workers) with the
command `cluster.start_ray("~/example_ray_code")`, where the argument is
the local path to the directory that contains your Python code. This command will
copy this source code to each node and will start the cluster. Each worker that
is started will have a local copy of the ~/example_ray_code directory in their
PYTHONPATH. After completing successfully, this command will print out a command
that can be run on the head node to attach a shell (the driver) to the cluster.
For example,
```
cd "$RAY_HOME/../user_source_files/example_ray_code";
source "$RAY_HOME/setup-env.sh";
python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach
```
8. Note that there are several more commands that can be run from within
7. Note that there are several more commands that can be run from within
`cluster.py`.
- `cluster.install_ray()` - This pulls the Ray source code on each node,
builds all of the third party libraries, and builds the project itself.
- `cluster.start_ray(worker_directory, num_workers_per_node=10)` - This
- `cluster.start_ray(user_source_directory, num_workers_per_node=10)` - This
starts a scheduler process on the head node, and it starts an object store
and some workers on each node.
- `cluster.stop_ray()` - This shuts down the cluster (killing all of the
processes).
- `cluster.restart_workers(worker_directory, num_workers_per_node=10)` -
This kills the current workers and starts new workers using the worker
code from the given file. Currently, this can only run when there are no
tasks currently executing on any of the workers.
- `cluster.update_ray()` - This pulls the latest Ray source code and builds
it.

View file

@ -144,7 +144,7 @@ def start_worker(worker_path, scheduler_address, objstore_address, worker_addres
if local:
all_processes.append((p, worker_address))
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None):
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None):
"""Start an object store and associated workers in the cluster setting.
This starts an object store and the associated workers when Ray is being used
@ -156,7 +156,9 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
node_ip_address (str): ip address (without port) of the node this function
is run on
num_workers (int): the number of workers to be started on this node
worker_path (str): path of the source code that will be run on the worker
worker_path (str): path of the Python worker script that will be run on the worker
user_source_directory (str): path to the user's code the workers will import
modules from
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address, local=False)
@ -164,7 +166,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), user_source_directory=user_source_directory, local=False)
time.sleep(0.5)
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):

View file

@ -130,7 +130,7 @@ class RayCluster(object):
""".format(self.installation_directory, self.installation_directory)
self._run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command)
def start_ray(self, worker_directory, num_workers_per_node=10):
def start_ray(self, user_source_directory, num_workers_per_node=10):
"""Start Ray on a cluster.
This method is used to start Ray on a cluster. It will ssh to the head node,
@ -139,13 +139,13 @@ class RayCluster(object):
workers.
Args:
worker_directory (str): The path to the local directory containing the
worker source code. This directory must contain a file worker.py which
is the code run by the worker processes.
user_source_directory (str): The path to the local directory containing the
user's source code. Files and directories in this directory can be used
as modules in remote functions.
num_workers_per_node (int): The number workers to start on each node.
"""
# First update the worker code on the nodes.
remote_worker_path = self._update_worker_code(worker_directory)
remote_user_source_directory = self._update_user_code(user_source_directory)
scripts_directory = os.path.join(self.installation_directory, "ray/scripts")
# Start the scheduler
@ -164,8 +164,8 @@ class RayCluster(object):
start_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path)
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, user_source_directory=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory)
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)
@ -173,54 +173,10 @@ class RayCluster(object):
setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh")
shell_script_path = os.path.join(self.installation_directory, "ray/scripts/shell.py")
print """
cd "{}";
source "{}";
python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach
""".format(setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
def restart_workers(self, worker_directory, num_workers_per_node=10):
"""Restart the workers on the cluster.
This method is used for restarting the workers in the cluster, for example,
to use new application code. This is done without shutting down the
scheduler or the object stores so that work is not thrown away. It also does
not shut down any drivers.
Args:
worker_directory (str): The path to the local directory containing the
worker source code. This directory must contain a file worker.py which
is the code run by the worker processes.
num_workers_per_node (int): The number workers to start on each node.
"""
# First update the worker code on the nodes.
remote_worker_path = self._update_worker_code(worker_directory)
scripts_directory = os.path.join(self.installation_directory, "ray/scripts")
head_node_ip_address = self.node_ip_addresses[0]
head_node_private_ip_address = self.node_private_ip_addresses[0]
scheduler_address = "{}:10001".format(head_node_private_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray.
objstore_address = "{}:20001".format(head_node_private_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray.
shell_address = "{}:{}".format(head_node_private_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell.
# Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers()
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
kill_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.connect(\\\"{}\\\", \\\"{}\\\", \\\"{}\\\", is_driver=True); ray.kill_workers()"
""".format(scripts_directory, scheduler_address, objstore_address, shell_address)
self._run_command_over_ssh(head_node_ip_address, kill_workers_command)
# Start new workers on each node
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
start_workers_commands = []
for i, node_ip_address in enumerate(self.node_ip_addresses):
start_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path)
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)
""".format(remote_user_source_directory, setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
def stop_ray(self):
"""Kill all of the processes in the Ray cluster.
@ -249,33 +205,30 @@ class RayCluster(object):
""".format(ray_directory)
self._run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command)
def _update_worker_code(self, worker_directory):
"""Update the worker code on each node in the cluster.
def _update_user_code(self, user_source_directory):
"""Update the user's source code on each node in the cluster.
This method is used to update the worker source code on each node in the
cluster. The local worker_directory will be copied under ray_worker_files in
the installation_directory. For example, if installation_directory is
"/d/e/f" and we call _update_worker_code("~/a/b/c"), then the contents of
"~/a/b/c" on the local machine will be copied to "/d/e/f/ray_worker_files/c"
on each node in the cluster.
This method is used to update the user's source code on each node in the
cluster. The local user_source_directory will be copied under ray_source_files in
the ray installation directory on the worker node. For example, if the ray
installation directory is "/d/e/f" and we call _update_source_code("~/a/b/c"),
then the contents of "~/a/b/c" on the local machine will be copied to
"/d/e/f/user_source_files/c" on each node in the cluster.
Args:
worker_directory (str): The path on the local machine to the directory
that contains the worker code. This directory must contain a file
worker.py.
user_source_directory (str): The path on the local machine to the directory
that contains the worker code.
Returns:
A string with the path to the source code of the worker on the remote
nodes.
"""
worker_directory = os.path.expanduser(worker_directory)
if not os.path.isdir(worker_directory):
raise Exception("Directory {} does not exist.".format(worker_directory))
if not os.path.exists(os.path.join(worker_directory, "worker.py")):
raise Exception("Directory {} does not contain a file named worker.py.".format(worker_directory))
# If worker_directory is "/a/b/c", then local_directory_name is "c".
local_directory_name = os.path.split(os.path.realpath(worker_directory))[1]
remote_directory = os.path.join(self.installation_directory, "ray_worker_files", local_directory_name)
user_source_directory = os.path.expanduser(user_source_directory)
if not os.path.isdir(user_source_directory):
raise Exception("Directory {} does not exist.".format(user_source_directory))
# If user_source_directory is "/a/b/c", then local_directory_name is "c".
local_directory_name = os.path.split(os.path.realpath(user_source_directory))[1]
remote_directory = os.path.join(self.installation_directory, "user_source_files", local_directory_name)
# Remove and recreate the directory on the node.
recreate_directory_command = """
rm -r "{}";
@ -286,13 +239,12 @@ class RayCluster(object):
def copy_function(node_ip_address):
copy_command = """
scp -r -i {} {}/* {}@{}:{}/
""".format(self.key_file, worker_directory, self.username, node_ip_address, remote_directory)
""".format(self.key_file, user_source_directory, self.username, node_ip_address, remote_directory)
subprocess.call([copy_command], shell=True)
inputs = [(node_ip_address,) for node_ip_address in node_ip_addresses]
self._run_parallel_functions(len(self.node_ip_addresses) * [copy_function], inputs)
# Return the path to worker.py on the remote nodes.
remote_worker_path = os.path.join(remote_directory, "worker.py")
return remote_worker_path
# Return the source directory path on the remote nodes
return remote_directory
def _is_valid_ip(ip_address):
"""Check if ip_addess is a valid IPv4 address.