mirror of
https://github.com/vale981/jobmanager
synced 2025-03-05 09:51:38 -05:00
cleanup and tried to improve module decorator
This commit is contained in:
parent
df0d2e165d
commit
98275679b0
2 changed files with 19 additions and 5 deletions
|
@ -22,7 +22,7 @@ def func(x):
|
||||||
# Create list of parameters
|
# Create list of parameters
|
||||||
a = list()
|
a = list()
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
a.append((i,2.34))
|
a.append([i,2.34])
|
||||||
|
|
||||||
# mp.Pool example:
|
# mp.Pool example:
|
||||||
p_mp = mp.Pool()
|
p_mp = mp.Pool()
|
||||||
|
|
|
@ -9,6 +9,7 @@ import multiprocessing as mp
|
||||||
import random
|
import random
|
||||||
import multiprocessing.pool as _mpool
|
import multiprocessing.pool as _mpool
|
||||||
import time
|
import time
|
||||||
|
from types import ModuleType
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
from . import clients
|
from . import clients
|
||||||
|
@ -53,7 +54,10 @@ class _Pool_Client(JobManager_Client):
|
||||||
|
|
||||||
def func(self, args, *_):
|
def func(self, args, *_):
|
||||||
"""simply return the current argument"""
|
"""simply return the current argument"""
|
||||||
return self._func(args)
|
if not isinstance(args, list):
|
||||||
|
# convert tuple to list
|
||||||
|
args = list(args)
|
||||||
|
return self._func(*args)
|
||||||
|
|
||||||
|
|
||||||
class Pool(_mpool.Pool):
|
class Pool(_mpool.Pool):
|
||||||
|
@ -81,7 +85,7 @@ class Pool(_mpool.Pool):
|
||||||
return self.map_async(*args, **kwargs)
|
return self.map_async(*args, **kwargs)
|
||||||
|
|
||||||
def map(self, *args, **kwargs):
|
def map(self, *args, **kwargs):
|
||||||
return self.map_async(*args, **kwargs)
|
return self.map_async(*args, **kwargs).get()
|
||||||
|
|
||||||
def map_async(self, func, iterable, chunksize=None, callback=None):
|
def map_async(self, func, iterable, chunksize=None, callback=None):
|
||||||
'''
|
'''
|
||||||
|
@ -90,7 +94,7 @@ class Pool(_mpool.Pool):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert self._state == _mpool.RUN
|
assert self._state == _mpool.RUN
|
||||||
if not hasattr(iterable, '__len__'):
|
if not isinstance(iterable, list):
|
||||||
iterable = list(iterable)
|
iterable = list(iterable)
|
||||||
|
|
||||||
if chunksize is not None:
|
if chunksize is not None:
|
||||||
|
@ -111,7 +115,7 @@ class Pool(_mpool.Pool):
|
||||||
p_server.join()
|
p_server.join()
|
||||||
|
|
||||||
# call q.get() to obtain the results
|
# call q.get() to obtain the results
|
||||||
return q.get()
|
return q
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _run_jm_client(func, authkey):
|
def _run_jm_client(func, authkey):
|
||||||
|
@ -452,3 +456,13 @@ def decorate_module_ProgressBar(module, decorator=ProgressBar, **kwargs):
|
||||||
print("Jobmanager wrapped {}.{}".format(
|
print("Jobmanager wrapped {}.{}".format(
|
||||||
module.__name__, key))
|
module.__name__, key))
|
||||||
|
|
||||||
|
elif vdict[key] == mp.Pool:
|
||||||
|
# replace mp.Pool
|
||||||
|
setattr(module, vdict[key], Pool)
|
||||||
|
elif isinstance(vdict[key], ModuleType):
|
||||||
|
# replace mp.Pool in submodules
|
||||||
|
subdict = vdict[key].__dict__
|
||||||
|
for skey in list(subdict.keys()):
|
||||||
|
if subdict[skey] == mp.pool.Pool:
|
||||||
|
setattr(vdict[key], subdict[skey], Pool)
|
||||||
|
|
Loading…
Add table
Reference in a new issue