Replace queue.Queue with multiprocessing.JoinableQueue (#21860)

Reason for not using `queue.Queue` for multiprocessing purposes on Windows is at https://stackoverflow.com/a/37244276 and in the second reply to https://stackoverflow.com/a/37245300
And reason for using `multiprocessing.JoinableQueue` over `multiprocessing.Queue` is https://stackoverflow.com/a/30725121

AFAIK, this is because in Windows each process gets it own `Queue` and hence nothing is shared among those processes. When `multiprocessing.Queue` is used, changes in it are shared via pipes internally along with proper locks.
This commit is contained in:
Gagandeep Singh 2022-02-15 22:31:17 +05:30 committed by GitHub
parent 37ef372a10
commit a8341dfc29
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 8 deletions

View file

@ -152,8 +152,6 @@ test_python() {
-python/ray/tests:test_global_gc -python/ray/tests:test_global_gc
-python/ray/tests:test_job -python/ray/tests:test_job
-python/ray/tests:test_memstat -python/ray/tests:test_memstat
-python/ray/tests:test_multiprocessing # flaky, causes subsequent tests to fail
-python/ray/tests:test_multiprocessing_client_mode
-python/ray/tests:test_multi_node_3 -python/ray/tests:test_multi_node_3
-python/ray/tests:test_object_manager # OOM on test_object_directory_basic -python/ray/tests:test_object_manager # OOM on test_object_directory_basic
-python/ray/tests:test_resource_demand_scheduler -python/ray/tests:test_resource_demand_scheduler

View file

@ -1,5 +1,6 @@
import os import os
import sys import sys
import platform
import pytest import pytest
import tempfile import tempfile
import time import time
@ -10,7 +11,7 @@ import math
import ray import ray
from ray._private.test_utils import SignalActor from ray._private.test_utils import SignalActor
from ray.util.multiprocessing import Pool, TimeoutError from ray.util.multiprocessing import Pool, TimeoutError, JoinableQueue
def teardown_function(function): def teardown_function(function):
@ -370,9 +371,9 @@ def test_starmap(pool):
assert pool.starmap(lambda x, y: x + y, zip([1, 2], [3, 4])) == [4, 6] assert pool.starmap(lambda x, y: x + y, zip([1, 2], [3, 4])) == [4, 6]
@pytest.mark.skipif(sys.platform == "win32", reason="Hangs in windows")
def test_callbacks(pool_4_processes, pool_4_processes_python_multiprocessing_lib): def test_callbacks(pool_4_processes, pool_4_processes_python_multiprocessing_lib):
callback_queue = queue.Queue() Queue = JoinableQueue if platform.system() == "Windows" else queue.Queue
callback_queue = Queue()
def callback(result): def callback(result):
callback_queue.put(result) callback_queue.put(result)
@ -405,7 +406,7 @@ def test_callbacks(pool_4_processes, pool_4_processes_python_multiprocessing_lib
for callback_type in test_callback_types: for callback_type in test_callback_types:
# Reinitialize queue to track number of callback calls made by # Reinitialize queue to track number of callback calls made by
# the current process_pool and callback_type in map_async # the current process_pool and callback_type in map_async
callback_queue = queue.Queue() callback_queue = Queue()
indices, error_indices = list(range(100)), [] indices, error_indices = list(range(100)), []
if callback_type == "error callback": if callback_type == "error callback":

View file

@ -1,5 +1,5 @@
from multiprocessing import TimeoutError from multiprocessing import TimeoutError, JoinableQueue
from .pool import Pool from .pool import Pool
__all__ = ["Pool", "TimeoutError"] __all__ = ["Pool", "TimeoutError", "JoinableQueue"]