mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00

Signed-off-by: Jules S.Damji jules@anyscale.com Why are these changes needed? The code snippet referenced a python function that was not defined, therefore the code snippet as is won't work. All complete or self-contained code in our docs should run. The changes made were adding the undefined function, iterating over a list of different random large arrays to show the difference between local or distributed sort's execution time, and print them. Closes #20960
111 lines
3.8 KiB
ReStructuredText
111 lines
3.8 KiB
ReStructuredText
Pattern: Tree of tasks
|
|
======================
|
|
|
|
In this pattern, remote tasks are spawned in a recursive fashion to sort a list. Within the definition of a remote function, it is possible to invoke itself (quick_sort_distributed.remote). A single call to the task triggers the dispatch of multiple tasks.
|
|
|
|
Example use case
|
|
----------------
|
|
|
|
You have a large list of items that you need to process recursively (i.e., sorting).
|
|
|
|
We call ``ray.get`` after both ray function invocations take place. This allows you to maximize parallelism in the workload.
|
|
Notice in the execution times below that with smaller and finer tasks, the non-distributed version is faster; however, as the task execution
|
|
time increases, that is the task with larger list takes longer, the distributed version is faster. Takeaway here is that fine trained tasks are an
|
|
anti Ray pattern.
|
|
|
|
.. code-block:: python
|
|
|
|
lesser = quick_sort_distributed.remote(lesser)
|
|
greater = quick_sort_distributed.remote(greater)
|
|
ray.get(lesser) + [pivot] + ray.get(greater)
|
|
|
|
|
|
.. figure:: tree-of-tasks.svg
|
|
|
|
Tree of tasks
|
|
|
|
Code example
|
|
------------
|
|
|
|
.. code-block:: python
|
|
|
|
import ray
|
|
|
|
def partition(collection):
|
|
# Use the last element as the first pivot
|
|
pivot = collection.pop()
|
|
greater, lesser = [], []
|
|
for element in collection:
|
|
if element > pivot:
|
|
greater.append(element)
|
|
else:
|
|
lesser.append(element)
|
|
return lesser, pivot, greater
|
|
|
|
def quick_sort(collection):
|
|
|
|
if len(collection) <= 200000: # magic number
|
|
return sorted(collection)
|
|
else:
|
|
lesser, pivot, greater = partition(collection)
|
|
lesser = quick_sort(lesser)
|
|
greater = quick_sort(greater)
|
|
return lesser + [pivot] + greater
|
|
|
|
@ray.remote
|
|
def quick_sort_distributed(collection):
|
|
# Tiny tasks are an antipattern.
|
|
# Thus, in our example we have a "magic number" to
|
|
# toggle when distributed recursion should be used vs
|
|
# when the sorting should be done in place. The rule
|
|
# of thumb is that the duration of an individual task
|
|
# should be at least 1 second.
|
|
if len(collection) <= 200000: # magic number
|
|
return sorted(collection)
|
|
else:
|
|
lesser, pivot, greater = partition(collection)
|
|
lesser = quick_sort_distributed.remote(lesser)
|
|
greater = quick_sort_distributed.remote(greater)
|
|
return ray.get(lesser) + [pivot] + ray.get(greater)
|
|
|
|
if __name__ == "__main__":
|
|
from numpy import random
|
|
import time
|
|
|
|
ray.init()
|
|
for size in [200000, 4000000, 8000000, 10000000, 20000000]:
|
|
print(f'Array size: {size}')
|
|
unsorted = random.randint(1000000, size=(size)).tolist()
|
|
s = time.time()
|
|
quick_sort(unsorted)
|
|
print(f"Sequential execution: {(time.time() - s):.3f}")
|
|
s = time.time()
|
|
# put the large object in the global store and pass only the reference
|
|
unsorted_obj = ray.put(unsorted)
|
|
ray.get(quick_sort_distributed.remote(unsorted_obj))
|
|
print(f"Distributed execution: {(time.time() - s):.3f}")
|
|
print("--" * 10)
|
|
|
|
.. code-block:: text
|
|
|
|
Array size: 200000
|
|
Sequential execution: 0.040
|
|
Distributed execution: 0.152
|
|
--------------------
|
|
Array size: 4000000
|
|
Sequential execution: 6.161
|
|
Distributed execution: 5.779
|
|
--------------------
|
|
Array size: 8000000
|
|
Sequential execution: 15.459
|
|
Distributed execution: 11.282
|
|
--------------------
|
|
Array size: 10000000
|
|
Sequential execution: 20.671
|
|
Distributed execution: 13.132
|
|
--------------------
|
|
Array size: 20000000
|
|
Sequential execution: 47.352
|
|
Distributed execution: 36.213
|
|
--------------------
|
|
|