diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 62d2f59..ff1029d 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -1291,7 +1291,8 @@ class JobManager_Server(object): hide_progress = False, show_statistics = True, job_q_on_disk = False, - job_q_on_disk_path = '.'): + job_q_on_disk_path = '.', + timeout = None): """ authkey [string] - authentication key used by the SyncManager. Server and Client must have the same authkey. @@ -1374,6 +1375,9 @@ class JobManager_Server(object): self.stat = None + self.timeout = timeout + self.start_time = datetime.now() + @staticmethod def _check_bind(host, port): @@ -1680,13 +1684,29 @@ class JobManager_Server(object): failqsize = self.fail_q.qsize() jobqsize = self.job_q.qsize() markeditems = self.job_q.marked_items() - numresults.value = failqsize + markeditems - info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ - "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), - jobqsize, - markeditems, - failqsize, - self.numjobs.value - numresults.value - jobqsize).encode('utf-8') + numresults.value = failqsize + markeditems + if self.timeout: + time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds()) + if time_left < 0: + if self.stat: + self.stat.stop() + log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout)) + break + info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ + "done:{}, failed:{}, in progress:{}, "+ + "timeout in:{}s").format(self.result_q.qsize(), + jobqsize, + markeditems, + failqsize, + self.numjobs.value - numresults.value - jobqsize, + time_left).encode('utf-8') + else: + info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ + "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), + jobqsize, + markeditems, + failqsize, + self.numjobs.value - numresults.value - jobqsize).encode('utf-8') log.info("infoline {}".format(info_line.value)) # allows for update of the info line try: