mirror of
https://github.com/vale981/ray
synced 2025-03-09 12:56:46 -04:00

It's important to show how Ray can be used for easily parallelizable independent tasks. I put this together to demonstrate how to di this.
364 lines
10 KiB
Text
364 lines
10 KiB
Text
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "515dffba",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Using Ray for Highly Parallelizable Tasks\n",
|
|
"\n",
|
|
"While Ray can be used for very complex parallelization tasks,\n",
|
|
"often we just want to do something simple in parallel.\n",
|
|
"For example, we may have 100,000 time series to process with exactly the same algorithm,\n",
|
|
"and each one takes a minute of processing.\n",
|
|
"\n",
|
|
"Clearly running it on a single processor is prohibitive: this would take 70 days.\n",
|
|
"Even if we managed to use 8 processors on a single machine,\n",
|
|
"that would bring it down to 9 days. But if we can use 8 machines, each with 16 cores,\n",
|
|
"it can be done in about 12 hours.\n",
|
|
"\n",
|
|
"How can we use Ray for these types of task? \n",
|
|
"\n",
|
|
"We take the simple example of computing the digits of pi.\n",
|
|
"The algorithm is simple: generate random x and y, and if ``x^2 + y^2 < 1``, it's\n",
|
|
"inside the circle, we count as in. This actually turns out to be pi/4\n",
|
|
"(remembering your high school math).\n",
|
|
"\n",
|
|
"The following code (and this notebook) assumes you have already set up your Ray cluster and that you are running on the head node. For more details on how to set up a Ray cluster please see the [Ray Cluster Quickstart Guide](https://docs.ray.io/en/master/cluster/quickstart.html). \n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "8e3e7c4f",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import ray\n",
|
|
"import random\n",
|
|
"import time\n",
|
|
"import math\n",
|
|
"from fractions import Fraction"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "92d2461b",
|
|
"metadata": {
|
|
"scrolled": true,
|
|
"tags": [
|
|
"remove-output"
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"INFO:anyscale.snapshot_util:Synced git objects for /home/ray/workspace-project-waleed_test1 to /efs/workspaces/shared_objects in 0.07651424407958984s.\n",
|
|
"INFO:anyscale.snapshot_util:Created snapshot for /home/ray/workspace-project-waleed_test1 at /tmp/snapshot_2022-05-16T16:38:57.388956_otbjcv41.zip of size 1667695 in 0.014925718307495117s.\n",
|
|
"INFO:anyscale.snapshot_util:Content hashes b'f4fcea43e90a69d561bf323a07691536' vs b'f4fcea43e90a69d561bf323a07691536'\n",
|
|
"INFO:anyscale.snapshot_util:Content hash unchanged, not saving new snapshot.\n",
|
|
"INFO:ray.worker:Connecting to existing Ray cluster at address: 172.31.78.11:9031\n",
|
|
"2022-05-16 16:38:57,451\tINFO packaging.py:269 -- Pushing file package 'gcs://_ray_pkg_bf4a08129b7b19b96a1701be1151f9a8.zip' (1.59MiB) to Ray cluster...\n",
|
|
"2022-05-16 16:38:57,470\tINFO packaging.py:278 -- Successfully pushed file package 'gcs://_ray_pkg_bf4a08129b7b19b96a1701be1151f9a8.zip'.\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Updated runtime env to {'working_dir': '/efs/workspaces/expwrk_aXjrEWxgAfCazC2KCUCttum5/snapshots/snapshot_2022-05-16T00:38:47.798071_auto_p0mfj5qr.zip'}\n"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"RayContext(dashboard_url='127.0.0.1:8265', python_version='3.8.5', ray_version='2.0.0.dev0', ray_commit='e2ee2140f97ca08b70fd0f7561038b7f8d958d63', address_info={'node_ip_address': '172.31.78.11', 'raylet_ip_address': '172.31.78.11', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-05-16_16-09-56_740551_146/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-05-16_16-09-56_740551_146/sockets/raylet', 'webui_url': '127.0.0.1:8265', 'session_dir': '/tmp/ray/session_2022-05-16_16-09-56_740551_146', 'metrics_export_port': 55904, 'gcs_address': '172.31.78.11:9031', 'address': '172.31.78.11:9031', 'node_id': 'a9667bf72f15c8289ed547e67b90d8098ff2771386b88774f2f33201'})"
|
|
]
|
|
},
|
|
"execution_count": 3,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"# Let's start Ray\n",
|
|
"ray.init(address='auto')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "b96f2eb9",
|
|
"metadata": {},
|
|
"source": [
|
|
"We use the ``@ray.remote`` decorator to create a Ray task.\n",
|
|
"A task is like a function, except the result is returned asynchronously.\n",
|
|
"\n",
|
|
"It also may not run on the local machine, it may run elsewhere in the cluster.\n",
|
|
"This way you can run multiple tasks in parallel,\n",
|
|
"beyond the limit of the number of processors you can have in a single machine."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "ece9887c",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"@ray.remote\n",
|
|
"def pi4_sample(sample_count):\n",
|
|
" \"\"\"pi4_sample runs sample_count experiments, and returns the \n",
|
|
" fraction of time it was inside the circle. \n",
|
|
" \"\"\"\n",
|
|
" in_count = 0\n",
|
|
" for i in range(sample_count):\n",
|
|
" x = random.random()\n",
|
|
" y = random.random()\n",
|
|
" if x*x + y*y <= 1:\n",
|
|
" in_count += 1\n",
|
|
" return Fraction(in_count, sample_count)\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "05bf8675",
|
|
"metadata": {},
|
|
"source": [
|
|
"To get the result of a future, we use ray.get() which \n",
|
|
"blocks until the result is complete. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "9d9a3509",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Running 1000000 tests took 1.4935967922210693 seconds\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"SAMPLE_COUNT = 1000 * 1000\n",
|
|
"start = time.time() \n",
|
|
"future = pi4_sample.remote(sample_count = SAMPLE_COUNT)\n",
|
|
"pi4 = ray.get(future)\n",
|
|
"end = time.time()\n",
|
|
"dur = end - start\n",
|
|
"print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "cc17429d",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now let's see how good our approximation is."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "42d4c464",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pi = pi4 * 4"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "4009bee0",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"3.143024"
|
|
]
|
|
},
|
|
"execution_count": 8,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"float(pi)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"id": "d19155d6",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"0.0004554042254233261"
|
|
]
|
|
},
|
|
"execution_count": 9,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"abs(pi-math.pi)/pi"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "ddb3b095",
|
|
"metadata": {},
|
|
"source": [
|
|
"Meh. A little off -- that's barely 4 decimal places.\n",
|
|
"Why don't we do it a 100,000 times as much? Let's do 100 billion!"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"id": "b7b9cff9",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Doing 100000 batches\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"FULL_SAMPLE_COUNT = 100 * 1000 * 1000 * 1000 # 100 billion samples! \n",
|
|
"BATCHES = int(FULL_SAMPLE_COUNT / SAMPLE_COUNT)\n",
|
|
"print(f'Doing {BATCHES} batches')\n",
|
|
"results = []\n",
|
|
"for _ in range(BATCHES):\n",
|
|
" results.append(pi4_sample.remote())\n",
|
|
"output = ray.get(results)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "94264de4",
|
|
"metadata": {},
|
|
"source": [
|
|
"Notice that in the above, we generated a list with 100,000 futures.\n",
|
|
"Now all we do is have to do is wait for the result.\n",
|
|
"\n",
|
|
"Depending on your ray cluster's size, this might take a few minutes.\n",
|
|
"But to give you some idea, if we were to do it on a single machine,\n",
|
|
"when I ran this it took 0.4 seconds.\n",
|
|
"\n",
|
|
"On a single core, that means we're looking at 0.4 * 100000 = about 11 hours. \n",
|
|
"\n",
|
|
"Here's what the Dashboard looks like: \n",
|
|
"\n",
|
|
"\n",
|
|
"\n",
|
|
"So now, rather than just a single core working on this,\n",
|
|
"I have 168 working on the task together. And its ~80% efficient."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"id": "76eba02d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pi = sum(output)*4/len(output)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"id": "ede2bd8c",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"3.14159518188"
|
|
]
|
|
},
|
|
"execution_count": 13,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"float(pi)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"id": "bb62cb27",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"8.047791203506436e-07"
|
|
]
|
|
},
|
|
"execution_count": 14,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"abs(pi-math.pi)/pi"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "30d12e50",
|
|
"metadata": {},
|
|
"source": [
|
|
"Not bad at all -- we're off by a millionth. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "1b36747b",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
}
|
|
],
|
|
"metadata": {
|
|
"celltoolbar": "Tags",
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.8.13"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|