diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 04fa4a2..5202be9 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -213,7 +213,8 @@ class JobManager_Client(object): log_level = logging.WARNING, ask_on_sigterm = True, nthreads = 1, - status_output_for_srun = False): + status_output_for_srun = False, + emtpy_lines_at_end = 0): """ server [string] - ip address or hostname where the JobManager_Server is running @@ -334,6 +335,8 @@ class JobManager_Client(object): self.ask_on_sigterm = ask_on_sigterm self.status_output_for_srun = status_output_for_srun + self.emtpy_lines_at_end = emtpy_lines_at_end + print("ADD LINES ABOVE", self.emtpy_lines_at_end) def connect(self): if self.manager_objects is None: @@ -494,7 +497,7 @@ class JobManager_Client(object): try: log.debug("wait until local result q is almost empty") while local_result_q.qsize() > nproc: - time.sleep(0.1) + time.sleep(0.5) log.debug("done waiting, call job_q_get") with sig_delay([signal.SIGTERM]): @@ -520,17 +523,18 @@ class JobManager_Client(object): log.error("Error when calling 'job_q_get'") handle_unexpected_queue_error(e) break - tg_1 = time.time() + tg_1 = time.perf_counter() time_queue += (tg_1-tg_0) # try to process the retrieved argument try: - tf_0 = time.time() + tf_0 = time.perf_counter() log.debug("START crunching _func") res = _func(arg, const_arg, c, m) log.debug("DONE crunching _func") - tf_1 = time.time() - time_calc += (tf_1-tf_0) + tf_1 = time.perf_counter() + time_calc_this = (tf_1-tf_0) + time_calc += time_calc_this # handle SystemExit in outer try ... except except SystemExit as e: raise e @@ -571,12 +575,14 @@ class JobManager_Client(object): # - try to send the result back to the server else: try: - tp_0 = time.time() + tp_0 = time.perf_counter() with sig_delay([signal.SIGTERM]): - bin_data = pickle.dumps((arg, res)) + bin_data = pickle.dumps({'arg': arg, + 'res': res, + 'time': time_calc_this}) local_result_q.put(bin_data) log.debug('put result to local result_q, done!') - tp_1 = time.time() + tp_1 = time.perf_counter() time_queue += (tp_1-tp_0) # handle SystemExit in outer try ... except @@ -743,9 +749,9 @@ class JobManager_Client(object): data = local_result_q.get() log.debug("result_q client forward...\n{}".format(data)) with result_q_put_pending_lock: - t0 = time.time() + t0 = time.perf_counter() result_q_put(data) - t1 = time.time() + t1 = time.perf_counter() with bytes_send.get_lock(): bytes_send.value += len(data) with time_result_q_put.get_lock(): @@ -791,7 +797,8 @@ class JobManager_Client(object): prepend = prepend, sigint = 'ign', sigterm = 'ign', - info_line = infoline) as self.pbc : + info_line = infoline, + emtpy_lines_at_end = self.emtpy_lines_at_end) as self.pbc: if (not self.hide_progress) and self.show_statusbar_for_jobs: self.pbc.start() @@ -1506,6 +1513,10 @@ class JobManager_Server(object): self.jm_ready_callback = jm_ready_callback + self.single_job_max_time = 0 + self.single_job_min_time = 10**10 + self.single_job_acu_time = 0 + self.single_job_cnt = 0 @staticmethod @@ -1650,10 +1661,18 @@ class JobManager_Server(object): l = len(id1) id2 = ' '*l + "| " + print() + dt = datetime.now() - self.start_time + print("{} start at {} | runtime {:.3e}s".format(id1, self.start_time, dt.seconds)) + print("{}total number of jobs : {}".format(id1, all_jobs)) print("{} processed : {}".format(id2, all_processed)) print("{} succeeded : {}".format(id2, succeeded)) print("{} failed : {}".format(id2, failed)) + if self.single_job_cnt > 0: + print("{} timing in sec: min {:.3e} | max {:.3e} | avr {:.3e}".format(id2, self.single_job_min_time, + self.single_job_max_time, + self.single_job_acu_time / self.single_job_cnt)) all_not_processed = all_jobs - all_processed not_queried = self.number_of_jobs() @@ -1816,11 +1835,10 @@ class JobManager_Server(object): log.debug("at start: number of results: {}".format(numresults.value)) speed_q = myQueue() - curr_time = time.time() + time_stamp = time.perf_counter() bytes_recieved = 0 for i in range(15): - speed_q.put((bytes_recieved, curr_time)) - + speed_q.put((bytes_recieved, time_stamp)) with progress.ProgressBarFancy(count = numresults, max_count = numjobs, @@ -1841,12 +1859,12 @@ class JobManager_Server(object): jobqsize = self.number_of_jobs() markeditems = self.job_q.marked_items() numresults.value = failqsize + markeditems - if (time.time() - curr_time) > self.msg_interval: - old_bytes, old_time = speed_q.get() + if (time.perf_counter() - time_stamp) > self.msg_interval: + old_bytes, old_time_stamp = speed_q.get() - curr_time = time.time() - speed_q.put((bytes_recieved, curr_time)) - data_speed = humanize_size((bytes_recieved - old_bytes) / (curr_time - old_time)) + time_stamp = time.perf_counter() + speed_q.put((bytes_recieved, time_stamp)) + data_speed = humanize_size((bytes_recieved - old_bytes) / (time_stamp - old_time_stamp)) if (self.timeout is not None): time_left = int(self.timeout - self.__wait_before_stop - (datetime.now() - self.start_time).total_seconds()) @@ -1874,19 +1892,25 @@ class JobManager_Server(object): # allows for update of the info line try: bin_data = self.result_q.get(timeout=self.msg_interval) - arg, result = pickle.loads(bin_data) + data_dict = pickle.loads(bin_data) bytes_recieved += len(bin_data) del bin_data except queue.Empty: continue # print("got arg", arg) - self.job_q.mark(arg) + arg = data_dict['arg'] + res = data_dict['res'] + single_job_time = data_dict['time'] + self.job_q.mark(data_dict['arg']) # print("has been marked!") - log.debug("received {}".format(arg)) - self.process_new_result(arg, result) + log.debug("received {}".format(data_dict['arg'])) + self.process_new_result(arg, res) + self.single_job_max_time = max(self.single_job_max_time, single_job_time) + self.single_job_min_time = min(self.single_job_min_time, single_job_time) + self.single_job_acu_time += single_job_time + self.single_job_cnt += 1 if not self.keep_new_result_in_memory: - del arg - del result + del data_dict self.stat = None @@ -2296,19 +2320,19 @@ class proxy_operation_decorator(object): log.debug("operation '{}' successfully executed") return res - t0 = time.time() + t0 = time.perf_counter() c = 0 reconnect_wait = self.reconnect_wait while True: reconnect_wait *= 1.2 - t1 = time.time() + t1 = time.perf_counter() check_if_host_is_reachable_unix_ping(adr = self.dest[0][0], timeout = self.ping_timeout, retry = self.ping_retry) - log.debug("ping time: {:.2f}s".format(time.time() - t1)) + log.debug("ping time: {:.2f}s".format(time.perf_counter() - t1)) log.debug("establish connection to %s", self.dest) try: - t1 = time.time() + t1 = time.perf_counter() self.proxy._connect() except Exception as e: log.warning("establishing connection to %s FAILED due to '%s'", self.dest, type(e)) @@ -2320,11 +2344,11 @@ class proxy_operation_decorator(object): log.info("wait %s seconds and retry", reconnect_wait) time.sleep(reconnect_wait) continue - log.debug("self.proxy._connect() time: {:.2f}s".format(time.time() - t1)) + log.debug("self.proxy._connect() time: {:.2f}s".format(time.perf_counter() - t1)) log.debug("execute operation '%s' -> %s", self.operation, self.dest) try: - t1 = time.time() + t1 = time.perf_counter() res = self.o(*args, **kwargs) except queue.Empty as e: log.info("operation '%s' -> %s FAILED due to '%s'", self.operation, self.dest, type(e)) @@ -2351,8 +2375,8 @@ class proxy_operation_decorator(object): else: handler_unexpected_error(e) else: # SUCCESS -> return True - log.debug("self.o(*args, **kwargs) time: {:.2f}s".format(time.time() - t1)) - log.debug("operation '{}' successfully executed (overall time {:.2f}s)".format(self.operation, time.time() - t0)) + log.debug("self.o(*args, **kwargs) time: {:.2f}s".format(time.perf_counter() - t1)) + log.debug("operation '{}' successfully executed (overall time {:.2f}s)".format(self.operation, time.perf_counter() - t0)) return res log.debug("close connection to %s", self.dest) diff --git a/jobmanager/ode_wrapper.py b/jobmanager/ode_wrapper.py index e0587af..54aedd5 100644 --- a/jobmanager/ode_wrapper.py +++ b/jobmanager/ode_wrapper.py @@ -231,10 +231,10 @@ def integrate_cplx(c, t0, t1, N, f, args, x0, integrator, res_dim=None, x_to_res log.info("integration summary\n"+ - "integration time {:.2g}ms ({:.2%})\n".format(t_int*1000, t_int / (t_int + t_conv))+ - " f_dot eval {:.2g}ms ({:.2%})\n".format(time_as_list[0]*1000, time_as_list[0] / (t_int + t_conv))+ - " f_dot eval cnt {} -> average time per eval {:.2g}ms\n".format(time_as_list[1], time_as_list[0]*1000 / time_as_list[1]) + - "data conversion time {:.2g}ms ({:.2%})\n".format(t_conv*1000, t_conv / (t_int + t_conv))) + "integration time {:.3g}ms ({:.2%})\n".format(t_int*1000, t_int / (t_int + t_conv))+ + " f_dot eval {:.3g}ms ({:.2%})\n".format(time_as_list[0]*1000, time_as_list[0] / (t_int + t_conv))+ + " f_dot eval cnt {} -> average time per eval {:.3g}ms\n".format(time_as_list[1], time_as_list[0]*1000 / time_as_list[1]) + + "data conversion time {:.3g}ms ({:.2%})\n".format(t_conv*1000, t_conv / (t_int + t_conv))) return t, x, None def integrate_real(c, t0, t1, N, f, args, x0, integrator, verbose=0, res_dim=None, x_to_res=None, **kwargs):