From 1bf01ef09882e52fd15a08ccf4c9370c973c975e Mon Sep 17 00:00:00 2001 From: Richard Hartmann Date: Mon, 3 Jun 2019 21:48:37 +0200 Subject: [PATCH] added speed to status infoline --- jobmanager/jobmanager.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 6d9e698..b928a27 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -85,8 +85,6 @@ myQueue = queue.Queue AuthenticationError = mp.AuthenticationError def humanize_size(size_in_bytes): - """convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero. - """ thr = 99 scales = [1024, 1024, 1024] units = ['k', 'M', 'G', 'T'] @@ -898,6 +896,7 @@ class ClosableQueue_Data(object): self.log_prefix = 'ClosableQueue:{} - '.format(name) else: self.log_prefix = '' + self.bytes_recieved = 0 def new_conn(self): conn1, conn2 = mp.Pipe() @@ -923,7 +922,9 @@ class ClosableQueue_Data(object): try: cmd = conn.recv() log.debug("{}listener got cmd '{}'".format(self.log_prefix, cmd)) - args = conn.recv() + args_bytes = conn.recv_bytes() + self.bytes_recieved += len(args_bytes) + args = pickle.loads(args_bytes) if cmd == '#put': self._exec_cmd(self.put, conn, args) elif cmd == '#get': @@ -956,6 +957,7 @@ class ClosableQueue_Data(object): with self.lock: self.is_closed = True log.debug("{}queue closed".format(self.log_prefix)) + class ClosableQueue(object): def __init__(self, name=None): @@ -1000,6 +1002,12 @@ class ClosableQueue(object): def close(self): return self._communicate('#close', tuple()) + def get_bytes_recieved(self): + if self.data: + return self.data.bytes_recieved + else: + raise RuntimeError("client side has no counter for bytes_revieved") + class ArgsContainerQueue(object): def __init__(self, put_conn, get_conn): self.put_conn = put_conn @@ -1755,12 +1763,17 @@ class JobManager_Server(object): if not self.hide_progress: self.stat.start() + old_bytes = self.result_q.get_bytes_recieved() + old_time = time.time() while numresults.value < numjobs.value: numjobs.value = self.job_q.put_items() failqsize = self.fail_q.qsize() jobqsize = self.job_q.qsize() markeditems = self.job_q.marked_items() numresults.value = failqsize + markeditems + bytes_recieved = self.result_q.get_bytes_recieved() + curr_time = time.time() + data_speed = humanize_size((bytes_recieved - old_bytes) / (curr_time - old_time)) if (self.timeout is not None): time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds()) if time_left < 0: @@ -1768,22 +1781,24 @@ class JobManager_Server(object): self.stat.stop() log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout)) break - info_line.value = ("res_q size:{}, jobs: rem.:{}, "+ + info_line.value = ("res_q size:{} {}/s, jobs: rem.:{}, "+ "done:{}, failed:{}, prog.:{}, "+ - "timeout in:{}s").format(self.result_q.qsize(), + "timeout in:{}s").format(self.result_q.qsize(), data_speed, jobqsize, markeditems, failqsize, numjobs.value - numresults.value - jobqsize, time_left).encode('utf-8') else: - info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ - "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), + info_line.value = ("result_q size:{} {}/s, jobs: remaining:{}, "+ + "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), data_speed, jobqsize, markeditems, failqsize, numjobs.value - numresults.value - jobqsize).encode('utf-8') log.info("infoline {}".format(info_line.value)) + old_bytes = bytes_recieved + old_time = time # allows for update of the info line try: arg, result = self.result_q.get(timeout=self.msg_interval)