diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 2e8e5e5..6025d82 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -40,11 +40,14 @@ import pickle import signal import socket import sys +import random import time import traceback import warnings import binfootprint as bf import progression as progress +#import queuelib +import shelve import logging import threading import ctypes @@ -183,10 +186,10 @@ class JobManager_Client(object): show_counter_only = False, interval = 0.3, emergency_dump_path = '.', - job_q_get_timeout = 1, - job_q_put_timeout = 10, - result_q_put_timeout = 300, - fail_q_put_timeout = 10, + #job_q_get_timeout = 1, + #job_q_put_timeout = 10, + #result_q_put_timeout = 300, + #fail_q_put_timeout = 10, reconnect_wait = 2, reconnect_tries = 3, ping_timeout = 2, @@ -248,14 +251,14 @@ class JobManager_Client(object): self.interval = interval log.debug("interval:%s", self.interval) - self._job_q_get_timeout = job_q_get_timeout - log.debug("_job_q_get_timeout:%s", self._job_q_get_timeout) - self._job_q_put_timeout = job_q_put_timeout - log.debug("_job_q_put_timeout:%s", self._job_q_put_timeout) - self._result_q_put_timeout = result_q_put_timeout - log.debug("_result_q_put_timeout:%s", self._result_q_put_timeout) - self._fail_q_put_timeout = fail_q_put_timeout - log.debug("_fail_q_put_timeout:%s", self._fail_q_put_timeout) + #self._job_q_get_timeout = job_q_get_timeout + #log.debug("_job_q_get_timeout:%s", self._job_q_get_timeout) + #self._job_q_put_timeout = job_q_put_timeout + #log.debug("_job_q_put_timeout:%s", self._job_q_put_timeout) + #self._result_q_put_timeout = result_q_put_timeout + #log.debug("_result_q_put_timeout:%s", self._result_q_put_timeout) + #self._fail_q_put_timeout = fail_q_put_timeout + #log.debug("_fail_q_put_timeout:%s", self._fail_q_put_timeout) self.reconnect_wait = reconnect_wait log.debug("reconnect_wait:%s", self.reconnect_wait) self.reconnect_tries = reconnect_tries @@ -396,7 +399,7 @@ class JobManager_Client(object): reset_pbc, njobs, emergency_dump_path, - job_q_get_timeout, +# job_q_get_timeout, host, port, authkey): @@ -456,26 +459,27 @@ class JobManager_Client(object): while njobs != 0: njobs -= 1 - # try to get an item from the job_q - try: - tg_0 = time.time() - arg = job_q_get(block = True, timeout = job_q_get_timeout) - tg_1 = time.time() - time_queue += (tg_1-tg_0) - + # try to get an item from the job_q + tg_0 = time.time() + try: + arg = job_q_get() # regular case, just stop working when empty job_q was found except queue.Empty: log.info("finds empty job queue, processed %s jobs", cnt) break # handle SystemExit in outer try ... except except SystemExit as e: + arg = None log.warning('getting arg from job_q failed due to SystemExit') raise e # job_q.get failed -> server down? except Exception as e: + arg = None log.error("Error when calling 'job_q_get'") handle_unexpected_queue_error(e) break + tg_1 = time.time() + time_queue += (tg_1-tg_0) # try to process the retrieved argument try: @@ -548,20 +552,23 @@ class JobManager_Client(object): # note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the # default signal handlers except SystemExit: - log.warning("SystemExit, quit processing, reinsert current argument, please wait") - log.debug("put arg back to local job_q") - try: - local_job_q.put(arg) - # handle SystemExit in outer try ... except - except SystemExit as e: - log.error("puting arg back to local job_q failed due to SystemExit") - raise e - # fail_q.put failed -> server down? - except Exception as e: - log.error("puting arg back to local job_q failed due to %s", type(e)) - handle_unexpected_queue_error(e) + if arg is None: + log.warning("SystemExit, quit processing, no argument to reinsert") else: - log.debug("putting arg back to local job_q was successful") + log.warning("SystemExit, quit processing, reinsert current argument, please wait") + log.debug("put arg back to local job_q") + try: + local_job_q.put(arg) + # handle SystemExit in outer try ... except + except SystemExit as e: + log.error("puting arg back to local job_q failed due to SystemExit") + raise e + # fail_q.put failed -> server down? + except Exception as e: + log.error("puting arg back to local job_q failed due to %s", type(e)) + handle_unexpected_queue_error(e) + else: + log.debug("putting arg back to local job_q was successful") try: sta = progress.humanize_time(time_calc / cnt) @@ -589,7 +596,6 @@ class JobManager_Client(object): if not self.connected: raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)") - log.info("STARTING CLIENT\nserver:%s authkey:%s port:%s num proc:%s", self.server, self.authkey.decode(), self.port, self.nproc) @@ -607,6 +613,8 @@ class JobManager_Client(object): if not self.show_counter_only: m_set_by_function = m_progress + else: + m_progress = None prepend = [] infoline = progress.StringValue(num_of_bytes=12) @@ -643,40 +651,39 @@ class JobManager_Client(object): result_q_put = proxy_operation_decorator(proxy=result_q, operation='put', **kwargs) fail_q_put = proxy_operation_decorator(proxy=fail_q, operation='put', **kwargs) - def pass_job_q_put(job_q_put, local_job_q, timeout): + def pass_job_q_put(job_q_put, local_job_q): # log.debug("this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) while True: data = local_job_q.get() - job_q_put(data, timeout=timeout) + job_q_put(data) # log.debug("stopped thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) - def pass_result_q_put(result_q_put, local_result_q, timeout): + def pass_result_q_put(result_q_put, local_result_q): log.debug("this is thread thr_result_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) try: while True: data = local_result_q.get() - result_q_put(data, timeout=timeout) + result_q_put(data) except Exception as e: log.error("thr_result_q_put caught error %s", type(e)) log.info(traceback.format_exc()) log.debug("stopped thread thr_result_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) - def pass_fail_q_put(fail_q_put, local_fail_q, timeout): + def pass_fail_q_put(fail_q_put, local_fail_q): # log.debug("this is thread thr_fail_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) while True: data = local_fail_q.get() - fail_q_put(data, timeout=timeout) + log.info("put {} to failq".format(data)) + fail_q_put(data) # log.debug("stopped thread thr_fail_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) - thr_job_q_put = threading.Thread(target=pass_job_q_put , args=(job_q_put , local_job_q , self._job_q_put_timeout)) + thr_job_q_put = threading.Thread(target=pass_job_q_put , args=(job_q_put , local_job_q)) thr_job_q_put.daemon = True - thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q, self._result_q_put_timeout)) + thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q)) thr_result_q_put.daemon = True - thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q , self._fail_q_put_timeout)) + thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q)) thr_fail_q_put.daemon = True - - thr_job_q_put.start() thr_result_q_put.start() @@ -709,7 +716,7 @@ class JobManager_Client(object): reset_pbc, # reset_pbc self.njobs, # njobs self.emergency_dump_path, # emergency_dump_path - self._job_q_get_timeout, # job_q_get_timeout + #self._job_q_get_timeout, # job_q_get_timeout self.server, # host self.port, # port self.authkey)) # authkey @@ -818,6 +825,59 @@ def get_shared_status(ss): return None else: return ss.value + +class PersistentQueue(object): + def __init__(self, fname): + self._path = fname + self.q = shelve.open(self._path) + self.head = 0 + self.tail = 0 + + def close(self): + self.q.close() + os.remove(self._path) + + def qsize(self): + # log.info("qsize: this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) + # log.info("qsize {} ({},{})".format(self.tail - self.head, self.tail, self.head)) + return self.tail - self.head + + def put(self, item): + # log.info("put item {}".format(item)) + self.q[str(self.tail)] = item + self.tail += 1 + + def get(self): + try: + item = self.q[str(self.head)] + except KeyError: + raise queue.Empty + + self.head += 1 + # log.info("get item {}".format(item)) + return item + + +RAND_STR_ASCII_IDX_LIST = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) +def rand_str(l = 8): + s = '' + for i in range(l): + s += chr(random.choice(RAND_STR_ASCII_IDX_LIST)) + return s + +def _new_rand_file_name(path='.', end='', l=8): + c = 0 + while True: + fname = rand_str(l) + end + full_name = os.path.join(path, fname) + if not os.path.exists(full_name): + return fname + + c += 1 + if c > 10: + l += 2 + c = 0 + print("INFO: increase random file name length to", l) class JobManager_Server(object): """general usage: @@ -873,7 +933,9 @@ class JobManager_Server(object): speed_calc_cycles = 50, keep_new_result_in_memory = False, hide_progress = False, - show_statistics = True): + show_statistics = True, + job_q_on_disk = False, + job_q_on_disk_path = '.'): """ authkey [string] - authentication key used by the SyncManager. Server and Client must have the same authkey. @@ -950,10 +1012,16 @@ class JobManager_Server(object): self.final_result = [] # NOTE: it only works using multiprocessing.Queue() - # the Queue class from the module queue does NOT work - self.job_q = myQueue() # queue holding args to process + # the Queue class from the module queue does NOT work + self.job_q_on_disk = job_q_on_disk + if job_q_on_disk: + fname = _new_rand_file_name(job_q_on_disk_path, '.jobqdb') + self.job_q = PersistentQueue(fname) + else: + self.job_q = myQueue() # queue holding args to process self.result_q = myQueue() # queue holding returned results self.fail_q = myQueue() # queue holding args where processing failed + self.manager = None self.hostname = socket.gethostname() @@ -1065,10 +1133,9 @@ class JobManager_Server(object): - if job_q is not empty dump remaining job_q """ # will only be False when _shutdown was started in subprocess - - self.__stop_SyncManager() + self.job_q.close() + self.__stop_SyncManager() log.debug("SyncManager stopped!") - # do user defined final processing self.process_final_result() log.debug("process_final_result done!") @@ -1165,6 +1232,7 @@ class JobManager_Server(object): pickle.dump(self.args_dict, f, protocol=pickle.HIGHEST_PROTOCOL) pickle.dump(self.args_list, f, protocol=pickle.HIGHEST_PROTOCOL) fail_list = [] + log.info("__dump: failq size {}".format(self.fail_q.qsize())) try: while True: fail_list.append(self.fail_q.get_nowait()) @@ -1231,7 +1299,7 @@ class JobManager_Server(object): def print_jm_ready(self): # please overwrite for individual hooks to notify that the server process runs - print("jobmanager awaits client results") + print("{} awaits client results".format(self.__class__.__name__)) def bring_him_up(self): if not self.__start_SyncManager(): @@ -1258,6 +1326,11 @@ class JobManager_Server(object): else: log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port, self.numjobs) + print("{} started (host:{} authkey:{} port:{} jobs:{})".format(self.__class__.__name__, + self.hostname, + self.authkey.decode(), + self.port, + self.numjobs)) Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT]) @@ -1270,6 +1343,7 @@ class JobManager_Server(object): When finished, or on exception call stop() afterwards to shut down gracefully. """ + info_line = progress.StringValue(num_of_bytes=100) with progress.ProgressBarFancy(count=self._numresults, max_count=self._numjobs, @@ -1286,6 +1360,8 @@ class JobManager_Server(object): self.job_q.qsize(), self.numresults).encode( 'utf-8') + log.info("infoline {}".format(info_line.value)) + log.info("failq size {}".format(self.fail_q.qsize())) # allows for update of the info line try: diff --git a/tests/test_jobmanager.py b/tests/test_jobmanager.py index c89ef5c..f5b1a71 100644 --- a/tests/test_jobmanager.py +++ b/tests/test_jobmanager.py @@ -36,7 +36,7 @@ jm_log.setLevel(logging.WARNING) import warnings warnings.filterwarnings('error') -#warnings.filterwarnings('always', category=DeprecationWarning) +#warnings.filterwarnings('always', category=ImportWarning) AUTHKEY = 'testing' PORT = random.randint(10000, 60000) @@ -160,7 +160,7 @@ def test_Signal_to_terminate_process_list(): -def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False): +def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False, job_q_on_disk=False): print("START SERVER") args = range(1,n) with jobmanager.JobManager_Server(authkey = AUTHKEY, @@ -168,7 +168,8 @@ def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False) msg_interval = 1, const_arg = client_sleep, fname_dump = 'jobmanager.dump', - hide_progress = hide_progress) as jm_server: + hide_progress = hide_progress, + job_q_on_disk = job_q_on_disk) as jm_server: if not read_old_state: jm_server.args_from_list(args) else: @@ -182,9 +183,6 @@ def start_client(hide_progress=True): port = PORT, nproc = 3, reconnect_tries = 0, - job_q_put_timeout = 1, - result_q_put_timeout = 1, - fail_q_put_timeout = 1, hide_progress = hide_progress) jm_client.start() @@ -222,10 +220,7 @@ def test_jobmanager_static_client_call(): authkey = AUTHKEY, port = PORT, nproc = 3, - reconnect_tries = 0, - job_q_put_timeout = 1, - result_q_put_timeout = 1, - fail_q_put_timeout = 1) + reconnect_tries = 0) jm_client.func(arg=1, const_arg=1) @@ -247,10 +242,7 @@ def test_client(): authkey = AUTHKEY, port = PORT, nproc = 3, - reconnect_tries = 0, - job_q_put_timeout = 1, - result_q_put_timeout = 1, - fail_q_put_timeout = 1) + reconnect_tries = 0) jmc.start() p_server.join(5) @@ -271,56 +263,58 @@ def test_jobmanager_basic(): check if all arguments are found in final_result of dump """ global PORT - PORT += 1 - n = 5 - - p_server = None - p_client = None - - try: - # start a server - p_server = mp.Process(target=start_server, args=(n,False)) - p_server.start() - time.sleep(0.5) - # server needs to be running - assert p_server.is_alive() + - # start client - p_client = mp.Process(target=start_client) - p_client.start() - - p_client.join(10) - # client should have processed all - if sys.version_info.major == 2: - p_client.terminate() - p_client.join(3) + for jqd in [False, True]: + PORT += 1 + n = 5 + p_server = None + p_client = None - assert not p_client.is_alive(), "the client did not terminate on time!" - # client must not throw an exception - assert p_client.exitcode == 0, "the client raised an exception" - p_server.join(3) - # server should have come down - assert not p_server.is_alive(), "the server did not terminate on time!" - - print("[+] client and server terminated") - - fname = 'jobmanager.dump' - with open(fname, 'rb') as f: - data = jobmanager.JobManager_Server.static_load(f) + try: + # start a server + p_server = mp.Process(target=start_server, args=(n,False), kwargs={'job_q_on_disk': jqd}) + p_server.start() + time.sleep(0.5) + # server needs to be running + assert p_server.is_alive() - - final_res_args_set = {a[0] for a in data['final_result']} - set_ref = set(range(1,n)) - intersect = set_ref - final_res_args_set - - assert len(intersect) == 0, "final result does not contain all arguments!" - print("[+] all arguments found in final_results") - except: - if p_server is not None: - p_server.terminate() - if p_client is not None: - p_client.terminate() - raise + # start client + p_client = mp.Process(target=start_client) + p_client.start() + + p_client.join(10) + # client should have processed all + if sys.version_info.major == 2: + p_client.terminate() + p_client.join(3) + + assert not p_client.is_alive(), "the client did not terminate on time!" + # client must not throw an exception + assert p_client.exitcode == 0, "the client raised an exception" + p_server.join(3) + # server should have come down + assert not p_server.is_alive(), "the server did not terminate on time!" + + print("[+] client and server terminated") + + fname = 'jobmanager.dump' + with open(fname, 'rb') as f: + data = jobmanager.JobManager_Server.static_load(f) + + + final_res_args_set = {a[0] for a in data['final_result']} + set_ref = set(range(1,n)) + intersect = set_ref - final_res_args_set + + assert len(intersect) == 0, "final result does not contain all arguments!" + print("[+] all arguments found in final_results") + except: + if p_server is not None: + p_server.terminate() + if p_client is not None: + p_client.terminate() + raise def test_jobmanager_server_signals(): """ @@ -470,7 +464,7 @@ def shutdown_client(sig): try: - p_server = mp.Process(target=start_server, args=(n, False, 0.1)) + p_server = mp.Process(target=start_server, args=(n, False, 0.4)) p_server.start() time.sleep(0.5) @@ -577,7 +571,7 @@ def test_check_fail(): set_ref = {binfootprint.dump(a) for a in range(1,n)} - args_set = set(data['args_dict'].keys()) + args_set = set(data['args_dict'].keys()) assert args_set == data['fail_set'] final_result_args_set = {binfootprint.dump(a[0]) for a in data['final_result']} @@ -828,10 +822,10 @@ def test_hum_size(): assert humanize_size(1024**4) == '1024.00TB' if __name__ == "__main__": - jm_log.setLevel(logging.DEBUG) - progress.log.setLevel(logging.DEBUG) + jm_log.setLevel(logging.INFO) +# progress.log.setLevel(logging.DEBUG) # jm_log.setLevel(logging.ERROR) -# progress.log.setLevel(logging.ERROR) + progress.log.setLevel(logging.ERROR) if len(sys.argv) > 1: pass @@ -846,14 +840,14 @@ if __name__ == "__main__": # test_start_server_with_no_args, # test_start_server, # test_client, -# test_jobmanager_basic, + test_jobmanager_basic, # test_jobmanager_server_signals, # test_shutdown_server_while_client_running, # test_shutdown_client, # test_check_fail, # test_jobmanager_read_old_stat, # test_client_status, - test_jobmanager_local, +# test_jobmanager_local, # test_start_server_on_used_port, # test_shared_const_arg, # test_digest_rejected,