use pipes to make argscontainer work in multiprocess env

This commit is contained in:
Richard Hartmann 2016-11-27 22:51:34 +01:00
parent 585bf96c63
commit 0aa88b4ec7
2 changed files with 189 additions and 89 deletions

View file

@ -63,7 +63,6 @@ __all__ = ["JobManager_Client",
"JobManager_Server",
"getDateForFileName"
]
# Magic conversion from 3 to 2
if sys.version_info[0] == 2:
@ -837,59 +836,112 @@ class ClosableQueue(object):
self._close = True
class ArgsContainerQueue(object):
def __init__(self, q_in, q_out):
self._q_in = q_in
self._q_out = q_out
self.timeout = 1
def __init__(self, put_conn, get_conn):
self.put_conn = put_conn
self.get_conn = get_conn
def put(self, item):
self._q_in.put(item)
self.put_conn.send(item)
kind, res = self.put_conn.recv()
if kind == '#suc':
return
elif kind == '#exc':
raise res
else:
raise RuntimeError("unknown kind '{}'".format(kind))
def get(self):
return self._q_out.get(timeout = self.timeout)
self.get_conn.send('#GET')
kind, res = self.get_conn.recv()
if kind == '#res':
return res
elif kind == '#exc':
raise res
else:
raise RuntimeError("unknown kind '{}'".format(kind))
class ArgsContainer(object):
def __init__(self, path=None, new_shelve=True):
r"""a container for the arguments hold by the jobmanager server
and fed to the jobmanager clients
With the option to keep the data on the disk while providing
a Queue like thread save interface. For multiprocessing the 'get_Queue'
method returns again a Queue like object, that can be passes to subprocesses
to access the actual container class.
Additional features:
- items may only be inserted once via 'put'
- if an item was drawn using 'get', it may be reinserted using 'put'
- items are identified via hash values, using sha256
- items that were drawn using 'get' can be marked using 'mark'
- items that are 'marked' can not be reinserted
- the class is pickable, when unpickled, ALL items that are NOT marked
will be accessible via 'get'
These Features allows the following workflow for the Server/Client communication.
The Client used 'get' to draw an item. Onces successfully processed the client
returns the item and its results. The item will be marked once it was received by the server.
Now the item is 'save', even when shutting down the server, dumping its state and restarting
the server, the item will not be calculated again.
"""
def __init__(self, path=None):
self._path = path
self._lock = threading.Lock()
if self._path is None:
self.data = {}
else:
self._open_shelve(new_shelve)
self._open_shelve()
self._closed = False
self._not_gotten_ids = set()
self._marked_ids = set()
self._max_id = 0
self._q_in = mp.Queue()
self._q_out = mp.Queue()
self.q_out_size = 10
def get_queue(self):
c_get_1, c_get_2 = mp.Pipe()
c_put_1, c_put_2 = mp.Pipe()
thr_in_reader = threading.Thread(target=self._receiver)
thr_in_reader.daemon = True
#thr_in_reader.start()
thr_sender = threading.Thread(target=self._sender, args=(c_get_1, ))
thr_sender.daemon = True
thr_sender.start()
thr_out_sender = threading.Thread(target=self._sender)
thr_out_sender.daemon = True
#thr_out_sender.start()
thr_receiver = threading.Thread(target=self._receiver, args=(c_put_1,))
thr_receiver.daemon = True
thr_receiver.start()
return ArgsContainerQueue(put_conn=c_put_2, get_conn=c_get_2)
def _receiver(self):
def _receiver(self, conn):
while True:
item = self._q_in.get()
self.put(item)
def _sender(self):
while self._q_out.qsize() < self.q_out_size:
try:
self._q_out.put(self.get())
except queue.Empty:
item = conn.recv()
except EOFError:
break
try:
self.put(item)
except Exception as e:
conn.send( ('#exc', type(e)) )
else:
conn.send( ('#suc', None) )
def _sender(self, conn):
while True:
try:
msg = conn.recv()
except EOFError:
break
if msg == '#GET':
try:
conn.send( ('#res', self.get()) )
except Exception as e:
conn.send( ('#exc', type(e)) )
else:
raise RuntimeError("reveived unknown message '{}'".format(msg))
def _open_shelve(self, new_shelve=True):
if os.path.exists(self._path):
@ -897,14 +949,11 @@ class ArgsContainer(object):
raise RuntimeWarning("can not create shelve, path '{}' is an existing file".format(self._path))
if new_shelve:
raise RuntimeError("a shelve with name {} already exists".format(self._path))
print("shelve exists")
else:
os.makedirs(self._path)
fname = os.path.join(self._path, 'args')
fname = os.path.abspath(os.path.join(self._path, 'args'))
self.data = shelve.open(fname)
print("open shelve", fname, len(self.data), type(self.data))
def __getstate__(self):
@ -919,7 +968,6 @@ class ArgsContainer(object):
# the not gotten ones are all items except the markes ones
# the old gotten ones which are not marked where lost
self._not_gotten_ids = set(range(self._max_id)) - self._marked_ids
print("getstate", tmp)
if isinstance(tmp, dict):
self.data = tmp
self._path = None

View file

@ -7,7 +7,7 @@ import sys
import time
import multiprocessing as mp
from multiprocessing import util
util.log_to_stderr()
# util.log_to_stderr()
from multiprocessing.managers import BaseManager
import socket
import signal
@ -780,6 +780,8 @@ def test_ArgsContainer():
from shutil import rmtree
import shelve
# simple test on shelve, close is needed to write data to disk
s = shelve.open("test")
s['a'] = 1
s.close()
@ -787,50 +789,21 @@ def test_ArgsContainer():
assert s2['a'] == 1
path = 'argscont'
ac = ArgsContainer(path, new_shelve=False)
ac.data['1'] = 'item11'
print(len(ac.data))
ac.close()
time.sleep(1)
fname = '/home/cima/uni/myGit/hierarchies/libs/externals/dev/jobmanager_package/tests/argscont/args'
s = shelve.open(fname)
time.sleep(1)
print(len(s))
return
# remove old test data
try:
rmtree(path)
except FileNotFoundError:
pass
ac = ArgsContainer(path)
try:
try: # error as ac already exists on disk
ac = ArgsContainer(path)
except RuntimeError:
pass
else:
assert False
ac.clear()
ac = ArgsContainer(path)
ac.put(1)
i = ac.get()
print(i)
ac2 = ArgsContainer(path, new_shelve=False)
print(ac2.qsize())
item = ac2.get()
print(item)
ac.clear()
return
ac.clear() # remove ac from disk
#for p in [None, path]:
for p in [path]:
@ -840,8 +813,6 @@ def test_ArgsContainer():
ac.put(arg)
assert ac.qsize() == 5
print(len(ac.data), type(ac.data))
item1 = ac.get()
item2 = ac.get()
@ -899,36 +870,17 @@ def test_ArgsContainer():
import pickle
ac_dump = pickle.dumps(ac)
print(len(ac.data), type(ac.data))
fname = os.path.join(path, 'args')
print(fname)
s = shelve.open(fname, flag='c')
print(len(s), type(s))
# del ac
# ac2 = pickle.loads(ac_dump)
return
ac.close_shelve() # the shelve need to be closed, so that the data gets flushed to disk
# note here, when loading, the _not_gottem_id are all ids
# except the marked its
ac2 = pickle.loads(ac_dump)
assert ac2.qsize() == 4
from jobmanager.jobmanager import bf, hashlib
item3_hash = hashlib.sha256(bf.dump(item3)).hexdigest()
print(item3)
print(item3_hash)
for k in ac2.data:
print(k)
assert item3_hash in ac2.data
assert ac2.marked_items() == 1
assert ac2.unmarked_items() == 4
@ -1026,6 +978,105 @@ def test_ArgsContainer_BaseManager():
assert ac_inst.qsize() == 200
def test_ArgsContainer_BaseManager_in_subprocess():
from jobmanager.jobmanager import ArgsContainer
from jobmanager.jobmanager import ContainerClosedError
import queue
global PORT
path = 'argscont'
from shutil import rmtree
try:
rmtree(path)
except FileNotFoundError:
pass
for p in [path, None]:
PORT += 1
ac_inst = ArgsContainer(p)
ac_inst.put('a')
assert ac_inst.qsize() == 1
assert ac_inst.gotten_items() == 0
assert ac_inst.marked_items() == 0
class MM(BaseManager):
pass
q = ac_inst.get_queue()
MM.register('get_job_q', callable=lambda: q, exposed=['put', 'get'])
m = MM(('', PORT), b'test_argscomnt')
m.start()
class MM_remote(BaseManager):
pass
MM_remote.register('get_job_q')
mr = MM_remote(('localhost', PORT), b'test_argscomnt')
mr.connect()
acr = mr.get_job_q()
acr.put('b')
acr.put('c')
time.sleep(0.1)
assert ac_inst.qsize() == 3
assert ac_inst.gotten_items() == 0
assert ac_inst.marked_items() == 0
it = ac_inst.get()
assert ac_inst.qsize() == 2
assert ac_inst.gotten_items() == 1
assert ac_inst.marked_items() == 0
ac_inst.mark(it)
assert ac_inst.qsize() == 2
assert ac_inst.gotten_items() == 1
assert ac_inst.marked_items() == 1
it = acr.get()
assert ac_inst.qsize() == 1
assert ac_inst.gotten_items() == 2
assert ac_inst.marked_items() == 1
ac_inst.mark(it)
assert ac_inst.qsize() == 1
assert ac_inst.gotten_items() == 2
assert ac_inst.marked_items() == 2
acr.get()
assert ac_inst.qsize() == 0
try:
acr.get()
except queue.Empty:
print("caught queue.Empty")
else:
assert False
acr.put('e')
time.sleep(0.1)
assert ac_inst.qsize() == 1
ac_inst.close()
try:
acr.put('f')
except ContainerClosedError:
print("caught ContainerClosedError")
else:
assert False
try:
acr.get()
except ContainerClosedError:
print("caught ContainerClosedError")
else:
assert False
def test_unbind_adresse():
from multiprocessing import managers
from multiprocessing import connection
@ -1129,8 +1180,9 @@ if __name__ == "__main__":
pass
else:
func = [
test_ArgsContainer,
# test_ArgsContainer,
# test_ArgsContainer_BaseManager,
test_ArgsContainer_BaseManager_in_subprocess,
# test_hum_size,
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,