Merge pull request #15 from cimatosa/reserve-tty

Reserve tty
This commit is contained in:
Paul Müller 2015-02-12 16:56:14 +01:00
commit 8283a9ade0
3 changed files with 468 additions and 362 deletions

View file

@ -153,6 +153,8 @@ class ProgressBar(object):
pb.start()
return self.func(**callargs)
class ProgressBarExtended(ProgressBar):
"""
extends the ProgressBar such that
@ -219,6 +221,7 @@ class ProgressBarExtended(ProgressBar):
return self.func(**callargs)
class ProgressBarOverrideCount(ProgressBar):
def __call__(self, *args, **kwargs):
""" Calls `func` - previously defined in `__init__`.

View file

@ -2,15 +2,16 @@
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import copy
import math
import multiprocessing as mp
import os
import time
import traceback
import signal
import sys
import os
import math
import subprocess
import copy
import sys
import warnings
if sys.version_info[0] == 2:
# Python 2
@ -19,267 +20,6 @@ else:
# Python 3
import queue
myQueue = mp.Queue
# a mapping from the numeric values of the signals to their names used in the
# standard python module signals
signal_dict = {}
for s in dir(signal):
if s.startswith('SIG') and s[3] != '_':
n = getattr(signal, s)
if n in signal_dict:
signal_dict[n] += ('/'+s)
else:
signal_dict[n] = s
ESC_NO_CHAR_ATTR = "\033[0m"
ESC_BOLD = "\033[1m"
ESC_DIM = "\033[2m"
ESC_UNDERLINED = "\033[4m"
ESC_BLINK = "\033[5m"
ESC_INVERTED = "\033[7m"
ESC_HIDDEN = "\033[8m"
ESC_RESET_BOLD = "\033[21m"
ESC_RESET_DIM = "\033[22m"
ESC_RESET_UNDERLINED = "\033[24m"
ESC_RESET_BLINK = "\033[25m"
ESC_RESET_INVERTED = "\033[27m"
ESC_RESET_HIDDEN = "\033[28m"
ESC_DEFAULT = "\033[39m"
ESC_BLACK = "\033[30m"
ESC_RED = "\033[31m"
ESC_GREEN = "\033[32m"
ESC_YELLOW = "\033[33m"
ESC_BLUE = "\033[34m"
ESC_MAGENTA = "\033[35m"
ESC_CYAN = "\033[36m"
ESC_LIGHT_GREY = "\033[37m"
ESC_DARK_GREY = "\033[90m"
ESC_LIGHT_RED = "\033[91m"
ESC_LIGHT_GREEN = "\033[92m"
ESC_LIGHT_YELLOW = "\033[93m"
ESC_LIGHT_BLUE = "\033[94m"
ESC_LIGHT_MAGENTA = "\033[95m"
ESC_LIGHT_CYAN = "\033[96m"
ESC_WHITE = "\033[97m"
ESC_SEQ_SET = [ESC_NO_CHAR_ATTR,
ESC_BOLD,
ESC_DIM,
ESC_UNDERLINED,
ESC_BLINK,
ESC_INVERTED,
ESC_HIDDEN,
ESC_RESET_BOLD,
ESC_RESET_DIM,
ESC_RESET_UNDERLINED,
ESC_RESET_BLINK,
ESC_RESET_INVERTED,
ESC_RESET_HIDDEN,
ESC_DEFAULT,
ESC_BLACK,
ESC_RED,
ESC_GREEN,
ESC_YELLOW,
ESC_BLUE,
ESC_MAGENTA,
ESC_CYAN,
ESC_LIGHT_GREY,
ESC_DARK_GREY,
ESC_LIGHT_RED,
ESC_LIGHT_GREEN,
ESC_LIGHT_YELLOW,
ESC_LIGHT_BLUE,
ESC_LIGHT_MAGENTA,
ESC_LIGHT_CYAN,
ESC_WHITE]
def ESC_MOVE_LINE_UP(n):
return "\033[{}A".format(n)
def ESC_MOVE_LINE_DOWN(n):
return "\033[{}B".format(n)
def remove_ESC_SEQ_from_string(s):
for esc_seq in ESC_SEQ_SET:
s = s.replace(esc_seq, '')
return s
def len_string_without_ESC(s):
return len(remove_ESC_SEQ_from_string(s))
def humanize_time(secs):
"""convert second in to hh:mm:ss format
"""
mins, secs = divmod(secs, 60)
hours, mins = divmod(mins, 60)
return '{:02d}:{:02d}:{:02d}'.format(int(hours), int(mins), int(secs))
def humanize_speed(c_per_sec):
"""convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.
"""
scales = [60, 60, 24]
units = ['c/s', 'c/min', 'c/h', 'c/d']
speed = c_per_sec
i = 0
if speed > 0:
while (speed < 1) and (i < len(scales)):
speed *= scales[i]
i += 1
return "{:.1f}{}".format(speed, units[i])
def UnsignedIntValue(val=0):
return mp.Value('I', val, lock=True)
def FloatValue(val=0.):
return mp.Value('d', val, lock=True)
def printQueue(q, lock=None):
if lock is not None:
lock.acquire()
res = []
for i in range(q.qsize()):
item = q.get()
res.append(copy.deepcopy(item[0]))
q.put(item)
if lock is not None:
lock.release()
print(res)
def check_process_termination(proc, identifier, timeout, verbose=0, auto_kill_on_last_resort = False):
proc.join(timeout)
if not proc.is_alive():
if verbose > 1:
print("{}: loop termination within given timeout of {}s SUCCEEDED!".format(identifier, timeout))
return True
# process still runs -> send SIGTERM -> see what happens
if verbose > 0:
print("{}: loop termination within given timeout of {}s FAILED!".format(identifier, timeout))
proc.terminate()
new_timeout = 3*timeout
proc.join(new_timeout)
if not proc.is_alive():
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s SUCCEEDED!".format(identifier, new_timeout))
return True
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s FAILED!".format(identifier, new_timeout))
answer = 'y' if auto_kill_on_last_resort else '_'
while True:
if answer == 'y':
print("{}: send SIGKILL to".format(identifier))
os.kill(proc.pid, signal.SIGKILL)
time.sleep(0.1)
if not proc.is_alive():
print("{}: has stopped running!".format(identifier))
return True
else:
print("{}: still running!".format(identifier))
answer = '_'
while not answer in 'yn':
print("Do you want to send SIGKILL to '{}'? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'n':
while not answer in 'yn':
print("Do you want let the process '{}' running? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'y':
print("{}: keeps running".format(identifier))
return False
def get_identifier(name=None, pid=None, bold=True):
if pid == None:
pid = os.getpid()
if bold:
esc_bold = ESC_BOLD
esc_no_char_attr = ESC_NO_CHAR_ATTR
else:
esc_bold = ""
esc_no_char_attr = ""
if name == None:
return "{}PID {}{}".format(esc_bold, pid, esc_no_char_attr)
else:
return "{}{} ({}){}".format(esc_bold, name, pid, esc_no_char_attr)
def get_terminal_width(default=80, name=None, verbose=0):
identifier = get_identifier(name=name)
try:
out = subprocess.check_output(["tput", "cols"])
width = int(out.decode("utf-8").strip())
if verbose > 1:
print("{}: determined terminal width to {}".format(identifier, width))
return width
except Exception as e:
if verbose > 0:
print("{}: failed to determine the width of the terminal".format(identifier))
if verbose > 1:
if isinstance(e, subprocess.CalledProcessError):
print("calling 'tput cols' returned: {}".format(e.output.decode('utf-8')))
traceback.print_exc()
return default
class SIG_handler_Loop(object):
"""class to setup signal handling for the Loop class
Note: each subprocess receives the default signal handling from it's parent.
If the signal function from the module signal is evoked within the subprocess
this default behavior can be overwritten.
The init function receives a shared memory boolean object which will be set
false in case of signal detection. Since the Loop class will check the state
of this boolean object before each repetition, the loop will stop when
a signal was receives.
"""
def __init__(self, shared_mem_run, sigint, sigterm, identifier, verbose=0):
self.shared_mem_run = shared_mem_run
self.set_signal(signal.SIGINT, sigint)
self.set_signal(signal.SIGTERM, sigterm)
self.verbose=verbose
self.identifier = identifier
if self.verbose > 1:
print("{}: setup signal handler for loop (SIGINT:{}, SIGTERM:{})".format(self.identifier, sigint, sigterm))
def set_signal(self, sig, handler_str):
if handler_str == 'ign':
signal.signal(sig, self._ignore_signal)
elif handler_str == 'stop':
signal.signal(sig, self._stop_on_signal)
else:
raise TypeError("unknown signal hander string '{}'".format(handler_str))
def _ignore_signal(self, signal, frame):
pass
def _stop_on_signal(self, signal, frame):
if self.verbose > 0:
print("{}: received sig {} -> set run false".format(self.identifier, signal_dict[signal]))
self.shared_mem_run.value = False
class Loop(object):
"""
@ -371,7 +111,6 @@ class Loop(object):
print("{}: is still running on context exit".format(self._identifier))
self.__cleanup()
def __cleanup(self):
"""
Wait at most twice as long as the given repetition interval
@ -398,7 +137,6 @@ class Loop(object):
print("{}: cleanup successful".format(self._identifier))
self._proc = None
@staticmethod
def _wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, verbose, sigint, sigterm, name):
"""to be executed as a seperate process (that's why this functions is declared static)
@ -475,7 +213,6 @@ class Loop(object):
if self.is_alive():
self._proc.join(timeout)
def getpid(self):
"""
return the process id of the spawned process
@ -522,6 +259,8 @@ class Loop(object):
def run(self, run):
self._run.value = run
class Progress(Loop):
"""
Abstract Progress Loop
@ -633,7 +372,6 @@ class Progress(Loop):
else:
max_count = [None] * self.len
self.start_time = []
self.speed_calc_cycles = speed_calc_cycles
@ -680,8 +418,22 @@ class Progress(Loop):
self.add_args = {}
# setup loop class
super(Progress, self).__init__(func=Progress.show_stat_wrapper_multi,
# before printing any output to stout, we can now check this
# variable to see if any other ProgressBar has reserved that
# terminal.
if (self.__class__.__name__ in TERMINAL_PRINT_LOOP_CLASSES):
self.terminal_reserved = terminal_reserve()
if not self.terminal_reserved:
warnings.warn("tty reserved, not printing progress!")
func = lambda x: None
self.show_on_exit = False
else:
func = Progress.show_stat_wrapper_multi
else:
self.terminal_reserved = False
# setup loop class with func
super(Progress, self).__init__(func=func,
args=(self.count,
self.last_count,
self.start_time,
@ -704,15 +456,19 @@ class Progress(Loop):
auto_kill_on_last_resort=True)
def __exit__(self, *exc_args):
"""
will terminate loop process
""" Tear things down
show a last progress -> see the full 100% on exit
- will terminate loop process
- show a last progress -> see the full 100% on exit
- releases terminal reservation
"""
super(Progress, self).__exit__(*exc_args)
if self.show_on_exit:
self._show_stat()
print('\n'*(self.len-1))
if self.terminal_reserved:
terminal_unreserve()
if self.show_on_exit:
self._show_stat()
print('\n'*(self.len-1))
def _show_stat(self):
"""
@ -733,6 +489,7 @@ class Progress(Loop):
self.len,
self.add_args,
self.lock)
@staticmethod
def show_stat_wrapper_multi(count,
last_count,
@ -788,7 +545,6 @@ class Progress(Loop):
NOTE: show_stat is purely abstract and need to be reimplemented to
achieve a specific progress display.
"""
count_value = count.value
start_time_value = start_time.value
current_time = time.time()
@ -899,7 +655,6 @@ class Progress(Loop):
verbose = self.verbose,
auto_kill_on_last_resort = True)
def _reset_all(self):
"""
reset all progress information
@ -919,7 +674,6 @@ class Progress(Loop):
self.lock[i].release()
self.start_time[i].value = time.time()
def reset(self, i = None):
"""
convenient function to reset progress information
@ -933,6 +687,7 @@ class Progress(Loop):
self._reset_i(i)
# super(Progress, self).start()
class ProgressBar(Progress):
"""
Implements a Progress bar (progress par) similar to the one known from 'wget'
@ -967,7 +722,6 @@ class ProgressBar(Progress):
sigterm=sigterm,
name=name)
@staticmethod
def show_stat(count_value, max_count_value, prepend, speed, tet, eta, width, i, **kwargs):
if max_count_value is None:
@ -994,43 +748,6 @@ class ProgressBar(Progress):
print(s1+s2+s3)
# class ProgressCounter(Progress):
# """
# simple Progress counter, not using the max_count information
# """
# def __init__(self,
# count,
# max_count=None,
# prepend=None,
# speed_calc_cycles=10,
# width='auto',
# interval=1,
# verbose=0,
# sigint='stop',
# sigterm='stop',
# name='progress_counter'):
#
# super(ProgressCounter, self).__init__(count=count,
# max_count=max_count,
# prepend=prepend,
# speed_calc_cycles=speed_calc_cycles,
# width=width,
# interval=interval,
# verbose = verbose,
# sigint=sigint,
# sigterm=sigterm,
# name=name)
#
# @staticmethod
# def show_stat(count_value, max_count_value, prepend, speed, tet, eta, width, i, **kwargs):
# if max_count_value is not None:
# max_count_str = "/{}".format(max_count_value)
# else:
# max_count_value = count_value + 1
# max_count_str = ""
#
# s = "{}{} [{}{}] ({})".format(prepend, humanize_time(tet), count_value, max_count_str, humanize_speed(speed))
# print(s)
class ProgressBarCounter(Progress):
"""
@ -1141,9 +858,9 @@ class ProgressBarCounter(Progress):
s2 = "="*a + ">" + " "*b
s_c = s_c+s1+s2+s3
print(s_c + ' '*(width - len_string_without_ESC(s_c)))
class ProgressSilentDummy(Progress):
def __init__(self, **kwargs):
pass
@ -1171,3 +888,362 @@ class ProgressSilentDummy(Progress):
def resume(self):
pass
class SIG_handler_Loop(object):
"""class to setup signal handling for the Loop class
Note: each subprocess receives the default signal handling from it's parent.
If the signal function from the module signal is evoked within the subprocess
this default behavior can be overwritten.
The init function receives a shared memory boolean object which will be set
false in case of signal detection. Since the Loop class will check the state
of this boolean object before each repetition, the loop will stop when
a signal was receives.
"""
def __init__(self, shared_mem_run, sigint, sigterm, identifier, verbose=0):
self.shared_mem_run = shared_mem_run
self.set_signal(signal.SIGINT, sigint)
self.set_signal(signal.SIGTERM, sigterm)
self.verbose=verbose
self.identifier = identifier
if self.verbose > 1:
print("{}: setup signal handler for loop (SIGINT:{}, SIGTERM:{})".format(self.identifier, sigint, sigterm))
def set_signal(self, sig, handler_str):
if handler_str == 'ign':
signal.signal(sig, self._ignore_signal)
elif handler_str == 'stop':
signal.signal(sig, self._stop_on_signal)
else:
raise TypeError("unknown signal hander string '{}'".format(handler_str))
def _ignore_signal(self, signal, frame):
pass
def _stop_on_signal(self, signal, frame):
if self.verbose > 0:
print("{}: received sig {} -> set run false".format(self.identifier, signal_dict[signal]))
self.shared_mem_run.value = False
# class ProgressCounter(Progress):
# """
# simple Progress counter, not using the max_count information
# """
# def __init__(self,
# count,
# max_count=None,
# prepend=None,
# speed_calc_cycles=10,
# width='auto',
# interval=1,
# verbose=0,
# sigint='stop',
# sigterm='stop',
# name='progress_counter'):
#
# super(ProgressCounter, self).__init__(count=count,
# max_count=max_count,
# prepend=prepend,
# speed_calc_cycles=speed_calc_cycles,
# width=width,
# interval=interval,
# verbose = verbose,
# sigint=sigint,
# sigterm=sigterm,
# name=name)
#
# @staticmethod
# def show_stat(count_value, max_count_value, prepend, speed, tet, eta, width, i, **kwargs):
# if max_count_value is not None:
# max_count_str = "/{}".format(max_count_value)
# else:
# max_count_value = count_value + 1
# max_count_str = ""
#
# s = "{}{} [{}{}] ({})".format(prepend, humanize_time(tet), count_value, max_count_str, humanize_speed(speed))
# print(s)
def ESC_MOVE_LINE_UP(n):
return "\033[{}A".format(n)
def ESC_MOVE_LINE_DOWN(n):
return "\033[{}B".format(n)
def FloatValue(val=0.):
return mp.Value('d', val, lock=True)
def UnsignedIntValue(val=0):
return mp.Value('I', val, lock=True)
def check_process_termination(proc, identifier, timeout, verbose=0, auto_kill_on_last_resort = False):
proc.join(timeout)
if not proc.is_alive():
if verbose > 1:
print("{}: loop termination within given timeout of {}s SUCCEEDED!".format(identifier, timeout))
return True
# process still runs -> send SIGTERM -> see what happens
if verbose > 0:
print("{}: loop termination within given timeout of {}s FAILED!".format(identifier, timeout))
proc.terminate()
new_timeout = 3*timeout
proc.join(new_timeout)
if not proc.is_alive():
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s SUCCEEDED!".format(identifier, new_timeout))
return True
if verbose > 0:
print("{}: loop termination via SIGTERM with timeout of {}s FAILED!".format(identifier, new_timeout))
answer = 'y' if auto_kill_on_last_resort else '_'
while True:
if answer == 'y':
print("{}: send SIGKILL to".format(identifier))
os.kill(proc.pid, signal.SIGKILL)
time.sleep(0.1)
if not proc.is_alive():
print("{}: has stopped running!".format(identifier))
return True
else:
print("{}: still running!".format(identifier))
answer = '_'
while not answer in 'yn':
print("Do you want to send SIGKILL to '{}'? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'n':
while not answer in 'yn':
print("Do you want let the process '{}' running? [y/n]: ".format(identifier), end='')
sys.stdout.flush()
answer = sys.stdin.readline()[:-1]
if answer == 'y':
print("{}: keeps running".format(identifier))
return False
def get_identifier(name=None, pid=None, bold=True):
if pid == None:
pid = os.getpid()
if bold:
esc_bold = ESC_BOLD
esc_no_char_attr = ESC_NO_CHAR_ATTR
else:
esc_bold = ""
esc_no_char_attr = ""
if name == None:
return "{}PID {}{}".format(esc_bold, pid, esc_no_char_attr)
else:
return "{}{} ({}){}".format(esc_bold, name, pid, esc_no_char_attr)
def get_terminal_width(default=80, name=None, verbose=0):
identifier = get_identifier(name=name)
try:
out = subprocess.check_output(["tput", "cols"])
width = int(out.decode("utf-8").strip())
if verbose > 1:
print("{}: determined terminal width to {}".format(identifier, width))
return width
except Exception as e:
if verbose > 0:
print("{}: failed to determine the width of the terminal".format(identifier))
if verbose > 1:
if isinstance(e, subprocess.CalledProcessError):
print("calling 'tput cols' returned: {}".format(e.output.decode('utf-8')))
traceback.print_exc()
return default
def humanize_speed(c_per_sec):
"""convert a speed in counts per second to counts per [s, min, h, d], choosing the smallest value greater zero.
"""
scales = [60, 60, 24]
units = ['c/s', 'c/min', 'c/h', 'c/d']
speed = c_per_sec
i = 0
if speed > 0:
while (speed < 1) and (i < len(scales)):
speed *= scales[i]
i += 1
return "{:.1f}{}".format(speed, units[i])
def humanize_time(secs):
"""convert second in to hh:mm:ss format
"""
mins, secs = divmod(secs, 60)
hours, mins = divmod(mins, 60)
return '{:02d}:{:02d}:{:02d}'.format(int(hours), int(mins), int(secs))
def len_string_without_ESC(s):
return len(remove_ESC_SEQ_from_string(s))
def printQueue(q, lock=None):
if lock is not None:
lock.acquire()
res = []
for i in range(q.qsize()):
item = q.get()
res.append(copy.deepcopy(item[0]))
q.put(item)
if lock is not None:
lock.release()
print(res)
def remove_ESC_SEQ_from_string(s):
for esc_seq in ESC_SEQ_SET:
s = s.replace(esc_seq, '')
return s
def terminal_reserve():
""" Registers the terminal (stdout) for printing.
Useful to prevent multiple processes from writing progress bars
to stdout.
It is currently handled with a simple list.
One process (server) prints to stdout and a couple of subprocesses
do not print to the same stdout, because the server has reserved it.
Of course, the clients have to be nice and check with
terminal_reserve first if they should (not) print.
Nothing is locked.
Returns
-------
True if reservation was successfull, false if there already is a
reservation.
"""
reservation = False
if sys.stdin.isatty():
name = os.ttyname(sys.stdin.fileno())
if not name in TERMINAL_RESERVATION:
TERMINAL_RESERVATION.append(name)
reservation = True
print(name)
return reservation
def terminal_unreserve():
""" Unregisters the terminal (stdout) for printing.
see terminal_reserved for more information
Returns
-------
None
"""
if sys.stdin.isatty():
name = os.ttyname(sys.stdin.fileno())
if name in TERMINAL_RESERVATION:
TERMINAL_RESERVATION.remove(name)
myQueue = mp.Queue
# a mapping from the numeric values of the signals to their names used in the
# standard python module signals
signal_dict = {}
for s in dir(signal):
if s.startswith('SIG') and s[3] != '_':
n = getattr(signal, s)
if n in signal_dict:
signal_dict[n] += ('/'+s)
else:
signal_dict[n] = s
ESC_NO_CHAR_ATTR = "\033[0m"
ESC_BOLD = "\033[1m"
ESC_DIM = "\033[2m"
ESC_UNDERLINED = "\033[4m"
ESC_BLINK = "\033[5m"
ESC_INVERTED = "\033[7m"
ESC_HIDDEN = "\033[8m"
ESC_RESET_BOLD = "\033[21m"
ESC_RESET_DIM = "\033[22m"
ESC_RESET_UNDERLINED = "\033[24m"
ESC_RESET_BLINK = "\033[25m"
ESC_RESET_INVERTED = "\033[27m"
ESC_RESET_HIDDEN = "\033[28m"
ESC_DEFAULT = "\033[39m"
ESC_BLACK = "\033[30m"
ESC_RED = "\033[31m"
ESC_GREEN = "\033[32m"
ESC_YELLOW = "\033[33m"
ESC_BLUE = "\033[34m"
ESC_MAGENTA = "\033[35m"
ESC_CYAN = "\033[36m"
ESC_LIGHT_GREY = "\033[37m"
ESC_DARK_GREY = "\033[90m"
ESC_LIGHT_RED = "\033[91m"
ESC_LIGHT_GREEN = "\033[92m"
ESC_LIGHT_YELLOW = "\033[93m"
ESC_LIGHT_BLUE = "\033[94m"
ESC_LIGHT_MAGENTA = "\033[95m"
ESC_LIGHT_CYAN = "\033[96m"
ESC_WHITE = "\033[97m"
ESC_SEQ_SET = [ESC_NO_CHAR_ATTR,
ESC_BOLD,
ESC_DIM,
ESC_UNDERLINED,
ESC_BLINK,
ESC_INVERTED,
ESC_HIDDEN,
ESC_RESET_BOLD,
ESC_RESET_DIM,
ESC_RESET_UNDERLINED,
ESC_RESET_BLINK,
ESC_RESET_INVERTED,
ESC_RESET_HIDDEN,
ESC_DEFAULT,
ESC_BLACK,
ESC_RED,
ESC_GREEN,
ESC_YELLOW,
ESC_BLUE,
ESC_MAGENTA,
ESC_CYAN,
ESC_LIGHT_GREY,
ESC_DARK_GREY,
ESC_LIGHT_RED,
ESC_LIGHT_GREEN,
ESC_LIGHT_YELLOW,
ESC_LIGHT_BLUE,
ESC_LIGHT_MAGENTA,
ESC_LIGHT_CYAN,
ESC_WHITE]
# terminal reservation list, see terminal_reserve
TERMINAL_RESERVATION = list()
# these are classes that print progress bars, see terminal_reserve
TERMINAL_PRINT_LOOP_CLASSES = ["ProgressBar", "ProgressBarCounter"]

View file

@ -13,17 +13,19 @@ sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import decorators, progress
@decorators.ProgressBar
def _my_func_1(arg,
kwarg = "1",
count = decorators.progress.UnsignedIntValue(val=0),
max_count = decorators.progress.UnsignedIntValue(val=1)):
max_count = decorators.progress.UnsignedIntValue(val=1),
sleep = 0.02):
maxval = 100
max_count.value = maxval
for i in range(maxval):
count.value += 1
time.sleep(0.02)
time.sleep(sleep)
return arg+kwarg
@ -42,26 +44,12 @@ def _my_func_2(arg,
return arg+kwarg
def test_ProgressBar():
result1 = _my_func_1("decorated function", kwarg=" 1")
print(result1)
wrapper = decorators.ProgressBar(_my_func_2, interval=.1)
result2 = wrapper("wrapped function", kwarg=" 2")
print(result2)
@decorators.ProgressBar
def my_func(c, m):
for i in range(m.value):
c.value = i
time.sleep(0.02)
def test_decorator():
c = progress.UnsignedIntValue(val=0)
m = progress.UnsignedIntValue(val=100)
my_func(c=c, m=m)
my_func(c, m)
def my_func_ProgressBarOverrideCount(c = None, m = None):
maxVal = 100
@ -74,16 +62,33 @@ def my_func_ProgressBarOverrideCount(c = None, m = None):
c.value = i
def test_ProgressBarOverrideCount():
print("normal call -> no decoration")
my_func_ProgressBarOverrideCount()
print("done!")
print()
@decorators.ProgressBar
def test_decorated_func_calls_decorated_func(
c = decorators.progress.UnsignedIntValue(val=0),
m = decorators.progress.UnsignedIntValue(val=1),
):
""" This function calls a function that has been decorated with
a progress bar. Only the progressbar of this function here is
displayed.
"""
maxval = 10
m.value = maxval
c.value = 0
for i in range(maxval):
c.value += 1
time.sleep(0.02)
# This function is decorated, but because we have implemented
# methods for reserving a terminal, the function does not print
# anything - which makes the output pretty.
_my_func_1(arg=i, kwarg=0, sleep=0.01)
def test_decorator():
c = progress.UnsignedIntValue(val=0)
m = progress.UnsignedIntValue(val=100)
my_func(c=c, m=m)
my_func(c, m)
my_func_ProgressBarOverrideCount_dec = decorators.ProgressBarOverrideCount(my_func_ProgressBarOverrideCount)
print("with decorator")
my_func_ProgressBarOverrideCount_dec()
print("done!")
def test_extended_PB_get_access_to_progress_bar():
def my_func(c, m, **kwargs):
@ -108,6 +113,7 @@ def test_extended_PB_get_access_to_progress_bar():
print("call non decorated func")
my_func(c, m)
def test_extended_PB_progress_bar_off():
c = progress.UnsignedIntValue(val=0)
m = progress.UnsignedIntValue(val=20)
@ -144,6 +150,27 @@ def test_extended_PB_progress_bar_off():
def test_ProgressBar():
result1 = _my_func_1("decorated function", kwarg=" 1")
print(result1)
wrapper = decorators.ProgressBar(_my_func_2, interval=.1)
result2 = wrapper("wrapped function", kwarg=" 2")
print(result2)
def test_ProgressBarOverrideCount():
print("normal call -> no decoration")
my_func_ProgressBarOverrideCount()
print("done!")
print()
my_func_ProgressBarOverrideCount_dec = decorators.ProgressBarOverrideCount(my_func_ProgressBarOverrideCount)
print("with decorator")
my_func_ProgressBarOverrideCount_dec()
print("done!")
if __name__ == "__main__":
# test_ProgressBar()