From 796969e935b61e746edccc98404d12a34011c79e Mon Sep 17 00:00:00 2001 From: Richard Hartmann Date: Tue, 5 Jan 2021 15:23:46 +0100 Subject: [PATCH] added jm_ready_callback --- jobmanager/jobmanager.py | 48 ++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 2bd640f..6cbf22f 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -110,10 +110,27 @@ def set_mkl_threads(n): if n == 0: print("MKL threads not set!") else: - mkl_rt = ctypes.CDLL('libmkl_rt.so') - #print("mkl_set_num_threads", n) - mkl_rt.MKL_Set_Num_Threads(n) - #print("MKL threads", mkl_rt.mkl_get_max_threads()) + noMKL = noOB = False + try: + mkl_rt = ctypes.CDLL('libmkl_rt.so') + mkl_rt.MKL_Set_Num_Threads(n) + print("MKL threads set to", mkl_rt.mkl_get_max_threads()) + except OSError: + noMKL=True + + try: + openblas = ctypes.CDLL('libopenblas.so') + openblas.openblas_set_num_threads(n) + print("openblas threads set to", openblas.openblas_get_num_threads()) + except OSError: + noOB=True + + if noMKL and noOB: + warnings.warn("num_threads could not be set, MKL / openblas not found") + + print() + + class ServerQueueManager(BaseManager): @@ -243,6 +260,8 @@ class JobManager_Client(object): self._pid = os.getpid() self._sid = os.getsid(self._pid) + self.init_time = time.time() + if verbose is not None: log.warning("\nverbose is deprecated, only allowed for compatibility") warnings.warn("verbose is deprecated", DeprecationWarning) @@ -312,7 +331,6 @@ class JobManager_Client(object): if (self.timeout is not None) and (self.timeout < 0): log.warning("negative timeout! client will not start") - self.init_time = time.time() self.ask_on_sigterm = ask_on_sigterm self.status_output_for_srun = status_output_for_srun @@ -1388,7 +1406,8 @@ class JobManager_Server(object): job_q_on_disk_path = '.', timeout = None, log_level = logging.WARNING, - status_file_name = None): + status_file_name = None, + jm_ready_callback = lambda : print("jm ready")): """ authkey [string] - authentication key used by the SyncManager. Server and Client must have the same authkey. @@ -1420,6 +1439,9 @@ class JobManager_Server(object): assert fname_dump is None + self.timeout = timeout + self.start_time = datetime.now() + global log log = logging.getLogger(__name__+'.'+self.__class__.__name__) log.setLevel(log_level) @@ -1481,9 +1503,7 @@ class JobManager_Server(object): self.stat = None - self.timeout = timeout - self.start_time = datetime.now() - + self.jm_ready_callback = jm_ready_callback @@ -1614,8 +1634,8 @@ class JobManager_Server(object): log.info("JobManager_Server was successfully shut down") - if self.status_file: - os.remove(self.status_file) +# if self.status_file: +# os.remove(self.status_file) def show_statistics(self): if self.show_stat: @@ -1747,10 +1767,6 @@ class JobManager_Server(object): """to implement user defined final processing""" pass - def print_jm_ready(self): - # please overwrite for individual hooks to notify that the server process runs - pass - def bring_him_up(self, no_sys_exit_on_signal=False): self._start_manager() @@ -1777,7 +1793,7 @@ class JobManager_Server(object): Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT]) log.debug("ready for processing incoming results") - self.print_jm_ready() + self.jm_ready_callback() if self.status_file: with open(self.status_file, 'w') as f: