added stopEvent mechanism for JMServer join

This commit is contained in:
Richard Hartmann 2021-03-02 11:25:01 +01:00
parent 2bdaba2367
commit dcbb0ead64

View file

@ -214,7 +214,7 @@ class JobManager_Client(object):
ask_on_sigterm = True,
nthreads = 1,
status_output_for_srun = False,
emtpy_lines_at_end = 0):
emtpy_lines_at_end = 0):
"""
server [string] - ip address or hostname where the JobManager_Server is running
@ -336,7 +336,6 @@ class JobManager_Client(object):
self.ask_on_sigterm = ask_on_sigterm
self.status_output_for_srun = status_output_for_srun
self.emtpy_lines_at_end = emtpy_lines_at_end
print("ADD LINES ABOVE", self.emtpy_lines_at_end)
def connect(self):
if self.manager_objects is None:
@ -782,7 +781,7 @@ class JobManager_Client(object):
thr_update_infoline = threading.Thread(target=update_infoline, args=(infoline, local_result_q, bytes_send, time_result_q_put))
thr_update_infoline.daemon = True
thr_job_q_put.start()
thr_job_q_put.start()
thr_result_q_put.start()
thr_fail_q_put.start()
thr_update_infoline.start()
@ -1812,6 +1811,7 @@ class JobManager_Server(object):
else:
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
log.debug("ready for processing incoming results")
self.jm_ready_callback()
@ -1819,7 +1819,7 @@ class JobManager_Server(object):
with open(self.status_file, 'w') as f:
f.write('ready')
def join(self):
def join(self, stopEvent = None):
"""
starts to loop over incoming results
@ -1854,6 +1854,12 @@ class JobManager_Server(object):
data_speed = 0
while numresults.value < numjobs.value:
if stopEvent is not None:
if stopEvent.is_set():
log.info('received externally set stop event -> leave join loop')
break
numjobs.value = self.job_q.put_items()
failqsize = self.fail_q.qsize()
jobqsize = self.number_of_jobs()