diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index 77b764743..3edb9dd85 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -97,41 +97,31 @@ block until the installation has completed. The standard output from the nodes will be redirected to your terminal. 5. To check that the installation succeeded, you can ssh to each node, cd into the directory `ray/test/`, and run the tests (e.g., `python runtest.py`). -6. Create a directory (for example, `mkdir ~/example_ray_code`) containing the -worker `worker.py` code along with the code for any modules imported by -`worker.py`. For example, - - ``` - cp ray/scripts/default_worker.py ~/example_ray_code/worker.py - cp ray/scripts/example_functions.py ~/example_ray_code/ - ``` - -7. Start the cluster (the scheduler, object stores, and workers) with the -command `cluster.start_ray("~/example_ray_code")`, where the second argument is -the local path to the worker code that you would like to use. This command will -copy the worker code to each node and will start the cluster. After completing -successfully, this command will print out a command that can be run on the head -node to attach a shell (the driver) to the cluster. For example, +6. Start the cluster (the scheduler, object stores, and workers) with the +command `cluster.start_ray("~/example_ray_code")`, where the argument is +the local path to the directory that contains your Python code. This command will +copy this source code to each node and will start the cluster. Each worker that +is started will have a local copy of the ~/example_ray_code directory in their +PYTHONPATH. After completing successfully, this command will print out a command +that can be run on the head node to attach a shell (the driver) to the cluster. +For example, ``` + cd "$RAY_HOME/../user_source_files/example_ray_code"; source "$RAY_HOME/setup-env.sh"; python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach ``` -8. Note that there are several more commands that can be run from within +7. Note that there are several more commands that can be run from within `cluster.py`. - `cluster.install_ray()` - This pulls the Ray source code on each node, builds all of the third party libraries, and builds the project itself. - - `cluster.start_ray(worker_directory, num_workers_per_node=10)` - This + - `cluster.start_ray(user_source_directory, num_workers_per_node=10)` - This starts a scheduler process on the head node, and it starts an object store and some workers on each node. - `cluster.stop_ray()` - This shuts down the cluster (killing all of the processes). - - `cluster.restart_workers(worker_directory, num_workers_per_node=10)` - - This kills the current workers and starts new workers using the worker - code from the given file. Currently, this can only run when there are no - tasks currently executing on any of the workers. - `cluster.update_ray()` - This pulls the latest Ray source code and builds it. diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 68708e87f..0c9c772bc 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -144,7 +144,7 @@ def start_worker(worker_path, scheduler_address, objstore_address, worker_addres if local: all_processes.append((p, worker_address)) -def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None): +def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None): """Start an object store and associated workers in the cluster setting. This starts an object store and the associated workers when Ray is being used @@ -156,7 +156,9 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None node_ip_address (str): ip address (without port) of the node this function is run on num_workers (int): the number of workers to be started on this node - worker_path (str): path of the source code that will be run on the worker + worker_path (str): path of the Python worker script that will be run on the worker + user_source_directory (str): path to the user's code the workers will import + modules from """ objstore_address = address(node_ip_address, new_objstore_port()) start_objstore(scheduler_address, objstore_address, local=False) @@ -164,7 +166,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") for _ in range(num_workers): - start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) + start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), user_source_directory=user_source_directory, local=False) time.sleep(0.5) def start_workers(scheduler_address, objstore_address, num_workers, worker_path): diff --git a/scripts/cluster.py b/scripts/cluster.py index 02d24310a..1e160da7a 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -130,7 +130,7 @@ class RayCluster(object): """.format(self.installation_directory, self.installation_directory) self._run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command) - def start_ray(self, worker_directory, num_workers_per_node=10): + def start_ray(self, user_source_directory, num_workers_per_node=10): """Start Ray on a cluster. This method is used to start Ray on a cluster. It will ssh to the head node, @@ -139,13 +139,13 @@ class RayCluster(object): workers. Args: - worker_directory (str): The path to the local directory containing the - worker source code. This directory must contain a file worker.py which - is the code run by the worker processes. + user_source_directory (str): The path to the local directory containing the + user's source code. Files and directories in this directory can be used + as modules in remote functions. num_workers_per_node (int): The number workers to start on each node. """ # First update the worker code on the nodes. - remote_worker_path = self._update_worker_code(worker_directory) + remote_user_source_directory = self._update_user_code(user_source_directory) scripts_directory = os.path.join(self.installation_directory, "ray/scripts") # Start the scheduler @@ -164,8 +164,8 @@ class RayCluster(object): start_workers_command = """ cd "{}"; source ../setup-env.sh; - python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path) + python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, user_source_directory=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & + """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory) start_workers_commands.append(start_workers_command) self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) @@ -173,54 +173,10 @@ class RayCluster(object): setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh") shell_script_path = os.path.join(self.installation_directory, "ray/scripts/shell.py") print """ + cd "{}"; source "{}"; python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach - """.format(setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) - - def restart_workers(self, worker_directory, num_workers_per_node=10): - """Restart the workers on the cluster. - - This method is used for restarting the workers in the cluster, for example, - to use new application code. This is done without shutting down the - scheduler or the object stores so that work is not thrown away. It also does - not shut down any drivers. - - Args: - worker_directory (str): The path to the local directory containing the - worker source code. This directory must contain a file worker.py which - is the code run by the worker processes. - num_workers_per_node (int): The number workers to start on each node. - """ - # First update the worker code on the nodes. - remote_worker_path = self._update_worker_code(worker_directory) - - scripts_directory = os.path.join(self.installation_directory, "ray/scripts") - head_node_ip_address = self.node_ip_addresses[0] - head_node_private_ip_address = self.node_private_ip_addresses[0] - scheduler_address = "{}:10001".format(head_node_private_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray. - objstore_address = "{}:20001".format(head_node_private_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray. - shell_address = "{}:{}".format(head_node_private_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell. - - # Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers() - # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " - kill_workers_command = """ - cd "{}"; - source ../setup-env.sh; - python -c "import ray; ray.connect(\\\"{}\\\", \\\"{}\\\", \\\"{}\\\", is_driver=True); ray.kill_workers()" - """.format(scripts_directory, scheduler_address, objstore_address, shell_address) - self._run_command_over_ssh(head_node_ip_address, kill_workers_command) - - # Start new workers on each node - # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " - start_workers_commands = [] - for i, node_ip_address in enumerate(self.node_ip_addresses): - start_workers_command = """ - cd "{}"; - source ../setup-env.sh; - python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path) - start_workers_commands.append(start_workers_command) - self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) + """.format(remote_user_source_directory, setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) def stop_ray(self): """Kill all of the processes in the Ray cluster. @@ -249,33 +205,30 @@ class RayCluster(object): """.format(ray_directory) self._run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command) - def _update_worker_code(self, worker_directory): - """Update the worker code on each node in the cluster. + def _update_user_code(self, user_source_directory): + """Update the user's source code on each node in the cluster. - This method is used to update the worker source code on each node in the - cluster. The local worker_directory will be copied under ray_worker_files in - the installation_directory. For example, if installation_directory is - "/d/e/f" and we call _update_worker_code("~/a/b/c"), then the contents of - "~/a/b/c" on the local machine will be copied to "/d/e/f/ray_worker_files/c" - on each node in the cluster. + This method is used to update the user's source code on each node in the + cluster. The local user_source_directory will be copied under ray_source_files in + the ray installation directory on the worker node. For example, if the ray + installation directory is "/d/e/f" and we call _update_source_code("~/a/b/c"), + then the contents of "~/a/b/c" on the local machine will be copied to + "/d/e/f/user_source_files/c" on each node in the cluster. Args: - worker_directory (str): The path on the local machine to the directory - that contains the worker code. This directory must contain a file - worker.py. + user_source_directory (str): The path on the local machine to the directory + that contains the worker code. Returns: A string with the path to the source code of the worker on the remote nodes. """ - worker_directory = os.path.expanduser(worker_directory) - if not os.path.isdir(worker_directory): - raise Exception("Directory {} does not exist.".format(worker_directory)) - if not os.path.exists(os.path.join(worker_directory, "worker.py")): - raise Exception("Directory {} does not contain a file named worker.py.".format(worker_directory)) - # If worker_directory is "/a/b/c", then local_directory_name is "c". - local_directory_name = os.path.split(os.path.realpath(worker_directory))[1] - remote_directory = os.path.join(self.installation_directory, "ray_worker_files", local_directory_name) + user_source_directory = os.path.expanduser(user_source_directory) + if not os.path.isdir(user_source_directory): + raise Exception("Directory {} does not exist.".format(user_source_directory)) + # If user_source_directory is "/a/b/c", then local_directory_name is "c". + local_directory_name = os.path.split(os.path.realpath(user_source_directory))[1] + remote_directory = os.path.join(self.installation_directory, "user_source_files", local_directory_name) # Remove and recreate the directory on the node. recreate_directory_command = """ rm -r "{}"; @@ -286,13 +239,12 @@ class RayCluster(object): def copy_function(node_ip_address): copy_command = """ scp -r -i {} {}/* {}@{}:{}/ - """.format(self.key_file, worker_directory, self.username, node_ip_address, remote_directory) + """.format(self.key_file, user_source_directory, self.username, node_ip_address, remote_directory) subprocess.call([copy_command], shell=True) inputs = [(node_ip_address,) for node_ip_address in node_ip_addresses] self._run_parallel_functions(len(self.node_ip_addresses) * [copy_function], inputs) - # Return the path to worker.py on the remote nodes. - remote_worker_path = os.path.join(remote_directory, "worker.py") - return remote_worker_path + # Return the source directory path on the remote nodes + return remote_directory def _is_valid_ip(ip_address): """Check if ip_addess is a valid IPv4 address.