[Dask] Dask Example Tests (#16346)

* add examples

* update dask docs

* add build file

* formatting

* fix ci command

* fix

* Update python/ray/util/dask/BUILD

* newline

* fix pytest fixtures

* fixes

* formatting

* fix shuffle example
This commit is contained in:
Amog Kamsetty 2021-06-12 20:25:45 -07:00 committed by GitHub
parent acb439e8f2
commit f9936c4252
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 245 additions and 102 deletions

View file

@ -150,6 +150,8 @@
--test_tag_filters=-kubernetes,-jenkins_only,client_tests,-flaky
--test_env=RAY_CLIENT_MODE=1 --test_env=RAY_PROFILING=1
python/ray/tests/...
# Dask tests and examples.
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/...
- label: ":python: (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
@ -296,6 +298,7 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 PYTHON=3.7 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
- rm -rf ./python/ray/thirdparty_files; ./ci/travis/ci.sh build
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client --test_env=RAY_CLIENT_MODE=1 python/ray/util/dask/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-flaky python/ray/tune/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-client_unit_tests python/ray/util/sgd/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client,-flaky python/ray/util/xgboost/...

View file

@ -31,33 +31,8 @@ any Dask `.compute() <https://docs.dask.org/en/latest/api.html#dask.compute>`__
call.
Here's an example:
.. code-block:: python
import ray
from ray.util.dask import ray_dask_get
import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)
# Set the scheduler to ray_dask_get in your config so you don't have to specify it on
# each compute call.
dask.config.set(scheduler=ray_dask_get)
df = dd.from_pandas(pd.DataFrame(
np.random.randint(0, 100, size=(1024, 2)),
columns=["age", "grade"]), npartitions=2)
df.groupby(["age"]).mean().compute()
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_scheduler_example.py
:language: python
.. note::
For execution on a Ray cluster, you should *not* use the
@ -121,42 +96,8 @@ aggregations): those downstream computations will be faster since that base coll
computation was kicked off early and referenced by all downstream computations, often
via shared memory.
.. code-block:: python
import ray
from ray.util.dask import ray_dask_get
import dask
import dask.array as da
import numpy as np
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the scheduler to ray_dask_get in your config so you don't have to specify it on
# each compute call.
dask.config.set(scheduler=ray_dask_get)
d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
# dtype=dtype('float64')), (5,))}
# This submits all underlying Ray tasks to the cluster and returns a Dask array with
# the Ray futures inlined.
d_arr_p = d_arr.persist()
# Notice that the Ray ObjectRef is inlined. The dask.ones() task has been submitted
# to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}
# Future computations on this persisted Dask Array will be fast since we already
# started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_persist_example.py
:language: python
Custom optimization for Dask DataFrame shuffling
@ -168,31 +109,8 @@ Dask on Ray provides a Dask DataFrame optimizer that leverages Ray's ability to
execute multiple-return tasks in order to speed up shuffling by as much as 4x on Ray.
Simply set the `dataframe_optimize` configuration option to our optimizer function, similar to how you specify the Dask-on-Ray scheduler:
.. code-block:: python
import ray
from ray.util.dask import ray_dask_get, dataframe_optimize
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If you're connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the scheduler to ray_dask_get, and set the Dask DataFrame optimizer to our
# custom optimization function, this time using the config setter as a context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
npartitions = 100
df = dd.from_pandas(pd.DataFrame(
np.random.randint(0, 100, size=(10000, 2)),
columns=["age", "grade"]), npartitions=npartitions)
# We set max_branch to infinity in order to ensure that the task-based shuffle
# happens in a single stage, which is required in order for our optimization to
# work.
df.set_index(
["age"], shuffle="tasks", max_branch=float("inf")).head(10, npartitions=-1)
.. literalinclude:: ../../python/ray/util/dask/examples/dask_ray_shuffle_optimization.py
:language: python
Callbacks
---------

View file

@ -125,8 +125,6 @@ py_test_module_list(
"test_command_runner.py",
"test_component_failures.py",
"test_coordinator_server.py",
"test_dask_callback.py",
"test_dask_scheduler.py",
"test_debug_tools.py",
"test_job.py",
"test_k8s_operator_unit_tests.py",
@ -239,9 +237,6 @@ py_test_module_list(
"test_basic_2.py",
"test_basic_3.py",
"test_asyncio.py",
"test_dask_callback.py",
"test_dask_scheduler.py",
"test_dask_optimization.py",
],
size = "medium",
extra_srcs = SRCS,

118
python/ray/util/dask/BUILD Normal file
View file

@ -0,0 +1,118 @@
# --------------------------------------------------------------------
# Tests from the python/ray/util/dask/tests directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
name = "test_dask_callback",
size = "small",
srcs = ["tests/test_dask_callback.py"],
tags = ["exclusive"],
deps = [":dask_lib"]
)
py_test(
name = "test_dask_callback_client_mode",
size = "small",
main = "test_dask_callback.py",
srcs = ["tests/test_dask_callback.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"]
)
py_test(
name = "test_dask_optimization",
size = "small",
srcs = ["tests/test_dask_optimization.py"],
tags = ["exclusive"],
deps = [":dask_lib"]
)
py_test(
name = "test_dask_optimization_client_mode",
size = "small",
main = "test_dask_optimization.py",
srcs = ["tests/test_dask_optimization.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"]
)
py_test(
name = "test_dask_scheduler",
size = "small",
srcs = ["tests/test_dask_scheduler.py"],
tags = ["exclusive"],
deps = [":dask_lib"]
)
py_test(
name = "test_dask_scheduler_client_mode",
size = "small",
main = "test_dask_scheduler.py",
srcs = ["tests/test_dask_scheduler.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"]
)
# --------------------------------------------------------------------
# Tests from the python/ray/util/dask/examples directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
name = "dask_ray_persist_example",
size = "medium",
srcs = ["examples/dask_ray_persist_example.py"],
tags = ["exclusive"],
deps = [":dask_lib"],
)
py_test(
name = "dask_ray_persist_example_client_mode",
size = "medium",
main = "dask_ray_persist_example.py",
srcs = ["examples/dask_ray_persist_example.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"],
)
py_test(
name = "dask_ray_scheduler_example",
size = "medium",
srcs = ["examples/dask_ray_scheduler_example.py"],
tags = ["exclusive"],
deps = [":dask_lib"],
)
py_test(
name = "dask_ray_scheduler_example_client_mode",
size = "medium",
main = "dask_ray_scheduler_example.py",
srcs = ["examples/dask_ray_scheduler_example.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"],
)
py_test(
name = "dask_ray_shuffle_optimization",
size = "medium",
srcs = ["examples/dask_ray_shuffle_optimization.py"],
tags = ["exclusive"],
deps = [":dask_lib"],
)
py_test(
name = "dask_ray_shuffle_optimization_client_mode",
size = "medium",
main = "dask_ray_shuffle_optimization.py",
srcs = ["examples/dask_ray_shuffle_optimization.py"],
tags = ["exclusive", "client"],
deps = [":dask_lib"],
)
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "dask_lib",
srcs = glob(["**/*.py"], exclude=["tests/*.py"]),
)

View file

@ -0,0 +1,36 @@
import ray
from ray.util.dask import ray_dask_get
import dask
import dask.array as da
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the scheduler to ray_dask_get in your config so you don't
# have to specify it on each compute call.
dask.config.set(scheduler=ray_dask_get)
d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
# dtype=dtype('float64')), (5,))}
# This submits all underlying Ray tasks to the cluster and returns
# a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()
# Notice that the Ray ObjectRef is inlined. The dask.ones() task has
# been submitted to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}
# Future computations on this persisted Dask Array will be fast since we
# already started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()
ray.shutdown()

View file

@ -0,0 +1,28 @@
import ray
from ray.util.dask import ray_dask_get
import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)
# Set the scheduler to ray_dask_get in your config so you don't have to
# specify it on each compute call.
dask.config.set(scheduler=ray_dask_get)
df = dd.from_pandas(
pd.DataFrame(
np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
npartitions=2)
df.groupby(["age"]).mean().compute()
ray.shutdown()

View file

@ -0,0 +1,29 @@
import ray
from ray.util.dask import dataframe_optimize
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the Dask DataFrame optimizer to
# our custom optimization function, this time using the config setter as a
# context manager.
with dask.config.set(dataframe_optimize=dataframe_optimize):
npartitions = 100
df = dd.from_pandas(
pd.DataFrame(
np.random.randint(0, 100, size=(10000, 2)),
columns=["age", "grade"]),
npartitions=npartitions)
# We set max_branch to infinity in order to ensure that the task-based
# shuffle happens in a single stage, which is required in order for our
# optimization to work.
df.set_index(
["age"], shuffle="tasks", max_branch=float("inf")).head(
10, npartitions=-1)
ray.shutdown()

View file

View file

@ -5,6 +5,14 @@ import ray
from ray.util.dask import ray_dask_get, RayDaskCallback
@pytest.fixture
def ray_start_1_cpu():
address_info = ray.init(num_cpus=2)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
@dask.delayed
def add(x, y):
return x + y
@ -20,7 +28,7 @@ def test_callback_active():
assert not RayDaskCallback.ray_active
def test_presubmit_shortcircuit(ray_start_regular_shared):
def test_presubmit_shortcircuit(ray_start_1_cpu):
"""
Test that presubmit return short-circuits task submission, and that task's
result is set to the presubmit return value.
@ -41,7 +49,7 @@ def test_presubmit_shortcircuit(ray_start_regular_shared):
assert result == 0
def test_pretask_posttask_shared_state(ray_start_regular_shared):
def test_pretask_posttask_shared_state(ray_start_1_cpu):
"""
Test that pretask return value is passed to corresponding posttask
callback.
@ -61,7 +69,7 @@ def test_pretask_posttask_shared_state(ray_start_regular_shared):
assert result == 5
def test_postsubmit(ray_start_regular_shared):
def test_postsubmit(ray_start_1_cpu):
"""
Test that postsubmit is called after each task.
"""
@ -94,7 +102,7 @@ def test_postsubmit(ray_start_regular_shared):
assert result == 5
def test_postsubmit_all(ray_start_regular_shared):
def test_postsubmit_all(ray_start_1_cpu):
"""
Test that postsubmit_all is called once.
"""
@ -126,7 +134,7 @@ def test_postsubmit_all(ray_start_regular_shared):
assert result == 5
def test_finish(ray_start_regular_shared):
def test_finish(ray_start_1_cpu):
"""
Test that finish callback is called once.
"""
@ -158,7 +166,7 @@ def test_finish(ray_start_regular_shared):
assert result == 5
def test_multiple_callbacks(ray_start_regular_shared):
def test_multiple_callbacks(ray_start_1_cpu):
"""
Test that multiple callbacks are supported.
"""
@ -194,7 +202,7 @@ def test_multiple_callbacks(ray_start_regular_shared):
assert result == 5
def test_pretask_posttask_shared_state_multi(ray_start_regular_shared):
def test_pretask_posttask_shared_state_multi(ray_start_1_cpu):
"""
Test that pretask return values are passed to the correct corresponding
posttask callbacks when multiple callbacks are given.

View file

@ -7,8 +7,16 @@ import ray
from ray.util.client.common import ClientObjectRef
@pytest.fixture
def ray_start_1_cpu():
address_info = ray.init(num_cpus=2)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
@unittest.skipIf(sys.platform == "win32", "Failing on Windows.")
def test_ray_dask_basic(ray_start_regular_shared):
def test_ray_dask_basic(ray_start_1_cpu):
from ray.util.dask import ray_dask_get
@ray.remote
@ -36,7 +44,7 @@ def test_ray_dask_basic(ray_start_regular_shared):
@unittest.skipIf(sys.platform == "win32", "Failing on Windows.")
def test_ray_dask_persist(ray_start_regular_shared):
def test_ray_dask_persist(ray_start_1_cpu):
from ray.util.dask import ray_dask_get
arr = da.ones(5) + 2
result = arr.persist(scheduler=ray_dask_get)