added jm_ready_callback

This commit is contained in:
Richard Hartmann 2021-01-05 15:23:46 +01:00
parent 1c12072ffd
commit 796969e935

View file

@ -110,10 +110,27 @@ def set_mkl_threads(n):
if n == 0:
print("MKL threads not set!")
else:
noMKL = noOB = False
try:
mkl_rt = ctypes.CDLL('libmkl_rt.so')
#print("mkl_set_num_threads", n)
mkl_rt.MKL_Set_Num_Threads(n)
#print("MKL threads", mkl_rt.mkl_get_max_threads())
print("MKL threads set to", mkl_rt.mkl_get_max_threads())
except OSError:
noMKL=True
try:
openblas = ctypes.CDLL('libopenblas.so')
openblas.openblas_set_num_threads(n)
print("openblas threads set to", openblas.openblas_get_num_threads())
except OSError:
noOB=True
if noMKL and noOB:
warnings.warn("num_threads could not be set, MKL / openblas not found")
print()
class ServerQueueManager(BaseManager):
@ -243,6 +260,8 @@ class JobManager_Client(object):
self._pid = os.getpid()
self._sid = os.getsid(self._pid)
self.init_time = time.time()
if verbose is not None:
log.warning("\nverbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
@ -312,7 +331,6 @@ class JobManager_Client(object):
if (self.timeout is not None) and (self.timeout < 0):
log.warning("negative timeout! client will not start")
self.init_time = time.time()
self.ask_on_sigterm = ask_on_sigterm
self.status_output_for_srun = status_output_for_srun
@ -1388,7 +1406,8 @@ class JobManager_Server(object):
job_q_on_disk_path = '.',
timeout = None,
log_level = logging.WARNING,
status_file_name = None):
status_file_name = None,
jm_ready_callback = lambda : print("jm ready")):
"""
authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey.
@ -1420,6 +1439,9 @@ class JobManager_Server(object):
assert fname_dump is None
self.timeout = timeout
self.start_time = datetime.now()
global log
log = logging.getLogger(__name__+'.'+self.__class__.__name__)
log.setLevel(log_level)
@ -1481,9 +1503,7 @@ class JobManager_Server(object):
self.stat = None
self.timeout = timeout
self.start_time = datetime.now()
self.jm_ready_callback = jm_ready_callback
@ -1614,8 +1634,8 @@ class JobManager_Server(object):
log.info("JobManager_Server was successfully shut down")
if self.status_file:
os.remove(self.status_file)
# if self.status_file:
# os.remove(self.status_file)
def show_statistics(self):
if self.show_stat:
@ -1747,10 +1767,6 @@ class JobManager_Server(object):
"""to implement user defined final processing"""
pass
def print_jm_ready(self):
# please overwrite for individual hooks to notify that the server process runs
pass
def bring_him_up(self, no_sys_exit_on_signal=False):
self._start_manager()
@ -1777,7 +1793,7 @@ class JobManager_Server(object):
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
log.debug("ready for processing incoming results")
self.print_jm_ready()
self.jm_ready_callback()
if self.status_file:
with open(self.status_file, 'w') as f: