mirror of
https://github.com/vale981/jobmanager
synced 2025-03-05 09:51:38 -05:00
added shutdown by timeout for server
This commit is contained in:
parent
d16d7f2532
commit
06b9b531dc
1 changed files with 28 additions and 8 deletions
|
@ -1291,7 +1291,8 @@ class JobManager_Server(object):
|
||||||
hide_progress = False,
|
hide_progress = False,
|
||||||
show_statistics = True,
|
show_statistics = True,
|
||||||
job_q_on_disk = False,
|
job_q_on_disk = False,
|
||||||
job_q_on_disk_path = '.'):
|
job_q_on_disk_path = '.',
|
||||||
|
timeout = None):
|
||||||
"""
|
"""
|
||||||
authkey [string] - authentication key used by the SyncManager.
|
authkey [string] - authentication key used by the SyncManager.
|
||||||
Server and Client must have the same authkey.
|
Server and Client must have the same authkey.
|
||||||
|
@ -1374,6 +1375,9 @@ class JobManager_Server(object):
|
||||||
|
|
||||||
self.stat = None
|
self.stat = None
|
||||||
|
|
||||||
|
self.timeout = timeout
|
||||||
|
self.start_time = datetime.now()
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _check_bind(host, port):
|
def _check_bind(host, port):
|
||||||
|
@ -1680,13 +1684,29 @@ class JobManager_Server(object):
|
||||||
failqsize = self.fail_q.qsize()
|
failqsize = self.fail_q.qsize()
|
||||||
jobqsize = self.job_q.qsize()
|
jobqsize = self.job_q.qsize()
|
||||||
markeditems = self.job_q.marked_items()
|
markeditems = self.job_q.marked_items()
|
||||||
numresults.value = failqsize + markeditems
|
numresults.value = failqsize + markeditems
|
||||||
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
|
if self.timeout:
|
||||||
"done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(),
|
time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds())
|
||||||
jobqsize,
|
if time_left < 0:
|
||||||
markeditems,
|
if self.stat:
|
||||||
failqsize,
|
self.stat.stop()
|
||||||
self.numjobs.value - numresults.value - jobqsize).encode('utf-8')
|
log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout))
|
||||||
|
break
|
||||||
|
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
|
||||||
|
"done:{}, failed:{}, in progress:{}, "+
|
||||||
|
"timeout in:{}s").format(self.result_q.qsize(),
|
||||||
|
jobqsize,
|
||||||
|
markeditems,
|
||||||
|
failqsize,
|
||||||
|
self.numjobs.value - numresults.value - jobqsize,
|
||||||
|
time_left).encode('utf-8')
|
||||||
|
else:
|
||||||
|
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
|
||||||
|
"done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(),
|
||||||
|
jobqsize,
|
||||||
|
markeditems,
|
||||||
|
failqsize,
|
||||||
|
self.numjobs.value - numresults.value - jobqsize).encode('utf-8')
|
||||||
log.info("infoline {}".format(info_line.value))
|
log.info("infoline {}".format(info_line.value))
|
||||||
# allows for update of the info line
|
# allows for update of the info line
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Add table
Reference in a new issue