save work in progress

This commit is contained in:
cimatosa 2016-09-28 16:14:01 +02:00
parent 521a29320e
commit 4dfa906017
5 changed files with 190 additions and 320 deletions

View file

@ -49,6 +49,7 @@ import warnings
import binfootprint as bf import binfootprint as bf
import progress import progress
import logging import logging
import psutil
# try: # try:
# from logging.handlers import QueueHandler # from logging.handlers import QueueHandler
@ -299,7 +300,13 @@ class JobManager_Client(object):
def connect(self): def connect(self):
if self.manager_objects is None: if self.manager_objects is None:
self.manager_objects = self.create_manager_objects() try:
self.manager_objects = self.create_manager_objects()
except Exception as e:
log.critical("creating manager objects failed due to {}".format(type(e)))
log.info(traceback.format_exc())
raise
else: else:
log.info("already connected (at least shared object are available)") log.info("already connected (at least shared object are available)")
@ -465,7 +472,7 @@ class JobManager_Client(object):
# job_q.get failed -> server down? # job_q.get failed -> server down?
except Exception as e: except Exception as e:
log.error("Error when calling 'job_q_get'") log.error("Error when calling 'job_q_get'")
handle_unexpected_queue_error(e, log) handle_unexpected_queue_error(e)
break break
# try to process the retrieved argument # try to process the retrieved argument
@ -504,7 +511,7 @@ class JobManager_Client(object):
# fail_q.put failed -> server down? # fail_q.put failed -> server down?
except Exception as e: except Exception as e:
log.error('sending arg to fail_q failed') log.error('sending arg to fail_q failed')
handle_unexpected_queue_error(e, log) handle_unexpected_queue_error(e)
break break
else: else:
log.debug('sending arg to fail_q was successful') log.debug('sending arg to fail_q was successful')
@ -525,8 +532,8 @@ class JobManager_Client(object):
except Exception as e: except Exception as e:
log.error('sending result to result_q failed due to %s', type(e)) log.error('sending result to result_q failed due to %s', type(e))
emergency_dump(arg, res, emergency_dump_path, log) emergency_dump(arg, res, emergency_dump_path)
handle_unexpected_queue_error(e, log) handle_unexpected_queue_error(e)
break break
del res del res
@ -550,7 +557,7 @@ class JobManager_Client(object):
# fail_q.put failed -> server down? # fail_q.put failed -> server down?
except Exception as e: except Exception as e:
log.error("put arg back to job_q failed due to %s", type(e)) log.error("put arg back to job_q failed due to %s", type(e))
JobManager_Client._handle_unexpected_queue_error(e, log) handle_unexpected_queue_error(e)
else: else:
log.debug("putting arg back to job_q was successful") log.debug("putting arg back to job_q was successful")
@ -568,6 +575,10 @@ class JobManager_Client(object):
log.info(stat) log.info(stat)
log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid()) log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid())
# log.debug("trigger sys.exit(0)")
# raise SystemExit
# log.debug("sys.exit(0) was triggered")
def start(self): def start(self):
""" """
@ -670,9 +681,15 @@ class JobManager_Client(object):
for p in self.procs: for p in self.procs:
log.debug("join %s PID %s", p, p.pid) log.debug("join %s PID %s", p, p.pid)
while p.is_alive(): p.join()
log.debug("still alive %s PID %s", p, p.pid) # while p.is_alive():
p.join(timeout=self.interval) # log.debug("still alive %s PID %s", p, p.pid)
# p.join(timeout=self.interval)
# _proc = psutil.Process(p.pid)
# log.debug(str(p.exitcode))
# log.debug(str(_proc.connections()))
# log.debug(str(_proc.children(recursive=True)))
# log.debug(str(_proc.status()))
log.debug("process %s PID %s was joined", p, p.pid) log.debug("process %s PID %s was joined", p, p.pid)
@ -926,6 +943,9 @@ class JobManager_Server(object):
""" """
# will only be False when _shutdown was started in subprocess # will only be False when _shutdown was started in subprocess
self.__stop_SyncManager()
log.debug("SyncManager stopped!")
# do user defined final processing # do user defined final processing
self.process_final_result() self.process_final_result()
log.debug("process_final_result done!") log.debug("process_final_result done!")
@ -952,8 +972,7 @@ class JobManager_Server(object):
self.show_statistics() self.show_statistics()
self.__stop_SyncManager()
log.debug("SyncManager stopped!")
log.info("JobManager_Server was successfully shut down") log.info("JobManager_Server was successfully shut down")
def show_statistics(self): def show_statistics(self):
@ -1333,28 +1352,28 @@ def call_connect_python3(connect, dest, reconnect_wait=2, reconnect_tries=3):
log.error(traceback.format_stack()[-3].strip()) log.error(traceback.format_stack()[-3].strip())
if type(e) is ConnectionResetError: # ... when the destination hangs up on us if type(e) is ConnectionResetError: # ... when the destination hangs up on us
c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, log) c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries)
elif type(e) is ConnectionRefusedError: # ... when the destination refuses our connection elif type(e) is ConnectionRefusedError: # ... when the destination refuses our connection
handler_connection_refused(e, dest, log) handler_connection_refused(e, dest)
elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch
handler_authentication_error(e, dest, log) handler_authentication_error(e, dest)
elif type(e) is RemoteError: # ... when the destination send us an error message elif type(e) is RemoteError: # ... when the destination send us an error message
if 'KeyError' in e.args[0]: if 'KeyError' in e.args[0]:
handler_remote_key_error(e, dest, log) handler_remote_key_error(e, dest)
elif 'ValueError: unsupported pickle protocol:' in e.args[0]: elif 'ValueError: unsupported pickle protocol:' in e.args[0]:
handler_remote_value_error(e, dest, log) handler_remote_value_error(e, dest)
else: else:
handler_remote_error(e, dest, log) handler_remote_error(e, dest)
elif type(e) is ValueError: elif type(e) is ValueError:
handler_value_error(e, log) handler_value_error(e)
else: # any other exception else: # any other exception
handler_unexpected_error(e, log) handler_unexpected_error(e)
else: # no exception else: # no exception
log.debug("connection to %s successfully established".format(dest)) log.debug("connection to %s successfully established".format(dest))
return True return True
def call_connect_python2(connect, dest, verbose=1, reconnect_wait=2, reconnect_tries=3): def call_connect_python2(connect, dest, reconnect_wait=2, reconnect_tries=3):
c = 0 c = 0
while True: while True:
try: # here we try re establish the connection try: # here we try re establish the connection
@ -1369,24 +1388,24 @@ def call_connect_python2(connect, dest, verbose=1, reconnect_wait=2, reconnect_t
log.error("caught %s with args %s", type(e), e.args) log.error("caught %s with args %s", type(e), e.args)
err_code = e.args[0] err_code = e.args[0]
if err_code == errno.ECONNRESET: # ... when the destination hangs up on us if err_code == errno.ECONNRESET: # ... when the destination hangs up on us
c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, verbose) c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries)
elif err_code == errno.ECONNREFUSED: # ... when the destination refuses our connection elif err_code == errno.ECONNREFUSED: # ... when the destination refuses our connection
handler_connection_refused(e, dest, verbose) handler_connection_refused(e, dest)
else: else:
handler_unexpected_error(e, verbose) handler_unexpected_error(e)
elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch
handler_authentication_error(e, dest, verbose) handler_authentication_error(e, dest)
elif type(e) is RemoteError: # ... when the destination send us an error message elif type(e) is RemoteError: # ... when the destination send us an error message
if 'KeyError' in e.args[0]: if 'KeyError' in e.args[0]:
handler_remote_key_error(e, verbose, dest) handler_remote_key_error(e, dest)
elif 'ValueError: unsupported pickle protocol:' in e.args[0]: elif 'ValueError: unsupported pickle protocol:' in e.args[0]:
handler_remote_value_error(e, verbose, dest) handler_remote_value_error(e, dest)
else: else:
handler_remote_error(e, verbose, dest) handler_remote_error(e, dest)
elif type(e) is ValueError: elif type(e) is ValueError:
handler_value_error(e, verbose) handler_value_error(e)
else: # any other exception else: # any other exception
handler_unexpected_error(e, verbose) handler_unexpected_error(e)
else: # no exception else: # no exception
log.debug("connection to %s successfully established", dest) log.debug("connection to %s successfully established", dest)
@ -1435,24 +1454,24 @@ def getDateForFileName(includePID = False):
name += "_{}".format(os.getpid()) name += "_{}".format(os.getpid())
return name return name
def handler_authentication_error(e, dest, log): def handler_authentication_error(e, dest):
log.error("authentication error") log.error("authentication error")
log.info("Authkey specified does not match the authkey at destination side!") log.info("Authkey specified does not match the authkey at destination side!")
raise e raise e
def handler_broken_pipe_error(e, log): def handler_broken_pipe_error(e):
log.error("broken pip error") log.error("broken pip error")
log.info("This usually means that an established connection was closed\n") log.info("This usually means that an established connection was closed\n")
log.info("does not exists anymore, probably the server went down") log.info("does not exists anymore, probably the server went down")
raise e raise e
def handler_connection_refused(e, dest, log): def handler_connection_refused(e, dest):
log.error("connection refused error") log.error("connection refused error")
log.info("This usually means that no matching Manager object was instanciated at destination side!") log.info("This usually means that no matching Manager object was instanciated at destination side!")
log.info("Either there is no Manager running at all, or it is listening to another port.") log.info("Either there is no Manager running at all, or it is listening to another port.")
raise JMConnectionRefusedError(e) raise JMConnectionRefusedError(e)
def handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, log): def handler_connection_reset(dest, c, reconnect_wait, reconnect_tries):
log.error("connection reset error") log.error("connection reset error")
log.info("During 'connect' this error might be due to firewall settings"+ log.info("During 'connect' this error might be due to firewall settings"+
"or other TPC connections controlling mechanisms!") "or other TPC connections controlling mechanisms!")
@ -1465,33 +1484,33 @@ def handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, log):
time.sleep(reconnect_wait) time.sleep(reconnect_wait)
return c return c
def handler_eof_error(e, log): def handler_eof_error(e):
log.error("EOF error") log.error("EOF error")
log.info("This usually means that server did not replay, although the connection is still there.\n"+ log.info("This usually means that server did not replay, although the connection is still there.\n"+
"This is due to the fact that the connection is in 'timewait' status for about 60s\n"+ "This is due to the fact that the connection is in 'timewait' status for about 60s\n"+
"after the server went down inappropriately.") "after the server went down inappropriately.")
raise e raise e
def handler_remote_error(e, dest, log): def handler_remote_error(e, dest):
log.error("remote error") log.error("remote error")
log.info("The server %s send an RemoteError message!\n%s", dest, e.args[0]) log.info("The server %s send an RemoteError message!\n%s", dest, e.args[0])
raise RemoteError(e.args[0]) raise RemoteError(e.args[0])
def handler_remote_key_error(e, dest, log): def handler_remote_key_error(e, dest):
log.error("remote key error") log.error("remote key error")
log.info("'KeyError' detected in RemoteError message from server %s!\n"+ log.info("'KeyError' detected in RemoteError message from server %s!\n"+
"This hints to the fact that the actual instace of the shared object on the server side has changed,\n"+ "This hints to the fact that the actual instace of the shared object on the server side has changed,\n"+
"for example due to a server restart you need to reinstanciate the proxy object.", dest) "for example due to a server restart you need to reinstanciate the proxy object.", dest)
raise RemoteKeyError(e.args[0]) raise RemoteKeyError(e.args[0])
def handler_remote_value_error(e, dest, log): def handler_remote_value_error(e, dest):
log.error("remote value error") log.error("remote value error")
log.info("'ValueError' due to 'unsupported pickle protocol' detected in RemoteError from server %s!\n"+ log.info("'ValueError' due to 'unsupported pickle protocol' detected in RemoteError from server %s!\n"+
"You might have tried to connect to a SERVER running with an OLDER python version.\n"+ "You might have tried to connect to a SERVER running with an OLDER python version.\n"+
"At this stage (and probably for ever) this should be avoided!", dest) "At this stage (and probably for ever) this should be avoided!", dest)
raise RemoteValueError(e.args[0]) raise RemoteValueError(e.args[0])
def handler_value_error(e, log): def handler_value_error(e):
log.error("value error") log.error("value error")
if 'unsupported pickle protocol' in e.args[0]: if 'unsupported pickle protocol' in e.args[0]:
log.info("'ValueError' due to 'unsupported pickle protocol'!\n" log.info("'ValueError' due to 'unsupported pickle protocol'!\n"
@ -1499,16 +1518,16 @@ def handler_value_error(e, log):
"At this stage (and probably for ever) this should be avoided.\n") "At this stage (and probably for ever) this should be avoided.\n")
raise e raise e
def handler_unexpected_error(e, log): def handler_unexpected_error(e):
log.error("unexpected error of type %s and args %s", type(e), e.args) log.error("unexpected error of type %s and args %s", type(e), e.args)
raise e raise e
def handle_unexpected_queue_error(e, log): def handle_unexpected_queue_error(e):
log.error("unexpected error of type %s and args %s\n"+ log.error("unexpected error of type %s and args %s\n"+
"I guess the server went down, can't do anything, terminate now!", type(e), e.args) "I guess the server went down, can't do anything, terminate now!", type(e), e.args)
log.debug(traceback.print_exc()) log.debug(traceback.print_exc())
def emergency_dump(arg, res, path, log): def emergency_dump(arg, res, path):
now = datetime.now().isoformat() now = datetime.now().isoformat()
pid = os.getpid() pid = os.getpid()
fname = "{}_pid_{}".format(now, pid) fname = "{}_pid_{}".format(now, pid)
@ -1566,7 +1585,6 @@ def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconn
try: try:
res = o(*args, **kwargs) res = o(*args, **kwargs)
except queue.Empty as e: except queue.Empty as e:
log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e)) log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
raise e raise e
@ -1586,11 +1604,11 @@ def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconn
time.sleep(reconnect_wait) time.sleep(reconnect_wait)
continue continue
elif type(e) is BrokenPipeError: elif type(e) is BrokenPipeError:
handler_broken_pipe_error(e, log) handler_broken_pipe_error(e)
elif type(e) is EOFError: elif type(e) is EOFError:
handler_eof_error(e, log) handler_eof_error(e)
else: else:
handler_unexpected_error(e, log) handler_unexpected_error(e)
else: # SUCCESS -> return True else: # SUCCESS -> return True
log.debug("operation '%s' successfully executed", operation) log.debug("operation '%s' successfully executed", operation)
return res return res
@ -1627,11 +1645,14 @@ def proxy_operation_decorator_python2(proxy, operation, reconnect_wait=2, reconn
raise e raise e
time.sleep(reconnect_wait) time.sleep(reconnect_wait)
continue continue
log.debug("execute operation '%s' -> %s", operation, dest) log.debug("execute operation '%s' -> %s", operation, dest)
try: try:
res = o(*args, **kwargs) res = o(*args, **kwargs)
except queue.Empty as e:
log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
raise e
except Exception as e: except Exception as e:
log.warning("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e)) log.warning("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip()) log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
@ -1652,11 +1673,11 @@ def proxy_operation_decorator_python2(proxy, operation, reconnect_wait=2, reconn
time.sleep(reconnect_wait) time.sleep(reconnect_wait)
continue continue
else: else:
handler_unexpected_error(e, log) handler_unexpected_error(e)
elif type(e) is EOFError: elif type(e) is EOFError:
handler_eof_error(e, log) handler_eof_error(e)
else: else:
handler_unexpected_error(e, log) handler_unexpected_error(e)
else: # SUCCESS -> return True else: # SUCCESS -> return True
log.debug("operation '%s' successfully executed", operation) log.debug("operation '%s' successfully executed", operation)
return res return res

View file

@ -1,67 +1,9 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import binfootprint
from .jobmanager import JobManager_Server from .jobmanager import JobManager_Server
import pickle
def recursive_scan_for_instance(obj, type, explicit_exclude = None ):
"""
try to do some recursive check to see whether 'obj' is of type
'type' or contains items of 'type' type.
if obj is a mapping (like dict) this will only check
for item iterated over via
for item in obj
which corresponds to the keys in the dict case.
The explicit_exclude argument may be a tuple of types for
some explicit checking in the sense that if obj is an
instance of one of the type given by explicit_exclude
we know it is NOT an instance of type.
"""
# check this object for type
if isinstance(obj, type):
return True
# check for some explicit types in order to conclude
# that obj is not of type
# see dict example
if explicit_exclude is not None:
if isinstance(obj, explicit_exclude):
return False
# if not of desired type, try to iterate and check each item for type
try:
for i in obj:
# return True, if object is of type or contains type
if recursive_scan_for_instance(i, type) == True:
return True
except:
pass
# either object is not iterable and not of type, or each item is not of type -> return False
return False
def recursive_scan_for_dict_instance(obj):
# here we explicitly check against the 'str' class
# as it is iterable, but can not contain an dict as item, only characters
return recursive_scan_for_instance(obj, type=dict, explicit_exclude=(str, ))
def data_as_binary_key(data):
# since the hash value of a string is randomly seeded each time python
# is started -> the order of the entries in a dict is not guaranteed
# and therefore its binary representation may vary
# this forbids to use dicts as a key for persistent data structures
if recursive_scan_for_dict_instance(data):
raise RuntimeError("data used as 'key' must not include dictionaries!")
# protocol 2 ensures backwards compatibility
# (at least here) down to Python 2.3
return pickle.dumps(data, protocol=2)
class PersistentData_Server(JobManager_Server): class PersistentData_Server(JobManager_Server):
def __init__(self, def __init__(self,
@ -91,13 +33,13 @@ class PersistentData_Server(JobManager_Server):
return True, if a new data set was added (key not already in pds) return True, if a new data set was added (key not already in pds)
otherwise false otherwise false
""" """
key = data_as_binary_key(arg.id) key = binfootprint.dump(arg.id)
has_key = key in self.pds has_key = key in self.pds
self.pds[key] = (arg, result) self.pds[key] = (arg, result)
return not has_key return not has_key
def put_arg(self, a): def put_arg(self, a):
a_bin = data_as_binary_key(a.id) a_bin = binfootprint.dump(a.id)
if self.overwrite or (not a_bin in self.pds): if self.overwrite or (not a_bin in self.pds):
JobManager_Server.put_arg(self, a) JobManager_Server.put_arg(self, a)
return True return True

View file

@ -8,6 +8,7 @@ OUTFILE='pytest_out'
PYTHON="python" PYTHON="python"
PYTHON2_7="python2.7" PYTHON2_7="python2.7"
PYTHON3_4="python3.4" PYTHON3_4="python3.4"
PYTHON3_5="python3.5"
PYLIST=( $PYTHON ) PYLIST=( $PYTHON )
@ -17,7 +18,7 @@ while getopts ":p:ahn" opt; do
case $opt in case $opt in
a) a)
echo "run all!" >&2 echo "run all!" >&2
PYLIST=( $PYTHON2_7 $PYTHON3_4 ) PYLIST=( $PYTHON2_7 $PYTHON3_4 $PYTHON3_5 )
;; ;;
p) p)
if [ "$OPTARG" = "2.7" ]; then if [ "$OPTARG" = "2.7" ]; then

View file

@ -23,7 +23,7 @@ progress.log.setLevel(logging.ERROR)
from jobmanager.jobmanager import log as jm_log from jobmanager.jobmanager import log as jm_log
jm_log.setLevel(logging.INFO) jm_log.setLevel(logging.WARNING)
import warnings import warnings
warnings.filterwarnings('error') warnings.filterwarnings('error')
@ -186,20 +186,24 @@ def test_jobmanager_basic():
p_client = None p_client = None
try: try:
# start a server
p_server = mp.Process(target=start_server, args=(n,False)) p_server = mp.Process(target=start_server, args=(n,False))
p_server.start() p_server.start()
time.sleep(0.5) time.sleep(0.5)
# server needs to be running
assert p_server.is_alive() assert p_server.is_alive()
# start client
p_client = mp.Process(target=start_client) p_client = mp.Process(target=start_client)
p_client.start() p_client.start()
p_client.join(3) p_client.join(3)
# client should have processed all
assert not p_client.is_alive(), "the client did not terminate on time!" assert not p_client.is_alive(), "the client did not terminate on time!"
# client must not throw an exception
assert p_client.exitcode == 0, "the client raised an exception" assert p_client.exitcode == 0, "the client raised an exception"
p_server.join(3) p_server.join(3)
# server should have come down
assert not p_server.is_alive(), "the server did not terminate on time!" assert not p_server.is_alive(), "the server did not terminate on time!"
print("[+] client and server terminated") print("[+] client and server terminated")
@ -207,84 +211,67 @@ def test_jobmanager_basic():
fname = 'jobmanager.dump' fname = 'jobmanager.dump'
with open(fname, 'rb') as f: with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f) data = jobmanager.JobManager_Server.static_load(f)
final_res_args_set = {a[0] for a in data['final_result']} final_res_args_set = {a[0] for a in data['final_result']}
set_ref = set(range(1,n)) set_ref = set(range(1,n))
intersect = set_ref - final_res_args_set intersect = set_ref - final_res_args_set
assert len(intersect) == 0, "final result does not contain all arguments!" assert len(intersect) == 0, "final result does not contain all arguments!"
print("[+] all arguments found in final_results") print("[+] all arguments found in final_results")
except: except:
if p_server is not None: if p_server is not None:
os.kill(p_server.pid, signal.SIGINT) p_server.terminate()
if p_client is not None: if p_client is not None:
os.kill(p_client.pid, signal.SIGINT) p_client.terminate()
raise raise
def test_jobmanager_server_signals(): def test_jobmanager_server_signals():
"""
start a server (no client), shutdown, check dump
"""
global PORT global PORT
PORT += 1 timeout = 5
print("## TEST SIGTERM ##") n = 15
p_server = mp.Process(target=start_server, args=(30,)) sigs = [('SIGTERM', signal.SIGTERM), ('SIGINT', signal.SIGINT)]
p_server.start()
time.sleep(1)
print(" send SIGTERM")
os.kill(p_server.pid, signal.SIGTERM)
assert p_server.is_alive()
print("[+] still alive (assume shut down takes some time)")
p_server.join(15)
assert not p_server.is_alive(), "timeout for server shutdown reached"
print("[+] now terminated (timeout of 15s not reached)")
fname = 'jobmanager.dump' for signame, sig in sigs:
with open(fname, 'rb') as f: PORT += 1
data = jobmanager.JobManager_Server.static_load(f) p_server = None
try:
args_set = set(data['args_dict'].keys()) print("## TEST {} ##".format(signame))
args_ref = range(1,30) p_server = mp.Process(target=start_server, args=(n,))
ref_set = set() p_server.start()
for a in args_ref: time.sleep(0.5)
ref_set.add(binfootprint.dump(a)) assert p_server.is_alive()
print(" send {}".format(signame))
assert len(args_set) == len(ref_set) os.kill(p_server.pid, sig)
assert len(ref_set - args_set) == 0 print("[+] still alive (assume shut down takes some time)")
print("[+] args_set from dump contains all arguments") p_server.join(timeout)
assert not p_server.is_alive(), "timeout for server shutdown reached"
PORT += 1 print("[+] now terminated (timeout of {}s not reached)".format(timeout))
print("## TEST SIGINT ##")
p_server = mp.Process(target=start_server, args=(30,)) fname = 'jobmanager.dump'
p_server.start() with open(fname, 'rb') as f:
time.sleep(1) data = jobmanager.JobManager_Server.static_load(f)
print(" send SIGINT")
os.kill(p_server.pid, signal.SIGINT) args_set = set(data['args_dict'].keys())
assert p_server.is_alive() args_ref = range(1,n)
print("[+] still alive (assume shut down takes some time)") ref_set = set()
p_server.join(15) for a in args_ref:
assert not p_server.is_alive(), "timeout for server shutdown reached" ref_set.add(binfootprint.dump(a))
print("[+] now terminated (timeout of 15s not reached)")
assert len(args_set) == len(ref_set)
fname = 'jobmanager.dump' assert len(ref_set - args_set) == 0
with open(fname, 'rb') as f: print("[+] args_set from dump contains all arguments")
data = jobmanager.JobManager_Server.static_load(f) except:
if p_server is not None:
args_set = set(data['args_dict'].keys()) p_server.terminate()
args_ref = range(1,30) raise
ref_set = set()
for a in args_ref:
ref_set.add(binfootprint.dump(a))
assert len(args_set) == len(ref_set)
assert len(ref_set - args_set) == 0
print("[+] args_set from dump contains all arguments")
def test_shutdown_server_while_client_running(): def test_shutdown_server_while_client_running():
""" """
start server with 1000 elements in queue start server with 100 elements in queue
start client start client
@ -296,61 +283,66 @@ def test_shutdown_server_while_client_running():
all arguments given all arguments given
""" """
global PORT global PORT
PORT += 1
n = 100
timeout = 10
n = 1000 sigs = [('SIGTERM', signal.SIGTERM), ('SIGINT', signal.SIGINT)]
sigs = [('SIGTERM', signal.SIGTERM)]
p_server = mp.Process(target=start_server, args=(n,)) for signame, sig in sigs:
p_server.start() PORT += 1
time.sleep(1) p_server = None
p_client = None
p_client = mp.Process(target=start_client)
p_client.start()
time.sleep(2)
os.kill(p_server.pid, signal.SIGTERM)
p_server.join(200)
p_client.join(200)
try:
assert not p_server.is_alive()
except:
p_server.terminate()
raise
try:
assert not p_client.is_alive()
except:
p_client.terminate()
raise
try:
p_server = mp.Process(target=start_server, args=(n,))
fname = 'jobmanager.dump' p_server.start()
with open(fname, 'rb') as f: time.sleep(0.5)
data = jobmanager.JobManager_Server.static_load(f) assert p_server.is_alive()
args_set = set(data['args_dict'].keys()) p_client = mp.Process(target=start_client)
final_result = data['final_result'] p_client.start()
time.sleep(4)
final_res_args = {binfootprint.dump(a[0]) for a in final_result} assert p_client.is_alive()
print(" send {} to server".format(signame))
os.kill(p_server.pid, sig)
p_server.join(timeout)
assert not p_server.is_alive(), "server did not shut down on time"
p_client.join(timeout)
assert not p_client.is_alive(), "client did not shut down on time"
fname = 'jobmanager.dump'
with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f)
args_ref = range(1,n) args_set = set(data['args_dict'].keys())
set_ref = set() final_result = data['final_result']
for a in args_ref:
set_ref.add(binfootprint.dump(a)) final_res_args = {binfootprint.dump(a[0]) for a in final_result}
set_recover = set(args_set) | set(final_res_args) args_ref = range(1,n)
set_ref = set()
intersec_set = set_ref-set_recover for a in args_ref:
set_ref.add(binfootprint.dump(a))
if len(intersec_set) == 0:
print("[+] no arguments lost!") set_recover = set(args_set) | set(final_res_args)
assert len(intersec_set) == 0, "NOT all arguments found in dump!" intersec_set = set_ref-set_recover
if len(intersec_set) == 0:
print("[+] no arguments lost!")
assert len(intersec_set) == 0, "NOT all arguments found in dump!"
except:
if p_server is not None:
p_server.terminate()
if p_client is not None:
p_client.terminate()
raise
def test_shutdown_client(): def test_shutdown_client():
shutdown_client(signal.SIGTERM) shutdown_client(signal.SIGTERM)
@ -752,7 +744,9 @@ def test_hum_size():
assert humanize_size(1024**3) == '1.00TB' assert humanize_size(1024**3) == '1.00TB'
assert humanize_size(1024**4) == '1024.00TB' assert humanize_size(1024**4) == '1024.00TB'
if __name__ == "__main__": if __name__ == "__main__":
jm_log.setLevel(logging.DEBUG)
progress.log.setLevel(logging.DEBUG)
if len(sys.argv) > 1: if len(sys.argv) > 1:
pass pass
else: else:
@ -763,10 +757,10 @@ if __name__ == "__main__":
# test_Signal_to_sys_exit, # test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list, # test_Signal_to_terminate_process_list,
test_jobmanager_basic, # test_jobmanager_basic,
# test_jobmanager_server_signals, # test_jobmanager_server_signals,
# test_shutdown_server_while_client_running, test_shutdown_server_while_client_running,
# test_shutdown_client, # test_shutdown_client,
# test_check_fail, # test_check_fail,
# test_jobmanager_read_old_stat, # test_jobmanager_read_old_stat,
# test_client_status, # test_client_status,

View file

@ -7,95 +7,7 @@ from os.path import abspath, dirname, split
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import jobmanager as jm import jobmanager as jm
def bin_list():
import pickle
a = ['cvbnm', 'qwert', 'asdfg']
print(pickle.dumps(a))
def bin_dict():
import pickle
a = {'cvbnm': 1, 'qwert': 2, 'asdfg': 3}
print(pickle.dumps(a))
def see_bin_data():
import multiprocessing as mp
mp.set_start_method('spawn')
for i in range(10):
p = mp.Process(target = bin_list)
p.start()
p.join()
for i in range(10):
p = mp.Process(target = bin_dict)
p.start()
p.join()
def test_recursive_type_scan():
d = {'a': 1, 'b':2}
l = [2, 3]
t = ('3','4')
i = 1
f = 3.4
s = 's'
a1 = [l, t, d]
a2 = [l, t]
a3 = (d, d, d)
a4 = (i, l, a1)
###
# GENERAL DICT SCAN
###
assert jm.servers.recursive_scan_for_instance(obj=d, type=dict) == True
assert jm.servers.recursive_scan_for_instance(obj=l, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=t, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=i, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=f, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=s, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=a1, type=dict) == True
assert jm.servers.recursive_scan_for_instance(obj=a2, type=dict) == False
assert jm.servers.recursive_scan_for_instance(obj=a3, type=dict) == True
assert jm.servers.recursive_scan_for_instance(obj=a4, type=dict) == True
###
# SPECIFIC DICT SCAN
###
assert jm.servers.recursive_scan_for_dict_instance(obj=d) == True
assert jm.servers.recursive_scan_for_dict_instance(obj=l) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=t) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=i) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=f) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=s) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=a1) == True
assert jm.servers.recursive_scan_for_dict_instance(obj=a2) == False
assert jm.servers.recursive_scan_for_dict_instance(obj=a3) == True
assert jm.servers.recursive_scan_for_dict_instance(obj=a4) == True
###
# INT SCAN
###
assert jm.servers.recursive_scan_for_instance(obj = i, type=int) == True
assert jm.servers.recursive_scan_for_instance(obj = l, type=int) == True
assert jm.servers.recursive_scan_for_instance(obj = a1, type=int) == True
assert jm.servers.recursive_scan_for_instance(obj = a2, type=int) == True
assert jm.servers.recursive_scan_for_instance(obj = a4, type=int) == True
print(jm.servers.recursive_scan_for_instance(obj = d, type=int))
print(jm.servers.recursive_scan_for_instance(obj = a3, type=int))
for i in d:
print(i)
if __name__ == "__main__": if __name__ == "__main__":
test_recursive_type_scan() pass
# see_bin_data()