From c0faa27e8ae5745dee6aedb8adec07c96266565a Mon Sep 17 00:00:00 2001 From: Richard Hartmann Date: Wed, 28 Sep 2016 22:53:11 +0200 Subject: [PATCH] a version that passes all tests in python3.4 --- jobmanager/jobmanager.py | 46 +++++---- tests/test_decorators.py | 4 +- tests/test_jobmanager.py | 204 ++++++++++++++++++--------------------- 3 files changed, 120 insertions(+), 134 deletions(-) diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 5da9491..c50a597 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -239,7 +239,7 @@ class JobManager_Client(object): self.hide_progress = hide_progress - log.info("init JobManager Client instance") + log.info("init JobManager Client instance (pid %s)", os.getpid()) self.show_statusbar_for_jobs = show_statusbar_for_jobs log.debug("show_statusbar_for_jobs:%s", self.show_statusbar_for_jobs) @@ -578,8 +578,8 @@ class JobManager_Client(object): log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid()) - pid = os.getpid() - os.kill(pid, signal.SIGTERM) + # pid = os.getpid() + # os.kill(pid, signal.SIGTERM) # __p = psutil.Process(pid) # for thr in __p.threads(): # if pid == thr.id: @@ -644,7 +644,7 @@ class JobManager_Client(object): l = len(str(self.nproc)) for i in range(self.nproc): prepend.append("w{0:0{1}}:".format(i+1, l)) - + with progress.ProgressBarCounterFancy(count = c, max_count = m_progress, interval = self.interval, @@ -683,32 +683,36 @@ class JobManager_Client(object): p.start() time.sleep(0.3) - time.sleep(self.interval/2) + log.debug("all worker processes startes") + + #time.sleep(self.interval/2) + log.debug("setup Signal_to_terminate_process_list handler") exit_handler = Signal_to_terminate_process_list(process_list = self.procs, identifier_list = [progress.get_identifier(name = "worker{}".format(i+1), pid = p.pid, bold = True) for i, p in enumerate(self.procs)], signals = [signal.SIGTERM], timeout = 2) - - interrupt_handler = Signal_handler_for_Jobmanager_client(client_object = self, - exit_handler=exit_handler, - signals=[signal.SIGINT]) + + log.debug("setup Signal_handler_for_Jobmanager_client handler") + Signal_handler_for_Jobmanager_client(client_object = self, + exit_handler=exit_handler, + signals=[signal.SIGINT]) for p in self.procs: - log.debug("join %s PID %s", p, p.pid) - + p.join() + log.debug("process %s exitcode %s", p, p.exitcode) -# while p.is_alive(): -# log.debug("still alive %s PID %s", p, p.pid) -# p.join(timeout=self.interval) -# _proc = psutil.Process(p.pid) -# log.debug(str(p.exitcode)) -# log.debug(str(_proc.connections())) -# log.debug(str(_proc.children(recursive=True))) -# log.debug(str(_proc.status())) -# log.debug(str(_proc.threads())) + # while p.is_alive(): + # log.debug("still alive %s PID %s", p, p.pid) + # p.join(timeout=self.interval) + # _proc = psutil.Process(p.pid) + # log.debug(str(p.exitcode)) + # log.debug(str(_proc.connections())) + # log.debug(str(_proc.children(recursive=True))) + # log.debug(str(_proc.status())) + # log.debug(str(_proc.threads())) log.debug("process %s PID %s was joined", p, p.pid) @@ -811,7 +815,7 @@ class JobManager_Server(object): self.hide_progress = hide_progress - log.debug("I'm the JobManager_Server main process") + log.debug("I'm the JobManager_Server main process (pid %s)", os.getpid()) self.__wait_before_stop = 2 self.port = port diff --git a/tests/test_decorators.py b/tests/test_decorators.py index 4ee896f..967f8b9 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -69,7 +69,7 @@ def my_func_ProgressBarOverrideCount(c = None, m = None): @decorators.ProgressBar -def testing_decorated_func_calls_decorated_func( +def _testing_decorated_func_calls_decorated_func( c = decorators.progress.UnsignedIntValue(val=0), m = decorators.progress.UnsignedIntValue(val=1), ): @@ -89,7 +89,7 @@ def testing_decorated_func_calls_decorated_func( _my_func_1(arg=i, kwarg=0, sleep=0.005) def test_decorated_func_calls_decorated_func(): - testing_decorated_func_calls_decorated_func() + _testing_decorated_func_calls_decorated_func() def test_decorator(): c = progress.UnsignedIntValue(val=0) diff --git a/tests/test_jobmanager.py b/tests/test_jobmanager.py index 83a2299..009913b 100644 --- a/tests/test_jobmanager.py +++ b/tests/test_jobmanager.py @@ -9,6 +9,7 @@ import multiprocessing as mp import socket import signal import logging +import datetime from numpy import random from os.path import abspath, dirname, split @@ -19,6 +20,11 @@ import jobmanager import binfootprint import progress +if sys.version_info[0] == 2: + TIMEOUT = 300 +elif sys.version_info[0] == 3: + TIMEOUT = 5 + progress.log.setLevel(logging.ERROR) from jobmanager.jobmanager import log as jm_log @@ -167,9 +173,9 @@ def start_server(n, read_old_state=False, client_sleep=0.1): def start_client(): print("START CLIENT") - jm_client = jobmanager.JobManager_Client(server = SERVER, - authkey = AUTHKEY, - port = PORT, + jm_client = jobmanager.JobManager_Client(server = SERVER, + authkey = AUTHKEY, + port = PORT, nproc = 3, reconnect_tries = 0) jm_client.start() @@ -287,11 +293,9 @@ def test_shutdown_server_while_client_running(): global PORT n = 100 - timeout = 30 - + sigs = [('SIGTERM', signal.SIGTERM), ('SIGINT', signal.SIGINT)] - sigs = [('SIGTERM', signal.SIGTERM)] - + for signame, sig in sigs: PORT += 1 @@ -306,16 +310,19 @@ def test_shutdown_server_while_client_running(): p_client = mp.Process(target=start_client) p_client.start() - time.sleep(1.5) + time.sleep(2) assert p_client.is_alive() print(" send {} to server".format(signame)) os.kill(p_server.pid, sig) - p_server.join(timeout) + p_server.join(TIMEOUT) assert not p_server.is_alive(), "server did not shut down on time" - p_client.join(timeout) + p_client.join(TIMEOUT) assert not p_client.is_alive(), "client did not shut down on time" + + + print("[+] server and client joined {}".format(datetime.datetime.now().isoformat())) fname = 'jobmanager.dump' with open(fname, 'rb') as f: @@ -366,56 +373,66 @@ def shutdown_client(sig): """ global PORT PORT += 1 - n = 300 + n = 30 print("## terminate client with {} ##".format(progress.signal_dict[sig])) + + p_server = None + p_client = None + try: - p_server = mp.Process(target=start_server, args=(n, )) - p_server.start() - - time.sleep(2) - - p_client = mp.Process(target=start_client) - p_client.start() - - time.sleep(5) - - print(" send {}".format(progress.signal_dict[sig])) - os.kill(p_client.pid, sig) - assert p_client.is_alive() - print("[+] still alive (assume shut down takes some time)") - p_client.join(5) - assert not p_client.is_alive(), "timeout for client shutdown reached" - print("[+] now terminated (timeout of 5s not reached)") - - time.sleep(0.5) - - p_client = mp.Process(target=start_client) - p_client.start() - - p_client.join(30) - p_server.join(30) - - assert not p_client.is_alive() - assert not p_server.is_alive() - - print("[+] client and server terminated") - - fname = 'jobmanager.dump' - with open(fname, 'rb') as f: - data = jobmanager.JobManager_Server.static_load(f) - - assert len(data['args_dict']) == 0 - print("[+] args_set is empty -> all args processed & none failed") - - final_res_args_set = {a[0] for a in data['final_result']} - - set_ref = set(range(1,n)) - - intersect = set_ref - final_res_args_set - - assert len(intersect) == 0, "final result does not contain all arguments!" - print("[+] all arguments found in final_results") + p_server = mp.Process(target=start_server, args=(n, False, 0.1)) + p_server.start() + + time.sleep(0.5) + + p_client = mp.Process(target=start_client) + p_client.start() + + time.sleep(1.5) + + print(" send {}".format(progress.signal_dict[sig])) + os.kill(p_client.pid, sig) + assert p_client.is_alive() + print("[+] still alive (assume shut down takes some time)") + p_client.join(5) + assert not p_client.is_alive(), "timeout for client shutdown reached" + print("[+] now terminated (timeout of 5s not reached)") + + time.sleep(0.5) + + p_client = mp.Process(target=start_client) + p_client.start() + + p_client.join(TIMEOUT) + p_server.join(TIMEOUT) + + assert not p_client.is_alive() + assert not p_server.is_alive() + + print("[+] client and server terminated") + + fname = 'jobmanager.dump' + with open(fname, 'rb') as f: + data = jobmanager.JobManager_Server.static_load(f) + + assert len(data['args_dict']) == 0 + print("[+] args_set is empty -> all args processed & none failed") + + final_res_args_set = {a[0] for a in data['final_result']} + + set_ref = set(range(1,n)) + + intersect = set_ref - final_res_args_set + + assert len(intersect) == 0, "final result does not contain all arguments!" + print("[+] all arguments found in final_results") + except: + if p_server is not None: + p_server.terminate() + if p_client is not None: + p_client.terminate() + raise def test_check_fail(): global PORT @@ -424,14 +441,14 @@ def test_check_fail(): def func(self, args, const_args, c, m): c.value = 0 m.value = -1 - fail_on = [3,23,45,67,89] + fail_on = [3,5,13] time.sleep(0.1) if args in fail_on: raise RuntimeError("fail_on Error") return os.getpid() - n = 100 + n = 20 p_server = mp.Process(target=start_server, args=(n,)) p_server.start() @@ -490,7 +507,7 @@ def test_jobmanager_read_old_stat(): """ global PORT PORT += 1 - n = 100 + n = 50 p_server = mp.Process(target=start_server, args=(n,)) p_server.start() @@ -499,9 +516,8 @@ def test_jobmanager_read_old_stat(): p_client = mp.Process(target=start_client) p_client.start() - time.sleep(3) - - + time.sleep(1.5) + # terminate server ... to start again using reload_from_dump p_server.terminate() @@ -512,13 +528,13 @@ def test_jobmanager_read_old_stat(): assert not p_server.is_alive(), "the server did not terminate on time!" print("[+] client and server terminated") - time.sleep(2) + time.sleep(1) PORT += 1 # start server using old dump p_server = mp.Process(target=start_server, args=(n,True)) p_server.start() - time.sleep(2) + time.sleep(1) p_client = mp.Process(target=start_client) p_client.start() @@ -544,40 +560,6 @@ def test_jobmanager_read_old_stat(): assert len(intersect) == 0, "final result does not contain all arguments!" print("[+] all arguments found in final_results") -# def test_hashedViewOnNumpyArray(): -# s = set() -# -# a = np.ones(4) -# ah = jobmanager.hashableCopyOfNumpyArray(a) -# -# s.add(ah) -# -# b = np.ones(4, dtype=np.int32) -# bh = jobmanager.hashableCopyOfNumpyArray(b) -# -# # hash function independent of dtype -# assert hash(ah) == hash(bh) -# # overwritten equal operator ... -# assert ah == bh -# # ... makes such statements possible! -# assert bh in s -# -# # check if it is truly a copy, not just a view -# b[0] = 2 -# assert bh[0] == 1 -# -# c = np.ones(5) -# ch = jobmanager.hashableCopyOfNumpyArray(c) -# # different array -# assert not ch in s -# -# # check if shape is included in hash calculation -# bh2 = bh.reshape((2,2)) -# assert bh2 not in s -# -# # just some redundant back conversion an checking -# bh2 = bh2.reshape((4,)) -# assert bh2 in s def test_client_status(): global PORT @@ -747,8 +729,8 @@ def test_hum_size(): assert humanize_size(1024**4) == '1024.00TB' if __name__ == "__main__": - jm_log.setLevel(logging.DEBUG) - progress.log.setLevel(logging.DEBUG) + jm_log.setLevel(logging.WARNING) + progress.log.setLevel(logging.ERROR) if len(sys.argv) > 1: pass else: @@ -759,18 +741,18 @@ if __name__ == "__main__": # test_Signal_to_sys_exit, # test_Signal_to_terminate_process_list, -# test_jobmanager_basic, + # test_jobmanager_basic, # test_jobmanager_server_signals, - test_shutdown_server_while_client_running, +# test_shutdown_server_while_client_running, # test_shutdown_client, - # test_check_fail, - # test_jobmanager_read_old_stat, - # test_client_status, - # test_jobmanager_local, - # test_start_server_on_used_port, - # test_shared_const_arg, - # test_digest_rejected, - # test_hum_size, +# test_chedck_fail, + test_jobmanager_read_old_stat, + test_client_status, + test_jobmanager_local, + test_start_server_on_used_port, + test_shared_const_arg, + test_digest_rejected, + test_hum_size, lambda : print("END") ]