mirror of
https://github.com/vale981/jobmanager
synced 2025-03-04 17:31:39 -05:00
set arg to None when sysexit while in job_q_get -> no push back on fail_q
This commit is contained in:
parent
d70649c27f
commit
8df19ca344
1 changed files with 25 additions and 20 deletions
|
@ -456,26 +456,27 @@ class JobManager_Client(object):
|
|||
while njobs != 0:
|
||||
njobs -= 1
|
||||
|
||||
# try to get an item from the job_q
|
||||
try:
|
||||
tg_0 = time.time()
|
||||
# try to get an item from the job_q
|
||||
tg_0 = time.time()
|
||||
try:
|
||||
arg = job_q_get(block = True, timeout = job_q_get_timeout)
|
||||
tg_1 = time.time()
|
||||
time_queue += (tg_1-tg_0)
|
||||
|
||||
# regular case, just stop working when empty job_q was found
|
||||
except queue.Empty:
|
||||
log.info("finds empty job queue, processed %s jobs", cnt)
|
||||
break
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
arg = None
|
||||
log.warning('getting arg from job_q failed due to SystemExit')
|
||||
raise e
|
||||
# job_q.get failed -> server down?
|
||||
except Exception as e:
|
||||
arg = None
|
||||
log.error("Error when calling 'job_q_get'")
|
||||
handle_unexpected_queue_error(e)
|
||||
break
|
||||
tg_1 = time.time()
|
||||
time_queue += (tg_1-tg_0)
|
||||
|
||||
# try to process the retrieved argument
|
||||
try:
|
||||
|
@ -548,20 +549,23 @@ class JobManager_Client(object):
|
|||
# note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the
|
||||
# default signal handlers
|
||||
except SystemExit:
|
||||
log.warning("SystemExit, quit processing, reinsert current argument, please wait")
|
||||
log.debug("put arg back to local job_q")
|
||||
try:
|
||||
local_job_q.put(arg)
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
log.error("puting arg back to local job_q failed due to SystemExit")
|
||||
raise e
|
||||
# fail_q.put failed -> server down?
|
||||
except Exception as e:
|
||||
log.error("puting arg back to local job_q failed due to %s", type(e))
|
||||
handle_unexpected_queue_error(e)
|
||||
if arg is None:
|
||||
log.warning("SystemExit, quit processing, no argument to reinsert")
|
||||
else:
|
||||
log.debug("putting arg back to local job_q was successful")
|
||||
log.warning("SystemExit, quit processing, reinsert current argument, please wait")
|
||||
log.debug("put arg back to local job_q")
|
||||
try:
|
||||
local_job_q.put(arg)
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
log.error("puting arg back to local job_q failed due to SystemExit")
|
||||
raise e
|
||||
# fail_q.put failed -> server down?
|
||||
except Exception as e:
|
||||
log.error("puting arg back to local job_q failed due to %s", type(e))
|
||||
handle_unexpected_queue_error(e)
|
||||
else:
|
||||
log.debug("putting arg back to local job_q was successful")
|
||||
|
||||
try:
|
||||
sta = progress.humanize_time(time_calc / cnt)
|
||||
|
@ -589,7 +593,6 @@ class JobManager_Client(object):
|
|||
|
||||
if not self.connected:
|
||||
raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)")
|
||||
|
||||
|
||||
log.info("STARTING CLIENT\nserver:%s authkey:%s port:%s num proc:%s", self.server, self.authkey.decode(), self.port, self.nproc)
|
||||
|
||||
|
@ -607,6 +610,8 @@ class JobManager_Client(object):
|
|||
|
||||
if not self.show_counter_only:
|
||||
m_set_by_function = m_progress
|
||||
else:
|
||||
m_progress = None
|
||||
|
||||
prepend = []
|
||||
infoline = progress.StringValue(num_of_bytes=12)
|
||||
|
|
Loading…
Add table
Reference in a new issue