code cleanup with pool

This commit is contained in:
Paul Müller 2015-11-11 15:37:17 +01:00
parent e4403b0f3a
commit 6539a05546

View file

@ -8,11 +8,12 @@ from inspect import getcallargs
import multiprocessing as mp import multiprocessing as mp
import random import random
import multiprocessing.pool as _mpool import multiprocessing.pool as _mpool
import time
import warnings import warnings
from . import clients from . import clients
from . import progress from . import progress
from .jobmanager import getCountKwargs, validCountKwargs, JobManager_Server, JobManager_Client from .jobmanager import getCountKwargs, validCountKwargs, JobManager_Server, JobManager_Client, JMConnectionError
#__all__ = ["ProgressBar", "ProgressBarOverrideCount"] #__all__ = ["ProgressBar", "ProgressBarOverrideCount"]
@ -22,7 +23,7 @@ class _Pool_Server(JobManager_Server):
def __init__(self, authkey): def __init__(self, authkey):
# server show status information (verbose=1) # server show status information (verbose=1)
super(_Pool_Server, self).__init__(authkey=authkey, super(_Pool_Server, self).__init__(authkey=authkey,
verbose=2) verbose=1)
self.results = list() self.results = list()
def process_new_result(self, arg, result): def process_new_result(self, arg, result):
@ -79,6 +80,8 @@ class Pool(_mpool.Pool):
def _map_async(self, *args, **kwargs): def _map_async(self, *args, **kwargs):
return self.map_async(*args, **kwargs) return self.map_async(*args, **kwargs)
def map(self, *args, **kwargs):
return self.map_async(*args, **kwargs)
def map_async(self, func, iterable, chunksize=None, callback=None): def map_async(self, func, iterable, chunksize=None, callback=None):
''' '''
@ -90,13 +93,17 @@ class Pool(_mpool.Pool):
if not hasattr(iterable, '__len__'): if not hasattr(iterable, '__len__'):
iterable = list(iterable) iterable = list(iterable)
warnings.warn("chunksize not supported in jobmanager") if chunksize is not None:
warnings.warn("callback not yet implemented") raise NotImplementedError("chunksize not supported in jobmanager")
if callback is not None:
raise NotImplementedError("callback for jobmanager pool not yet implemented")
q = mp.Queue() q = mp.Queue()
p_server = mp.Process(target=Pool._run_jm_server, args=(q, iterable, self.authkey)) p_server = mp.Process(target=Pool._run_jm_server, args=(q, iterable, self.authkey))
p_server.start() p_server.start()
time.sleep(1)
p_client = mp.Process(target=Pool._run_jm_client, args=(func, self.authkey)) p_client = mp.Process(target=Pool._run_jm_client, args=(func, self.authkey))
p_client.start() p_client.start()
@ -112,6 +119,8 @@ class Pool(_mpool.Pool):
client._func = func client._func = func
client.start() client.start()
@staticmethod @staticmethod
def _run_jm_server(q, iterable, authkey): def _run_jm_server(q, iterable, authkey):
#iterable = self._jm_iterable #iterable = self._jm_iterable
@ -429,7 +438,7 @@ def decorate_module_ProgressBar(module, decorator=ProgressBar, **kwargs):
if getCountKwargs(vdict[key]) is not None: if getCountKwargs(vdict[key]) is not None:
newid = "_jm_decorate_{}".format(key) newid = "_jm_decorate_{}".format(key)
if hasattr(module, newid): if hasattr(module, newid):
warings.warn("Wrapping of {} prevented by module.". warnings.warn("Wrapping of {} prevented by module.".
format(key)) format(key))
else: else:
# copy old function # copy old function