trying to write mp.pool alternative with jm

This commit is contained in:
Paul Müller 2015-05-07 00:01:54 +02:00
parent 39b9baa705
commit 134765f661
2 changed files with 105 additions and 2 deletions

View file

@ -17,7 +17,7 @@ import jobmanager as jm
class Example_Client(jm.JobManager_Client):
def __init__(self):
# start quiet client (verbopse=0)
# start quiet client (verbose=0)
super(Example_Client, self).__init__(server="localhost",
authkey='simple example',
verbose=0)

View file

@ -5,14 +5,117 @@
from __future__ import division, print_function
from inspect import getcallargs
import multiprocessing as mp
import multiprocessing.pool as _mpool
import warnings
from . import clients
from . import progress
from .jobmanager import getCountKwargs, validCountKwargs
from .jobmanager import getCountKwargs, validCountKwargs, JobManager_Server, JobManager_Client
#__all__ = ["ProgressBar", "ProgressBarOverrideCount"]
class _Pool_Server(JobManager_Server):
def __init__(self, authkey):
# server show status information (verbose=1)
super(_Pool_Server, self).__init__(authkey=authkey,
verbose=1)
self.final_result = 1
def process_new_result(self, arg, result):
"""over write final_result with the new incoming result
if the new result is smaller then the final_result"""
print(arg, result)
if self.final_result > result:
self.final_result = result
def process_final_result(self):
print("final_result:", self.final_result)
class _Pool_Client(JobManager_Client):
def __init__(self, server, authkey, verbose):
super(_Pool_Client, self).__init__(server="localhost",
authkey=authkey,
verbose=verbose)
@staticmethod
def func(args):
"""simply return the current argument"""
print(args)
return args
class Pool(_mpool.Pool):
"""
A progressbar-decorated version of `multiprocessing.Pool`
The methods `map` and `map_async` work as expected.
"""
__doc__ = __doc__ + _mpool.Pool.__doc__
def __init__(self, processes=None, initializer=None,
initargs=(), maxtasksperchild=None):
_mpool.Pool.__init__(self, processes, initializer, initargs,
maxtasksperchild)
# the self.map method calls the self.map_async method
def map_async(self, func, iterable, chunksize=None, callback=None):
'''
Asynchronous equivalent of `map()` builtin.
'''
assert self._state == _mpool.RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
p_server = mp.Process(target=Pool._run_jm_server, args=(iterable,))
p_server.start()
import time
time.sleep(1)
p_client = mp.Process(target=Pool._run_jm_client)
p_client.start()
#task_batches = _mpool.Pool._get_tasks(func, iterable, chunksize)
#result = MapResult(self._cache, chunksize, len(iterable), callback)
#
#self._taskqueue.put((((result._job, i, mapstar, (x,), {})
# for i, x in enumerate(task_batches)), None))
p_client.join()
p_server.join()
@staticmethod
def _run_jm_client():
client = _Pool_Client("localhost", "map", verbose=1)
#client.func = self._jm_func
client.start()
@staticmethod
def _run_jm_server(iterable):
#iterable = self._jm_iterable
# Create jobmanager server and client
with _Pool_Server("map") as server:
for it in iterable:
print(it)
server.put_arg(it)
server.start()
class ProgressBar(object):
""" A wrapper/decorator with a text-based progress bar.