diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index d18f820..b60745e 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -456,26 +456,27 @@ class JobManager_Client(object): while njobs != 0: njobs -= 1 - # try to get an item from the job_q - try: - tg_0 = time.time() + # try to get an item from the job_q + tg_0 = time.time() + try: arg = job_q_get(block = True, timeout = job_q_get_timeout) - tg_1 = time.time() - time_queue += (tg_1-tg_0) - # regular case, just stop working when empty job_q was found except queue.Empty: log.info("finds empty job queue, processed %s jobs", cnt) break # handle SystemExit in outer try ... except except SystemExit as e: + arg = None log.warning('getting arg from job_q failed due to SystemExit') raise e # job_q.get failed -> server down? except Exception as e: + arg = None log.error("Error when calling 'job_q_get'") handle_unexpected_queue_error(e) break + tg_1 = time.time() + time_queue += (tg_1-tg_0) # try to process the retrieved argument try: @@ -548,20 +549,23 @@ class JobManager_Client(object): # note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the # default signal handlers except SystemExit: - log.warning("SystemExit, quit processing, reinsert current argument, please wait") - log.debug("put arg back to local job_q") - try: - local_job_q.put(arg) - # handle SystemExit in outer try ... except - except SystemExit as e: - log.error("puting arg back to local job_q failed due to SystemExit") - raise e - # fail_q.put failed -> server down? - except Exception as e: - log.error("puting arg back to local job_q failed due to %s", type(e)) - handle_unexpected_queue_error(e) + if arg is None: + log.warning("SystemExit, quit processing, no argument to reinsert") else: - log.debug("putting arg back to local job_q was successful") + log.warning("SystemExit, quit processing, reinsert current argument, please wait") + log.debug("put arg back to local job_q") + try: + local_job_q.put(arg) + # handle SystemExit in outer try ... except + except SystemExit as e: + log.error("puting arg back to local job_q failed due to SystemExit") + raise e + # fail_q.put failed -> server down? + except Exception as e: + log.error("puting arg back to local job_q failed due to %s", type(e)) + handle_unexpected_queue_error(e) + else: + log.debug("putting arg back to local job_q was successful") try: sta = progress.humanize_time(time_calc / cnt) @@ -589,7 +593,6 @@ class JobManager_Client(object): if not self.connected: raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)") - log.info("STARTING CLIENT\nserver:%s authkey:%s port:%s num proc:%s", self.server, self.authkey.decode(), self.port, self.nproc) @@ -607,6 +610,8 @@ class JobManager_Client(object): if not self.show_counter_only: m_set_by_function = m_progress + else: + m_progress = None prepend = [] infoline = progress.StringValue(num_of_bytes=12)