mirror of
https://github.com/vale981/jobmanager
synced 2025-03-04 17:31:39 -05:00
added control of the MKL multi threading for each sub process
This commit is contained in:
parent
44cfb9d512
commit
909007a53d
1 changed files with 28 additions and 3 deletions
|
@ -29,6 +29,7 @@ The class JobManager_Client
|
|||
|
||||
"""
|
||||
import copy
|
||||
import ctypes
|
||||
from datetime import datetime
|
||||
import inspect
|
||||
import multiprocessing as mp
|
||||
|
@ -54,6 +55,9 @@ import ctypes
|
|||
from shutil import rmtree
|
||||
from .signalDelay import sig_delay
|
||||
|
||||
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# This is a list of all python objects that will be imported upon
|
||||
|
@ -103,6 +107,13 @@ def get_user_num_process():
|
|||
out = subprocess.check_output('ps ut | wc -l', shell=True).decode().strip()
|
||||
return int(out)-2
|
||||
|
||||
def set_mkl_threads(n):
|
||||
if n == 0:
|
||||
print("MKL threads not set!")
|
||||
else:
|
||||
mkl_rt = ctypes.CDLL('libmkl_rt.so')
|
||||
mkl_rt.MKL_Set_Num_Threads(n)
|
||||
|
||||
|
||||
class ServerQueueManager(BaseManager):
|
||||
pass
|
||||
|
@ -163,7 +174,8 @@ class JobManager_Client(object):
|
|||
use_special_SIG_INT_handler = True,
|
||||
timeout = None,
|
||||
log_level = logging.WARNING,
|
||||
ask_on_sigterm = True):
|
||||
ask_on_sigterm = True,
|
||||
nthreads = 1):
|
||||
"""
|
||||
server [string] - ip address or hostname where the JobManager_Server is running
|
||||
|
||||
|
@ -179,6 +191,10 @@ class JobManager_Client(object):
|
|||
zero: number of spawned processes == number cpu cores
|
||||
|
||||
negative integer: number of spawned processes == number cpu cores - |nproc|
|
||||
|
||||
between 0 and 1: ... fraction
|
||||
|
||||
nthreads, MLK threads for each subprocess
|
||||
|
||||
njobs [integer] - total number of jobs to run per process
|
||||
|
||||
|
@ -266,6 +282,9 @@ class JobManager_Client(object):
|
|||
if self.nproc == 0:
|
||||
self.nproc = 1
|
||||
log.debug("nproc:%s", self.nproc)
|
||||
|
||||
self.nthreads = nthreads
|
||||
|
||||
if njobs == 0: # internally, njobs must be negative for infinite jobs
|
||||
njobs -= 1
|
||||
self.njobs = njobs
|
||||
|
@ -375,7 +394,8 @@ class JobManager_Client(object):
|
|||
# job_q_get_timeout,
|
||||
host,
|
||||
port,
|
||||
authkey):
|
||||
authkey,
|
||||
nthreads):
|
||||
"""
|
||||
the wrapper spawned nproc times calling and handling self.func
|
||||
"""
|
||||
|
@ -419,6 +439,10 @@ class JobManager_Client(object):
|
|||
else:
|
||||
log.debug("found standard keyword arguments: [c, m]")
|
||||
_func = func
|
||||
|
||||
log.debug("set mkl threads to {}".format(nthreads))
|
||||
set_mkl_threads(nthreads)
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -732,7 +756,8 @@ class JobManager_Client(object):
|
|||
#self._job_q_get_timeout, # job_q_get_timeout
|
||||
self.server, # host
|
||||
self.port, # port
|
||||
self.authkey)) # authkey
|
||||
self.authkey, # authkey
|
||||
self.nthreads))
|
||||
|
||||
|
||||
self.procs.append(p)
|
||||
|
|
Loading…
Add table
Reference in a new issue