mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Add queue data structures (#2261)
This commit is contained in:
parent
404bfc5da2
commit
f5c46c7765
3 changed files with 297 additions and 0 deletions
|
@ -185,6 +185,7 @@ script:
|
||||||
- python python/ray/local_scheduler/test/test.py
|
- python python/ray/local_scheduler/test/test.py
|
||||||
- python python/ray/global_scheduler/test/test.py
|
- python python/ray/global_scheduler/test/test.py
|
||||||
|
|
||||||
|
- python -m pytest python/ray/test/test_queue.py
|
||||||
- python -m pytest test/xray_test.py
|
- python -m pytest test/xray_test.py
|
||||||
|
|
||||||
- python test/runtest.py
|
- python test/runtest.py
|
||||||
|
|
164
python/ray/experimental/queue.py
Normal file
164
python/ray/experimental/queue.py
Normal file
|
@ -0,0 +1,164 @@
|
||||||
|
from __future__ import absolute_import
|
||||||
|
from __future__ import division
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
from collections import deque
|
||||||
|
import time
|
||||||
|
|
||||||
|
import ray
|
||||||
|
|
||||||
|
|
||||||
|
class Empty(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Full(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Queue(object):
|
||||||
|
"""Queue implementation on Ray.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
maxsize (int): maximum size of the queue. If zero, size is unboundend.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, maxsize=0):
|
||||||
|
self.maxsize = maxsize
|
||||||
|
self.actor = _QueueActor.remote(maxsize)
|
||||||
|
|
||||||
|
def qsize(self):
|
||||||
|
"""The size of the queue."""
|
||||||
|
return ray.get(self.actor.qsize.remote())
|
||||||
|
|
||||||
|
def empty(self):
|
||||||
|
"""Whether the queue is empty."""
|
||||||
|
return ray.get(self.actor.qsize.remote())
|
||||||
|
|
||||||
|
def full(self):
|
||||||
|
"""Whether the queue is full."""
|
||||||
|
return ray.get(self.actor.full.remote())
|
||||||
|
|
||||||
|
def put(self, item, block=True, timeout=None):
|
||||||
|
"""Adds an item to the queue.
|
||||||
|
|
||||||
|
Uses polling if block=True, so there is no guarantee of order if
|
||||||
|
multiple producers put to the same full queue.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Full if the queue is full and blocking is False.
|
||||||
|
"""
|
||||||
|
if self.maxsize <= 0:
|
||||||
|
self.actor.put.remote(item)
|
||||||
|
elif not block:
|
||||||
|
if not ray.get(self.actor.put.remote(item)):
|
||||||
|
raise Full
|
||||||
|
elif timeout is None:
|
||||||
|
# Polling
|
||||||
|
# Use a not_full condition variable or promise?
|
||||||
|
while not ray.get(self.actor.put.remote(item)):
|
||||||
|
# Consider adding time.sleep here
|
||||||
|
pass
|
||||||
|
elif timeout < 0:
|
||||||
|
raise ValueError("'timeout' must be a non-negative number")
|
||||||
|
else:
|
||||||
|
endtime = time.time() + timeout
|
||||||
|
# Polling
|
||||||
|
# Use a condition variable or switch to promise?
|
||||||
|
success = False
|
||||||
|
while not success and time.time() < endtime:
|
||||||
|
success = ray.get(self.actor.put.remote(item))
|
||||||
|
if not success:
|
||||||
|
raise Full
|
||||||
|
|
||||||
|
def get(self, block=True, timeout=None):
|
||||||
|
"""Gets an item from the queue.
|
||||||
|
|
||||||
|
Uses polling if block=True, so there is no guarantee of order if
|
||||||
|
multiple consumers get from the same empty queue.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The next item in the queue.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Empty if the queue is empty and blocking is False.
|
||||||
|
"""
|
||||||
|
if not block:
|
||||||
|
success, item = ray.get(self.actor.get.remote())
|
||||||
|
if not success:
|
||||||
|
raise Empty
|
||||||
|
elif timeout is None:
|
||||||
|
# Polling
|
||||||
|
# Use a not_empty condition variable or return a promise?
|
||||||
|
success, item = ray.get(self.actor.get.remote())
|
||||||
|
while not success:
|
||||||
|
# Consider adding time.sleep here
|
||||||
|
success, item = ray.get(self.actor.get.remote())
|
||||||
|
elif timeout < 0:
|
||||||
|
raise ValueError("'timeout' must be a non-negative number")
|
||||||
|
else:
|
||||||
|
endtime = time.time() + timeout
|
||||||
|
# Polling
|
||||||
|
# Use a not_full condition variable or return a promise?
|
||||||
|
success = False
|
||||||
|
while not success and time.time() < endtime:
|
||||||
|
success, item = ray.get(self.actor.get.remote())
|
||||||
|
if not success:
|
||||||
|
raise Empty
|
||||||
|
return item
|
||||||
|
|
||||||
|
def put_nowait(self, item):
|
||||||
|
"""Equivalent to put(item, block=False).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Full if the queue is full.
|
||||||
|
"""
|
||||||
|
return self.put(item, block=False)
|
||||||
|
|
||||||
|
def get_nowait(self):
|
||||||
|
"""Equivalent to get(item, block=False).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Empty if the queue is empty.
|
||||||
|
"""
|
||||||
|
return self.get(block=False)
|
||||||
|
|
||||||
|
|
||||||
|
@ray.remote
|
||||||
|
class _QueueActor(object):
|
||||||
|
def __init__(self, maxsize):
|
||||||
|
self.maxsize = maxsize
|
||||||
|
self._init(maxsize)
|
||||||
|
|
||||||
|
def qsize(self):
|
||||||
|
return self._qsize()
|
||||||
|
|
||||||
|
def empty(self):
|
||||||
|
return not self._qsize()
|
||||||
|
|
||||||
|
def full(self):
|
||||||
|
return 0 < self.maxsize <= self._qsize()
|
||||||
|
|
||||||
|
def put(self, item):
|
||||||
|
if self.maxsize > 0 and self._qsize() >= self.maxsize:
|
||||||
|
return False
|
||||||
|
self._put(item)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
if not self._qsize():
|
||||||
|
return False, None
|
||||||
|
return True, self._get()
|
||||||
|
|
||||||
|
# Override these for different queue implementations
|
||||||
|
def _init(self, maxsize):
|
||||||
|
self.queue = deque()
|
||||||
|
|
||||||
|
def _qsize(self):
|
||||||
|
return len(self.queue)
|
||||||
|
|
||||||
|
def _put(self, item):
|
||||||
|
self.queue.append(item)
|
||||||
|
|
||||||
|
def _get(self):
|
||||||
|
return self.queue.popleft()
|
132
python/ray/test/test_queue.py
Normal file
132
python/ray/test/test_queue.py
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
from __future__ import absolute_import
|
||||||
|
from __future__ import division
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import ray
|
||||||
|
|
||||||
|
from ray.experimental.queue import Queue, Empty, Full
|
||||||
|
|
||||||
|
|
||||||
|
def start_ray():
|
||||||
|
if not ray.worker.global_worker.connected:
|
||||||
|
ray.init()
|
||||||
|
|
||||||
|
|
||||||
|
@ray.remote
|
||||||
|
def get_async(queue, block, timeout, sleep):
|
||||||
|
time.sleep(sleep)
|
||||||
|
return queue.get(block, timeout)
|
||||||
|
|
||||||
|
|
||||||
|
@ray.remote
|
||||||
|
def put_async(queue, item, block, timeout, sleep):
|
||||||
|
time.sleep(sleep)
|
||||||
|
queue.put(item, block, timeout)
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_use():
|
||||||
|
start_ray()
|
||||||
|
q = Queue()
|
||||||
|
|
||||||
|
items = list(range(10))
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
q.put(item)
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
assert item == q.get()
|
||||||
|
|
||||||
|
|
||||||
|
def test_async():
|
||||||
|
start_ray()
|
||||||
|
q = Queue()
|
||||||
|
|
||||||
|
items = set(range(10))
|
||||||
|
producers = [ # noqa
|
||||||
|
put_async.remote(q, item, True, None, 0.5) for item in items
|
||||||
|
]
|
||||||
|
consumers = [get_async.remote(q, True, None, 0) for _ in items]
|
||||||
|
|
||||||
|
result = set(ray.get(consumers))
|
||||||
|
|
||||||
|
assert items == result
|
||||||
|
|
||||||
|
|
||||||
|
def test_put():
|
||||||
|
start_ray()
|
||||||
|
q = Queue(1)
|
||||||
|
|
||||||
|
item = 0
|
||||||
|
q.put(item, block=False)
|
||||||
|
assert q.get() == item
|
||||||
|
|
||||||
|
item = 1
|
||||||
|
q.put(item, timeout=0.2)
|
||||||
|
assert q.get() == item
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
q.put(0, timeout=-1)
|
||||||
|
|
||||||
|
q.put(0)
|
||||||
|
with pytest.raises(Full):
|
||||||
|
q.put_nowait(1)
|
||||||
|
|
||||||
|
with pytest.raises(Full):
|
||||||
|
q.put(1, timeout=0.2)
|
||||||
|
|
||||||
|
q.get()
|
||||||
|
q.put(1)
|
||||||
|
|
||||||
|
get_id = get_async.remote(q, False, None, 0.2)
|
||||||
|
q.put(2)
|
||||||
|
|
||||||
|
assert ray.get(get_id) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_get():
|
||||||
|
start_ray()
|
||||||
|
q = Queue()
|
||||||
|
|
||||||
|
item = 0
|
||||||
|
q.put(item)
|
||||||
|
assert q.get(block=False) == item
|
||||||
|
|
||||||
|
item = 1
|
||||||
|
q.put(item)
|
||||||
|
assert q.get(timeout=0.2) == item
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
q.get(timeout=-1)
|
||||||
|
|
||||||
|
with pytest.raises(Empty):
|
||||||
|
q.get_nowait()
|
||||||
|
|
||||||
|
with pytest.raises(Empty):
|
||||||
|
q.get(timeout=0.2)
|
||||||
|
|
||||||
|
item = 0
|
||||||
|
put_async.remote(q, item, True, None, 0.2)
|
||||||
|
assert q.get() == item
|
||||||
|
|
||||||
|
|
||||||
|
def test_qsize():
|
||||||
|
start_ray()
|
||||||
|
q = Queue()
|
||||||
|
|
||||||
|
items = list(range(10))
|
||||||
|
size = 0
|
||||||
|
|
||||||
|
assert q.qsize() == size
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
q.put(item)
|
||||||
|
size += 1
|
||||||
|
assert q.qsize() == size
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
assert q.get() == item
|
||||||
|
size -= 1
|
||||||
|
assert q.qsize() == size
|
Loading…
Add table
Reference in a new issue