mirror of
https://github.com/vale981/jobmanager
synced 2025-03-05 18:01:38 -05:00
work in progress on jobmanager module, to do: handle args_set and fail_q correctly
This commit is contained in:
parent
5c041bb2c4
commit
3fc03d4437
1 changed files with 19 additions and 11 deletions
|
@ -663,15 +663,13 @@ class JobManager_Server(object):
|
|||
if self._pid != pid:
|
||||
print(" NOTE: this routine was triggered as a subprocess which is NOT preferable!")
|
||||
|
||||
try:
|
||||
# self.p_reinsert_failure.start()
|
||||
|
||||
try:
|
||||
if self.verbose > 0:
|
||||
self.stat.start()
|
||||
if self.verbose > 1:
|
||||
print("PID {}: StatusBar started".format(self.stat.getpid()))
|
||||
|
||||
while len(self.args_set) > 0:
|
||||
while (len(self.args_set) - self.fail_q.qsize()) > 0:
|
||||
arg, result = self.result_q.get() #blocks until an item is available
|
||||
self.args_set.remove(arg)
|
||||
self.process_new_result(arg, result)
|
||||
|
@ -704,8 +702,6 @@ class JobManager_Server(object):
|
|||
pid = os.getpid()
|
||||
|
||||
self.stat.stop()
|
||||
# self.p_reinsert_failure.stop()
|
||||
# JobManager_Server.reinsert_failures(self.fail_q, self.job_q)
|
||||
|
||||
manager_pid = self.manager._process.pid
|
||||
|
||||
|
@ -904,22 +900,34 @@ class JobManager_Client(object):
|
|||
if verbose > 0:
|
||||
print("\nPID {}: EOFError, I guess the server went down, can't do anything, terminate now!".format(os.getpid()))
|
||||
return
|
||||
except:
|
||||
except SystemExit:
|
||||
if verbose > 0:
|
||||
print("\nPID {}: SystemExit, quit processing, reinsert current argument".format(os.getpid()))
|
||||
try:
|
||||
job_q.put(new_args, timeout=10)
|
||||
except queue.Full:
|
||||
if verbose > 0:
|
||||
print("\nPID {}: failed to reinsert argument, Server down? I quit!".format(os.getpid()))
|
||||
return
|
||||
else:
|
||||
err, val, trb = sys.exc_info()
|
||||
if verbose > 0:
|
||||
print("\nPID {}: caught exception {} -> report failure with arg {}".format(os.getpid(), err, new_args))
|
||||
print("\nPID {}: caught exception {}, report current argument has faild,".format(os.getpid(), err))
|
||||
print(" write traceback to file, continue processing next argument")
|
||||
|
||||
hostname = socket.gethostname()
|
||||
fail_q.put((new_args, err, hostname))
|
||||
|
||||
fname = 'traceback_args_{}_err_{}_{}.trb'.format(new_args, err, getDateForFileName(includePID=True))
|
||||
with open(fname, 'w') as f:
|
||||
traceback.print_tb(tb=trb, file=f)
|
||||
|
||||
return
|
||||
traceback.print_tb(tb=trb, file=f)
|
||||
|
||||
if verbose > 1:
|
||||
print("PID {}: JobManager_Client.__worker_func terminates".format(os.getpid()))
|
||||
|
||||
|
||||
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue