This commit is contained in:
Richard Hartmann 2016-10-10 12:18:07 +02:00
commit 5e4eb24a89
2 changed files with 130 additions and 98 deletions

View file

@ -191,7 +191,8 @@ class JobManager_Client(object):
reconnect_tries = 3,
ping_timeout = 2,
ping_retry = 3,
hide_progress = False):
hide_progress = False,
use_special_SIG_INT_handler = True):
"""
server [string] - ip address or hostname where the JobManager_Server is running
@ -236,6 +237,7 @@ class JobManager_Client(object):
warnings.warn("verbose is deprecated", DeprecationWarning)
self.hide_progress = hide_progress
self.use_special_SIG_INT_handler = use_special_SIG_INT_handler
log.info("init JobManager Client instance (pid %s)", os.getpid())
@ -721,18 +723,26 @@ class JobManager_Client(object):
log.debug("all worker processes startes")
#time.sleep(self.interval/2)
log.debug("setup Signal_to_terminate_process_list handler")
if self.use_special_SIG_INT_handler:
exit_handler_signals = [signal.SIGTERM]
jm_client_special_interrupt_signals = [signal.SIGINT]
else:
exit_handler_signals = [signal.SIGTERM, signal.SIGINT]
jm_client_special_interrupt_signals = []
log.debug("setup Signal_to_terminate_process_list handler for signals %s", exit_handler_signals)
exit_handler = Signal_to_terminate_process_list(process_list = self.procs,
identifier_list = [progress.get_identifier(name = "worker{}".format(i+1),
pid = p.pid,
bold = True) for i, p in enumerate(self.procs)],
signals = [signal.SIGTERM],
signals = exit_handler_signals,
timeout = 2)
log.debug("setup Signal_handler_for_Jobmanager_client handler")
log.debug("setup Signal_handler_for_Jobmanager_client handler for signals %s", jm_client_special_interrupt_signals)
Signal_handler_for_Jobmanager_client(client_object = self,
exit_handler=exit_handler,
signals=[signal.SIGINT])
exit_handler = exit_handler,
signals = jm_client_special_interrupt_signals)
for p in self.procs:
@ -799,7 +809,15 @@ class JobManager_Client(object):
def set_shared_status(ss, v):
if ss is not None:
ss.value = v
def get_shared_status(ss):
if ss in None:
return None
else:
return ss.value
class JobManager_Server(object):
"""general usage:
@ -854,7 +872,8 @@ class JobManager_Server(object):
fname_dump = 'auto',
speed_calc_cycles = 50,
keep_new_result_in_memory = False,
hide_progress = False):
hide_progress = False,
show_statistics = True):
"""
authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey.
@ -894,6 +913,7 @@ class JobManager_Server(object):
warnings.warn("verbose is deprecated", DeprecationWarning)
self.hide_progress = hide_progress
self.show_stat = show_statistics
log.debug("I'm the JobManager_Server main process (pid %s)", os.getpid())
@ -1079,30 +1099,31 @@ class JobManager_Server(object):
log.info("JobManager_Server was successfully shut down")
def show_statistics(self):
all_jobs = self.numjobs
succeeded = self.numresults
failed = self.fail_q.qsize()
all_processed = succeeded + failed
id1 = self.__class__.__name__+" "
l = len(id1)
id2 = ' '*l + "| "
print("{}total number of jobs : {}".format(id1, all_jobs))
print("{} processed : {}".format(id2, all_processed))
print("{} succeeded : {}".format(id2, succeeded))
print("{} failed : {}".format(id2, failed))
all_not_processed = all_jobs - all_processed
not_queried = self.job_q.qsize()
queried_but_not_processed = all_not_processed - not_queried
print("{} not processed : {}".format(id2, all_not_processed))
print("{} queried : {}".format(id2, queried_but_not_processed))
print("{} not queried yet : {}".format(id2, not_queried))
print("{}len(args_dict) : {}".format(id2, len(self.args_dict)))
if (all_not_processed + failed) != len(self.args_dict):
log.error("'all_not_processed != len(self.args_dict)' something is inconsistent!")
if self.show_stat:
all_jobs = self.numjobs
succeeded = self.numresults
failed = self.fail_q.qsize()
all_processed = succeeded + failed
id1 = self.__class__.__name__+" "
l = len(id1)
id2 = ' '*l + "| "
print("{}total number of jobs : {}".format(id1, all_jobs))
print("{} processed : {}".format(id2, all_processed))
print("{} succeeded : {}".format(id2, succeeded))
print("{} failed : {}".format(id2, failed))
all_not_processed = all_jobs - all_processed
not_queried = self.job_q.qsize()
queried_but_not_processed = all_not_processed - not_queried
print("{} not processed : {}".format(id2, all_not_processed))
print("{} queried : {}".format(id2, queried_but_not_processed))
print("{} not queried yet : {}".format(id2, not_queried))
print("{}len(args_dict) : {}".format(id2, len(self.args_dict)))
if (all_not_processed + failed) != len(self.args_dict):
log.error("'all_not_processed != len(self.args_dict)' something is inconsistent!")
def all_successfully_processed(self):
return self.numjobs == self.numresults
@ -1212,77 +1233,87 @@ class JobManager_Server(object):
# please overwrite for individual hooks to notify that the server process runs
print("jobmanager awaits client results")
def start(self):
"""
starts to loop over incoming results
When finished, or on exception call stop() afterwards to shut down gracefully.
"""
def bring_him_up(self):
if not self.__start_SyncManager():
log.critical("could not start server")
raise RuntimeError("could not start server")
if self._pid != os.getpid():
log.critical("do not run JobManager_Server.start() in a subprocess")
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
if (self.numjobs - self.numresults) != len(self.args_dict):
log.debug("numjobs: %s\n"+
"numresults: %s\n"+
log.debug("numjobs: %s\n" +
"numresults: %s\n" +
"len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict))
log.critical("inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
raise RuntimeError("inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
log.critical(
"inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
raise RuntimeError(
"inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
if self.numjobs == 0:
log.warning("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q")
log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q")
return
else:
log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port, self.numjobs)
log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port,
self.numjobs)
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
log.debug("start processing incoming results")
info_line = progress.StringValue(num_of_bytes=100)
log.debug("ready for processing incoming results")
self.print_jm_ready()
with progress.ProgressBarFancy(count = self._numresults,
max_count = self._numjobs,
interval = self.msg_interval,
speed_calc_cycles = self.speed_calc_cycles,
sigint = 'ign',
sigterm = 'ign',
info_line = info_line) as stat:
def join(self):
"""
starts to loop over incoming results
When finished, or on exception call stop() afterwards to shut down gracefully.
"""
info_line = progress.StringValue(num_of_bytes=100)
with progress.ProgressBarFancy(count=self._numresults,
max_count=self._numjobs,
interval=self.msg_interval,
speed_calc_cycles=self.speed_calc_cycles,
sigint='ign',
sigterm='ign',
info_line=info_line) as stat:
if not self.hide_progress:
stat.start()
while (len(self.args_dict) - self.fail_q.qsize()) > 0:
info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}".format(self.result_q.qsize(),
self.job_q.qsize(),
self.numresults).encode('utf-8')
info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}".format(self.result_q.qsize(),
self.job_q.qsize(),
self.numresults).encode(
'utf-8')
# allows for update of the info line
try:
arg, result = self.result_q.get(timeout = self.msg_interval)
arg, result = self.result_q.get(timeout=self.msg_interval)
except queue.Empty:
continue
bf_arg = bf.dump(arg)
if bf_arg not in self.args_dict:
log.warning("got an argument that is not listed in the args_dict (probably crunshed twice, uups) -> will be skipped")
log.warning(
"got an argument that is not listed in the args_dict (probably crunshed twice, uups) -> will be skipped")
del arg
del result
continue
del self.args_dict[bf_arg]
self.numresults += 1
self.process_new_result(arg, result)
if not self.keep_new_result_in_memory:
del arg
del result
log.debug("wait %ss before trigger clean up", self.__wait_before_stop)
time.sleep(self.__wait_before_stop)
def start(self):
self.bring_him_up()
self.join()
class JobManager_Local(JobManager_Server):
@ -1301,14 +1332,15 @@ class JobManager_Local(JobManager_Server):
msg_interval = 1,
fname_dump = 'auto',
speed_calc_cycles = 50):
super(JobManager_Local, self).__init__(authkey = authkey,
const_arg = const_arg,
port = port,
verbose = verbose,
msg_interval = msg_interval,
fname_dump = fname_dump,
speed_calc_cycles = speed_calc_cycles)
JobManager_Server.__init__(self,
authkey = authkey,
const_arg = const_arg,
port = port,
verbose = verbose,
msg_interval = msg_interval,
fname_dump = fname_dump,
speed_calc_cycles = speed_calc_cycles)
self.client_class = client_class
self.port = port
@ -1324,15 +1356,11 @@ class JobManager_Local(JobManager_Server):
port,
client_class,
nproc = 0,
nice = 19,
delay = 1,
nice = 19,
verbose = None,
show_statusbar_for_jobs = False,
show_counter_only = False): # ignore signal, because any signal bringing the server down
# will cause an error in the client server communication
# therefore the clients will also quit
Signal_to_SIG_IGN(signals=[signal.SIGINT, signal.SIGTERM])
time.sleep(delay)
show_counter_only = False):
client = client_class(server='localhost',
authkey = authkey,
port = port,
@ -1340,8 +1368,10 @@ class JobManager_Local(JobManager_Server):
nice = nice,
verbose = verbose,
show_statusbar_for_jobs = show_statusbar_for_jobs,
show_counter_only = show_counter_only)
show_counter_only = show_counter_only,
use_special_SIG_INT_handler = False)
Signal_to_sys_exit(signals=[signal.SIGINT, signal.SIGTERM])
client.start()
@ -1352,12 +1382,14 @@ class JobManager_Local(JobManager_Server):
self.client_class,
self.nproc,
self.niceness_clients,
self.delay,
self.verbose_client,
self.show_statusbar_for_jobs,
self.show_counter_only))
JobManager_Local.bring_him_up(self)
p_client.start()
super(JobManager_Local, self).start()
JobManager_Local.join(self)
progress.check_process_termination(p_client,
prefix = 'local_client',
timeout = 2)

View file

@ -846,18 +846,18 @@ if __name__ == "__main__":
# test_start_server_with_no_args,
# test_start_server,
# test_client,
test_jobmanager_basic,
test_jobmanager_server_signals,
test_shutdown_server_while_client_running,
test_shutdown_client,
test_check_fail,
test_jobmanager_read_old_stat,
test_client_status,
# test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
# test_client_status,
test_jobmanager_local,
test_start_server_on_used_port,
test_shared_const_arg,
test_digest_rejected,
test_hum_size,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
lambda : print("END")
]