diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index ce564b7..1f7df3d 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -394,7 +394,8 @@ class JobManager_Client(object): host, port, authkey, - nthreads): + nthreads, + nproc): """ the wrapper spawned nproc times calling and handling self.func """ @@ -460,7 +461,7 @@ class JobManager_Client(object): tg_0 = time.time() try: log.debug("wait until local result q is almost empty") - while local_result_q.qsize() > 1: + while local_result_q.qsize() > nproc: time.sleep(0.1) log.debug("done waiting, call job_q_get") @@ -764,7 +765,8 @@ class JobManager_Client(object): self.server, # host self.port, # port self.authkey, # authkey - self.nthreads)) + self.nthreads, + self.nproc)) self.procs.append(p) @@ -1787,7 +1789,7 @@ class JobManager_Server(object): self.stat.stop() log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout)) break - info_line.value = ("res_q size:{} {}/s {}, jobs: rem.:{}, "+ + info_line.value = ("res_q #{} {}/s {}|rem.:{}, "+ "done:{}, failed:{}, prog.:{}, "+ "timeout in:{}s").format(self.result_q.qsize(), data_speed, humanize_size(bytes_recieved), jobqsize, @@ -1796,12 +1798,12 @@ class JobManager_Server(object): numjobs.value - numresults.value - jobqsize, time_left).encode('utf-8') else: - info_line.value = ("result_q size:{} {}/s {}, jobs: remaining:{}, "+ - "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), data_speed, humanize_size(bytes_recieved), - jobqsize, - markeditems, - failqsize, - numjobs.value - numresults.value - jobqsize).encode('utf-8') + info_line.value = ("res_q #{} {}/s {}|rem.:{}, "+ + "done:{}, failed:{}, prog.:{}").format(self.result_q.qsize(), data_speed, humanize_size(bytes_recieved), + jobqsize, + markeditems, + failqsize, + numjobs.value - numresults.value - jobqsize).encode('utf-8') log.info("infoline {}".format(info_line.value)) # allows for update of the info line try: