work in progress

This commit is contained in:
Richard Hartmann 2014-09-09 17:15:37 +02:00
parent 661f08c7ac
commit bbed478c14
2 changed files with 203 additions and 145 deletions

View file

@ -23,7 +23,6 @@ import fcntl
import termios
import struct
"""jobmanager module
Richard Hartmann 2014
@ -147,7 +146,8 @@ class Loop(object):
run=False,
verbose=0,
sigint='stop',
sigterm='stop'):
sigterm='stop',
name=None):
"""
func [callable] - function to be called periodically
@ -178,6 +178,7 @@ class Loop(object):
self._proc = None
self._sigint = sigint
self._sigterm = sigterm
self._name = name
if run:
self.start()
@ -212,11 +213,12 @@ class Loop(object):
"""
self.run = True
self._proc = mp.Process(target = Loop._wrapper_func,
args = (self.func, self.args, self._run, self.interval,
self.verbose, self._sigint, self._sigterm))
args = (self.func, self.args, self._run, self.interval,
self.verbose, self._sigint, self._sigterm),
name=self._name)
self._proc.start()
if self.verbose > 1:
print("PID {}: I'm a new loop process".format(self._proc.pid))
print("PID {}: I'm a new loop process ({})".format(self._proc.pid, self._name))
def stop(self):
"""
@ -310,7 +312,8 @@ class StatusBar(Loop):
run=True,
verbose=0,
sigint='stop',
sigterm='stop'):
sigterm='stop',
name='statusbar'):
"""
The init will also start to display the status bar if run was set True.
Otherwise use the inherited method start() to start the show_stat loop.
@ -350,7 +353,8 @@ class StatusBar(Loop):
super().__init__(func=StatusBar.show_stat,
args=(self.count, self.start_time, self.max_count,
self.width, self.speed_calc_cycles, self.q),
interval=interval, run=run, verbose=verbose, sigint=sigint, sigterm=sigterm)
interval=interval, run=run, verbose=verbose,
sigint=sigint, sigterm=sigterm, name=name)
def __set_width(self, width):
"""
@ -450,10 +454,12 @@ class Signal_to_SIG_IGN(object):
self.verbose = verbose
for s in signals:
signal.signal(s, self._handler)
def _handler(self, signal, frame):
def _handler(self, sig, frame):
if self.verbose > 0:
print("PID {}: received signal {} -> will be ignored".format(os.getpid(), signal_dict[signal]))
print("PID {}: received signal {} -> will be ignored".format(os.getpid(), signal_dict[sig]))
class Signal_to_sys_exit(object):
def __init__(self, signals=[signal.SIGINT, signal.SIGTERM], verbose=0):
self.verbose = verbose
@ -524,14 +530,14 @@ class JobManager_Server(object):
a SIGKILL.
"""
def __init__(self,
authkey,
fname_for_final_result_dump,
const_args=None,
port=42524,
verbose=1,
msg_interval=1,
fname_for_job_q_dump='auto',
fname_for_fail_q_dump='auto'):
authkey,
fname_for_final_result_dump,
const_args=None,
port=42524,
verbose=1,
msg_interval=1,
fname_for_args_dump='auto',
fname_for_fail_dump='auto'):
"""
authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey.
@ -553,6 +559,7 @@ class JobManager_Server(object):
"""
self.verbose = verbose
self._pid = os.getpid()
self._pid_start = None
if self.verbose > 1:
print("PID {}: I'm the JobManager_Server main process".format(self._pid))
@ -563,7 +570,10 @@ class JobManager_Server(object):
self.fname_for_final_result_dump = fname_for_final_result_dump
self.const_args = copy.copy(const_args)
self.fname_for_job_q_dump = fname_for_job_q_dump
self.fname_for_args_dump = fname_for_args_dump
self.fname_for_fail_dump = fname_for_fail_dump
self.msg_interval = msg_interval
# to do some redundant checking, might be removed
# the args_set holds all arguments to be processed
@ -586,7 +596,7 @@ class JobManager_Server(object):
JobQueueManager.register('get_job_q', callable=lambda: self.job_q)
JobQueueManager.register('get_result_q', callable=lambda: self.result_q)
JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q)
JobQueueManager.register('get_const_args', callable=lambda: self.const_args)
JobQueueManager.register('get_const_args', callable=lambda: self.const_args, exposed=["__iter__"])
address=('', self.port) #ip='' means local
authkey=self.authkey
@ -609,16 +619,11 @@ class JobManager_Server(object):
self.final_result = []
self.stat = None
if self.verbose > 0:
self.stat = StatusBar(count = self.numresults, max_count = self.numjobs,
interval=msg_interval, speed_calc_cycles=10,
verbose = self.verbose, sigint='ign', sigterm='ign')
self.err_log = []
self.p_reinsert_failure = Loop(func=JobManager_Server.reinsert_failures,
args=(self.fail_q, self.job_q),
interval=1, run=False)
# self.p_reinsert_failure = Loop(func=JobManager_Server.reinsert_failures,
# args=(self.fail_q, self.job_q),
# interval=1, run=False)
def put_arg(self, a):
"""add argument a to the job_q
@ -677,13 +682,24 @@ class JobManager_Server(object):
When finished, or on exception call stop() afterwards to shout down gracefully.
"""
if self._pid != os.getpid():
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
if self.numjobs.value != len(self.args_set):
raise RuntimeError("use JobManager_Server.put_arg to put arguments to the job_q")
if self.verbose > 0:
self.stat = StatusBar(count = self.numresults, max_count = self.numjobs,
interval=self.msg_interval, speed_calc_cycles=10,
verbose = self.verbose, sigint='ign', sigterm='ign')
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT], verbose = self.verbose)
pid = os.getpid()
if self.verbose > 1:
print("PID {}: JobManager_Server starts processing incoming results".format(pid))
if self._pid != pid:
print(" NOTE: this routine was triggered as a subprocess which is NOT preferable!")
# if self.verbose > 1:
# print("PID {}: JobManager_Server starts processing incoming results".format(pid))
# if self._pid != pid:
# print(" NOTE: this routine was triggered as a subprocess which is NOT preferable!")
try:
if self.verbose > 0:
@ -692,11 +708,14 @@ class JobManager_Server(object):
print("PID {}: StatusBar started".format(self.stat.getpid()))
while (len(self.args_set) - self.fail_q.qsize()) > 0:
arg, result = self.result_q.get() #blocks until an item is available
self.args_set.remove(arg)
self.process_new_result(arg, result)
with self.numresults.get_lock():
self.numresults.value += 1
try:
arg, result = self.result_q.get(timeout=1) #blocks until an item is available
self.args_set.remove(arg)
self.process_new_result(arg, result)
self.numresults.value = self.numjobs.value - (len(self.args_set) - self.fail_q.qsize())
except queue.Empty:
pass
except:
if self.verbose > 0:
err, val, trb = sys.exc_info()
@ -713,9 +732,9 @@ class JobManager_Server(object):
print("PID {}: wait {}s before trigger clean up".format(pid, self.__wait_before_stop))
time.sleep(self.__wait_before_stop)
finally:
self.stop()
def stop(self):
self._shoutdown()
def _shoutdown(self):
""""stop all spawned processes and clean up
- call process_final_result to handle all collected result
@ -754,49 +773,42 @@ class JobManager_Server(object):
if self.verbose > 0:
print("done!")
if not self.job_q.empty():
if self.fname_for_job_q_dump == 'auto':
fname = "jobmanager_server_job_queue_dump_{}".format(getDateForFileName(includePID=False))
if len(self.args_set) > 0:
if self.fname_for_args_dump == 'auto':
fname = "jobmanager_server_args_{}.dump".format(getDateForFileName(includePID=False))
else:
fname = self.fname_for_job_q_dump
fname = self.fname_for_args_dump
if self.verbose > 0:
print("PID {}: args_set not empty -> dump to file {} ... ".format(pid, fname), end='', flush=True)
print("PID {}: args_set not empty -> dump to file '{}' ... ".format(pid, fname), end='', flush=True)
args = list(self.args_set)
with open(fname, 'wb') as f:
pickle.dump(args, f, protocol=pickle.HIGHEST_PROTOCOL)
if self.verbose > 0:
print("done!")
print("done!")
if not self.fail_q.empty():
if self.fname_for_fail_q_dump == 'auto':
fname = "jobmanager_server_fail_queue_dump_{}".format(getDateForFileName(includePID=False))
if self.fname_for_fail_dump == 'auto':
fname = "jobmanager_server_fail_{}.dump".format(getDateForFileName(includePID=False))
else:
fname = self.fname_for_fail_q_dump
fname = self.fname_for_fail_dump
if self.verbose > 0:
print("PID {}: there are reported failures -> dump to file {} ... ".format(pid, fname), end='', flush=True)
print("PID {}: there are reported failures -> dump to file '{}' ... ".format(pid, fname), end='', flush=True)
fail_list = []
while not self.fail_q.empty():
new_args, err, hostname, fname_tb = self.fail_q.get()
fail_list.append((new_args, err, hostname, fname_tb))
new_args, err_name, hostname = self.fail_q.get()
fail_list.append((new_args, err_name, hostname))
if self.verbose > 1:
print("'{}' on {}:{}".format(err_name, hostname, new_args))
with open(fname, 'rb') as f:
with open(fname, 'wb') as f:
pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL)
if self.verbose > 0:
print("done!")
print("done!")
if self.verbose > 1:
for f in fail_list:
print(new_args, err, hostname, fname_tb)
if self.verbose > 0:
print("PID {}: JobManager_Server was successfully shout down".format(pid))
print("PID {}: JobManager_Server was successfully shout down".format(pid))
class JobManager_Client(object):
@ -868,28 +880,35 @@ class JobManager_Client(object):
self.procs = []
@staticmethod
def _get_manager_object(ip, port, authkey, verbose=0):
"""
connects to the server and get registered shared objects such as
job_q, result_q, fail_q, const_args
"""
class ServerQueueManager(SyncManager):
pass
ServerQueueManager.register('get_job_q')
ServerQueueManager.register('get_result_q')
ServerQueueManager.register('get_fail_q')
ServerQueueManager.register('get_const_args')
ServerQueueManager.register('get_const_args', exposed="__iter__")
self.manager = ServerQueueManager(address=(self.ip, self.port), authkey=self.authkey)
manager = ServerQueueManager(address=(ip, port), authkey=authkey)
if self.verbose > 0:
print('PIC {}: SyncManager connecting to {}:{} ... '.format(self._pid, self.ip, self.port), end='', flush=True)
self.manager.connect()
if self.verbose > 0:
if verbose > 0:
print('PIC {}: connecting to {}:{} ... '.format(os.getpid(), ip, port), end='', flush=True)
manager.connect()
if verbose > 0:
print('done!')
self.job_q = self.manager.get_job_q()
self.result_q = self.manager.get_result_q()
self.fail_q = self.manager.get_fail_q()
self.const_args = self.manager.get_const_args()
job_q = manager.get_job_q()
result_q = manager.get_result_q()
fail_q = manager.get_fail_q()
const_args = manager.get_const_args()
return job_q, result_q, fail_q, const_args
@staticmethod
def func(args, const_args):
"""
@ -907,12 +926,14 @@ class JobManager_Client(object):
@staticmethod
def __worker_func(func, const_args, job_q, result_q, nice, fail_q, verbose):
def __worker_func(func, nice, verbose, ip, port, authkey):
"""
the wrapper spawned nproc trimes calling and handling self.func
"""
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
job_q, result_q, fail_q, const_args = JobManager_Client._get_manager_object(ip, port, authkey, verbose)
n = os.nice(0)
n = os.nice(nice - n)
c = 0
@ -927,7 +948,7 @@ class JobManager_Client(object):
# regular case, just stop woring when empty job_q was found
except queue.Empty:
if verbose > 1:
if verbose > 0:
print("\nPID {}: finds empty job queue, processed {} jobs".format(os.getpid(), c))
return
@ -956,19 +977,16 @@ class JobManager_Client(object):
return
# some unexpected Exception
# write argument to fail_q, save traceback
# continue wirkung
# write argument, exception name and hostname to fail_q, save traceback
# continue workung
except:
err, val, trb = sys.exc_info()
if verbose > 0:
print("\nPID {}: caught exception {}, report failure of current argument to server ... ".format(os.getpid(), err), end='', flush=True)
print("\nPID {}: caught exception '{}', report failure of current argument to server ... ".format(os.getpid(), err.__name__), end='', flush=True)
hostname = socket.gethostname()
fname = 'traceback_args_{}_err_{}_{}.trb'.format(new_args, err, getDateForFileName(includePID=True))
# print(fail_q.qsize())
# fail_q.put(new_args, timeout=10)
fname = 'traceback_err_{}_{}.trb'.format(err.__name__, getDateForFileName(includePID=True))
fail_q.put((new_args, err.__name__, hostname), timeout=10)
if verbose > 0:
print("done")
@ -979,7 +997,7 @@ class JobManager_Client(object):
if verbose > 0:
print("done")
print(" continue processing next argument.")
print(" continue processing next argument.")
if verbose > 1:
print("PID {}: JobManager_Client.__worker_func terminates".format(os.getpid()))
@ -987,17 +1005,24 @@ class JobManager_Client(object):
def start(self):
"""
starts a number of nproc subprocess to work on the job_q
SIGTERM and SIGINT are managed to terminate all subprocesses
retruns when all subprocesses have terminated
"""
if self.verbose > 1:
print("PID {}: start {} processes to work on the remote queue".format(os.getpid(), self.nproc))
for i in range(self.nproc):
p = mp.Process(target=self.__worker_func, args=(self.func, self.const_args,
self.job_q, self.result_q,
self.nice, self.fail_q,
self.verbose))
p = mp.Process(target=self.__worker_func, args=(self.func,
self.nice,
self.verbose,
self.ip,
self.port,
self.authkey))
self.procs.append(p)
p.start()
time.sleep(0.3)
Signal_to_terminate_process_list(process_list = self.procs, verbose=self.verbose)

141
tests.py
View file

@ -90,8 +90,8 @@ def start_server(verbose, n=30):
jm_server = jobmanager.JobManager_Server(authkey=authkey,
fname_for_final_result_dump='final_result.dump',
verbose=verbose,
fname_for_job_q_dump='job_q.dump',
fname_for_fail_q_dump='fail_q.dump')
fname_for_args_dump='args.dump',
fname_for_fail_dump='fail.dump')
jm_server.args_from_list(args)
jm_server.start()
@ -148,13 +148,13 @@ def test_jobmanager_server_signals():
assert not p_server.is_alive(), "timeout for server shutdown reached"
print("[+] now terminated (timeout of 15s not reached)")
fname = 'job_q.dump'
fname = 'args.dump'
with open(fname, 'rb') as f:
args = pickle.load(f)
for i,a in enumerate(range(1,30)):
assert a == args[i], "checking the job_q.dump failed"
print("[+] job_q.dump contains all arguments")
assert a == args[i], "checking the args.dump failed"
print("[+] args.dump contains all arguments")
print("## TEST SIGINT ##")
@ -169,13 +169,13 @@ def test_jobmanager_server_signals():
assert not p_server.is_alive(), "timeout for server shutdown reached"
print("[+] now terminated (timeout of 15s not reached)")
fname = 'job_q.dump'
fname = 'args.dump'
with open(fname, 'rb') as f:
args = pickle.load(f)
for i,a in enumerate(range(1,30)):
assert a == args[i], "checking the job_q.dump failed"
print("[+] job_q.dump contains all arguments")
assert a == args[i], "checking the args.dump failed"
print("[+] args.dump contains all arguments")
def test_shutdown_server_while_client_running():
"""
@ -184,9 +184,9 @@ def test_shutdown_server_while_client_running():
start client
stop server -> client should catch exception, but can't do anything,
writing to fail_q won't work, because server went down
writing to fail won't work, because server went down
check if the final_result and the job_q dump end up to include
check if the final_result and the args dump end up to include
all arguments given
"""
p_server = mp.Process(target=start_server, args=(0,1000))
@ -207,7 +207,7 @@ def test_shutdown_server_while_client_running():
assert not p_server.is_alive()
assert not p_client.is_alive()
fname = 'job_q.dump'
fname = 'args.dump'
with open(fname, 'rb') as f:
args = pickle.load(f)
@ -226,7 +226,7 @@ def test_shutdown_server_while_client_running():
if len(intersec_set) == 0:
print("[+] no arguments lost!")
assert len(intersec_set) == 0, "job_q.dump and final_result_dump do NOT contain all arguments -> some must have been lost!"
assert len(intersec_set) == 0, "args.dump and final_result_dump do NOT contain all arguments -> some must have been lost!"
def test_shutdown_client():
shutdown_client(signal.SIGTERM)
@ -292,10 +292,72 @@ def shutdown_client(sig):
assert len(intersect) == 0, "final result does not contain all arguments!"
print("[+] all arguments found in final_results")
def test_check_fail():
class Client_Random_Error(jobmanager.JobManager_Client):
def func(self, args, const_args):
fail_on = [3,23,45,67,89]
time.sleep(0.1)
if args in fail_on:
raise RuntimeError("fail_on Error")
return os.getpid()
n = 100
verbose=0
p_server = mp.Process(target=start_server, args=(0,n))
p_server.start()
time.sleep(1)
print("START CLIENT")
jm_client = Client_Random_Error(ip='localhost',
authkey='testing',
port=42524,
nproc=0,
verbose=verbose)
p_client = mp.Process(target=jm_client.start)
p_client.start()
assert p_server.is_alive()
assert p_client.is_alive()
print("[+] server and client running")
p_server.join(60)
p_client.join(60)
assert not p_server.is_alive()
assert not p_client.is_alive()
print("[+] server and client stopped")
fname = 'final_result.dump'
with open(fname, 'rb') as f:
final_res = pickle.load(f)
final_res_args_set = {a[0] for a in final_res}
fname = 'fail.dump'
with open(fname, 'rb') as f:
fail = pickle.load(f)
fail_set = {a[0] for a in fail}
set_ref = set(range(1,n))
all_set = final_res_args_set | fail_set
assert len(set_ref - all_set) == 0, "final result union with reported failure do not correspond to all args!"
print("[+] all arguments found in final_results | reported failure")
def test_Signal_to_SIG_IGN():
def f():
jobmanager.Signal_to_SIG_IGN()
time.sleep(10)
print("before sleep")
while True:
time.sleep(1)
print("after sleep")
p = mp.Process(target=f)
p.start()
@ -383,47 +445,18 @@ def test_Signal_to_terminate_process_list():
time.sleep(0.5)
print("send SIGINT")
os.kill(p_mother.pid, signal.SIGINT)
def test_check_fail_q():
class Client_Random_Error(jobmanager.JobManager_Client):
def func(self, args, const_args):
time.sleep(0.1)
x = rand()
if x < 0.1:
assert False
return os.getpid()
n = 100
verbose=2
p_server = mp.Process(target=start_server, args=(0,n))
p_server.start()
time.sleep(1)
jm_client = Client_Random_Error(ip='localhost', authkey='testing', port=42524, nproc=1, verbose=verbose)
p_client = mp.Process(target=jm_client.start)
p_client.start()
if __name__ == "__main__":
# test_Signal_to_SIG_IGN()
# test_Signal_to_sys_exit()
# test_Signal_to_terminate_process_list()
# test_loop_basic()
# test_loop_signals()
# test_jobmanager_basic()
# test_jobmanager_server_signals()
# test_shutdown_server_while_client_running()
# test_shutdown_client()
test_check_fail_q()
"""
MEMO:
everything from the fail_q is also considered process
- remove from args_set
- keep track for summary at the end
"""
test_Signal_to_SIG_IGN()
test_Signal_to_sys_exit()
test_Signal_to_terminate_process_list()
test_loop_basic()
test_loop_signals()
test_jobmanager_basic()
test_jobmanager_server_signals()
test_shutdown_server_while_client_running()
test_shutdown_client()
test_check_fail()
pass