diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 8bfbaf086..7cec0359a 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -152,8 +152,6 @@ test_python() { -python/ray/tests:test_global_gc -python/ray/tests:test_job -python/ray/tests:test_memstat - -python/ray/tests:test_multiprocessing # flaky, causes subsequent tests to fail - -python/ray/tests:test_multiprocessing_client_mode -python/ray/tests:test_multi_node_3 -python/ray/tests:test_object_manager # OOM on test_object_directory_basic -python/ray/tests:test_resource_demand_scheduler diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index 415f87d45..1f9b511fa 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -1,5 +1,6 @@ import os import sys +import platform import pytest import tempfile import time @@ -10,7 +11,7 @@ import math import ray from ray._private.test_utils import SignalActor -from ray.util.multiprocessing import Pool, TimeoutError +from ray.util.multiprocessing import Pool, TimeoutError, JoinableQueue def teardown_function(function): @@ -370,9 +371,9 @@ def test_starmap(pool): assert pool.starmap(lambda x, y: x + y, zip([1, 2], [3, 4])) == [4, 6] -@pytest.mark.skipif(sys.platform == "win32", reason="Hangs in windows") def test_callbacks(pool_4_processes, pool_4_processes_python_multiprocessing_lib): - callback_queue = queue.Queue() + Queue = JoinableQueue if platform.system() == "Windows" else queue.Queue + callback_queue = Queue() def callback(result): callback_queue.put(result) @@ -405,7 +406,7 @@ def test_callbacks(pool_4_processes, pool_4_processes_python_multiprocessing_lib for callback_type in test_callback_types: # Reinitialize queue to track number of callback calls made by # the current process_pool and callback_type in map_async - callback_queue = queue.Queue() + callback_queue = Queue() indices, error_indices = list(range(100)), [] if callback_type == "error callback": diff --git a/python/ray/util/multiprocessing/__init__.py b/python/ray/util/multiprocessing/__init__.py index f9b741f1e..5b390439f 100644 --- a/python/ray/util/multiprocessing/__init__.py +++ b/python/ray/util/multiprocessing/__init__.py @@ -1,5 +1,5 @@ -from multiprocessing import TimeoutError +from multiprocessing import TimeoutError, JoinableQueue from .pool import Pool -__all__ = ["Pool", "TimeoutError"] +__all__ = ["Pool", "TimeoutError", "JoinableQueue"]