From 662a7c11b5b4b6d2868d105eda4fc72afb9f948f Mon Sep 17 00:00:00 2001 From: cimatosa Date: Mon, 19 Sep 2016 17:06:43 +0200 Subject: [PATCH] major chages in the organization of the JM package, also added logging system to be able to handle subprocess logs -> this is going to be pypy version 0.2 --- jobmanager/__init__.py | 1 - jobmanager/jobmanager.py | 698 ++++++++--------- jobmanager/progress.py | 1571 -------------------------------------- tests/test_jobmanager.py | 6 +- tests/test_progress.py | 815 -------------------- 5 files changed, 309 insertions(+), 2782 deletions(-) delete mode 100644 jobmanager/progress.py delete mode 100644 tests/test_progress.py diff --git a/jobmanager/__init__.py b/jobmanager/__init__.py index ce38a1d..afb8bdd 100644 --- a/jobmanager/__init__.py +++ b/jobmanager/__init__.py @@ -34,7 +34,6 @@ from .jobmanager import * from . import clients from . import decorators -from . import progress from . import servers from . import ode_wrapper from . import binfootprint diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 3cf75af..72b9332 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -48,6 +48,25 @@ import traceback import warnings from . import binfootprint as bf +import logging + +# taken from here: https://mail.python.org/pipermail/python-list/2010-November/591474.html +class MultiLineFormatter(logging.Formatter): + def format(self, record): + _str = logging.Formatter.format(self, record) + header = str.split(record.message)[0] + _str = _str.replace('\n', '\n' + ' '*len(header)) + return _str + +default_log = logging.getLogger(__name__) +default_log.setLevel(logging.INFO) +cons_hand = logging.StreamHandler(stream = sys.stderr) +cons_hand.setLevel(logging.DEBUG) +fmt = MultiLineFormatter('%(asctime)s %(name)s %(levelname)s : %(message)s') +cons_hand.setFormatter(fmt) +default_log.addHandler(cons_hand) + + from datetime import datetime # This is a list of all python objects that will be imported upon @@ -55,9 +74,7 @@ from datetime import datetime __all__ = ["JobManager_Client", "JobManager_Local", "JobManager_Server", - "getDateForFileName", - "hashDict", - "hashableCopyOfNumpyArray" + "getDateForFileName" ] @@ -115,33 +132,16 @@ def humanize_size(size_in_bytes): return "{:.2f}{}B".format(size_in_bytes, units[i]) def get_user(): - try: - out = subprocess.check_output('id -un', shell=True).decode().strip() - return out - except Exception as e: - print("failed to determine user") - print(e) - return None + out = subprocess.check_output('id -un', shell=True).decode().strip() + return out def get_user_process_limit(): - try: - out = subprocess.check_output('ulimit -u', shell=True).decode().strip() - return int(out) - except Exception as e: - print("failed to determine maximum number of user processeses") - print(e) - return None + out = subprocess.check_output('ulimit -u', shell=True).decode().strip() + return int(out) def get_user_num_process(): - try: - out = subprocess.check_output('ps ut | wc -l', shell=True).decode().strip() - return int(out)-2 - except Exception as e: - print("failed to determine current number of processes") - print(e) - return None - - + out = subprocess.check_output('ps ut | wc -l', shell=True).decode().strip() + return int(out)-2 class JobManager_Client(object): @@ -185,7 +185,12 @@ class JobManager_Client(object): emergency_dump_path = '.', result_q_timeout = 30, job_q_timeout = 0.1, - fail_q_timeout = 10): + fail_q_timeout = 10, + logging_level = None, + reconnect_wait = 2, + reconnect_tries = 3, + ping_timeout = 2, + ping_retry = 3): """ server [string] - ip address or hostname where the JobManager_Server is running @@ -216,49 +221,77 @@ class JobManager_Client(object): verbose [int] - 0: quiet, 1: status only, 2: debug messages """ - - self.show_statusbar_for_jobs = show_statusbar_for_jobs - self.show_counter_only = show_counter_only - self.interval = interval - self.verbose = verbose - - self._result_q_timeout = result_q_timeout - self._job_q_timeout = job_q_timeout - self._fail_q_timeout = fail_q_timeout - + self._pid = os.getpid() self._sid = os.getsid(self._pid) - self._identifier = progress.get_identifier(name=self.__class__.__name__, pid=self._pid) - if self.verbose > 1: - print("{}: init".format(self._identifier)) - + _identifier = progress.get_identifier(name=self.__class__.__name__, pid=self._pid) + self.log = logging.getLogger(_identifier) + + if logging_level is None: + if verbose == 0: + self.log.setLevel(logging.ERROR) + elif verbose == 1: + self.log.setLevel(logging.INFO) + else: + self.log.setLevel(logging.DEBUG) + else: + self.log.setLevel(logging_level) + self.log.addHandler(cons_hand) + + self.log.info("init JobManager Client instance") + + self.show_statusbar_for_jobs = show_statusbar_for_jobs + self.log.debug("show_statusbar_for_jobs:{}", self.show_statusbar_for_jobs) + self.show_counter_only = show_counter_only + self.log.debug("show_counter_only:{}", self.show_counter_only) + self.interval = interval + self.log.debug("interval:{}", self.interval) + + self._result_q_timeout = result_q_timeout + self.log.debug("_result_q_timeout:{}", self._result_q_timeout) + self._job_q_timeout = job_q_timeout + self.log.debug("_job_q_timeout:{}", self._job_q_timeout) + self._fail_q_timeout = fail_q_timeout + self.log.debug("_fail_q_timeout:{}", self._fail_q_timeout) + self.reconnect_wait = reconnect_wait + self.log.debug("reconnect_wait:{}", self.reconnect_wait) + self.reconnect_tries = reconnect_tries + self.log.debug("reconnect_tries:{}", self.reconnect_tries) + self.ping_timeout = ping_timeout + self.log.debug("ping_timeout:{}", self.ping_timeout) + self.ping_retry = ping_retry + self.log.debug("ping_retry:{}", self.ping_retry) + if no_warnings: - import warnings warnings.filterwarnings("ignore") - if self.verbose > 1: - print("{}: ignore all warnings".format(self._identifier)) + self.log.info("ignore all warnings") self.server = server + self.log.debug("server:{}", self.server) if isinstance(authkey, bytearray): self.authkey = authkey else: self.authkey = bytearray(authkey, encoding='utf8') + self.log.debug("authkey:{}", self.authkey) self.port = port + self.log.debug("port:{}", self.port) self.nice = nice + self.log.debug("nice:{}", self.nice) if nproc > 0: self.nproc = nproc else: self.nproc = mp.cpu_count() + nproc if self.nproc <= 0: raise RuntimeError("Invalid Number of Processes\ncan not spawn {} processes (cores found: {}, cores NOT to use: {} = -nproc)".format(self.nproc, mp.cpu_count(), abs(nproc))) - # internally, njobs must be negative for infinite jobs - if njobs == 0: + self.log.debug("nproc:{}", self.nproc) + if njobs == 0: # internally, njobs must be negative for infinite jobs njobs -= 1 self.njobs = njobs + self.log.debug("njobs:{}", self.njobs) self.emergency_dump_path = emergency_dump_path + self.log.debug("emergency_dump_path:{}", self.emergency_dump_path) - self.procs = [] - + self.procs = [] self.manager_objects = None # will be set via connect() self.connect() # get shared objects from server @@ -266,10 +299,9 @@ class JobManager_Client(object): def connect(self): if self.manager_objects is None: - self.manager_objects = self.get_manager_objects() + self.manager_objects = self.create_manager_objects() else: - if self.verbose > 0: - print("{}: already connected (at least shared object are available)".format(self._identifier)) + self.log.info("already connected (at least shared object are available)") @property def connected(self): @@ -278,17 +310,7 @@ class JobManager_Client(object): def _dump_result_to_local_storage(self, res): pass - def get_manager_objects(self): - return JobManager_Client._get_manager_objects(self.server, - self.port, - self.authkey, - self._identifier, - self.verbose) -# @staticmethod -# def _get_sync_manager_data(manager): - - @staticmethod - def _get_manager_objects(server, port, authkey, identifier, verbose=0, reconnect_wait=2, reconnect_tries=3): + def create_manager_objects(self): """ connects to the server and get registered shared objects such as job_q, result_q, fail_q @@ -304,23 +326,20 @@ class JobManager_Client(object): ServerQueueManager.register('get_fail_q') ServerQueueManager.register('get_const_arg') - manager = ServerQueueManager(address=(server, port), authkey=authkey) + manager = ServerQueueManager(address=(self.server, self.port), authkey=self.authkey) try: call_connect(connect = manager.connect, dest = address_authkey_from_manager(manager), - verbose = verbose, - identifier = identifier, - reconnect_wait = reconnect_wait, - reconnect_tries = reconnect_tries) + reconnect_wait = self.reconnect_wait, + reconnect_tries = self.reconnect_tries, + log = self.log) except: - print("{}: FAILED to connect to {}".format(identifier, address_authkey_from_manager(manager))) + self.log.warning("FAILED to connect to {}", address_authkey_from_manager(manager)) return None - job_q = manager.get_job_q() - if verbose > 1: - print("{}: found job_q with {} jobs".format(identifier, job_q.qsize())) + self.log.info("found job_q with {} jobs", job_q.qsize()) result_q = manager.get_result_q() fail_q = manager.get_fail_q() @@ -329,19 +348,6 @@ class JobManager_Client(object): return job_q, result_q, fail_q, const_arg, manager - def get_overall_memory_cunsumption(self): - vsize = 0 - # Example: /bin/ps -o vsize= --sid 23928 - #proc = sp.Popen(['ps', '-o', 'vsize=', '--sid', str(self._sid)], stdout=sp.PIPE, stderr=None, shell=False) - proc = sp.Popen(['ps', '-o', 'rss=', '--sid', str(self._sid)], stdout=sp.PIPE, stderr=None, shell=False) - (stdout, _stderr) = proc.communicate() - print("sid", self._sid) - print(stdout) - # Iterate over each process within the process tree of our process session - # (this ensures that we include processes launched by a child bash script, etc.) - for line in stdout.split(): - vsize += int(line.strip()) - return vsize*1024 # in bytes @staticmethod def func(arg, const_arg): @@ -366,29 +372,19 @@ class JobManager_Client(object): return os.getpid() @staticmethod - def _handle_unexpected_queue_error(e, verbose, identifier): - print("{}: unexpected Error {}, I guess the server went down, can't do anything, terminate now!".format(identifier, e)) - if verbose > 0: - traceback.print_exc() - - @staticmethod - def __emergency_dump(arg, res, path, identifier): - now = datetime.now().isoformat() - pid = os.getpid() - fname = "{}_pid_{}".format(now, pid) - full_path = os.path.join(path, fname) - print("{}: emergency dump (arg, res) to {}".format(identifier, full_path)) - with open(full_path, 'wb') as f: - pickle.dump(arg, f) - pickle.dump(res, f) - - @staticmethod - def __worker_func(func, nice, verbose, server, port, authkey, i, manager_objects, c, m, reset_pbc, njobs, - emergency_dump_path, job_q_timeout, fail_q_timeout, result_q_timeout): + def __worker_func(func, nice, loglevel, server, port, authkey, i, manager_objects, c, m, reset_pbc, njobs, + emergency_dump_path, job_q_timeout, fail_q_timeout, result_q_timeout, stdout_queue, + reconnect_wait, reconnect_tries, ping_timeout, ping_retry): """ the wrapper spawned nproc times calling and handling self.func """ - identifier = progress.get_identifier(name='worker{}'.format(i+1)) + log = logging.getLogger(progress.get_identifier(name='worker{}'.format(i+1))) + log.setLevel(loglevel) + + queue_hand = logging.handlers.QueueHandler(stdout_queue) + queue_hand.setFormatter(fmt) + log.addHandler(queue_hand) + Signal_to_sys_exit(signals=[signal.SIGTERM]) Signal_to_SIG_IGN(signals=[signal.SIGINT]) @@ -398,11 +394,9 @@ class JobManager_Client(object): try: n = os.nice(nice - n) except PermissionError: - if verbose > 0: - print("{}: changing niceness not permitted! run with niceness {}".format(identifier, n)) + log.warning("changing niceness not permitted! run with niceness {}", n) - if verbose > 1: - print("{}: now alive, niceness {}".format(identifier, n)) + log.debug("worker function now alive, niceness {}", n) cnt = 0 time_queue = 0. time_calc = 0. @@ -415,13 +409,11 @@ class JobManager_Client(object): count_args = getCountKwargs(func) if count_args is None: - if verbose > 1: - print("{}: found function without status information".format(identifier)) + log.warning("found function without status information (progress will not work)") m.value = 0 # setting max_count to -1 will hide the progress bar _func = lambda arg, const_arg, c, m : func(arg, const_arg) elif count_args != ["c", "m"]: - if verbose > 1: - print("{}: found counter keyword arguments: {}".format(identifier, count_args)) + log.debug("found counter keyword arguments: {}", count_args) # Allow other arguments, such as ["jmc", "jmm"] as defined # in `validCountKwargs`. # Here we translate to "c" and "m". @@ -430,35 +422,20 @@ class JobManager_Client(object): count_args[1]: m} return func(arg, const_arg, **kwargs) else: - if verbose > 1: - print("{}: found standard keyword arguments: [c, m]".format(identifier)) + log.debug("found standard keyword arguments: [c, m]") _func = func - job_q_get = proxy_operation_decorator(proxy = job_q, - operation = 'get', - verbose = verbose, - identifier = identifier, - reconnect_wait = 2, - reconnect_tries = 3) - job_q_put = proxy_operation_decorator(proxy = job_q, - operation = 'put', - verbose = verbose, - identifier = identifier, - reconnect_wait = 2, - reconnect_tries = 3) - result_q_put = proxy_operation_decorator(proxy = result_q, - operation = 'put', - verbose = verbose, - identifier = identifier, - reconnect_wait = 2, - reconnect_tries = 3) - fail_q_put = proxy_operation_decorator(proxy = fail_q, - operation = 'put', - verbose = verbose, - identifier = identifier, - reconnect_wait = 2, - reconnect_tries = 3) - + kwargs = {'reconnect_wait' : reconnect_wait, + 'reconnect_tries': reconnect_tries, + 'log' : log, + 'ping_timeout' : ping_timeout, + 'ping_retry' : ping_retry} + + job_q_get = proxy_operation_decorator(proxy = job_q, operation = 'get', **kwargs) + job_q_put = proxy_operation_decorator(proxy = job_q, operation = 'put', **kwargs) + result_q_put = proxy_operation_decorator(proxy = result_q, operation = 'put', **kwargs) + fail_q_put = proxy_operation_decorator(proxy = fail_q, operation = 'put', **kwargs) + # supposed to catch SystemExit, which will shut the client down quietly try: @@ -479,22 +456,24 @@ class JobManager_Client(object): # regular case, just stop working when empty job_q was found except queue.Empty: - if verbose > 0: - print("{}: finds empty job queue, processed {} jobs".format(identifier, cnt)) + log.info("finds empty job queue, processed {} jobs", cnt) break # handle SystemExit in outer try ... except except SystemExit as e: + log.warning('getting arg from job_q failed due to SystemExit') raise e # job_q.get failed -> server down? except Exception as e: - print("{}: Error when calling 'job_q_get'".format(identifier)) - JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier) + log.error("Error when calling 'job_q_get'") + handle_unexpected_queue_error(e, log) break # try to process the retrieved argument try: tf_0 = time.time() + log.debug("START crunching _func") res = _func(arg, const_arg, c, m) + log.debug("DONE crunching _func") tf_1 = time.time() time_calc += (tf_1-tf_0) # handle SystemExit in outer try ... except @@ -505,104 +484,86 @@ class JobManager_Client(object): # - try to inform the server of the failure except: err, val, trb = sys.exc_info() - if verbose > 0: - print("{}: caught exception '{}' when crunching 'func'".format(identifier, err.__name__)) - - if verbose > 1: - traceback.print_exc() + log.error("caught exception '{}' when crunching 'func'\n{}", err.__name__, traceback.print_exc()) # write traceback to file hostname = socket.gethostname() fname = 'traceback_err_{}_{}.trb'.format(err.__name__, getDateForFileName(includePID=True)) - - if verbose > 0: - print(" write exception to file {} ... ".format(fname), end='') - sys.stdout.flush() + + log.info("write exception to file {}", fname) with open(fname, 'w') as f: traceback.print_exception(etype=err, value=val, tb=trb, file=f) - if verbose > 0: - print("done") - print(" continue processing next argument.") - - # try to inform the server of the failure - if verbose > 1: - print("{}: try to send send failed arg to fail_q ...".format(identifier), end='') - sys.stdout.flush() + + log.debug("try to send send failed arg to fail_q") try: fail_q_put((arg, err.__name__, hostname), block = True, timeout=fail_q_timeout) # handle SystemExit in outer try ... except except SystemExit as e: - if verbose > 1: - print(" FAILED!") + log.warning('sending arg to fail_q failed due to SystemExit') raise e # fail_q.put failed -> server down? except Exception as e: - if verbose > 1: - print(" FAILED!") - JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier) + log.error('sending arg to fail_q failed') + handle_unexpected_queue_error(e, log) break else: - if verbose > 1: - print(" done!") + log.debug('sending arg to fail_q was successful') # processing the retrieved arguments succeeded # - try to send the result back to the server else: try: - tp_0 = time.time() + tp_0 = time.time() result_q_put((arg, res), block = True, timeout=result_q_timeout) tp_1 = time.time() time_queue += (tp_1-tp_0) # handle SystemExit in outer try ... except except SystemExit as e: + log.warning('sending result to result_q failed due to SystemExit') raise e except Exception as e: - print("{}: Error when calling 'result_q_put'".format(identifier)) - JobManager_Client.__emergency_dump(arg, res, emergency_dump_path, identifier) - JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier) + log.error('sending result to result_q failed due to {}', type(e)) + emergency_dump(arg, res, emergency_dump_path, log) + handle_unexpected_queue_error(e, log) break del res cnt += 1 reset_pbc() + log.debug("continue with next arg") # considered as normal exit caused by some user interaction, SIGINT, SIGTERM # note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the # default signal handlers except SystemExit: - if verbose > 0: - print("{}: SystemExit, quit processing, reinsert current argument".format(identifier)) - - if verbose > 1: - print("{}: try to put arg back to job_q ...".format(identifier), end='') - sys.stdout.flush() + log.warning("SystemExit, quit processing, reinsert current argument, please wait") + log.debug("try to put arg back to job_q") try: job_q.put(arg, timeout=fail_q_timeout) # handle SystemExit in outer try ... except except SystemExit as e: - if verbose > 1: - print(" FAILED!") + log.error("put arg back to job_q failed due to SystemExit") raise e # fail_q.put failed -> server down? except Exception as e: - if verbose > 1: - print(" FAILED!") - JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier) + log.error("put arg back to job_q failed due to {}", type(e)) + JobManager_Client._handle_unexpected_queue_error(e, log) else: - if verbose > 1: - print(" done!") - - if verbose > 0: - try: - print("{}: pure calculation time: {} single task average: {}".format(identifier, progress.humanize_time(time_calc), progress.humanize_time(time_calc / cnt) )) - print("{}: calculation:{:.2%} communication:{:.2%}".format(identifier, time_calc/(time_calc+time_queue), time_queue/(time_calc+time_queue))) - except: - pass - if verbose > 1: - print("{}: JobManager_Client.__worker_func at end (PID {})".format(identifier, os.getpid())) + log.debug("putting arg back to job_q was successful") + + try: + sta = progress.humanize_time(time_calc / cnt) + except: + sta = 'invalid' + + log.info("pure calculation time: {} single task average: {}\ncalculation:{:.2%} communication:{:.2%}", + progress.humanize_time(time_calc), sta, + time_calc/(time_calc+time_queue), time_queue/(time_calc+time_queue)) + + log.debug("JobManager_Client.__worker_func at end (PID {})", os.getpid()) def start(self): """ @@ -616,10 +577,8 @@ class JobManager_Client(object): if not self.connected: raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)") - print("{}: starting client with connection to server:{} authkey:{} port:{}".format(self._identifier, self.server, self.authkey.decode(), self.port)) - if self.verbose > 1: - print("{}: start {} processes to work on the remote queue".format(self._identifier, self.nproc)) + self.log.info("STARTING CLIENT\nserver:{} authkey:{} port:{} num proc:{}", self.server, self.authkey.decode(), self.port, self.nproc) c = [] for i in range(self.nproc): @@ -644,6 +603,9 @@ class JobManager_Client(object): prepend = [] infoline = progress.StringValue(num_of_bytes=12) infoline = None + + worker_stdout_queue = mp.SimpleQueue() + l = len(str(self.nproc)) for i in range(self.nproc): prepend.append("w{0:0{1}}:".format(i+1, l)) @@ -674,7 +636,12 @@ class JobManager_Client(object): self.emergency_dump_path, self._job_q_timeout, self._fail_q_timeout, - self._result_q_timeout)) + self._result_q_timeout, + worker_stdout_queue, + self.reconnect_wait, + self.reconnect_tries, + self.ping_timeout, + self.ping_retry)) self.procs.append(p) p.start() time.sleep(0.3) @@ -686,7 +653,7 @@ class JobManager_Client(object): pid = p.pid, bold = True) for i, p in enumerate(self.procs)], signals = [signal.SIGTERM], - verbose = self.verbose, + log = self.log, timeout = 2) interrupt_handler = Signal_handler_for_Jobmanager_client(client_object = self, @@ -1164,9 +1131,9 @@ class JobManager_Server(object): print("{}: started (host:{} authkey:{} port:{} jobs:{})".format(self._identifier, self.hostname, self.authkey.decode(), self.port, self.numjobs)) Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT], verbose = self.verbose) - pid = os.getpid() - user = get_user() - max_proc = get_user_process_limit() +# pid = os.getpid() +# user = get_user() +# max_proc = get_user_process_limit() if self.verbose > 1: print("{}: start processing incoming results".format(self._identifier)) @@ -1190,11 +1157,9 @@ class JobManager_Server(object): stat.start() while (len(self.args_dict) - self.fail_q.qsize()) > 0: - info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}, proc (curr/max): {}/{}".format(self.result_q.qsize(), - self.job_q.qsize(), - self.numresults, - get_user_num_process(), - max_proc).encode('utf-8') + info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}".format(self.result_q.qsize(), + self.job_q.qsize(), + self.numresults).encode('utf-8') # allows for update of the info line try: @@ -1366,25 +1331,23 @@ class Signal_to_terminate_process_list(object): """ SIGINT and SIGTERM will call terminate for process given in process_list """ - def __init__(self, identifier, process_list, identifier_list, signals = [signal.SIGINT, signal.SIGTERM], verbose=0, timeout=2): - self.identifier = identifier + def __init__(self, process_list, identifier_list, signals = [signal.SIGINT, signal.SIGTERM], log=default_log, timeout=2): + self.log = log self.process_list = process_list self.identifier_list = identifier_list - self.verbose = verbose self.timeout = timeout for s in signals: signal.signal(s, self._handler) def _handler(self, signal, frame): - if self.verbose > 0: - print("{}: received sig {} -> terminate all given subprocesses".format(self.identifier, progress.signal_dict[signal])) + self.log.debug("received sig {} -> terminate all given subprocesses", progress.signal_dict[signal]) for i, p in enumerate(self.process_list): p.terminate() progress.check_process_termination(proc = p, identifier = self.identifier_list[i], timeout = self.timeout, - verbose = self.verbose, + log = self.log, auto_kill_on_last_resort=False) @@ -1426,24 +1389,21 @@ def address_authkey_from_proxy(proxy): def address_authkey_from_manager(manager): return manager.address, manager._authkey.decode() -def call_connect_python3(connect, dest, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3): - identifier = mod_id(identifier) +def call_connect_python3(connect, dest, reconnect_wait=2, reconnect_tries=3, log=log): c = 0 while True: try: # here we try re establish the connection - if verbose > 1: - print("{}try connecting to {}".format(identifier, dest)) + log.debug("try connecting to {}", dest) connect() except Exception as e: - if verbose > 0: - print("{}connection to {} could not be established due to '{}'".format(identifier, dest, type(e))) - print(traceback.format_stack()[-3].strip()) + log.error("connection to {} could not be established due to '{}'", dest, type(e)) + log.error(traceback.format_stack()[-3].strip()) if type(e) is ConnectionResetError: # ... when the destination hangs up on us - c = handler_connection_reset(identifier, dest, c, reconnect_wait, reconnect_tries, verbose) + c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, log) elif type(e) is ConnectionRefusedError: # ... when the destination refuses our connection - handler_connection_refused(e, identifier, dest, verbose) + handler_connection_refused(e, dest, log) elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch handler_authentication_error(e, identifier, dest, verbose) elif type(e) is RemoteError: # ... when the destination send us an error message @@ -1549,287 +1509,241 @@ def getDateForFileName(includePID = False): name += "_{}".format(os.getpid()) return name -def handler_authentication_error(e, identifier, dest, verbose): - if verbose > 1: - print("authkey specified does not match the authkey at destination side!") +def handler_authentication_error(e, dest, log): + log.error("authentication error") + log.info("Authkey specified does not match the authkey at destination side!") raise e -def handler_broken_pipe_error(e, verbose): - if verbose > 1: - print("this usually means that an established was closed, does not exists anymore") - print("the server probably went down") +def handler_broken_pipe_error(e, log): + log.error("broken pip error") + log.info("This usually means that an established connection was closed\n") + log.info("does not exists anymore, probably the server went down") raise e -def handler_connection_refused(e, identifier, dest, verbose): - if verbose > 1: - print("this usually means that no matching Manager object was instanciated at destination side!") - print("either there is no Manager running at all, or it is listening to another port.") +def handler_connection_refused(e, dest, log): + log.error("connection refused error") + log.info("This usually means that no matching Manager object was instanciated at destination side!") + log.info("Either there is no Manager running at all, or it is listening to another port.") raise JMConnectionRefusedError(e) -def handler_connection_reset(identifier, dest, c, reconnect_wait, reconnect_tries, verbose): - if verbose > 1: - print("during connect this Error might be due to firewall settings\n"+ - "or other TPC connections controlling mechanisms!") +def handler_connection_reset(dest, c, reconnect_wait, reconnect_tries, log): + log.error("connection reset error") + log.info("During 'connect' this error might be due to firewall settings"+ + "or other TPC connections controlling mechanisms!") c += 1 if c > reconnect_tries: - raise JMConnectionError("{}connection to {} FAILED, ".format(identifier, dest)+ + log.error("maximium number of reconnects {} was reached", reconnect_tries) + raise JMConnectionError("connection to {} FAILED, ".format(dest)+ "{} retries were NOT successfull".format(reconnect_tries)) - if verbose > 0: - traceback.print_exc(limit=1) - print("{}try connecting to {} again in {} seconds".format(identifier, dest, reconnect_wait)) + log.debug("try connecting to {} again in {} seconds", dest, reconnect_wait) time.sleep(reconnect_wait) return c -def handler_eof_error(e, verbose): - if verbose > 1: - print("This usually means that server did not replay, although the connection is still there.") - print("This is due to the fact that the connection is in 'timewait' status for about 60s") - print("after the server went down. After that time the connection will not exist anymore.") +def handler_eof_error(e, log): + log.error("EOF error") + log.info("This usually means that server did not replay, although the connection is still there.\n"+ + "This is due to the fact that the connection is in 'timewait' status for about 60s\n"+ + "after the server went down inappropriately.") raise e -def handler_remote_error(e, verbose, dest): - if verbose > 1: - print("the server {} send an RemoteError message!".format(dest)) +def handler_remote_error(e, dest, log): + log.error("remote error") + log.info("The server {} send an RemoteError message!\n{}", dest, e.args[0]) raise RemoteError(e.args[0]) -def handler_remote_key_error(e, verbose, dest): - if verbose > 1: - print("'KeyError' detected in RemoteError message from server {}!".format(dest)) - print("This hints to the fact that the actual instace of the shared object on the server") - print("side has changed, for example due to a server restart") - print("you need to reinstanciate the proxy object") +def handler_remote_key_error(e, dest, log): + log.error("remote key error") + log.info("'KeyError' detected in RemoteError message from server {}!\n"+ + "This hints to the fact that the actual instace of the shared object on the server side has changed,\n"+ + "for example due to a server restart you need to reinstanciate the proxy object.", dest) raise RemoteKeyError(e.args[0]) -def handler_remote_value_error(e, verbose, dest): - if verbose > 1: - print("'ValueError' due to 'unsupported pickle protocol' detected in RemoteError from server {}!".format(dest)) - print("You might have tried to connect to a SERVER running with an OLDER python version") - print("At this stage (and probably for ever) this should be avoided") +def handler_remote_value_error(e, dest, log): + log.error("remote value error") + log.info("'ValueError' due to 'unsupported pickle protocol' detected in RemoteError from server {}!\n"+ + "You might have tried to connect to a SERVER running with an OLDER python version.\n"+ + "At this stage (and probably for ever) this should be avoided!", dest) raise RemoteValueError(e.args[0]) -def handler_value_error(e, verbose): +def handler_value_error(e, log): + log.error("value error") if 'unsupported pickle protocol' in e.args[0]: - if verbose > 1: - print("'ValueError' due to 'unsupported pickle protocol'!") - print("You might have tried to connect to a SERVER running with an NEWER python version") - print("At this stage (and probably for ever) this should be avoided") + log.info("'ValueError' due to 'unsupported pickle protocol'!\n" + "You might have tried to connect to a SERVER running with an NEWER python version.\n" + "At this stage (and probably for ever) this should be avoided.\n") raise e -def handler_unexpected_error(e, verbose): +def handler_unexpected_error(e, log): + log.error("unexpected error of type {} and args {}", type(e), e.args) raise e -def mod_id(identifier): - if identifier != '': - identifier = identifier.strip() - if identifier[-1] != ':': - identifier += ':' - identifier += ' ' - - return identifier +def handle_unexpected_queue_error(e, log): + log.error("unexpected error of type {} and args {}\n"+ + "I guess the server went down, can't do anything, terminate now!", type(e), e.args) + log.debug("show traceback.print_exc()\n{}", traceback.print_exc()) -def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5): +def emergency_dump(arg, res, path, log): + now = datetime.now().isoformat() + pid = os.getpid() + fname = "{}_pid_{}".format(now, pid) + full_path = os.path.join(path, fname) + log.warning("emergency dump (arg, res) to {}", full_path) + with open(full_path, 'wb') as f: + pickle.dump(arg, f) + pickle.dump(res, f) + +def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5, log=default_log): for i in range(retry): try: - subprocess.check_output(['ping', '-c 1', '-W {}'.format(int(timeout)), adr]) + cmd = 'ping -c 1 -W {} {} '.format(int(timeout), adr) + log.debug("[{}/{}]call: {}", i+1, retry, cmd) + subprocess.check_output(cmd, shell = True) except subprocess.CalledProcessError as e: # on exception, resume with loop + log.warning("CalledProcessError on ping with message: {}", e) continue else: # no exception, ping was successful, return without error + log.debug("ping was succesfull") return - # no early return happend, ping was never successful, raise error + # no early return happend, ping was never successful, raise error + log.error("ping failed after {} retries", retry) raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, e.output)) -def proxy_operation_decorator_python3(proxy, operation, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3): - identifier = mod_id(identifier) +def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconnect_tries=3, log=default_log, ping_timeout=2, ping_retry=5): o = getattr(proxy, operation) dest = address_authkey_from_proxy(proxy) def _operation(*args, **kwargs): c = 0 while True: - check_if_host_is_reachable_unix_ping(adr=dest[0][0]) - - if verbose > 1: - print("{}establish connection to {}".format(identifier, dest)) + check_if_host_is_reachable_unix_ping(adr = dest[0][0], + timeout = ping_timeout, + retry = ping_retry, + log = log) + log.debug("establish connection to {}", dest) try: proxy._connect() except Exception as e: - if verbose > 0: - print("{}establishing connection to {} FAILED due to '{}'".format(identifier, dest, type(e))) - print(traceback.format_stack()[-3].strip()) - print("{}wait {} seconds and retry".format(identifier, reconnect_wait)) + log.warning("establishing connection to {} FAILED due to '{}'", dest, type(e)) + log.debug("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) + log.info("wait {} seconds and retry", reconnect_wait) c += 1 if c > reconnect_tries: - if verbose > 0: - print("{}reached maximum number of reconnect tries, raise exception".format(identifier)) + log.error("reached maximum number of reconnect tries {}, raise exception", reconnect_tries) raise e time.sleep(reconnect_wait) continue - if verbose > 1: - print("{}execute operation '{}' -> {}".format(identifier, operation, dest)) + log.debug("execute operation '{}' -> {}", operation, dest) try: res = o(*args, **kwargs) except Exception as e: - if verbose > 0: - print("{}operation '{}' -> {} FAILED due to '{}'".format(identifier, operation, dest, type(e))) - print(traceback.format_stack()[-3].strip()) - + log.warning("operation '{}' -> {} FAILED due to '{}'", operation, dest, type(e)) + log.debug("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) if type(e) is ConnectionResetError: - if verbose > 0: - traceback.print_exc(limit=1) - print("{}wait {} seconds and retry".format(identifier, reconnect_wait)) + log.debug("show traceback.print_exc(limit=1))") + log.debug(traceback.print_exc(limit=1)) + c += 1 if c > reconnect_tries: - if verbose > 0: - print("{}reached maximum number of reconnect tries, raise exception".format(identifier)) + log.error("reached maximum number of reconnect tries {}", reconnect_tries) raise e + + log.info("wait {} seconds and retry", reconnect_wait) time.sleep(reconnect_wait) continue elif type(e) is BrokenPipeError: - handler_broken_pipe_error(e, verbose) + handler_broken_pipe_error(e, log) elif type(e) is EOFError: - handler_eof_error(e, verbose) + handler_eof_error(e, log) else: - handler_unexpected_error(e, verbose) - else: # SUCCESS -> return True - if verbose > 1: - print("{}operation '{}' successfully executed".format(identifier, operation)) + handler_unexpected_error(e, log) + else: # SUCCESS -> return True + log.debug("operation '{}' successfully executed", operation) return res - if verbose > 1: - print("{}close connection to {}".format(identifier, dest)) + log.debug("close connection to {}", dest) try: proxy._tls.connection.close() except Exception as e: - if verbose > 0: - print("{}closeing connection to {} FAILED due to {}".format(identifier, dest, type(e))) - print(traceback.format_stack()[-3].strip()) + log.error("closeing connection to {} FAILED due to {}", dest, type(e)) + log.info("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) return _operation -def proxy_operation_decorator_python2(proxy, operation, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3): - identifier = mod_id(identifier) +def proxy_operation_decorator_python2(proxy, operation, reconnect_wait=2, reconnect_tries=3, log=default_log, ping_timeout=2, ping_retry=5): o = getattr(proxy, operation) dest = address_authkey_from_proxy(proxy) def _operation(*args, **kwargs): c = 0 while True: - check_if_host_is_reachable_unix_ping(adr=dest[0]) - - if verbose > 1: - print("{}establish connection to {}".format(identifier, dest)) + check_if_host_is_reachable_unix_ping(adr = dest[0][0], + timeout = ping_timeout, + retry = ping_retry, + log = log) + log.debug("establish connection to {}", dest) try: - o._connect() + proxy._connect() except Exception as e: - if verbose > 0: - print("{}establishing connection to {} FAILED due to '{}'".format(identifier, dest, type(e))) - print(traceback.format_stack()[-3].strip()) - print("wait {} seconds and retry".format(reconnect_wait)) + log.warning("establishing connection to {} FAILED due to '{}'", dest, type(e)) + log.debug("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) + log.info("wait {} seconds and retry", reconnect_wait) c += 1 if c > reconnect_tries: - if verbose > 0: - print("reached maximum number of reconnect tries, raise exception") + log.error("reached maximum number of reconnect tries {}, raise exception", reconnect_tries) raise e time.sleep(reconnect_wait) continue - if verbose > 1: - print("{}execute operation '{}' -> {}".format(identifier, operation, dest)) + log.debug("execute operation '{}' -> {}", operation, dest) try: res = o(*args, **kwargs) except Exception as e: - if verbose > 0: - print("{}operation '{}' -> {} FAILED due to '{}'".format(identifier, operation, dest, type(e))) - print(traceback.format_stack()[-3].strip()) - + log.warning("operation '{}' -> {} FAILED due to '{}'", operation, dest, type(e)) + log.debug("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) + if type(e) is IOError: - if verbose > 0: - print("{} with args {}".format(type(e), e.args)) + log.debug("{} with args {}", type(e), e.args) err_code = e.args[0] if err_code == errno.ECONNRESET: # ... when the destination hangs up on us - if verbose > 0: - print("wait {} seconds and retry".format(reconnect_wait)) + log.debug("show traceback.print_exc(limit=1))") + log.debug(traceback.print_exc(limit=1)) + c += 1 if c > reconnect_tries: - if verbose > 0: - print("reached maximum number of reconnect tries, raise exception") + log.error("reached maximum number of reconnect tries {}", reconnect_tries) raise e + + log.info("wait {} seconds and retry", reconnect_wait) time.sleep(reconnect_wait) continue else: - handler_unexpected_error(e, verbose) + handler_unexpected_error(e, log) elif type(e) is BrokenPipeError: - handler_broken_pipe_error(e, verbose) + handler_broken_pipe_error(e, log) elif type(e) is EOFError: - handler_eof_error(e, verbose) + handler_eof_error(e, log) else: - handler_unexpected_error(e, verbose) - else: # SUCCESS -> return True - if verbose > 1: - print("{}operation '{}' successfully executed".format(identifier, operation)) + handler_unexpected_error(e, log) + else: # SUCCESS -> return True + log.debug("operation '{}' successfully executed", operation) return res - - if verbose > 1: - print("{}close connection to {}".format(identifier, dest)) + + log.debug("close connection to {}", dest) try: - o._tls.connection.close() + proxy._tls.connection.close() except Exception as e: - if verbose > 0: - print("{}closeing connection to {} FAILED due to {}".format(identifier, dest, type(e))) - print(traceback.format_stack()[-3].strip()) + log.error("closeing connection to {} FAILED due to {}", dest, type(e)) + log.info("show traceback.format_stack()[-3]\n{}", traceback.format_stack()[-3].strip()) return _operation -# def proxy_operation_decorator_python2(proxy, operation, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3): -# identifier = mod_id(identifier) -# o = getattr(proxy, operation) -# dest = address_authkey_from_proxy(proxy) -# -# def _operation(*args, **kwargs): -# while True: -# try: # here we try to put new data to the queue -# if verbose > 1: -# print("{}execute operation '{}' -> {}".format(identifier, operation, dest)) -# res = o(*args, **kwargs) -# -# except Exception as e: -# if verbose > 0: -# print("{}operation '{}' -> {} FAILED due to '{}'".format(identifier, operation, dest, type(e))) -# print(traceback.format_stack()[-3].strip()) -# -# if type(e) is IOError: -# if verbose > 0: -# print("{} with args {}".format(type(e), e.args)) -# err_code = e.args[0] -# if err_code == errno.ECONNRESET: # ... when the destination hangs up on us -# if verbose > 0: -# traceback.print_exc(limit=1) -# print("{}try to reconnect".format(identifier)) -# call_connect(proxy._connect, dest, verbose, identifier, reconnect_wait, reconnect_tries) -# else: -# handler_unexpected_error(e, verbose) -# -# elif type(e) is BrokenPipeError: -# handler_broken_pipe_error(e, verbose) -# elif type(e) is EOFError: -# handler_eof_error(e, verbose) -# else: -# handler_unexpected_error(e, verbose) -# -# else: # SUCCESS -> return True -# if verbose > 1: -# print("{}operation '{}' successfully executed".format(identifier, operation)) -# return res -# -# return _operation - proxy_operation_decorator = proxy_operation_decorator_python2 if sys.version_info[0] == 2 else proxy_operation_decorator_python3 def setup_SIG_handler_manager(): diff --git a/jobmanager/progress.py b/jobmanager/progress.py deleted file mode 100644 index b9235f5..0000000 --- a/jobmanager/progress.py +++ /dev/null @@ -1,1571 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import division, print_function - -import copy -import datetime -import math -import multiprocessing as mp -import signal -import subprocess as sp -import sys -import time -import traceback -import os -import warnings - -try: - from shutil import get_terminal_size as shutil_get_terminal_size -except ImportError: - shutil_get_terminal_size = None - -if sys.version_info[0] == 2: - old_math_ceil = math.ceil - def my_int_ceil(f): - return int(old_math_ceil(f)) - - math.ceil = my_int_ceil - -# Magic conversion from 3 to 2 -if sys.version_info[0] == 2: - _jm_compatible_bytearray = lambda x: x -else: - _jm_compatible_bytearray = bytearray - - -class Loop(object): - """ - class to run a function periodically an seperate process. - - In case the called function returns True, the loop will stop. - Otherwise a time interval given by interval will be slept before - another execution is triggered. - - The shared memory variable _run (accessible via the class property run) - also determines if the function if executed another time. If set to False - the execution stops. - - For safe cleanup (and in order to catch any Errors) - it is advisable to instantiate this class - using 'with' statement as follows: - - with Loop(**kwargs) as my_loop: - my_loop.start() - ... - - this will guarantee you that the spawned loop process is - down when exiting the 'with' scope. - - The only circumstance where the process is still running is - when you set auto_kill_on_last_resort to False and answer the - question to send SIGKILL with no. - """ - def __init__(self, - func, - args=(), - interval = 1, - verbose=0, - sigint='stop', - sigterm='stop', - name=None, - auto_kill_on_last_resort=False): - """ - func [callable] - function to be called periodically - - args [tuple] - arguments passed to func when calling - - intervall [pos number] - time to "sleep" between each call - - verbose [pos integer] - specifies the level of verbosity - [0--silent, 1--important information, 2--some debug info] - - sigint [string] - signal handler string to set SIGINT behavior (see below) - - sigterm [string] - signal handler string to set SIGTERM behavior (see below) - - name [string] - use this name in messages instead of the PID - - auto_kill_on_last_resort [bool] - If set False (default), ask user to send SIGKILL - to loop process in case normal stop and SIGTERM failed. If set True, send SIDKILL - without asking. - - the signal handler string may be one of the following - ing: ignore the incoming signal - stop: set the shared memory boolean to false -> prevents the loop from - repeating -> subprocess terminates when func returns and sleep time interval - has passed. - """ - self.func = func - self.args = args - self.interval = interval - self._run = mp.Value('b', False) - self._pause = mp.Value('b', False) - self.verbose = verbose - self._proc = None - self._sigint = sigint - self._sigterm = sigterm - self._name = name - self._auto_kill_on_last_resort = auto_kill_on_last_resort - - if not hasattr(self, '_identifier'): - self._identifier = None - - - def __enter__(self): - return self - - def __exit__(self, *exc_args): - # normal exit - if not self.is_alive(): - if self.verbose > 1: - print("{}: has stopped on context exit".format(self._identifier)) - return - - # loop still runs on context exit -> __cleanup - if self.verbose > 1: - print("{}: is still running on context exit".format(self._identifier)) - self.__cleanup() - - def __cleanup(self): - """ - Wait at most twice as long as the given repetition interval - for the _wrapper_function to terminate. - - If after that time the _wrapper_function has not terminated, - send SIGTERM to and the process. - - Wait at most five times as long as the given repetition interval - for the _wrapper_function to terminate. - - If the process still running send SIGKILL automatically if - auto_kill_on_last_resort was set True or ask the - user to confirm sending SIGKILL - """ - # set run to False and wait some time -> see what happens - self.run = False - if check_process_termination(proc=self._proc, - identifier=self._identifier, - timeout=2*self.interval, - verbose=self.verbose, - auto_kill_on_last_resort=self._auto_kill_on_last_resort): - if self.verbose > 1: - print("{}: cleanup successful".format(self._identifier)) - self._proc = None - self._identifier = get_identifier(self._name, 'not started') - else: - raise RuntimeError("{}: cleanup FAILED!".format(self._identifier)) - - - @staticmethod - def _wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, verbose, sigint, sigterm, name): - """to be executed as a separate process (that's why this functions is declared static) - """ - # implement the process specific signal handler - identifier = get_identifier(name) - SIG_handler_Loop(shared_mem_run, sigint, sigterm, identifier, verbose) - - while shared_mem_run.value: - # in pause mode, simply sleep - if shared_mem_pause.value: - quit_loop = False - else: - # if not pause mode -> call func and see what happens - try: - quit_loop = func(*args) - except: - err, val, trb = sys.exc_info() - print(ESC_NO_CHAR_ATTR, end='') - sys.stdout.flush() - if verbose > 0: - print("{}: error {} occurred in Loop class calling 'func(*args)'".format(identifier, err)) - traceback.print_exc() - return - - if quit_loop is True: - return - - time.sleep(interval) - - if verbose > 1: - print("{}: _wrapper_func terminates gracefully".format(identifier)) - - def start(self): - """ - uses multiprocess Process to call _wrapper_func in subprocess - """ - - if self.is_alive(): - if self.verbose > 0: - print("{}: is already running".format(self._identifier)) - return - - self.run = True - self._proc = mp.Process(target = Loop._wrapper_func, - args = (self.func, self.args, self._run, self._pause, self.interval, - self.verbose, self._sigint, self._sigterm, self._name), - name=self._name) - self._proc.start() - self._identifier = get_identifier(self._name, self.getpid()) - if self.verbose > 1: - print("{}: started as new loop process".format(self._identifier)) - - def stop(self): - """ - stops the process triggered by start - - Setting the shared memory boolean run to false, which should prevent - the loop from repeating. Call __cleanup to make sure the process - stopped. After that we could trigger start() again. - """ - #self.run = False# - if self.is_alive(): - self._proc.terminate() - else: - if self.verbose > 0: - print("PID None: there is no running loop to stop") - return - - self.__cleanup() - - - def join(self, timeout): - """ - calls join for the spawned process with given timeout - """ - if self.is_alive(): - self._proc.join(timeout) - - def getpid(self): - """ - return the process id of the spawned process - """ - return self._proc.pid - - def is_alive(self): - if self._proc == None: - return False - else: - return self._proc.is_alive() - - def pause(self): - if not self.run: - if self.verbose > 0: - print("{} is not running -> can not pause".format(self._identifier)) - - if self._pause.value == True: - if self.verbose > 1: - print("{} is already in pause mode!".format(self._identifier)) - self._pause.value = True - - def resume(self): - if not self.run: - if self.verbose > 0: - print("{} is not running -> can not resume".format(self._identifier)) - - if self._pause.value == False: - if self.verbose > 1: - print("{} is not in pause mode -> can not resume!".format(self._identifier)) - - self._pause.value = False - - @property - def run(self): - """ - makes the shared memory boolean accessible as class attribute - - Set run False, the loop will stop repeating. - Calling start, will set run True and start the loop again as a new process. - """ - return self._run.value - @run.setter - def run(self, run): - self._run.value = run - - - -class Progress(Loop): - """ - Abstract Progress Loop - - Uses Loop class to implement a repeating function to display the progress - of multiples processes. - - In the simplest case, given only a list of shared memory values 'count' (NOTE: - a single value will be automatically mapped to a one element list), - the total elapses time (TET) and a speed estimation are calculated for - each process and passed to the display function show_stat. - - This functions needs to be implemented in a subclass. It is intended to show - the progress of ONE process in one line. - - When max_count is given (in general as list of shared memory values, a single - shared memory value will be mapped to a one element list) the time to go TTG - will also be calculated and passed tow show_stat. - - Information about the terminal width will be retrieved when setting width='auto'. - If supported by the terminal emulator the width in characters of the terminal - emulator will be stored in width and also passed to show_stat. - Otherwise, a default width of 80 characters will be chosen. - - Also you can specify a fixed width by setting width to the desired number. - - NOTE: in order to achieve multiline progress special escape sequences are used - which may not be supported by all terminal emulators. - - example: - - c1 = UnsignedIntValue(val=0) - c2 = UnsignedIntValue(val=0) - - c = [c1, c2] - prepend = ['c1: ', 'c2: '] - with ProgressCounter(count=c, prepend=prepend) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - - if c[0].value == 1000: - break - - time.sleep(0.01) - - using start() within the 'with' statement ensures that the subprocess - which is started by start() in order to show the progress repeatedly - will be terminated on context exit. Otherwise one has to make sure - that at some point the stop() routine is called. When dealing with - signal handling and exception this might become tricky, so that the - use of the 'with' statement is highly encouraged. - """ - def __init__(self, - count, - max_count = None, - prepend = None, - width = 'auto', - speed_calc_cycles = 10, - interval = 1, - verbose = 0, - sigint = 'stop', - sigterm = 'stop', - name = 'progress', - info_line = None): - """ - count [mp.Value] - shared memory to hold the current state, (list or single value) - - max_count [mp.Value] - shared memory holding the final state, (None, list or single value), - may be changed by external process without having to explicitly tell this class. - If None, no TTG and relative progress can be calculated -> TTG = None - - prepend [string] - string to put in front of the progress output, (None, single string - or of list of strings) - - interval [int] - seconds to wait between progress print - - speed_calc_cycles [int] - use the current (time, count) as - well as the (old_time, old_count) read by the show_stat function - speed_calc_cycles calls before to calculate the speed as follows: - s = count - old_count / (time - old_time) - - verbose, sigint, sigterm -> see loop class - """ - self.name = name - self._identifier = get_identifier(self.name, pid='not started') - - try: - for c in count: - assert isinstance(c, mp.sharedctypes.Synchronized), "each element of 'count' must be if the type multiprocessing.sharedctypes.Synchronized" - self.is_multi = True - except TypeError: - assert isinstance(count, mp.sharedctypes.Synchronized), "'count' must be if the type multiprocessing.sharedctypes.Synchronized" - self.is_multi = False - count = [count] - - self.len = len(count) - - if max_count is not None: - if self.is_multi: - try: - for m in max_count: - assert isinstance(m, mp.sharedctypes.Synchronized), "each element of 'max_count' must be if the type multiprocessing.sharedctypes.Synchronized" - except TypeError: - raise TypeError("'max_count' must be iterable") - else: - assert isinstance(max_count, mp.sharedctypes.Synchronized), "'max_count' must be of the type multiprocessing.sharedctypes.Synchronized" - max_count = [max_count] - else: - max_count = [None] * self.len - - self.start_time = [] - self.speed_calc_cycles = speed_calc_cycles - - self.width = width - - self.q = [] - self.prepend = [] - self.lock = [] - self.last_count = [] - self.last_old_count = [] - self.last_old_time = [] - for i in range(self.len): - self.q.append(myQueue()) # queue to save the last speed_calc_cycles - # (time, count) information to calculate speed - self.last_count.append(UnsignedIntValue()) - self.last_old_count.append(UnsignedIntValue()) - self.last_old_time.append(FloatValue()) - self.lock.append(mp.Lock()) - self.start_time.append(FloatValue(val=time.time())) - if prepend is None: - # no prepend given - self.prepend.append('') - else: - try: - # assume list of prepend, (needs to be a sequence) - # except if prepend is an instance of string - # the assert will cause the except to be executed - assert not isinstance(prepend, str) - self.prepend.append(prepend[i]) - except: - # list fails -> assume single prepend for all - self.prepend.append(prepend) - - self.max_count = max_count # list of multiprocessing value type - self.count = count # list of multiprocessing value type - - self.interval = interval - self.verbose = verbose - - self.show_on_exit = False - self.add_args = {} - - self.info_line = info_line - - # setup loop class with func - super(Progress, self).__init__(func=Progress.show_stat_wrapper_multi, - args=(self.count, - self.last_count, - self.start_time, - self.max_count, - self.speed_calc_cycles, - self.width, - self.q, - self.last_old_count, - self.last_old_time, - self.prepend, - self.__class__.show_stat, - self.len, - self.add_args, - self.lock, - self.info_line), - interval=interval, - verbose=verbose, - sigint=sigint, - sigterm=sigterm, - name=name, - auto_kill_on_last_resort=True) - - def __exit__(self, *exc_args): - self.stop() - - - @staticmethod - def _calc(count, - last_count, - start_time, - max_count, - speed_calc_cycles, - q, - last_old_count, - last_old_time, - lock): - """ - do the pre calculations in order to get TET, speed, TTG - and call the actual display routine show_stat with these arguments - - NOTE: show_stat is purely abstract and need to be reimplemented to - achieve a specific progress display. - """ - count_value = count.value - start_time_value = start_time.value - current_time = time.time() - - if last_count.value != count_value: - # some progress happened - - with lock: - # save current state (count, time) to queue - - q.put((count_value, current_time)) - - # get older state from queue (or initial state) - # to to speed estimation - if q.qsize() > speed_calc_cycles: - old_count_value, old_time = q.get() - else: - old_count_value, old_time = 0, start_time_value - - last_count.value = count_value - last_old_count.value = old_count_value - last_old_time.value = old_time - else: - # progress has not changed since last call - # use also old (cached) data from the queue - old_count_value, old_time = last_old_count.value, last_old_time.value - - if (max_count is None): - max_count_value = None - else: - max_count_value = max_count.value - - tet = (current_time - start_time_value) - speed = (count_value - old_count_value) / (current_time - old_time) - if (speed == 0) or (max_count_value is None) or (max_count_value == 0): - ttg = None - else: - ttg = math.ceil((max_count_value - count_value) / speed) - - return count_value, max_count_value, speed, tet, ttg - - def _reset_all(self): - """ - reset all progress information - """ - for i in range(self.len): - self._reset_i(i) - - def _reset_i(self, i): - """ - reset i-th progress information - """ - self.count[i].value=0 - self.lock[i].acquire() - for x in range(self.q[i].qsize()): - self.q[i].get() - - self.lock[i].release() - self.start_time[i].value = time.time() - - def _show_stat(self): - """ - convenient functions to call the static show_stat_wrapper_multi with - the given class members - """ - Progress.show_stat_wrapper_multi(self.count, - self.last_count, - self.start_time, - self.max_count, - self.speed_calc_cycles, - self.width, - self.q, - self.last_old_count, - self.last_old_time, - self.prepend, - self.__class__.show_stat, - self.len, - self.add_args, - self.lock, - self.info_line, - no_move_up=True) - - def reset(self, i = None): - """ - convenient function to reset progress information - - i [None, int] - None: reset all, int: reset process indexed by i - """ -# super(Progress, self).stop() - if i is None: - self._reset_all() - else: - self._reset_i(i) -# super(Progress, self).start() - - @staticmethod - def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, **kwargs): - """ - re implement this function in a subclass - - count_value - current value of progress - - max_count_value - maximum value the progress can take - - prepend - some extra string to be put for example in front of the - progress display - - speed - estimated speed in counts per second (use for example humanize_speed - to get readable information in string format) - - tet - total elapsed time in seconds (use for example humanize_time - to get readable information in string format) - - ttg - time to go in seconds (use for example humanize_time - to get readable information in string format) - """ - raise NotImplementedError - - @staticmethod - def show_stat_wrapper(count, - last_count, - start_time, - max_count, - speed_calc_cycles, - width, - q, - last_old_count, - last_old_time, - prepend, - show_stat_function, - add_args, - i, - lock): - count_value, max_count_value, speed, tet, ttg, = Progress._calc(count, - last_count, - start_time, - max_count, - speed_calc_cycles, - q, - last_old_count, - last_old_time, - lock) - return show_stat_function(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **add_args) - - @staticmethod - def show_stat_wrapper_multi(count, - last_count, - start_time, - max_count, - speed_calc_cycles, - width, - q, - last_old_count, - last_old_time, - prepend, - show_stat_function, - len_, - add_args, - lock, - info_line, - no_move_up=False): - """ - call the static method show_stat_wrapper for each process - """ -# print(ESC_BOLD, end='') -# sys.stdout.flush() - for i in range(len_): - Progress.show_stat_wrapper(count[i], - last_count[i], - start_time[i], - max_count[i], - speed_calc_cycles, - width, - q[i], - last_old_count[i], - last_old_time[i], - prepend[i], - show_stat_function, - add_args, - i, - lock[i]) - n = len_ - if info_line is not None: - s = info_line.value.decode('utf-8') - s = s.split('\n') - n += len(s) - for si in s: - if width == 'auto': - width = get_terminal_width() - if len(si) > width: - si = si[:width] - print("{0:<{1}}".format(si, width)) - - if no_move_up: - n = 0 - - print(ESC_MOVE_LINE_UP(n) + ESC_NO_CHAR_ATTR, end='') - sys.stdout.flush() - - def start(self): - # before printing any output to stout, we can now check this - # variable to see if any other ProgressBar has reserved that - # terminal. - - if (self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES): - if not terminal_reserve(progress_obj=self, verbose=self.verbose, identifier=self._identifier): - if self.verbose > 1: - print("{}: tty already reserved, NOT starting the progress loop!".format(self._identifier)) - return - - super(Progress, self).start() - self.show_on_exit = True - - def stop(self, make_sure_its_down = False): - """ - trigger clean up by hand, needs to be done when not using - context management via 'with' statement - - - will terminate loop process - - show a last progress -> see the full 100% on exit - - releases terminal reservation - """ - self._auto_kill_on_last_resort = make_sure_its_down - - super(Progress, self).stop() - terminal_unreserve(progress_obj=self, verbose=self.verbose, identifier=self._identifier) - - if self.show_on_exit: - self._show_stat() - print() - self.show_on_exit = False - - -class ProgressBar(Progress): - """ - implements a progress bar similar to the one known from 'wget' or 'pv' - """ - def __init__(self, - count, - max_count=None, - width='auto', - prepend=None, - speed_calc_cycles=10, - interval=1, - verbose=0, - sigint='stop', - sigterm='stop', - name='progress_bar', - info_line=None): - """ - width [int/'auto'] - the number of characters used to show the Progress bar, - use 'auto' to determine width from terminal information -> see _set_width - """ - super(ProgressBar, self).__init__(count=count, - max_count=max_count, - prepend=prepend, - speed_calc_cycles=speed_calc_cycles, - width=width, - interval=interval, - verbose = verbose, - sigint=sigint, - sigterm=sigterm, - name=name, - info_line=info_line) - - self._PRE_PREPEND = ESC_NO_CHAR_ATTR + ESC_RED - self._POST_PREPEND = ESC_BOLD + ESC_GREEN - - @staticmethod - def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): - if max_count_value is None: - # only show current absolute progress as number and estimated speed - print("{}{}{}{} [{}] #{} ".format(ESC_NO_CHAR_ATTR + ESC_RED, - prepend, - ESC_BOLD + ESC_GREEN, - humanize_time(tet), humanize_speed(speed), count_value)) - else: - if width == 'auto': - width = get_terminal_width() - - # deduce relative progress and show as bar on screen - if ttg is None: - s3 = "] TTG --" - else: - s3 = "] TTG {}".format(humanize_time(ttg)) - - s1 = "{}{}{}{} [{}] [".format(ESC_NO_CHAR_ATTR + ESC_RED, - prepend, - ESC_BOLD + ESC_GREEN, - humanize_time(tet), - humanize_speed(speed)) - - l = len_string_without_ESC(s1+s3) - - if max_count_value != 0: - l2 = width - l - 1 - a = int(l2 * count_value / max_count_value) - b = l2 - a - s2 = "="*a + ">" + " "*b - else: - s2 = " "*(width - l) - - print(s1+s2+s3) - - -class ProgressBarCounter(Progress): - """ - records also the time of each reset and calculates the speed - of the resets. - - shows the TET since init (not effected by reset) - the speed of the resets (number of finished processed per time) - and the number of finished processes - - after that also show a progress of each process - max_count > 0 and not None -> bar - max_count == None -> absolute count statistic - max_count == 0 -> hide process statistic at all - """ - def __init__(self, - count, - max_count=None, - prepend=None, - speed_calc_cycles_bar=10, - speed_calc_cycles_counter=5, - width='auto', - interval=1, - verbose=0, - sigint='stop', - sigterm='stop', - name='progress_bar_counter', - info_line=None): - - super(ProgressBarCounter, self).__init__(count=count, - max_count=max_count, - prepend=prepend, - speed_calc_cycles=speed_calc_cycles_bar, - width=width, - interval=interval, - verbose = verbose, - sigint=sigint, - sigterm=sigterm, - name=name, - info_line=info_line) - - self.counter_count = [] - self.counter_q = [] - self.counter_speed = [] - for i in range(self.len): - self.counter_count.append(UnsignedIntValue(val=0)) - self.counter_q.append(myQueue()) - self.counter_speed.append(FloatValue()) - - self.counter_speed_calc_cycles = speed_calc_cycles_counter - self.init_time = time.time() - - self.add_args['counter_count'] = self.counter_count - self.add_args['counter_speed'] = self.counter_speed - self.add_args['init_time'] = self.init_time - - def get_counter_count(self, i=0): - return self.counter_count[i].value - - def _reset_i(self, i): - c = self.counter_count[i] - with c.get_lock(): - c.value += 1 - - count_value = c.value - q = self.counter_q[i] - - current_time = time.time() - q.put((count_value, current_time)) - - if q.qsize() > self.counter_speed_calc_cycles: - old_count_value, old_time = q.get() - else: - old_count_value, old_time = 0, self.init_time - - speed = (count_value - old_count_value) / (current_time - old_time) - - self.counter_speed[i].value = speed - - super(ProgressBarCounter, self)._reset_i(i) - - @staticmethod - def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): - counter_count = kwargs['counter_count'][i] - counter_speed = kwargs['counter_speed'][i] - counter_tet = time.time() - kwargs['init_time'] - - s_c = "{}{}{}{} [{}] #{}".format(ESC_NO_CHAR_ATTR + ESC_RED, - prepend, - ESC_BOLD + ESC_GREEN, - humanize_time(counter_tet), - humanize_speed(counter_speed.value), - counter_count.value) - - if width == 'auto': - width = get_terminal_width() - - if max_count_value != 0: - s_c += ' - ' - - if max_count_value is None: - s_c = "{}{} [{}] #{} ".format(s_c, humanize_time(tet), humanize_speed(speed), count_value) - else: - if ttg is None: - s3 = "] TTG --" - else: - s3 = "] TTG {}".format(humanize_time(ttg)) - - s1 = "{} [{}] [".format(humanize_time(tet), humanize_speed(speed)) - - l = len_string_without_ESC(s1 + s3 + s_c) - l2 = width - l - 1 - - a = int(l2 * count_value / max_count_value) - b = l2 - a - s2 = "="*a + ">" + " "*b - s_c = s_c+s1+s2+s3 - - print(s_c + ' '*(width - len_string_without_ESC(s_c))) - -class ProgressBarFancy(Progress): - """ - implements a progress bar where the color indicates the current status - similar to the bars known from 'htop' - """ - def __init__(self, - count, - max_count=None, - width='auto', - prepend=None, - speed_calc_cycles=10, - interval=1, - verbose=0, - sigint='stop', - sigterm='stop', - name='progress_bar', - info_line=None): - """ - width [int/'auto'] - the number of characters used to show the Progress bar, - use 'auto' to determine width from terminal information -> see _set_width - """ - if not self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES: - TERMINAL_PRINT_LOOP_CLASSES.append(self.__class__.__name__) - - super(ProgressBarFancy, self).__init__(count=count, - max_count=max_count, - prepend=prepend, - speed_calc_cycles=speed_calc_cycles, - width=width, - interval=interval, - verbose = verbose, - sigint=sigint, - sigterm=sigterm, - name=name, - info_line=info_line) - - @staticmethod - def get_d(s1, s2, width, lp, lps): - d = width-len(remove_ESC_SEQ_from_string(s1))-len(remove_ESC_SEQ_from_string(s2))-2-lp-lps - if d >= 0: - d1 = d // 2 - d2 = d - d1 - return s1, s2, d1, d2 - - @staticmethod - def full_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = "TET {} {:>12} TTG {}".format(tet, speed, ttg) - s2 = "ETA {} ORT {}".format(eta, ort) - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - - @staticmethod - def full_minor_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = "E {} {:>12} G {}".format(tet, speed, ttg) - s2 = "A {} O {}".format(eta, ort) - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - @staticmethod - def reduced_1_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = "E {} {:>12} G {}".format(tet, speed, ttg) - s2 = "O {}".format(ort) - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - @staticmethod - def reduced_2_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = "E {} G {}".format(tet, ttg) - s2 = "O {}".format(ort) - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - @staticmethod - def reduced_3_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = "E {} G {}".format(tet, ttg) - s2 = '' - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - @staticmethod - def reduced_4_stat(p, tet, speed, ttg, eta, ort, repl_ch, width, lp, lps): - s1 = '' - s2 = '' - return ProgressBarFancy.get_d(s1, s2, width, lp, lps) - - @staticmethod - def kw_bold(s, ch_after): - kws = ['TET', 'TTG', 'ETA', 'ORT', 'E', 'G', 'A', 'O'] - for kw in kws: - for c in ch_after: - s = s.replace(kw + c, ESC_BOLD + kw + ESC_RESET_BOLD + c) - - return s - - @staticmethod - def _stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): - if max_count_value is None: - # only show current absolute progress as number and estimated speed - stat = "{}{} [{}] #{} ".format(prepend, humanize_time(tet), humanize_speed(speed), count_value) - else: - if width == 'auto': - width = get_terminal_width() - # deduce relative progress - try: - p = count_value / max_count_value - except ZeroDivisionError: - p = 1 - if p < 1: - ps = " {:.1%} ".format(p) - else: - ps = " {:.0%} ".format(p) - - if ttg is None: - eta = '--' - ort = None - else: - eta = datetime.datetime.fromtimestamp(time.time() + ttg).strftime("%Y%m%d_%H:%M:%S") - ort = tet + ttg - - tet = humanize_time(tet) - speed = '['+humanize_speed(speed)+']' - ttg = humanize_time(ttg) - ort = humanize_time(ort) - repl_ch = '-' - lp = len(prepend) - - args = p, tet, speed, ttg, eta, ort, repl_ch, width, lp, len(ps) - - res = ProgressBarFancy.full_stat(*args) - if res is None: - res = ProgressBarFancy.full_minor_stat(*args) - if res is None: - res = ProgressBarFancy.reduced_1_stat(*args) - if res is None: - res = ProgressBarFancy.reduced_2_stat(*args) - if res is None: - res = ProgressBarFancy.reduced_3_stat(*args) - if res is None: - res = ProgressBarFancy.reduced_4_stat(*args) - - if res is not None: - s1, s2, d1, d2 = res - s = s1 + ' '*d1 + ps + ' '*d2 + s2 - - s_before = s[:math.ceil(width*p)].replace(' ', repl_ch) - if (len(s_before) > 0) and (s_before[-1] == repl_ch): - s_before = s_before[:-1] + '>' - s_after = s[math.ceil(width*p):] - - s_before = ProgressBarFancy.kw_bold(s_before, ch_after=[repl_ch, '>']) - s_after = ProgressBarFancy.kw_bold(s_after, ch_after=[' ']) - stat = prepend + ESC_BOLD + '[' + ESC_RESET_BOLD + ESC_LIGHT_GREEN + s_before + ESC_DEFAULT + s_after + ESC_BOLD + ']' + ESC_NO_CHAR_ATTR - else: - ps = ps.strip() - if p == 1: - ps = ' '+ps - stat = prepend + ps - - return stat - - @staticmethod - def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): - stat = ProgressBarFancy._stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs) - print(stat) - -class ProgressBarCounterFancy(ProgressBarCounter): - @staticmethod - def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): - counter_count = kwargs['counter_count'][i] - counter_speed = kwargs['counter_speed'][i] - counter_tet = time.time() - kwargs['init_time'] - - s_c = "{}{}{}{} {:>12} #{}".format(ESC_RED, - prepend, - ESC_NO_CHAR_ATTR, - humanize_time(counter_tet), - '['+humanize_speed(counter_speed.value)+']', - counter_count.value) - - if width == 'auto': - width = get_terminal_width() - - if max_count_value != 0: - s_c += ' ' - - if max_count_value is None: - s_c = "{}{} {:>12} #{} ".format(s_c, humanize_time(tet), '['+humanize_speed(speed)+']', count_value) - else: - _width = width - len_string_without_ESC(s_c) - s_c += ProgressBarFancy._stat(count_value, max_count_value, '', speed, tet, ttg, _width, i) - - print(s_c + ' '*(width - len_string_without_ESC(s_c))) - - -class ProgressSilentDummy(Progress): - def __init__(self, **kwargs): - raise DeprecationWarning("do not use this dummy class, if you want a silent progress, simply do not trigger 'start'!") - pass - - def __exit__(self, *exc_args): - pass - - def start(self): - pass - - def _reset_i(self, i): - pass - - def reset(self, i): - pass - - def _reset_all(self): - pass - - def stop(self): - pass - - def pause(self): - pass - - def resume(self): - pass - - -class SIG_handler_Loop(object): - """class to setup signal handling for the Loop class - - Note: each subprocess receives the default signal handling from it's parent. - If the signal function from the module signal is evoked within the subprocess - this default behavior can be overwritten. - - The init function receives a shared memory boolean object which will be set - false in case of signal detection. Since the Loop class will check the state - of this boolean object before each repetition, the loop will stop when - a signal was receives. - """ - def __init__(self, shared_mem_run, sigint, sigterm, identifier, verbose=0): - self.shared_mem_run = shared_mem_run - self.set_signal(signal.SIGINT, sigint) - self.set_signal(signal.SIGTERM, sigterm) - self.verbose=verbose - self.identifier = identifier - if self.verbose > 1: - print("{}: setup signal handler for loop (SIGINT:{}, SIGTERM:{})".format(self.identifier, sigint, sigterm)) - - def set_signal(self, sig, handler_str): - if handler_str == 'ign': - signal.signal(sig, self._ignore_signal) - elif handler_str == 'stop': - signal.signal(sig, self._stop_on_signal) - else: - raise TypeError("unknown signal hander string '{}'".format(handler_str)) - - def _ignore_signal(self, signal, frame): - pass - - def _stop_on_signal(self, signal, frame): - if self.verbose > 1: - print("{}: received sig {} -> set run false".format(self.identifier, signal_dict[signal])) - self.shared_mem_run.value = False - - -# class ProgressCounter(Progress): -# """ -# simple Progress counter, not using the max_count information -# """ -# def __init__(self, -# count, -# max_count=None, -# prepend=None, -# speed_calc_cycles=10, -# width='auto', -# interval=1, -# verbose=0, -# sigint='stop', -# sigterm='stop', -# name='progress_counter'): -# -# super(ProgressCounter, self).__init__(count=count, -# max_count=max_count, -# prepend=prepend, -# speed_calc_cycles=speed_calc_cycles, -# width=width, -# interval=interval, -# verbose = verbose, -# sigint=sigint, -# sigterm=sigterm, -# name=name) -# -# @staticmethod -# def show_stat(count_value, max_count_value, prepend, speed, tet, ttg, width, i, **kwargs): -# if max_count_value is not None: -# max_count_str = "/{}".format(max_count_value) -# else: -# max_count_value = count_value + 1 -# max_count_str = "" -# -# s = "{}{} [{}{}] ({})".format(prepend, humanize_time(tet), count_value, max_count_str, humanize_speed(speed)) -# print(s) - - -def ESC_MOVE_LINE_UP(n): - return "\033[{}A".format(n) - - -def ESC_MOVE_LINE_DOWN(n): - return "\033[{}B".format(n) - - -def FloatValue(val=0.): - return mp.Value('d', val, lock=True) - - -def UnsignedIntValue(val=0): - return mp.Value('I', val, lock=True) - - -def StringValue(num_of_bytes): - return mp.Array('c', _jm_compatible_bytearray(num_of_bytes), lock=True) - - -def check_process_termination(proc, identifier, timeout, verbose=0, auto_kill_on_last_resort = False): - proc.join(timeout) - if not proc.is_alive(): - if verbose > 1: - print("{}: loop termination within given timeout of {}s SUCCEEDED!".format(identifier, timeout)) - return True - - # process still runs -> send SIGTERM -> see what happens - if verbose > 0: - print("{}: loop termination within given timeout of {}s FAILED!".format(identifier, timeout)) - - proc.terminate() - - new_timeout = 3*timeout - proc.join(new_timeout) - if not proc.is_alive(): - if verbose > 0: - print("{}: loop termination via SIGTERM with timeout of {}s SUCCEEDED!".format(identifier, new_timeout)) - return True - - if verbose > 0: - print("{}: loop termination via SIGTERM with timeout of {}s FAILED!".format(identifier, new_timeout)) - - answer = 'y' if auto_kill_on_last_resort else '_' - while True: - if answer == 'y': - print("{}: send SIGKILL to".format(identifier)) - os.kill(proc.pid, signal.SIGKILL) - time.sleep(0.1) - - if not proc.is_alive(): - print("{}: has stopped running!".format(identifier)) - return True - else: - print("{}: still running!".format(identifier)) - - answer = '_' - while not answer in 'yn': - print("Do you want to send SIGKILL to '{}'? [y/n]: ".format(identifier), end='') - sys.stdout.flush() - answer = sys.stdin.readline()[:-1] - - - if answer == 'n': - while not answer in 'yn': - print("Do you want let the process '{}' running? [y/n]: ".format(identifier), end='') - sys.stdout.flush() - answer = sys.stdin.readline()[:-1] - if answer == 'y': - print("{}: keeps running".format(identifier)) - return False - - -def get_identifier(name=None, pid=None, bold=True): - if pid == None: - pid = os.getpid() - - if bold: - esc_bold = ESC_BOLD - esc_no_char_attr = ESC_NO_CHAR_ATTR - else: - esc_bold = "" - esc_no_char_attr = "" - - if name == None: - return "{}PID {}{}".format(esc_bold, pid, esc_no_char_attr) - else: - return "{}{} ({}){}".format(esc_bold, name, pid, esc_no_char_attr) - - -def get_terminal_size(defaultw=80): - """ Checks various methods to determine the terminal size - - - Methods: - - shutil.get_terminal_size (only Python3) - - fcntl.ioctl - - subprocess.check_output - - os.environ - - Parameters - ---------- - defaultw : int - Default width of terminal. - - - Returns - ------- - width, height : int - Width and height of the terminal. If one of them could not be - found, None is return in its place. - - """ - if hasattr(shutil_get_terminal_size, "__call__"): - return shutil_get_terminal_size() - else: - try: - import fcntl, termios, struct - fd = 0 - hw = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, - '1234')) - return (hw[1], hw[0]) - except: - try: - out = sp.check_output(["tput", "cols"]) - width = int(out.decode("utf-8").strip()) - return (width, None) - except: - try: - hw = (os.environ['LINES'], os.environ['COLUMNS']) - return (hw[1], hw[0]) - except: - return (defaultw, None) - - -def get_terminal_width(default=80, name=None, verbose=0): - id = get_identifier(name=name) - try: - width = get_terminal_size(defaultw=default)[0] - except: - width = default - if verbose > 1: - print("{}: use terminal width {}".format(id, width)) - return width - - -def humanize_speed(c_per_sec): - """convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero. - """ - scales = [60, 60, 24] - units = ['c/s', 'c/min', 'c/h', 'c/d'] - speed = c_per_sec - i = 0 - if speed > 0: - while (speed < 1) and (i < len(scales)): - speed *= scales[i] - i += 1 - - return "{:.1f}{}".format(speed, units[i]) - - -def humanize_time(secs): - """convert second in to hh:mm:ss format - """ - if secs is None: - return '--' - - mins, secs = divmod(secs, 60) - hours, mins = divmod(mins, 60) - return '{:02d}:{:02d}:{:02d}'.format(int(hours), int(mins), int(secs)) - - - -def len_string_without_ESC(s): - return len(remove_ESC_SEQ_from_string(s)) - - -def printQueue(q, lock=None): - if lock is not None: - lock.acquire() - - res = [] - for i in range(q.qsize()): - item = q.get() - res.append(copy.deepcopy(item[0])) - q.put(item) - - if lock is not None: - lock.release() - - print(res) - - -def remove_ESC_SEQ_from_string(s): - for esc_seq in ESC_SEQ_SET: - s = s.replace(esc_seq, '') - return s - - -def terminal_reserve(progress_obj, terminal_obj=None, verbose=0, identifier=None): - """ Registers the terminal (stdout) for printing. - - Useful to prevent multiple processes from writing progress bars - to stdout. - - One process (server) prints to stdout and a couple of subprocesses - do not print to the same stdout, because the server has reserved it. - Of course, the clients have to be nice and check with - terminal_reserve first if they should (not) print. - Nothing is locked. - - Returns - ------- - True if reservation was successful (or if we have already reserved this tty), - False if there already is a reservation from another instance. - """ - if terminal_obj is None: - terminal_obj = sys.stdout - - if identifier is None: - identifier = '' - else: - identifier = identifier + ': ' - - if terminal_obj in TERMINAL_RESERVATION: # terminal was already registered - if verbose > 1: - print("{}this terminal {} has already been added to reservation list".format(identifier, terminal_obj)) - - if TERMINAL_RESERVATION[terminal_obj] is progress_obj: - if verbose > 1: - print("{}we {} have already reserved this terminal {}".format(identifier, progress_obj, terminal_obj)) - return True - else: - if verbose > 1: - print("{}someone else {} has already reserved this terminal {}".format(identifier, TERMINAL_RESERVATION[terminal_obj], terminal_obj)) - return False - else: # terminal not yet registered - if verbose > 1: - print("{}terminal {} was reserved for us {}".format(identifier, terminal_obj, progress_obj)) - TERMINAL_RESERVATION[terminal_obj] = progress_obj - return True - - -def terminal_unreserve(progress_obj, terminal_obj=None, verbose=0, identifier=None): - """ Unregisters the terminal (stdout) for printing. - - an instance (progress_obj) can only unreserve the tty (terminal_obj) when it also reserved it - - see terminal_reserved for more information - - Returns - ------- - None - """ - - if terminal_obj is None: - terminal_obj =sys.stdout - - if identifier is None: - identifier = '' - else: - identifier = identifier + ': ' - - po = TERMINAL_RESERVATION.get(terminal_obj) - if po is None: - if verbose > 1: - print("{}terminal {} was not reserved, nothing happens".format(identifier, terminal_obj)) - else: - if po is progress_obj: - if verbose > 1: - print("{}terminal {} now unreserned".format(identifier, terminal_obj)) - del TERMINAL_RESERVATION[terminal_obj] - else: - if verbose > 1: - print("{}you {} can NOT unreserve terminal {} be cause it was reserved by {}".format(identifier, progress_obj, terminal_obj, po)) - - -myQueue = mp.Queue - -# a mapping from the numeric values of the signals to their names used in the -# standard python module signals -signal_dict = {} -for s in dir(signal): - if s.startswith('SIG') and s[3] != '_': - n = getattr(signal, s) - if n in signal_dict: - signal_dict[n] += ('/'+s) - else: - signal_dict[n] = s - -ESC_NO_CHAR_ATTR = "\033[0m" - -ESC_BOLD = "\033[1m" -ESC_DIM = "\033[2m" -ESC_UNDERLINED = "\033[4m" -ESC_BLINK = "\033[5m" -ESC_INVERTED = "\033[7m" -ESC_HIDDEN = "\033[8m" - -# not widely supported, use '22' instead -# ESC_RESET_BOLD = "\033[21m" - -ESC_RESET_DIM = "\033[22m" -ESC_RESET_BOLD = ESC_RESET_DIM - -ESC_RESET_UNDERLINED = "\033[24m" -ESC_RESET_BLINK = "\033[25m" -ESC_RESET_INVERTED = "\033[27m" -ESC_RESET_HIDDEN = "\033[28m" - -ESC_DEFAULT = "\033[39m" -ESC_BLACK = "\033[30m" -ESC_RED = "\033[31m" -ESC_GREEN = "\033[32m" -ESC_YELLOW = "\033[33m" -ESC_BLUE = "\033[34m" -ESC_MAGENTA = "\033[35m" -ESC_CYAN = "\033[36m" -ESC_LIGHT_GREY = "\033[37m" -ESC_DARK_GREY = "\033[90m" -ESC_LIGHT_RED = "\033[91m" -ESC_LIGHT_GREEN = "\033[92m" -ESC_LIGHT_YELLOW = "\033[93m" -ESC_LIGHT_BLUE = "\033[94m" -ESC_LIGHT_MAGENTA = "\033[95m" -ESC_LIGHT_CYAN = "\033[96m" -ESC_WHITE = "\033[97m" - -ESC_SEQ_SET = [ESC_NO_CHAR_ATTR, - ESC_BOLD, - ESC_DIM, - ESC_UNDERLINED, - ESC_BLINK, - ESC_INVERTED, - ESC_HIDDEN, - ESC_RESET_BOLD, - ESC_RESET_DIM, - ESC_RESET_UNDERLINED, - ESC_RESET_BLINK, - ESC_RESET_INVERTED, - ESC_RESET_HIDDEN, - ESC_DEFAULT, - ESC_BLACK, - ESC_RED, - ESC_GREEN, - ESC_YELLOW, - ESC_BLUE, - ESC_MAGENTA, - ESC_CYAN, - ESC_LIGHT_GREY, - ESC_DARK_GREY, - ESC_LIGHT_RED, - ESC_LIGHT_GREEN, - ESC_LIGHT_YELLOW, - ESC_LIGHT_BLUE, - ESC_LIGHT_MAGENTA, - ESC_LIGHT_CYAN, - ESC_WHITE] - -# terminal reservation list, see terminal_reserve -TERMINAL_RESERVATION = {} -# these are classes that print progress bars, see terminal_reserve -TERMINAL_PRINT_LOOP_CLASSES = ["ProgressBar", "ProgressBarCounter"] diff --git a/tests/test_jobmanager.py b/tests/test_jobmanager.py index cb42f94..724341f 100644 --- a/tests/test_jobmanager.py +++ b/tests/test_jobmanager.py @@ -166,12 +166,12 @@ def test_jobmanager_basic(): global PORT PORT += 1 n = 10 - p_server = mp.Process(target=start_server, args=(n,)) + p_server = mp.Process(target=start_server, args=(n,False,2)) p_server.start() time.sleep(1) - p_client = mp.Process(target=start_client) + p_client = mp.Process(target=start_client, args=(2,)) p_client.start() p_client.join(30) @@ -911,7 +911,7 @@ if __name__ == "__main__": test_jobmanager_basic, # test_jobmanager_server_signals, # test_shutdown_server_while_client_running, - test_shutdown_client, +# test_shutdown_client, # test_check_fail, # test_jobmanager_read_old_stat, # test_client_status, diff --git a/tests/test_progress.py b/tests/test_progress.py deleted file mode 100644 index 2b9904a..0000000 --- a/tests/test_progress.py +++ /dev/null @@ -1,815 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import division, print_function - -import multiprocessing as mp -import numpy as np -import os -import warnings - -try: - import psutil -except ImportError: - warnings.warn("can not import 'psutil' -> some tests will not work") - -import signal -import sys -import time -import traceback - -from os.path import abspath, dirname, split - -# Add parent directory to beginning of path variable -sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path - -from jobmanager import progress - -def _safe_assert_not_loop_is_alive(loop): - try: - assert not loop.is_alive() - except AssertionError: - os.kill(loop.getpid(), signal.SIGKILL) - raise - - -def test_loop_basic(): - """ - run function f in loop - - check if it is alive after calling start() - check if it is NOT alive after calling stop() - """ - f = lambda: print(" I'm process {}".format(os.getpid())) - loop = progress.Loop(func=f, interval=0.8, verbose=2) - loop.start() - - time.sleep(1) - - assert loop.is_alive() - print("[+] loop started") - - time.sleep(1) - - loop.stop() - - _safe_assert_not_loop_is_alive(loop) - print("[+] loop stopped") - -def test_loop_signals(): - f = lambda: print(" I'm process {}".format(os.getpid())) - loop = progress.Loop(func=f, interval=0.8, verbose=2, sigint='stop', sigterm='stop') - - print("## stop on SIGINT ##") - loop.start() - time.sleep(1) - loop.is_alive() - - pid = loop.getpid() - print(" send SIGINT") - os.kill(pid, signal.SIGINT) - time.sleep(1) - _safe_assert_not_loop_is_alive(loop) - print("[+] loop stopped running") - - print("## stop on SIGTERM ##") - loop.start() - time.sleep(1) - pid = loop.getpid() - print(" send SIGTERM") - os.kill(pid, signal.SIGTERM) - time.sleep(1) - _safe_assert_not_loop_is_alive(loop) - print("[+] loop stopped running") - - print("## ignore SIGINT ##") - loop = progress.Loop(func=f, interval=0.8, verbose=0, sigint='ign', sigterm='ign') - - loop.start() - time.sleep(1) - pid = loop.getpid() - os.kill(pid, signal.SIGINT) - print(" send SIGINT") - time.sleep(1) - assert loop.is_alive() - print("[+] loop still running") - print(" send SIGKILL") - os.kill(pid, signal.SIGKILL) - time.sleep(1) - assert not loop.is_alive() - print("[+] loop stopped running") - - print("## ignore SIGTERM ##") - loop.start() - time.sleep(1) - pid = loop.getpid() - print(" send SIGTERM") - os.kill(pid, signal.SIGTERM) - time.sleep(1) - assert loop.is_alive() - print("[+] loop still running") - print(" send SIGKILL") - os.kill(pid, signal.SIGKILL) - time.sleep(1) - assert not loop.is_alive() - print("[+] loop stopped running") - - -def non_stopping_function(): - print(" I'm pid", os.getpid()) - print(" I'm NOT going to stop") - - while True: # sleep will be interrupted by signal - time.sleep(1) # while True just calls sleep again - # only SIGKILL helps - - -def normal_function(): - print(" I'm pid", os.getpid()) - -def long_sleep_function(): - print(" I'm pid", os.getpid()) - print(" I will sleep for seven years") - time.sleep(60*60*12*356*7) - -def test_loop_normal_stop(): - with progress.Loop(func = normal_function, - verbose = 2, - name = 'loop') as loop: - loop.start() - time.sleep(0.1) - assert loop.is_alive() - print("[+] normal loop running") - - _safe_assert_not_loop_is_alive(loop) - print("[+] normal loop stopped") - -def test_loop_need_sigterm_to_stop(): - with progress.Loop(func = long_sleep_function, - verbose = 2, - name = 'loop') as loop: - loop.start() - time.sleep(0.1) - assert loop.is_alive() - print("[+] sleepy loop running") - - _safe_assert_not_loop_is_alive(loop) - print("[+] sleepy loop stopped") - -def test_loop_need_sigkill_to_stop(): - with progress.Loop(func = non_stopping_function, - verbose = 2, - name = 'loop', - auto_kill_on_last_resort = True) as loop: - loop.start() - time.sleep(0.1) - assert loop.is_alive() - print("[+] NON stopping loop running") - - _safe_assert_not_loop_is_alive(loop) - print("[+] NON stopping loop stopped") - -def test_why_with_statement(): - """ - here we demonstrate why you should use the with statement - """ - class ErrorLoop(progress.Loop): - def raise_error(self): - raise RuntimeError("on purpose error") - v=2 - - def t(shared_mem_pid): - l = ErrorLoop(func=normal_function, verbose=v) - l.start() - time.sleep(0.2) - shared_mem_pid.value = l.getpid() - l.raise_error() - l.stop() - - def t_with(shared_mem_pid): - with ErrorLoop(func=normal_function, verbose=v) as l: - l.start() - time.sleep(0.2) - shared_mem_pid.value = l.getpid() - l.raise_error() - l.stop() - - print("## start without with statement ...") - - # the pid of the loop process, which is spawned inside 't' - subproc_pid = progress.UnsignedIntValue() - - p = mp.Process(target=t, args=(subproc_pid, )) - p.start() - time.sleep(0.3) - print("## now an exception gets raised ... but you don't see it!") - time.sleep(3) - print("## ... and the loop is still running so we have to kill the process") - - p.terminate() - p.join(1) - - try: - assert not p.is_alive() - print("## ... done!") - p_sub = psutil.Process(subproc_pid.value) - if p_sub.is_running(): - print("## terminate loop process from extern ...") - p_sub.terminate() - - p_sub.wait(1) - assert not p_sub.is_running() - print("## process with PID {} terminated!".format(subproc_pid.value)) - except: - pass - else: - if p_sub.is_running(): - os.kill(subproc_pid.value, signal.SIGKILL) - finally: - if p.is_alive(): - os.kill(p.pid, signal.SIGKILL) - - - time.sleep(3) - - print("\n##\n## now to the same with the with statement ...") - p = mp.Process(target=t_with, args=(subproc_pid, )) - p.start() - - time.sleep(3) - print("## no special care must be taken ... cool eh!") - - print("## ALL DONE! (there is no control when the exception from the loop get printed)") - - p.join(1) - - try: - assert not p.is_alive() - finally: - if p.is_alive(): - # kill loop - p_sub = psutil.Process(subproc_pid.value) - if p_sub.is_running(): - os.kill(subproc_pid.value, signal.SIGKILL) - # kill sub process - os.kill(p.pid, signal.SIGKILL) - - - -def test_progress_bar(): - """ - deprecated, due to missing with - """ - count = progress.UnsignedIntValue() - max_count = progress.UnsignedIntValue(100) - sb = progress.ProgressBar(count, max_count, verbose=2) - assert not sb.is_alive() - - sb.start() - time.sleep(2) - assert sb.is_alive() - pid = sb.getpid() - - # call start on already running PB - sb.start() - time.sleep(2) - assert pid == sb.getpid() - - sb.stop() - _safe_assert_not_loop_is_alive(sb) - - time.sleep(2) - # call stop on not running PB - sb.stop() - time.sleep(2) - -def test_progress_bar_with_statement(): - print("TERMINAL_RESERVATION", progress.TERMINAL_RESERVATION) - count = progress.UnsignedIntValue() - max_count = progress.UnsignedIntValue(100) - with progress.ProgressBar(count, max_count, verbose=2) as sb: - assert not sb.is_alive() - - sb.start() - time.sleep(0.2) - assert sb.is_alive() - pid = sb.getpid() - - # call start on already running PB - sb.start() - time.sleep(0.2) - assert pid == sb.getpid() - - _safe_assert_not_loop_is_alive(sb) - - time.sleep(0.2) - sb.stop() - -def test_progress_bar_multi(): - print("TERMINAL_RESERVATION", progress.TERMINAL_RESERVATION) - n = 4 - max_count_value = 100 - - count = [] - max_count = [] - prepend = [] - for i in range(n): - count.append(progress.UnsignedIntValue(0)) - max_count.append(progress.UnsignedIntValue(max_count_value)) - prepend.append('_{}_: '.format(i)) - - with progress.ProgressBar(count=count, - max_count=max_count, - interval=0.2, - speed_calc_cycles=10, - width='auto', - verbose=2, - sigint='stop', - sigterm='stop', - name='sb multi', - prepend=prepend) as sbm: - - sbm.start() - - for x in range(500): - i = np.random.randint(low=0, high=n) - with count[i].get_lock(): - count[i].value += 1 - - if count[i].value > 100: - sbm.reset(i) - - time.sleep(0.02) - - -def test_status_counter(): - c = progress.UnsignedIntValue(val=0) - m = None - - with progress.ProgressBar(count=c, - max_count=m, - interval=0.2, - speed_calc_cycles=100, - verbose=2, - sigint='ign', - sigterm='ign', - name='sc', - prepend='') as sc: - - sc.start() - while True: - with c.get_lock(): - c.value += 1 - - if c.value == 100: - break - - time.sleep(0.01) - -def test_status_counter_multi(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - c = [c1, c2] - prepend = ['c1: ', 'c2: '] - with progress.ProgressBar(count=c, prepend=prepend, verbose=2) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - - if c[0].value == 100: - break - - time.sleep(0.01) - -def test_intermediate_prints_while_running_progess_bar(): - c = progress.UnsignedIntValue(val=0) - try: - with progress.ProgressBar(count=c, verbose=2, interval=0.3) as sc: - sc.start() - while True: - with c.get_lock(): - c.value += 1 - - if c.value == 100: - sc.stop() - print("intermediate message") - sc.start() - - if c.value == 400: - break - - time.sleep(0.01) - except: - print("IN EXCEPTION TEST") - traceback.print_exc() - - -def test_intermediate_prints_while_running_progess_bar_multi(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - c = [c1,c2] - with progress.ProgressBar(count=c, verbose=2, interval=0.3) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - - if c[0].value == 100: - sc.stop() - print("intermediate message") - sc.start() - - if c[0].value == 400: - break - - time.sleep(0.01) - -def test_progress_bar_counter(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - maxc = 30 - m1 = progress.UnsignedIntValue(val=maxc) - m2 = progress.UnsignedIntValue(val=maxc) - - c = [c1, c2] - m = [m1, m2] - - t0 = time.time() - - pp = ['a ', 'b '] - - with progress.ProgressBarCounter(count=c, max_count=m, verbose=1, interval=0.2, prepend = pp) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_progress_bar_counter_non_max(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - c = [c1, c2] - maxc = 30 - t0 = time.time() - - with progress.ProgressBarCounter(count=c, verbose=1, interval=0.2) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_progress_bar_counter_hide_bar(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - m1 = progress.UnsignedIntValue(val=0) - - c = [c1, c2] - m = [m1, m1] - maxc = 30 - t0 = time.time() - - with progress.ProgressBarCounter(count=c, max_count=m, verbose=1, interval=0.2) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_progress_bar_slow_change(): - max_count_value = 100 - - count = progress.UnsignedIntValue(0) - max_count = progress.UnsignedIntValue(max_count_value) - - with progress.ProgressBar(count=count, - max_count=max_count, - interval=0.1, - speed_calc_cycles=5) as sbm: - - sbm.start() - - for i in range(max_count_value): - time.sleep(1) - count.value = i - -def test_progress_bar_start_stop(): - max_count_value = 20 - - count = progress.UnsignedIntValue(0) - max_count = progress.UnsignedIntValue(max_count_value) - - with progress.ProgressBar(count=count, - max_count=max_count, - interval=0.5, - speed_calc_cycles=5, - verbose=1) as sbm: - - sbm.start() - - for i in range(max_count_value): - time.sleep(0.1) - count.value = i+1 - if i == 10: - sbm.stop(make_sure_its_down = True) - print("this will not overwrite the progressbar, because we stopped it explicitly") - sbm.start() - print("this WILL overwrite the progressbar, because we are still inside it's context (still running)") - - print() - print("create a progress bar, but do not start") - with progress.ProgressBar(count=count, - max_count=max_count, - interval=0.5, - speed_calc_cycles=5, - verbose=1) as sbm: - pass - print("this is after progress.__exit__, there should be no prints from the progress") - -def test_progress_bar_fancy(): - count = progress.UnsignedIntValue() - max_count = progress.UnsignedIntValue(100) - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width='auto') as sb: - sb.start() - for i in range(100): - count.value = i+1 - time.sleep(0.3) - -def test_progress_bar_multi_fancy(): - n = 4 - max_count_value = 100 - - count = [] - max_count = [] - prepend = [] - for i in range(n): - count.append(progress.UnsignedIntValue(0)) - max_count.append(progress.UnsignedIntValue(max_count_value)) - prepend.append('_{}_:'.format(i)) - - with progress.ProgressBarFancy(count=count, - max_count=max_count, - interval=0.2, - speed_calc_cycles=10, - width='auto', - verbose=2, - sigint='stop', - sigterm='stop', - name='sb multi', - prepend=prepend) as sbm: - - sbm.start() - - for x in range(500): - i = np.random.randint(low=0, high=n) - with count[i].get_lock(): - count[i].value += 1 - - if count[i].value > 100: - sbm.reset(i) - - time.sleep(0.02) - -def test_progress_bar_fancy_small(): - count = progress.UnsignedIntValue() - m = 15 - max_count = progress.UnsignedIntValue(m) - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width='auto') as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=80) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=70) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=60) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=50) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=40) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=30) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=20) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=10) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - - with progress.ProgressBarFancy(count, max_count, verbose=1, interval=0.2, width=5) as sb: - sb.start() - for i in range(m): - count.value = i+1 - time.sleep(0.1) - -def test_progress_bar_counter_fancy(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - maxc = 30 - m1 = progress.UnsignedIntValue(val=maxc) - m2 = progress.UnsignedIntValue(val=maxc) - - c = [c1, c2] - m = [m1, m2] - - t0 = time.time() - - pp = ['a ', 'b '] - - with progress.ProgressBarCounterFancy(count=c, max_count=m, verbose=1, interval=0.2, prepend = pp) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_progress_bar_counter_fancy_non_max(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - c = [c1, c2] - maxc = 30 - t0 = time.time() - - with progress.ProgressBarCounterFancy(count=c, verbose=1, interval=0.2) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_progress_bar_counter_fancy_hide_bar(): - c1 = progress.UnsignedIntValue(val=0) - c2 = progress.UnsignedIntValue(val=0) - - m1 = progress.UnsignedIntValue(val=0) - - c = [c1, c2] - m = [m1, m1] - maxc = 30 - t0 = time.time() - - with progress.ProgressBarCounterFancy(count=c, max_count=m, verbose=1, interval=0.2) as sc: - sc.start() - while True: - i = np.random.randint(0,2) - with c[i].get_lock(): - c[i].value += 1 - if c[i].value > maxc: - sc.reset(i) - - time.sleep(0.0432) - if (time.time() - t0) > 15: - break - -def test_info_line(): - c1 = progress.UnsignedIntValue(val=0) - s = progress.StringValue(80) - m1 = progress.UnsignedIntValue(val=30) - - with progress.ProgressBarFancy(count=c1, max_count=m1, verbose=1, interval=0.2, info_line=s) as sc: - sc.start() - while True: - c1.value = c1.value + 1 - if c1.value > 10: - s.value = b'info_line\nline2' - time.sleep(0.1) - if c1.value >= m1.value: - break - -def test_change_prepend(): - c1 = progress.UnsignedIntValue(val=0) - m1 = progress.UnsignedIntValue(val=30) - with progress.ProgressBarFancy(count=c1, max_count=m1, verbose=1, interval=0.2) as sc: - sc.start() - while True: - c1.value = c1.value + 1 - sc.prepend = [str(c1.value)] - time.sleep(0.1) - if c1.value >= m1.value: - break - -def test_stop_progress_with_large_interval(): - c1 = progress.UnsignedIntValue(val=0) - m1 = progress.UnsignedIntValue(val=10) - with progress.ProgressBarFancy(count=c1, max_count=m1, verbose=1, interval=3) as sc: - sc.start() - while True: - c1.value = c1.value + 1 - time.sleep(0.1) - if c1.value >= m1.value: - break - print("done inner loop") - print("done progress") - - -if __name__ == "__main__": - func = [ -# test_loop_basic, -# test_loop_signals, -# test_loop_normal_stop, -# test_loop_need_sigterm_to_stop, -# test_loop_need_sigkill_to_stop, -# test_why_with_statement, -# test_progress_bar, -# test_progress_bar_with_statement, -# test_progress_bar_multi, -# test_status_counter, -# test_status_counter_multi, -# test_intermediate_prints_while_running_progess_bar, -# test_intermediate_prints_while_running_progess_bar_multi, -# test_progress_bar_counter, -# test_progress_bar_counter_non_max, -# test_progress_bar_counter_hide_bar, -# test_progress_bar_slow_change, -# test_progress_bar_start_stop, -# test_progress_bar_fancy, -# test_progress_bar_multi_fancy, -# test_progress_bar_fancy_small, -# test_progress_bar_counter_fancy, -# test_progress_bar_counter_fancy_non_max, -# test_progress_bar_counter_fancy_hide_bar, -# test_info_line, -# test_change_prepend, - test_stop_progress_with_large_interval, - lambda: print("END") - ] - - for f in func: - print() - print('#'*80) - print('## {}'.format(f.__name__)) - print() - f() - time.sleep(1) -