changed time.time() to time.perf_counter()

This commit is contained in:
Richard Hartmann 2021-02-11 17:11:39 +01:00
parent da6e1d74b6
commit 2bdaba2367
2 changed files with 62 additions and 38 deletions

View file

@ -213,7 +213,8 @@ class JobManager_Client(object):
log_level = logging.WARNING,
ask_on_sigterm = True,
nthreads = 1,
status_output_for_srun = False):
status_output_for_srun = False,
emtpy_lines_at_end = 0):
"""
server [string] - ip address or hostname where the JobManager_Server is running
@ -334,6 +335,8 @@ class JobManager_Client(object):
self.ask_on_sigterm = ask_on_sigterm
self.status_output_for_srun = status_output_for_srun
self.emtpy_lines_at_end = emtpy_lines_at_end
print("ADD LINES ABOVE", self.emtpy_lines_at_end)
def connect(self):
if self.manager_objects is None:
@ -494,7 +497,7 @@ class JobManager_Client(object):
try:
log.debug("wait until local result q is almost empty")
while local_result_q.qsize() > nproc:
time.sleep(0.1)
time.sleep(0.5)
log.debug("done waiting, call job_q_get")
with sig_delay([signal.SIGTERM]):
@ -520,17 +523,18 @@ class JobManager_Client(object):
log.error("Error when calling 'job_q_get'")
handle_unexpected_queue_error(e)
break
tg_1 = time.time()
tg_1 = time.perf_counter()
time_queue += (tg_1-tg_0)
# try to process the retrieved argument
try:
tf_0 = time.time()
tf_0 = time.perf_counter()
log.debug("START crunching _func")
res = _func(arg, const_arg, c, m)
log.debug("DONE crunching _func")
tf_1 = time.time()
time_calc += (tf_1-tf_0)
tf_1 = time.perf_counter()
time_calc_this = (tf_1-tf_0)
time_calc += time_calc_this
# handle SystemExit in outer try ... except
except SystemExit as e:
raise e
@ -571,12 +575,14 @@ class JobManager_Client(object):
# - try to send the result back to the server
else:
try:
tp_0 = time.time()
tp_0 = time.perf_counter()
with sig_delay([signal.SIGTERM]):
bin_data = pickle.dumps((arg, res))
bin_data = pickle.dumps({'arg': arg,
'res': res,
'time': time_calc_this})
local_result_q.put(bin_data)
log.debug('put result to local result_q, done!')
tp_1 = time.time()
tp_1 = time.perf_counter()
time_queue += (tp_1-tp_0)
# handle SystemExit in outer try ... except
@ -743,9 +749,9 @@ class JobManager_Client(object):
data = local_result_q.get()
log.debug("result_q client forward...\n{}".format(data))
with result_q_put_pending_lock:
t0 = time.time()
t0 = time.perf_counter()
result_q_put(data)
t1 = time.time()
t1 = time.perf_counter()
with bytes_send.get_lock():
bytes_send.value += len(data)
with time_result_q_put.get_lock():
@ -791,7 +797,8 @@ class JobManager_Client(object):
prepend = prepend,
sigint = 'ign',
sigterm = 'ign',
info_line = infoline) as self.pbc :
info_line = infoline,
emtpy_lines_at_end = self.emtpy_lines_at_end) as self.pbc:
if (not self.hide_progress) and self.show_statusbar_for_jobs:
self.pbc.start()
@ -1506,6 +1513,10 @@ class JobManager_Server(object):
self.jm_ready_callback = jm_ready_callback
self.single_job_max_time = 0
self.single_job_min_time = 10**10
self.single_job_acu_time = 0
self.single_job_cnt = 0
@staticmethod
@ -1650,10 +1661,18 @@ class JobManager_Server(object):
l = len(id1)
id2 = ' '*l + "| "
print()
dt = datetime.now() - self.start_time
print("{} start at {} | runtime {:.3e}s".format(id1, self.start_time, dt.seconds))
print("{}total number of jobs : {}".format(id1, all_jobs))
print("{} processed : {}".format(id2, all_processed))
print("{} succeeded : {}".format(id2, succeeded))
print("{} failed : {}".format(id2, failed))
if self.single_job_cnt > 0:
print("{} timing in sec: min {:.3e} | max {:.3e} | avr {:.3e}".format(id2, self.single_job_min_time,
self.single_job_max_time,
self.single_job_acu_time / self.single_job_cnt))
all_not_processed = all_jobs - all_processed
not_queried = self.number_of_jobs()
@ -1816,11 +1835,10 @@ class JobManager_Server(object):
log.debug("at start: number of results: {}".format(numresults.value))
speed_q = myQueue()
curr_time = time.time()
time_stamp = time.perf_counter()
bytes_recieved = 0
for i in range(15):
speed_q.put((bytes_recieved, curr_time))
speed_q.put((bytes_recieved, time_stamp))
with progress.ProgressBarFancy(count = numresults,
max_count = numjobs,
@ -1841,12 +1859,12 @@ class JobManager_Server(object):
jobqsize = self.number_of_jobs()
markeditems = self.job_q.marked_items()
numresults.value = failqsize + markeditems
if (time.time() - curr_time) > self.msg_interval:
old_bytes, old_time = speed_q.get()
if (time.perf_counter() - time_stamp) > self.msg_interval:
old_bytes, old_time_stamp = speed_q.get()
curr_time = time.time()
speed_q.put((bytes_recieved, curr_time))
data_speed = humanize_size((bytes_recieved - old_bytes) / (curr_time - old_time))
time_stamp = time.perf_counter()
speed_q.put((bytes_recieved, time_stamp))
data_speed = humanize_size((bytes_recieved - old_bytes) / (time_stamp - old_time_stamp))
if (self.timeout is not None):
time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds())
@ -1874,19 +1892,25 @@ class JobManager_Server(object):
# allows for update of the info line
try:
bin_data = self.result_q.get(timeout=self.msg_interval)
arg, result = pickle.loads(bin_data)
data_dict = pickle.loads(bin_data)
bytes_recieved += len(bin_data)
del bin_data
except queue.Empty:
continue
# print("got arg", arg)
self.job_q.mark(arg)
arg = data_dict['arg']
res = data_dict['res']
single_job_time = data_dict['time']
self.job_q.mark(data_dict['arg'])
# print("has been marked!")
log.debug("received {}".format(arg))
self.process_new_result(arg, result)
log.debug("received {}".format(data_dict['arg']))
self.process_new_result(arg, res)
self.single_job_max_time = max(self.single_job_max_time, single_job_time)
self.single_job_min_time = min(self.single_job_min_time, single_job_time)
self.single_job_acu_time += single_job_time
self.single_job_cnt += 1
if not self.keep_new_result_in_memory:
del arg
del result
del data_dict
self.stat = None
@ -2296,19 +2320,19 @@ class proxy_operation_decorator(object):
log.debug("operation '{}' successfully executed")
return res
t0 = time.time()
t0 = time.perf_counter()
c = 0
reconnect_wait = self.reconnect_wait
while True:
reconnect_wait *= 1.2
t1 = time.time()
t1 = time.perf_counter()
check_if_host_is_reachable_unix_ping(adr = self.dest[0][0],
timeout = self.ping_timeout,
retry = self.ping_retry)
log.debug("ping time: {:.2f}s".format(time.time() - t1))
log.debug("ping time: {:.2f}s".format(time.perf_counter() - t1))
log.debug("establish connection to %s", self.dest)
try:
t1 = time.time()
t1 = time.perf_counter()
self.proxy._connect()
except Exception as e:
log.warning("establishing connection to %s FAILED due to '%s'", self.dest, type(e))
@ -2320,11 +2344,11 @@ class proxy_operation_decorator(object):
log.info("wait %s seconds and retry", reconnect_wait)
time.sleep(reconnect_wait)
continue
log.debug("self.proxy._connect() time: {:.2f}s".format(time.time() - t1))
log.debug("self.proxy._connect() time: {:.2f}s".format(time.perf_counter() - t1))
log.debug("execute operation '%s' -> %s", self.operation, self.dest)
try:
t1 = time.time()
t1 = time.perf_counter()
res = self.o(*args, **kwargs)
except queue.Empty as e:
log.info("operation '%s' -> %s FAILED due to '%s'", self.operation, self.dest, type(e))
@ -2351,8 +2375,8 @@ class proxy_operation_decorator(object):
else:
handler_unexpected_error(e)
else: # SUCCESS -> return True
log.debug("self.o(*args, **kwargs) time: {:.2f}s".format(time.time() - t1))
log.debug("operation '{}' successfully executed (overall time {:.2f}s)".format(self.operation, time.time() - t0))
log.debug("self.o(*args, **kwargs) time: {:.2f}s".format(time.perf_counter() - t1))
log.debug("operation '{}' successfully executed (overall time {:.2f}s)".format(self.operation, time.perf_counter() - t0))
return res
log.debug("close connection to %s", self.dest)

View file

@ -231,10 +231,10 @@ def integrate_cplx(c, t0, t1, N, f, args, x0, integrator, res_dim=None, x_to_res
log.info("integration summary\n"+
"integration time {:.2g}ms ({:.2%})\n".format(t_int*1000, t_int / (t_int + t_conv))+
" f_dot eval {:.2g}ms ({:.2%})\n".format(time_as_list[0]*1000, time_as_list[0] / (t_int + t_conv))+
" f_dot eval cnt {} -> average time per eval {:.2g}ms\n".format(time_as_list[1], time_as_list[0]*1000 / time_as_list[1]) +
"data conversion time {:.2g}ms ({:.2%})\n".format(t_conv*1000, t_conv / (t_int + t_conv)))
"integration time {:.3g}ms ({:.2%})\n".format(t_int*1000, t_int / (t_int + t_conv))+
" f_dot eval {:.3g}ms ({:.2%})\n".format(time_as_list[0]*1000, time_as_list[0] / (t_int + t_conv))+
" f_dot eval cnt {} -> average time per eval {:.3g}ms\n".format(time_as_list[1], time_as_list[0]*1000 / time_as_list[1]) +
"data conversion time {:.3g}ms ({:.2%})\n".format(t_conv*1000, t_conv / (t_int + t_conv)))
return t, x, None
def integrate_real(c, t0, t1, N, f, args, x0, integrator, verbose=0, res_dim=None, x_to_res=None, **kwargs):