emergency dump for local result_q

This commit is contained in:
Richard Hartmann 2016-10-01 00:12:03 +02:00
parent 4bccdb3fa4
commit 773ddc788c
2 changed files with 21 additions and 13 deletions

View file

@ -501,20 +501,20 @@ class JobManager_Client(object):
with open(fname, 'w') as f:
traceback.print_exception(etype=err, value=val, tb=trb, file=f)
log.debug("try to send send failed arg to fail_q")
log.debug("put arg to local fail_q")
try:
local_fail_q.put((arg, err.__name__, hostname))
# handle SystemExit in outer try ... except
except SystemExit as e:
log.warning('sending arg to fail_q failed due to SystemExit')
log.warning('putting arg to local fail_q failed due to SystemExit')
raise e
# fail_q.put failed -> server down?
except Exception as e:
log.error('sending arg to fail_q failed')
log.error('putting arg to local fail_q failed')
handle_unexpected_queue_error(e)
break
else:
log.debug('sending arg to fail_q was successful')
log.debug('putting arg to local fail_q was successful')
# processing the retrieved arguments succeeded
# - try to send the result back to the server
@ -527,12 +527,12 @@ class JobManager_Client(object):
# handle SystemExit in outer try ... except
except SystemExit as e:
log.warning('sending result to result_q failed due to SystemExit')
log.warning('putting result to local result_q failed due to SystemExit')
raise e
except Exception as e:
log.error('sending result to result_q failed due to %s', type(e))
emergency_dump(arg, res, emergency_dump_path)
log.error('putting result to local result_q failed due to %s', type(e))
emergency_dump(arg, res, emergency_dump_path, host, port, authkey)
handle_unexpected_queue_error(e)
break
@ -547,19 +547,19 @@ class JobManager_Client(object):
# default signal handlers
except SystemExit:
log.warning("SystemExit, quit processing, reinsert current argument, please wait")
log.debug("try to put arg back to job_q")
log.debug("put arg back to local job_q")
try:
local_job_q.put(arg)
# handle SystemExit in outer try ... except
except SystemExit as e:
log.error("put arg back to job_q failed due to SystemExit")
log.error("puting arg back to local job_q failed due to SystemExit")
raise e
# fail_q.put failed -> server down?
except Exception as e:
log.error("put arg back to job_q failed due to %s", type(e))
log.error("puting arg back to local job_q failed due to %s", type(e))
handle_unexpected_queue_error(e)
else:
log.debug("putting arg back to job_q was successful")
log.debug("putting arg back to local job_q was successful")
try:
sta = progress.humanize_time(time_calc / cnt)
@ -760,7 +760,10 @@ class JobManager_Client(object):
log.debug("allow the thread thr_result_q_put to process items")
time.sleep(1)
else:
log.warning("the thread thr_result_q_put has died, can not process remaining items")
log.warning("the thread thr_result_q_put has died, dump remaining results")
while (not local_result_q.empty()):
arg, res = local_result_q.get()
emergency_dump(arg, res, self.emergency_dump_path, self.server, self.port, self.authkey)
break
while (not local_fail_q.empty()):

View file

@ -12,8 +12,10 @@ import logging
import datetime
import threading
from numpy import random
import pytest
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
@ -225,7 +227,10 @@ def test_jobmanager_static_client_call():
result_q_put_timeout = 1,
fail_q_put_timeout = 1)
jm_client.func(arg=1, const_arg=1)
@pytest.mark.skipif(sys.version_info.major == 2,
reason="causes unknown trouble")
def test_client():
global PORT
PORT += 1