a version that passes all tests in python3.4

This commit is contained in:
Richard Hartmann 2016-09-28 22:53:11 +02:00
parent 9ee13856f1
commit c0faa27e8a
3 changed files with 120 additions and 134 deletions

View file

@ -239,7 +239,7 @@ class JobManager_Client(object):
self.hide_progress = hide_progress self.hide_progress = hide_progress
log.info("init JobManager Client instance") log.info("init JobManager Client instance (pid %s)", os.getpid())
self.show_statusbar_for_jobs = show_statusbar_for_jobs self.show_statusbar_for_jobs = show_statusbar_for_jobs
log.debug("show_statusbar_for_jobs:%s", self.show_statusbar_for_jobs) log.debug("show_statusbar_for_jobs:%s", self.show_statusbar_for_jobs)
@ -578,8 +578,8 @@ class JobManager_Client(object):
log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid()) log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid())
pid = os.getpid() # pid = os.getpid()
os.kill(pid, signal.SIGTERM) # os.kill(pid, signal.SIGTERM)
# __p = psutil.Process(pid) # __p = psutil.Process(pid)
# for thr in __p.threads(): # for thr in __p.threads():
# if pid == thr.id: # if pid == thr.id:
@ -644,7 +644,7 @@ class JobManager_Client(object):
l = len(str(self.nproc)) l = len(str(self.nproc))
for i in range(self.nproc): for i in range(self.nproc):
prepend.append("w{0:0{1}}:".format(i+1, l)) prepend.append("w{0:0{1}}:".format(i+1, l))
with progress.ProgressBarCounterFancy(count = c, with progress.ProgressBarCounterFancy(count = c,
max_count = m_progress, max_count = m_progress,
interval = self.interval, interval = self.interval,
@ -683,32 +683,36 @@ class JobManager_Client(object):
p.start() p.start()
time.sleep(0.3) time.sleep(0.3)
time.sleep(self.interval/2) log.debug("all worker processes startes")
#time.sleep(self.interval/2)
log.debug("setup Signal_to_terminate_process_list handler")
exit_handler = Signal_to_terminate_process_list(process_list = self.procs, exit_handler = Signal_to_terminate_process_list(process_list = self.procs,
identifier_list = [progress.get_identifier(name = "worker{}".format(i+1), identifier_list = [progress.get_identifier(name = "worker{}".format(i+1),
pid = p.pid, pid = p.pid,
bold = True) for i, p in enumerate(self.procs)], bold = True) for i, p in enumerate(self.procs)],
signals = [signal.SIGTERM], signals = [signal.SIGTERM],
timeout = 2) timeout = 2)
interrupt_handler = Signal_handler_for_Jobmanager_client(client_object = self, log.debug("setup Signal_handler_for_Jobmanager_client handler")
exit_handler=exit_handler, Signal_handler_for_Jobmanager_client(client_object = self,
signals=[signal.SIGINT]) exit_handler=exit_handler,
signals=[signal.SIGINT])
for p in self.procs: for p in self.procs:
log.debug("join %s PID %s", p, p.pid)
p.join() p.join()
log.debug("process %s exitcode %s", p, p.exitcode)
# while p.is_alive(): # while p.is_alive():
# log.debug("still alive %s PID %s", p, p.pid) # log.debug("still alive %s PID %s", p, p.pid)
# p.join(timeout=self.interval) # p.join(timeout=self.interval)
# _proc = psutil.Process(p.pid) # _proc = psutil.Process(p.pid)
# log.debug(str(p.exitcode)) # log.debug(str(p.exitcode))
# log.debug(str(_proc.connections())) # log.debug(str(_proc.connections()))
# log.debug(str(_proc.children(recursive=True))) # log.debug(str(_proc.children(recursive=True)))
# log.debug(str(_proc.status())) # log.debug(str(_proc.status()))
# log.debug(str(_proc.threads())) # log.debug(str(_proc.threads()))
log.debug("process %s PID %s was joined", p, p.pid) log.debug("process %s PID %s was joined", p, p.pid)
@ -811,7 +815,7 @@ class JobManager_Server(object):
self.hide_progress = hide_progress self.hide_progress = hide_progress
log.debug("I'm the JobManager_Server main process") log.debug("I'm the JobManager_Server main process (pid %s)", os.getpid())
self.__wait_before_stop = 2 self.__wait_before_stop = 2
self.port = port self.port = port

View file

@ -69,7 +69,7 @@ def my_func_ProgressBarOverrideCount(c = None, m = None):
@decorators.ProgressBar @decorators.ProgressBar
def testing_decorated_func_calls_decorated_func( def _testing_decorated_func_calls_decorated_func(
c = decorators.progress.UnsignedIntValue(val=0), c = decorators.progress.UnsignedIntValue(val=0),
m = decorators.progress.UnsignedIntValue(val=1), m = decorators.progress.UnsignedIntValue(val=1),
): ):
@ -89,7 +89,7 @@ def testing_decorated_func_calls_decorated_func(
_my_func_1(arg=i, kwarg=0, sleep=0.005) _my_func_1(arg=i, kwarg=0, sleep=0.005)
def test_decorated_func_calls_decorated_func(): def test_decorated_func_calls_decorated_func():
testing_decorated_func_calls_decorated_func() _testing_decorated_func_calls_decorated_func()
def test_decorator(): def test_decorator():
c = progress.UnsignedIntValue(val=0) c = progress.UnsignedIntValue(val=0)

View file

@ -9,6 +9,7 @@ import multiprocessing as mp
import socket import socket
import signal import signal
import logging import logging
import datetime
from numpy import random from numpy import random
from os.path import abspath, dirname, split from os.path import abspath, dirname, split
@ -19,6 +20,11 @@ import jobmanager
import binfootprint import binfootprint
import progress import progress
if sys.version_info[0] == 2:
TIMEOUT = 300
elif sys.version_info[0] == 3:
TIMEOUT = 5
progress.log.setLevel(logging.ERROR) progress.log.setLevel(logging.ERROR)
from jobmanager.jobmanager import log as jm_log from jobmanager.jobmanager import log as jm_log
@ -167,9 +173,9 @@ def start_server(n, read_old_state=False, client_sleep=0.1):
def start_client(): def start_client():
print("START CLIENT") print("START CLIENT")
jm_client = jobmanager.JobManager_Client(server = SERVER, jm_client = jobmanager.JobManager_Client(server = SERVER,
authkey = AUTHKEY, authkey = AUTHKEY,
port = PORT, port = PORT,
nproc = 3, nproc = 3,
reconnect_tries = 0) reconnect_tries = 0)
jm_client.start() jm_client.start()
@ -287,11 +293,9 @@ def test_shutdown_server_while_client_running():
global PORT global PORT
n = 100 n = 100
timeout = 30
sigs = [('SIGTERM', signal.SIGTERM), ('SIGINT', signal.SIGINT)] sigs = [('SIGTERM', signal.SIGTERM), ('SIGINT', signal.SIGINT)]
sigs = [('SIGTERM', signal.SIGTERM)]
for signame, sig in sigs: for signame, sig in sigs:
PORT += 1 PORT += 1
@ -306,16 +310,19 @@ def test_shutdown_server_while_client_running():
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
time.sleep(1.5) time.sleep(2)
assert p_client.is_alive() assert p_client.is_alive()
print(" send {} to server".format(signame)) print(" send {} to server".format(signame))
os.kill(p_server.pid, sig) os.kill(p_server.pid, sig)
p_server.join(timeout) p_server.join(TIMEOUT)
assert not p_server.is_alive(), "server did not shut down on time" assert not p_server.is_alive(), "server did not shut down on time"
p_client.join(timeout) p_client.join(TIMEOUT)
assert not p_client.is_alive(), "client did not shut down on time" assert not p_client.is_alive(), "client did not shut down on time"
print("[+] server and client joined {}".format(datetime.datetime.now().isoformat()))
fname = 'jobmanager.dump' fname = 'jobmanager.dump'
with open(fname, 'rb') as f: with open(fname, 'rb') as f:
@ -366,56 +373,66 @@ def shutdown_client(sig):
""" """
global PORT global PORT
PORT += 1 PORT += 1
n = 300 n = 30
print("## terminate client with {} ##".format(progress.signal_dict[sig])) print("## terminate client with {} ##".format(progress.signal_dict[sig]))
p_server = None
p_client = None
try:
p_server = mp.Process(target=start_server, args=(n, )) p_server = mp.Process(target=start_server, args=(n, False, 0.1))
p_server.start() p_server.start()
time.sleep(2) time.sleep(0.5)
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
time.sleep(5) time.sleep(1.5)
print(" send {}".format(progress.signal_dict[sig])) print(" send {}".format(progress.signal_dict[sig]))
os.kill(p_client.pid, sig) os.kill(p_client.pid, sig)
assert p_client.is_alive() assert p_client.is_alive()
print("[+] still alive (assume shut down takes some time)") print("[+] still alive (assume shut down takes some time)")
p_client.join(5) p_client.join(5)
assert not p_client.is_alive(), "timeout for client shutdown reached" assert not p_client.is_alive(), "timeout for client shutdown reached"
print("[+] now terminated (timeout of 5s not reached)") print("[+] now terminated (timeout of 5s not reached)")
time.sleep(0.5) time.sleep(0.5)
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
p_client.join(30) p_client.join(TIMEOUT)
p_server.join(30) p_server.join(TIMEOUT)
assert not p_client.is_alive() assert not p_client.is_alive()
assert not p_server.is_alive() assert not p_server.is_alive()
print("[+] client and server terminated") print("[+] client and server terminated")
fname = 'jobmanager.dump' fname = 'jobmanager.dump'
with open(fname, 'rb') as f: with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f) data = jobmanager.JobManager_Server.static_load(f)
assert len(data['args_dict']) == 0 assert len(data['args_dict']) == 0
print("[+] args_set is empty -> all args processed & none failed") print("[+] args_set is empty -> all args processed & none failed")
final_res_args_set = {a[0] for a in data['final_result']} final_res_args_set = {a[0] for a in data['final_result']}
set_ref = set(range(1,n)) set_ref = set(range(1,n))
intersect = set_ref - final_res_args_set intersect = set_ref - final_res_args_set
assert len(intersect) == 0, "final result does not contain all arguments!" assert len(intersect) == 0, "final result does not contain all arguments!"
print("[+] all arguments found in final_results") print("[+] all arguments found in final_results")
except:
if p_server is not None:
p_server.terminate()
if p_client is not None:
p_client.terminate()
raise
def test_check_fail(): def test_check_fail():
global PORT global PORT
@ -424,14 +441,14 @@ def test_check_fail():
def func(self, args, const_args, c, m): def func(self, args, const_args, c, m):
c.value = 0 c.value = 0
m.value = -1 m.value = -1
fail_on = [3,23,45,67,89] fail_on = [3,5,13]
time.sleep(0.1) time.sleep(0.1)
if args in fail_on: if args in fail_on:
raise RuntimeError("fail_on Error") raise RuntimeError("fail_on Error")
return os.getpid() return os.getpid()
n = 100 n = 20
p_server = mp.Process(target=start_server, args=(n,)) p_server = mp.Process(target=start_server, args=(n,))
p_server.start() p_server.start()
@ -490,7 +507,7 @@ def test_jobmanager_read_old_stat():
""" """
global PORT global PORT
PORT += 1 PORT += 1
n = 100 n = 50
p_server = mp.Process(target=start_server, args=(n,)) p_server = mp.Process(target=start_server, args=(n,))
p_server.start() p_server.start()
@ -499,9 +516,8 @@ def test_jobmanager_read_old_stat():
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
time.sleep(3) time.sleep(1.5)
# terminate server ... to start again using reload_from_dump # terminate server ... to start again using reload_from_dump
p_server.terminate() p_server.terminate()
@ -512,13 +528,13 @@ def test_jobmanager_read_old_stat():
assert not p_server.is_alive(), "the server did not terminate on time!" assert not p_server.is_alive(), "the server did not terminate on time!"
print("[+] client and server terminated") print("[+] client and server terminated")
time.sleep(2) time.sleep(1)
PORT += 1 PORT += 1
# start server using old dump # start server using old dump
p_server = mp.Process(target=start_server, args=(n,True)) p_server = mp.Process(target=start_server, args=(n,True))
p_server.start() p_server.start()
time.sleep(2) time.sleep(1)
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
@ -544,40 +560,6 @@ def test_jobmanager_read_old_stat():
assert len(intersect) == 0, "final result does not contain all arguments!" assert len(intersect) == 0, "final result does not contain all arguments!"
print("[+] all arguments found in final_results") print("[+] all arguments found in final_results")
# def test_hashedViewOnNumpyArray():
# s = set()
#
# a = np.ones(4)
# ah = jobmanager.hashableCopyOfNumpyArray(a)
#
# s.add(ah)
#
# b = np.ones(4, dtype=np.int32)
# bh = jobmanager.hashableCopyOfNumpyArray(b)
#
# # hash function independent of dtype
# assert hash(ah) == hash(bh)
# # overwritten equal operator ...
# assert ah == bh
# # ... makes such statements possible!
# assert bh in s
#
# # check if it is truly a copy, not just a view
# b[0] = 2
# assert bh[0] == 1
#
# c = np.ones(5)
# ch = jobmanager.hashableCopyOfNumpyArray(c)
# # different array
# assert not ch in s
#
# # check if shape is included in hash calculation
# bh2 = bh.reshape((2,2))
# assert bh2 not in s
#
# # just some redundant back conversion an checking
# bh2 = bh2.reshape((4,))
# assert bh2 in s
def test_client_status(): def test_client_status():
global PORT global PORT
@ -747,8 +729,8 @@ def test_hum_size():
assert humanize_size(1024**4) == '1024.00TB' assert humanize_size(1024**4) == '1024.00TB'
if __name__ == "__main__": if __name__ == "__main__":
jm_log.setLevel(logging.DEBUG) jm_log.setLevel(logging.WARNING)
progress.log.setLevel(logging.DEBUG) progress.log.setLevel(logging.ERROR)
if len(sys.argv) > 1: if len(sys.argv) > 1:
pass pass
else: else:
@ -759,18 +741,18 @@ if __name__ == "__main__":
# test_Signal_to_sys_exit, # test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list, # test_Signal_to_terminate_process_list,
# test_jobmanager_basic, # test_jobmanager_basic,
# test_jobmanager_server_signals, # test_jobmanager_server_signals,
test_shutdown_server_while_client_running, # test_shutdown_server_while_client_running,
# test_shutdown_client, # test_shutdown_client,
# test_check_fail, # test_chedck_fail,
# test_jobmanager_read_old_stat, test_jobmanager_read_old_stat,
# test_client_status, test_client_status,
# test_jobmanager_local, test_jobmanager_local,
# test_start_server_on_used_port, test_start_server_on_used_port,
# test_shared_const_arg, test_shared_const_arg,
# test_digest_rejected, test_digest_rejected,
# test_hum_size, test_hum_size,
lambda : print("END") lambda : print("END")
] ]