From dcbb0ead64342283b6ef8862be155e9e858dad63 Mon Sep 17 00:00:00 2001 From: Richard Hartmann Date: Tue, 2 Mar 2021 11:25:01 +0100 Subject: [PATCH] added stopEvent mechanism for JMServer join --- jobmanager/jobmanager.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 5202be9..f0f85f2 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -214,7 +214,7 @@ class JobManager_Client(object): ask_on_sigterm = True, nthreads = 1, status_output_for_srun = False, - emtpy_lines_at_end = 0): + emtpy_lines_at_end = 0): """ server [string] - ip address or hostname where the JobManager_Server is running @@ -336,7 +336,6 @@ class JobManager_Client(object): self.ask_on_sigterm = ask_on_sigterm self.status_output_for_srun = status_output_for_srun self.emtpy_lines_at_end = emtpy_lines_at_end - print("ADD LINES ABOVE", self.emtpy_lines_at_end) def connect(self): if self.manager_objects is None: @@ -782,7 +781,7 @@ class JobManager_Client(object): thr_update_infoline = threading.Thread(target=update_infoline, args=(infoline, local_result_q, bytes_send, time_result_q_put)) thr_update_infoline.daemon = True - thr_job_q_put.start() + thr_job_q_put.start() thr_result_q_put.start() thr_fail_q_put.start() thr_update_infoline.start() @@ -1812,6 +1811,7 @@ class JobManager_Server(object): else: Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT]) + log.debug("ready for processing incoming results") self.jm_ready_callback() @@ -1819,7 +1819,7 @@ class JobManager_Server(object): with open(self.status_file, 'w') as f: f.write('ready') - def join(self): + def join(self, stopEvent = None): """ starts to loop over incoming results @@ -1854,6 +1854,12 @@ class JobManager_Server(object): data_speed = 0 while numresults.value < numjobs.value: + + if stopEvent is not None: + if stopEvent.is_set(): + log.info('received externally set stop event -> leave join loop') + break + numjobs.value = self.job_q.put_items() failqsize = self.fail_q.qsize() jobqsize = self.number_of_jobs()