diff --git a/doc/source/deploy-on-yarn.rst b/doc/source/deploy-on-yarn.rst index 91255c1c6..6e009fe88 100644 --- a/doc/source/deploy-on-yarn.rst +++ b/doc/source/deploy-on-yarn.rst @@ -32,7 +32,7 @@ A Ray job is configured to run as two `Skein services`: application. 2. The ``ray-worker`` service that starts worker nodes that join the Ray cluster. You can change the number of instances in this configuration or at runtime - using ``skein scale`` to scale the cluster up/down. + using ``skein container scale`` to scale the cluster up/down. The specification for each service consists of necessary files and commands that will be run to start the service. @@ -43,7 +43,7 @@ The specification for each service consists of necessary files and commands that # There should only be one instance of the head node per cluster. instances: 1 resources: - # The resources for the head node. + # The resources for the worker node. vcores: 1 memory: 2048 files: @@ -51,10 +51,11 @@ The specification for each service consists of necessary files and commands that script: ... ray-worker: - # There should only be one instance of the head node per cluster. - instances: 1 + # Number of ray worker nodes to start initially. + # This can be scaled using 'skein container scale'. + instances: 3 resources: - # The resources for the head node. + # The resources for the worker node. vcores: 1 memory: 2048 files: @@ -82,7 +83,7 @@ Use the ``files`` option to specify files that will be copied into the YARN cont example.py: example.py # # A packaged python environment using `conda-pack`. Note that Skein # # doesn't require any specific way of distributing files, but this - # # is a good one for python projects. + # # is a good one for python projects. This is optional. # # See https://jcrist.github.io/skein/distributing-files.html # environment: environment.tar.gz @@ -98,19 +99,13 @@ Start by activating a pre-existing environment for dependency management. .. code-block:: bash - source /home/rayonyarn/miniconda3/bin/activate + source environment/bin/activate -Obtain the Skein Application ID which is used when pushing addresses to worker services. +Register the Ray head address needed by the workers in the Skein key-value store. .. code-block:: bash - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - -Register the Ray head addresses needed by the workers in the Skein key-value store using the Application ID. - -.. code-block:: bash - - skein kv put --key=RAY_HEAD_ADDRESS --value=$(hostname -i) $APP_ID + skein kv put --key=RAY_HEAD_ADDRESS --value=$(hostname -i) current Start all the processes needed on the ray head node. By default, we set object store memory and heap memory to roughly 200 MB. This is conservative and should be set according to application needs. @@ -130,51 +125,14 @@ Clean up all started processes even if the application fails or is killed. .. code-block:: bash ray stop - skein application shutdown $APP_ID + skein application shutdown current Putting things together, we have: -.. code-block:: bash - - services: - ray-head: - # There should only be one instance of the head node per cluster. - instances: 1 - resources: - # The resources for the head node. - vcores: 1 - memory: 2048 - files: - # ray/doc/yarn/example.py - example.py: example.py - # # A packaged python environment using `conda-pack`. Note that Skein - # # doesn't require any specific way of distributing files, but this - # # is a good one for python projects. - # # See https://jcrist.github.io/skein/distributing-files.html - # environment: environment.tar.gz - script: | - # Activate the packaged conda environment - # - source environment/bin/activate - # This activates a pre-existing environment for dependency management. - source /home/rayonyarn/miniconda3/bin/activate - # This obtains the Skein Application ID which is used when pushing addresses to worker services. - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - - # This register the Ray head addresses needed by the workers with the Skein key-value store. - skein kv put --key=RAY_HEAD_ADDRESS --value=$(hostname -i) $APP_ID - - # This command starts all the processes needed on the ray head node. - # By default, we set object store memory and heap memory to roughly 200 MB. This is conservative - # and should be set according to application needs. - # - ray start --head --redis-port=6379 --object-store-memory=200000000 --memory 200000000 --num-cpus=1 - - # This executes the user script. - python example.py - - # After the user script has executed, all started processes should also die. - ray stop - skein application shutdown $APP_ID +.. literalinclude:: ../yarn/ray-skein.yaml + :language: yaml + :start-after: # Head service + :end-before: # Worker service Worker node commands @@ -184,8 +142,7 @@ Fetch the address of the head node from the Skein key-value store. .. code-block:: bash - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS "$APP_ID") + RAY_HEAD_ADDRESS=$(skein kv get current --key=RAY_HEAD_ADDRESS) Start all of the processes needed on a ray worker node, blocking until killed by Skein/YARN via SIGTERM. After receiving SIGTERM, all started processes should also die (ray stop). @@ -195,50 +152,18 @@ Start all of the processes needed on a ray worker node, blocking until killed by Putting things together, we have: -.. code-block:: bash - - services: - ... - ray-worker: - # The number of instances to start initially. This can be scaled - # dynamically later. - instances: 4 - resources: - # The resources for the worker node - vcores: 1 - memory: 2048 - # files: - # environment: environment.tar.gz - depends: - # Don't start any worker nodes until the head node is started - - ray-head - script: | - # Activate the packaged conda environment - # - source environment/bin/activate - source /home/rayonyarn/miniconda3/bin/activate - - # This command gets any addresses it needs (e.g. the head node) from - # the skein key-value store. - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS "$APP_ID") - - # The below command starts all the processes needed on a ray worker node, blocking until killed with sigterm. - # After sigterm, all started processes should also die (ray stop). - ray start --object-store-memory=200000000 --memory 200000000 --num-cpus=1 --address=$RAY_HEAD_ADDRESS:6379 --block; ray stop - +.. literalinclude:: ../yarn/ray-skein.yaml + :language: yaml + :start-after: # Worker service Running a Job ------------- Within your Ray script, use the following to connect to the started Ray cluster: -.. code-block:: python - - if __name__ == "__main__": - DRIVER_MEMORY = 100 * 1024 * 1024 # 100MB here, but set this based on the application (subject to the YARN container limit). - ray.init( - address="localhost:6379", driver_object_store_memory=DRIVER_MEMORY) - main() +.. literalinclude:: ../yarn/example.py + :language: python + :start-after: if __name__ == "__main__" You can use the following command to launch the application as specified by the Skein YAML file. @@ -253,7 +178,7 @@ Once it has been submitted, you can see the job running on the YARN dashboard. Cleaning Up ----------- -To clean up a running job, use the following: +To clean up a running job, use the following (using the application ID): .. code-block:: bash diff --git a/doc/yarn/example.py b/doc/yarn/example.py index 3e23a23e3..949e055dc 100644 --- a/doc/yarn/example.py +++ b/doc/yarn/example.py @@ -47,6 +47,7 @@ def main(): if __name__ == "__main__": + DRIVER_MEMORY = 100 * 1024 * 1024 ray.init( - address="localhost:6379", driver_object_store_memory=100 * 1024 * 1024) + address="localhost:6379", driver_object_store_memory=DRIVER_MEMORY) main() diff --git a/doc/yarn/ray-skein.yaml b/doc/yarn/ray-skein.yaml index ffbe62b41..e23ddffff 100644 --- a/doc/yarn/ray-skein.yaml +++ b/doc/yarn/ray-skein.yaml @@ -1,6 +1,7 @@ name: ray services: + # Head service. ray-head: # There should only be one instance of the head node per cluster. instances: 1 @@ -13,19 +14,15 @@ services: example.py: example.py # # A packaged python environment using `conda-pack`. Note that Skein # # doesn't require any specific way of distributing files, but this - # # is a good one for python projects. + # # is a good one for python projects. This is optional. # # See https://jcrist.github.io/skein/distributing-files.html # environment: environment.tar.gz script: | # Activate the packaged conda environment # - source environment/bin/activate - # This activates a pre-existing environment for dependency management. - source /home/rayonyarn/miniconda3/bin/activate - # This obtains the Skein Application ID which is used when pushing addresses to worker services. - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - # This register the Ray head addresses needed by the workers with the Skein key-value store. - skein kv put --key=RAY_HEAD_ADDRESS --value=$(hostname -i) $APP_ID + # This stores the Ray head address in the Skein key-value store so that the workers can retrieve it later. + skein kv put current --key=RAY_HEAD_ADDRESS --value=$(hostname -i) # This command starts all the processes needed on the ray head node. # By default, we set object store memory and heap memory to roughly 200 MB. This is conservative @@ -38,8 +35,8 @@ services: # After the user script has executed, all started processes should also die. ray stop - skein application shutdown $APP_ID - + skein application shutdown current + # Worker service. ray-worker: # The number of instances to start initially. This can be scaled # dynamically later. @@ -56,12 +53,10 @@ services: script: | # Activate the packaged conda environment # - source environment/bin/activate - source /home/rayonyarn/miniconda3/bin/activate # This command gets any addresses it needs (e.g. the head node) from # the skein key-value store. - APP_ID=$(python -c 'import skein;print(skein.properties.application_id)') - RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS "$APP_ID") + RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS current) # The below command starts all the processes needed on a ray worker node, blocking until killed with sigterm. # After sigterm, all started processes should also die (ray stop).