added speed to status infoline

This commit is contained in:
Richard Hartmann 2019-06-03 21:48:37 +02:00
parent 6922a596ff
commit 1bf01ef098

View file

@ -85,8 +85,6 @@ myQueue = queue.Queue
AuthenticationError = mp.AuthenticationError
def humanize_size(size_in_bytes):
"""convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.
"""
thr = 99
scales = [1024, 1024, 1024]
units = ['k', 'M', 'G', 'T']
@ -898,6 +896,7 @@ class ClosableQueue_Data(object):
self.log_prefix = 'ClosableQueue:{} - '.format(name)
else:
self.log_prefix = ''
self.bytes_recieved = 0
def new_conn(self):
conn1, conn2 = mp.Pipe()
@ -923,7 +922,9 @@ class ClosableQueue_Data(object):
try:
cmd = conn.recv()
log.debug("{}listener got cmd '{}'".format(self.log_prefix, cmd))
args = conn.recv()
args_bytes = conn.recv_bytes()
self.bytes_recieved += len(args_bytes)
args = pickle.loads(args_bytes)
if cmd == '#put':
self._exec_cmd(self.put, conn, args)
elif cmd == '#get':
@ -956,6 +957,7 @@ class ClosableQueue_Data(object):
with self.lock:
self.is_closed = True
log.debug("{}queue closed".format(self.log_prefix))
class ClosableQueue(object):
def __init__(self, name=None):
@ -1000,6 +1002,12 @@ class ClosableQueue(object):
def close(self):
return self._communicate('#close', tuple())
def get_bytes_recieved(self):
if self.data:
return self.data.bytes_recieved
else:
raise RuntimeError("client side has no counter for bytes_revieved")
class ArgsContainerQueue(object):
def __init__(self, put_conn, get_conn):
self.put_conn = put_conn
@ -1755,12 +1763,17 @@ class JobManager_Server(object):
if not self.hide_progress:
self.stat.start()
old_bytes = self.result_q.get_bytes_recieved()
old_time = time.time()
while numresults.value < numjobs.value:
numjobs.value = self.job_q.put_items()
failqsize = self.fail_q.qsize()
jobqsize = self.job_q.qsize()
markeditems = self.job_q.marked_items()
numresults.value = failqsize + markeditems
bytes_recieved = self.result_q.get_bytes_recieved()
curr_time = time.time()
data_speed = humanize_size((bytes_recieved - old_bytes) / (curr_time - old_time))
if (self.timeout is not None):
time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds())
if time_left < 0:
@ -1768,22 +1781,24 @@ class JobManager_Server(object):
self.stat.stop()
log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout))
break
info_line.value = ("res_q size:{}, jobs: rem.:{}, "+
info_line.value = ("res_q size:{} {}/s, jobs: rem.:{}, "+
"done:{}, failed:{}, prog.:{}, "+
"timeout in:{}s").format(self.result_q.qsize(),
"timeout in:{}s").format(self.result_q.qsize(), data_speed,
jobqsize,
markeditems,
failqsize,
numjobs.value - numresults.value - jobqsize,
time_left).encode('utf-8')
else:
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
"done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(),
info_line.value = ("result_q size:{} {}/s, jobs: remaining:{}, "+
"done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), data_speed,
jobqsize,
markeditems,
failqsize,
numjobs.value - numresults.value - jobqsize).encode('utf-8')
log.info("infoline {}".format(info_line.value))
old_bytes = bytes_recieved
old_time = time
# allows for update of the info line
try:
arg, result = self.result_q.get(timeout=self.msg_interval)