diff --git a/jobmanager.py b/jobmanager.py index c96cb87..d6b5b47 100644 --- a/jobmanager.py +++ b/jobmanager.py @@ -23,7 +23,6 @@ import fcntl import termios import struct - """jobmanager module Richard Hartmann 2014 @@ -147,7 +146,8 @@ class Loop(object): run=False, verbose=0, sigint='stop', - sigterm='stop'): + sigterm='stop', + name=None): """ func [callable] - function to be called periodically @@ -178,6 +178,7 @@ class Loop(object): self._proc = None self._sigint = sigint self._sigterm = sigterm + self._name = name if run: self.start() @@ -212,11 +213,12 @@ class Loop(object): """ self.run = True self._proc = mp.Process(target = Loop._wrapper_func, - args = (self.func, self.args, self._run, self.interval, - self.verbose, self._sigint, self._sigterm)) + args = (self.func, self.args, self._run, self.interval, + self.verbose, self._sigint, self._sigterm), + name=self._name) self._proc.start() if self.verbose > 1: - print("PID {}: I'm a new loop process".format(self._proc.pid)) + print("PID {}: I'm a new loop process ({})".format(self._proc.pid, self._name)) def stop(self): """ @@ -310,7 +312,8 @@ class StatusBar(Loop): run=True, verbose=0, sigint='stop', - sigterm='stop'): + sigterm='stop', + name='statusbar'): """ The init will also start to display the status bar if run was set True. Otherwise use the inherited method start() to start the show_stat loop. @@ -350,7 +353,8 @@ class StatusBar(Loop): super().__init__(func=StatusBar.show_stat, args=(self.count, self.start_time, self.max_count, self.width, self.speed_calc_cycles, self.q), - interval=interval, run=run, verbose=verbose, sigint=sigint, sigterm=sigterm) + interval=interval, run=run, verbose=verbose, + sigint=sigint, sigterm=sigterm, name=name) def __set_width(self, width): """ @@ -450,10 +454,12 @@ class Signal_to_SIG_IGN(object): self.verbose = verbose for s in signals: signal.signal(s, self._handler) - def _handler(self, signal, frame): + def _handler(self, sig, frame): if self.verbose > 0: - print("PID {}: received signal {} -> will be ignored".format(os.getpid(), signal_dict[signal])) - + print("PID {}: received signal {} -> will be ignored".format(os.getpid(), signal_dict[sig])) + + + class Signal_to_sys_exit(object): def __init__(self, signals=[signal.SIGINT, signal.SIGTERM], verbose=0): self.verbose = verbose @@ -524,14 +530,14 @@ class JobManager_Server(object): a SIGKILL. """ def __init__(self, - authkey, - fname_for_final_result_dump, - const_args=None, - port=42524, - verbose=1, - msg_interval=1, - fname_for_job_q_dump='auto', - fname_for_fail_q_dump='auto'): + authkey, + fname_for_final_result_dump, + const_args=None, + port=42524, + verbose=1, + msg_interval=1, + fname_for_args_dump='auto', + fname_for_fail_dump='auto'): """ authkey [string] - authentication key used by the SyncManager. Server and Client must have the same authkey. @@ -553,6 +559,7 @@ class JobManager_Server(object): """ self.verbose = verbose self._pid = os.getpid() + self._pid_start = None if self.verbose > 1: print("PID {}: I'm the JobManager_Server main process".format(self._pid)) @@ -563,7 +570,10 @@ class JobManager_Server(object): self.fname_for_final_result_dump = fname_for_final_result_dump self.const_args = copy.copy(const_args) - self.fname_for_job_q_dump = fname_for_job_q_dump + self.fname_for_args_dump = fname_for_args_dump + self.fname_for_fail_dump = fname_for_fail_dump + + self.msg_interval = msg_interval # to do some redundant checking, might be removed # the args_set holds all arguments to be processed @@ -586,7 +596,7 @@ class JobManager_Server(object): JobQueueManager.register('get_job_q', callable=lambda: self.job_q) JobQueueManager.register('get_result_q', callable=lambda: self.result_q) JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q) - JobQueueManager.register('get_const_args', callable=lambda: self.const_args) + JobQueueManager.register('get_const_args', callable=lambda: self.const_args, exposed=["__iter__"]) address=('', self.port) #ip='' means local authkey=self.authkey @@ -609,16 +619,11 @@ class JobManager_Server(object): self.final_result = [] self.stat = None - if self.verbose > 0: - self.stat = StatusBar(count = self.numresults, max_count = self.numjobs, - interval=msg_interval, speed_calc_cycles=10, - verbose = self.verbose, sigint='ign', sigterm='ign') - self.err_log = [] - self.p_reinsert_failure = Loop(func=JobManager_Server.reinsert_failures, - args=(self.fail_q, self.job_q), - interval=1, run=False) +# self.p_reinsert_failure = Loop(func=JobManager_Server.reinsert_failures, +# args=(self.fail_q, self.job_q), +# interval=1, run=False) def put_arg(self, a): """add argument a to the job_q @@ -677,13 +682,24 @@ class JobManager_Server(object): When finished, or on exception call stop() afterwards to shout down gracefully. """ + if self._pid != os.getpid(): + raise RuntimeError("do not run JobManager_Server.start() in a subprocess") + + if self.numjobs.value != len(self.args_set): + raise RuntimeError("use JobManager_Server.put_arg to put arguments to the job_q") + + if self.verbose > 0: + self.stat = StatusBar(count = self.numresults, max_count = self.numjobs, + interval=self.msg_interval, speed_calc_cycles=10, + verbose = self.verbose, sigint='ign', sigterm='ign') + Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT], verbose = self.verbose) pid = os.getpid() - if self.verbose > 1: - print("PID {}: JobManager_Server starts processing incoming results".format(pid)) - if self._pid != pid: - print(" NOTE: this routine was triggered as a subprocess which is NOT preferable!") +# if self.verbose > 1: +# print("PID {}: JobManager_Server starts processing incoming results".format(pid)) +# if self._pid != pid: +# print(" NOTE: this routine was triggered as a subprocess which is NOT preferable!") try: if self.verbose > 0: @@ -692,11 +708,14 @@ class JobManager_Server(object): print("PID {}: StatusBar started".format(self.stat.getpid())) while (len(self.args_set) - self.fail_q.qsize()) > 0: - arg, result = self.result_q.get() #blocks until an item is available - self.args_set.remove(arg) - self.process_new_result(arg, result) - with self.numresults.get_lock(): - self.numresults.value += 1 + try: + arg, result = self.result_q.get(timeout=1) #blocks until an item is available + self.args_set.remove(arg) + self.process_new_result(arg, result) + self.numresults.value = self.numjobs.value - (len(self.args_set) - self.fail_q.qsize()) + except queue.Empty: + pass + except: if self.verbose > 0: err, val, trb = sys.exc_info() @@ -713,9 +732,9 @@ class JobManager_Server(object): print("PID {}: wait {}s before trigger clean up".format(pid, self.__wait_before_stop)) time.sleep(self.__wait_before_stop) finally: - self.stop() - - def stop(self): + self._shoutdown() + + def _shoutdown(self): """"stop all spawned processes and clean up - call process_final_result to handle all collected result @@ -754,49 +773,42 @@ class JobManager_Server(object): if self.verbose > 0: print("done!") - if not self.job_q.empty(): - if self.fname_for_job_q_dump == 'auto': - fname = "jobmanager_server_job_queue_dump_{}".format(getDateForFileName(includePID=False)) + if len(self.args_set) > 0: + if self.fname_for_args_dump == 'auto': + fname = "jobmanager_server_args_{}.dump".format(getDateForFileName(includePID=False)) else: - fname = self.fname_for_job_q_dump + fname = self.fname_for_args_dump - if self.verbose > 0: - print("PID {}: args_set not empty -> dump to file {} ... ".format(pid, fname), end='', flush=True) + print("PID {}: args_set not empty -> dump to file '{}' ... ".format(pid, fname), end='', flush=True) args = list(self.args_set) with open(fname, 'wb') as f: pickle.dump(args, f, protocol=pickle.HIGHEST_PROTOCOL) - if self.verbose > 0: - print("done!") + print("done!") if not self.fail_q.empty(): - if self.fname_for_fail_q_dump == 'auto': - fname = "jobmanager_server_fail_queue_dump_{}".format(getDateForFileName(includePID=False)) + if self.fname_for_fail_dump == 'auto': + fname = "jobmanager_server_fail_{}.dump".format(getDateForFileName(includePID=False)) else: - fname = self.fname_for_fail_q_dump + fname = self.fname_for_fail_dump - if self.verbose > 0: - print("PID {}: there are reported failures -> dump to file {} ... ".format(pid, fname), end='', flush=True) + print("PID {}: there are reported failures -> dump to file '{}' ... ".format(pid, fname), end='', flush=True) fail_list = [] while not self.fail_q.empty(): - new_args, err, hostname, fname_tb = self.fail_q.get() - fail_list.append((new_args, err, hostname, fname_tb)) + new_args, err_name, hostname = self.fail_q.get() + fail_list.append((new_args, err_name, hostname)) + if self.verbose > 1: + print("'{}' on {}:{}".format(err_name, hostname, new_args)) - with open(fname, 'rb') as f: + with open(fname, 'wb') as f: pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL) - if self.verbose > 0: - print("done!") + print("done!") - if self.verbose > 1: - for f in fail_list: - print(new_args, err, hostname, fname_tb) - - if self.verbose > 0: - print("PID {}: JobManager_Server was successfully shout down".format(pid)) + print("PID {}: JobManager_Server was successfully shout down".format(pid)) class JobManager_Client(object): @@ -868,28 +880,35 @@ class JobManager_Client(object): self.procs = [] + + @staticmethod + def _get_manager_object(ip, port, authkey, verbose=0): + """ + connects to the server and get registered shared objects such as + job_q, result_q, fail_q, const_args + """ class ServerQueueManager(SyncManager): pass ServerQueueManager.register('get_job_q') ServerQueueManager.register('get_result_q') ServerQueueManager.register('get_fail_q') - ServerQueueManager.register('get_const_args') + ServerQueueManager.register('get_const_args', exposed="__iter__") - self.manager = ServerQueueManager(address=(self.ip, self.port), authkey=self.authkey) + manager = ServerQueueManager(address=(ip, port), authkey=authkey) - if self.verbose > 0: - print('PIC {}: SyncManager connecting to {}:{} ... '.format(self._pid, self.ip, self.port), end='', flush=True) - self.manager.connect() - if self.verbose > 0: + if verbose > 0: + print('PIC {}: connecting to {}:{} ... '.format(os.getpid(), ip, port), end='', flush=True) + manager.connect() + if verbose > 0: print('done!') - self.job_q = self.manager.get_job_q() - self.result_q = self.manager.get_result_q() - self.fail_q = self.manager.get_fail_q() - self.const_args = self.manager.get_const_args() - - + job_q = manager.get_job_q() + result_q = manager.get_result_q() + fail_q = manager.get_fail_q() + const_args = manager.get_const_args() + return job_q, result_q, fail_q, const_args + @staticmethod def func(args, const_args): """ @@ -907,12 +926,14 @@ class JobManager_Client(object): @staticmethod - def __worker_func(func, const_args, job_q, result_q, nice, fail_q, verbose): + def __worker_func(func, nice, verbose, ip, port, authkey): """ the wrapper spawned nproc trimes calling and handling self.func """ Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT]) + job_q, result_q, fail_q, const_args = JobManager_Client._get_manager_object(ip, port, authkey, verbose) + n = os.nice(0) n = os.nice(nice - n) c = 0 @@ -927,7 +948,7 @@ class JobManager_Client(object): # regular case, just stop woring when empty job_q was found except queue.Empty: - if verbose > 1: + if verbose > 0: print("\nPID {}: finds empty job queue, processed {} jobs".format(os.getpid(), c)) return @@ -956,19 +977,16 @@ class JobManager_Client(object): return # some unexpected Exception - # write argument to fail_q, save traceback - # continue wirkung + # write argument, exception name and hostname to fail_q, save traceback + # continue workung except: err, val, trb = sys.exc_info() - if verbose > 0: - print("\nPID {}: caught exception {}, report failure of current argument to server ... ".format(os.getpid(), err), end='', flush=True) + print("\nPID {}: caught exception '{}', report failure of current argument to server ... ".format(os.getpid(), err.__name__), end='', flush=True) hostname = socket.gethostname() - fname = 'traceback_args_{}_err_{}_{}.trb'.format(new_args, err, getDateForFileName(includePID=True)) -# print(fail_q.qsize()) - -# fail_q.put(new_args, timeout=10) + fname = 'traceback_err_{}_{}.trb'.format(err.__name__, getDateForFileName(includePID=True)) + fail_q.put((new_args, err.__name__, hostname), timeout=10) if verbose > 0: print("done") @@ -979,7 +997,7 @@ class JobManager_Client(object): if verbose > 0: print("done") - print(" continue processing next argument.") + print(" continue processing next argument.") if verbose > 1: print("PID {}: JobManager_Client.__worker_func terminates".format(os.getpid())) @@ -987,17 +1005,24 @@ class JobManager_Client(object): def start(self): """ + starts a number of nproc subprocess to work on the job_q + SIGTERM and SIGINT are managed to terminate all subprocesses + + retruns when all subprocesses have terminated """ if self.verbose > 1: print("PID {}: start {} processes to work on the remote queue".format(os.getpid(), self.nproc)) for i in range(self.nproc): - p = mp.Process(target=self.__worker_func, args=(self.func, self.const_args, - self.job_q, self.result_q, - self.nice, self.fail_q, - self.verbose)) + p = mp.Process(target=self.__worker_func, args=(self.func, + self.nice, + self.verbose, + self.ip, + self.port, + self.authkey)) self.procs.append(p) p.start() + time.sleep(0.3) Signal_to_terminate_process_list(process_list = self.procs, verbose=self.verbose) diff --git a/tests.py b/tests.py index 8737169..387a62d 100644 --- a/tests.py +++ b/tests.py @@ -90,8 +90,8 @@ def start_server(verbose, n=30): jm_server = jobmanager.JobManager_Server(authkey=authkey, fname_for_final_result_dump='final_result.dump', verbose=verbose, - fname_for_job_q_dump='job_q.dump', - fname_for_fail_q_dump='fail_q.dump') + fname_for_args_dump='args.dump', + fname_for_fail_dump='fail.dump') jm_server.args_from_list(args) jm_server.start() @@ -148,13 +148,13 @@ def test_jobmanager_server_signals(): assert not p_server.is_alive(), "timeout for server shutdown reached" print("[+] now terminated (timeout of 15s not reached)") - fname = 'job_q.dump' + fname = 'args.dump' with open(fname, 'rb') as f: args = pickle.load(f) for i,a in enumerate(range(1,30)): - assert a == args[i], "checking the job_q.dump failed" - print("[+] job_q.dump contains all arguments") + assert a == args[i], "checking the args.dump failed" + print("[+] args.dump contains all arguments") print("## TEST SIGINT ##") @@ -169,13 +169,13 @@ def test_jobmanager_server_signals(): assert not p_server.is_alive(), "timeout for server shutdown reached" print("[+] now terminated (timeout of 15s not reached)") - fname = 'job_q.dump' + fname = 'args.dump' with open(fname, 'rb') as f: args = pickle.load(f) for i,a in enumerate(range(1,30)): - assert a == args[i], "checking the job_q.dump failed" - print("[+] job_q.dump contains all arguments") + assert a == args[i], "checking the args.dump failed" + print("[+] args.dump contains all arguments") def test_shutdown_server_while_client_running(): """ @@ -184,9 +184,9 @@ def test_shutdown_server_while_client_running(): start client stop server -> client should catch exception, but can't do anything, - writing to fail_q won't work, because server went down + writing to fail won't work, because server went down - check if the final_result and the job_q dump end up to include + check if the final_result and the args dump end up to include all arguments given """ p_server = mp.Process(target=start_server, args=(0,1000)) @@ -207,7 +207,7 @@ def test_shutdown_server_while_client_running(): assert not p_server.is_alive() assert not p_client.is_alive() - fname = 'job_q.dump' + fname = 'args.dump' with open(fname, 'rb') as f: args = pickle.load(f) @@ -226,7 +226,7 @@ def test_shutdown_server_while_client_running(): if len(intersec_set) == 0: print("[+] no arguments lost!") - assert len(intersec_set) == 0, "job_q.dump and final_result_dump do NOT contain all arguments -> some must have been lost!" + assert len(intersec_set) == 0, "args.dump and final_result_dump do NOT contain all arguments -> some must have been lost!" def test_shutdown_client(): shutdown_client(signal.SIGTERM) @@ -292,10 +292,72 @@ def shutdown_client(sig): assert len(intersect) == 0, "final result does not contain all arguments!" print("[+] all arguments found in final_results") +def test_check_fail(): + class Client_Random_Error(jobmanager.JobManager_Client): + def func(self, args, const_args): + fail_on = [3,23,45,67,89] + time.sleep(0.1) + if args in fail_on: + raise RuntimeError("fail_on Error") + return os.getpid() + + + n = 100 + verbose=0 + p_server = mp.Process(target=start_server, args=(0,n)) + p_server.start() + + time.sleep(1) + + print("START CLIENT") + jm_client = Client_Random_Error(ip='localhost', + authkey='testing', + port=42524, + nproc=0, + verbose=verbose) + + p_client = mp.Process(target=jm_client.start) + p_client.start() + + assert p_server.is_alive() + assert p_client.is_alive() + + print("[+] server and client running") + + p_server.join(60) + p_client.join(60) + + assert not p_server.is_alive() + assert not p_client.is_alive() + + print("[+] server and client stopped") + + fname = 'final_result.dump' + with open(fname, 'rb') as f: + final_res = pickle.load(f) + + final_res_args_set = {a[0] for a in final_res} + + fname = 'fail.dump' + with open(fname, 'rb') as f: + fail = pickle.load(f) + + fail_set = {a[0] for a in fail} + + set_ref = set(range(1,n)) + + all_set = final_res_args_set | fail_set + assert len(set_ref - all_set) == 0, "final result union with reported failure do not correspond to all args!" + print("[+] all arguments found in final_results | reported failure") + def test_Signal_to_SIG_IGN(): def f(): jobmanager.Signal_to_SIG_IGN() - time.sleep(10) + print("before sleep") + while True: + time.sleep(1) + print("after sleep") + p = mp.Process(target=f) p.start() @@ -383,47 +445,18 @@ def test_Signal_to_terminate_process_list(): time.sleep(0.5) print("send SIGINT") os.kill(p_mother.pid, signal.SIGINT) - -def test_check_fail_q(): - class Client_Random_Error(jobmanager.JobManager_Client): - def func(self, args, const_args): - time.sleep(0.1) - x = rand() - if x < 0.1: - assert False - return os.getpid() - - n = 100 - verbose=2 - p_server = mp.Process(target=start_server, args=(0,n)) - p_server.start() - - time.sleep(1) - - jm_client = Client_Random_Error(ip='localhost', authkey='testing', port=42524, nproc=1, verbose=verbose) - - p_client = mp.Process(target=jm_client.start) - p_client.start() if __name__ == "__main__": -# test_Signal_to_SIG_IGN() -# test_Signal_to_sys_exit() -# test_Signal_to_terminate_process_list() -# test_loop_basic() -# test_loop_signals() -# test_jobmanager_basic() -# test_jobmanager_server_signals() -# test_shutdown_server_while_client_running() -# test_shutdown_client() - test_check_fail_q() - - -""" -MEMO: - everything from the fail_q is also considered process - - remove from args_set - - keep track for summary at the end - -""" - + test_Signal_to_SIG_IGN() + test_Signal_to_sys_exit() + test_Signal_to_terminate_process_list() + test_loop_basic() + test_loop_signals() + test_jobmanager_basic() + test_jobmanager_server_signals() + test_shutdown_server_while_client_running() + test_shutdown_client() + test_check_fail() + pass + \ No newline at end of file