jobmanager/test_progress.py

448 lines
12 KiB
Python
Raw Normal View History

import os
import sys
import time
import signal
import multiprocessing as mp
import numpy as np
import psutil
import traceback
path = os.path.dirname(__file__)
sys.path.append(path)
import progress
def test_loop_basic():
"""
run function f in loop
check if it is alive after calling start()
check if it is NOT alive after calling stop()
"""
f = lambda: print(" I'm process {}".format(os.getpid()))
loop = progress.Loop(func=f, interval=0.8, verbose=2)
loop.start()
pid = loop.getpid()
time.sleep(1)
assert loop.is_alive()
print("[+] loop started")
time.sleep(1)
loop.stop()
assert not loop.is_alive()
print("[+] loop stopped")
def test_loop_signals():
f = lambda: print(" I'm process {}".format(os.getpid()))
loop = progress.Loop(func=f, interval=0.8, verbose=2, sigint='stop', sigterm='stop')
print("## stop on SIGINT ##")
loop.start()
time.sleep(1)
loop.is_alive()
pid = loop.getpid()
print(" send SIGINT")
os.kill(pid, signal.SIGINT)
time.sleep(1)
assert not loop.is_alive()
print("[+] loop stopped running")
print("## stop on SIGTERM ##")
loop.start()
time.sleep(1)
pid = loop.getpid()
print(" send SIGTERM")
os.kill(pid, signal.SIGTERM)
time.sleep(1)
assert not loop.is_alive()
print("[+] loop stopped running")
print("## ignore SIGINT ##")
loop = progress.Loop(func=f, interval=0.8, verbose=0, sigint='ign', sigterm='ign')
loop.start()
time.sleep(1)
pid = loop.getpid()
os.kill(pid, signal.SIGINT)
print(" send SIGINT")
time.sleep(1)
assert loop.is_alive()
print("[+] loop still running")
print(" send SIGKILL")
os.kill(pid, signal.SIGKILL)
time.sleep(1)
assert not loop.is_alive()
print("[+] loop stopped running")
print("## ignore SIGTERM ##")
loop.start()
time.sleep(1)
pid = loop.getpid()
print(" send SIGTERM")
os.kill(pid, signal.SIGTERM)
time.sleep(1)
assert loop.is_alive()
print("[+] loop still running")
print(" send SIGKILL")
os.kill(pid, signal.SIGKILL)
time.sleep(1)
assert not loop.is_alive()
print("[+] loop stopped running")
def non_stopping_function():
print(" I'm pid", os.getpid())
print(" I'm NOT going to stop")
while True: # sleep will be interrupted by signal
time.sleep(1) # while True just calls sleep again
# only SIGKILL helps
def normal_function():
print(" I'm pid", os.getpid())
def long_sleep_function():
print(" I'm pid", os.getpid())
print(" I will sleep for seven years")
time.sleep(60*60*12*356*7)
def test_loop_normal_stop():
with progress.Loop(func=normal_function,
verbose=2,
name='loop') as loop:
loop.start()
time.sleep(0.1)
assert loop.is_alive()
print("[+] normal loop running")
assert not loop.is_alive()
print("[+] normal loop stopped")
def test_loop_need_sigterm_to_stop():
with progress.Loop(func=long_sleep_function,
verbose=2,
name='loop') as loop:
loop.start()
time.sleep(0.1)
assert loop.is_alive()
print("[+] sleepy loop running")
assert not loop.is_alive()
print("[+] sleepy loop stopped")
def test_loop_need_sigkill_to_stop():
with progress.Loop(func=non_stopping_function,
verbose=2,
name='loop',
auto_kill_on_last_resort=True) as loop:
loop.start()
time.sleep(0.1)
assert loop.is_alive()
print("[+] NON stopping loop running")
assert not loop.is_alive()
print("[+] NON stopping loop stopped")
def test_why_with_statement():
class ErrorLoop(progress.Loop):
def raise_error(self):
raise RuntimeError("on purpose error")
v=2
def t(shared_mem_pid):
l = ErrorLoop(func=normal_function, verbose=v)
l.start()
time.sleep(0.2)
shared_mem_pid.value = l.getpid()
l.raise_error()
l.stop()
def t_with():
with ErrorLoop(func=normal_function, verbose=v) as l:
l.start()
time.sleep(0.2)
l.raise_error()
l.stop()
print("## start without with statement ...")
subproc_pid = progress.UnsignedIntValue()
p = mp.Process(target=t, args=(subproc_pid, ))
p.start()
time.sleep(0.3)
print("## now an exception gets raised ... but you don't see it!")
time.sleep(3)
print("## ... and the loop is still running so we have to kill the process")
p.terminate()
print("## ... done!")
p_sub = psutil.Process(subproc_pid.value)
if p_sub.is_running():
print("## terminate loop process from extern ...")
p_sub.terminate()
p_sub.wait(1)
assert not p_sub.is_running()
print("## process with PID {} terminated!".format(subproc_pid.value))
time.sleep(3)
print("\n##\n## now to the same with the with statement ...")
p = mp.Process(target=t_with)
p.start()
time.sleep(3)
print("## no special care must be taken ... cool eh!")
print("## ALL DONE! (and now comes some exception, strange!)")
def test_progress_bar():
"""
deprecated
"""
count = progress.UnsignedIntValue()
max_count = progress.UnsignedIntValue(100)
sb = progress.ProgressBar(count, max_count, verbose=2)
assert not sb.is_alive()
sb.start()
time.sleep(2)
assert sb.is_alive()
pid = sb.getpid()
sb.start()
time.sleep(2)
assert pid == sb.getpid()
sb.stop()
assert not sb.is_alive()
time.sleep(2)
sb.stop()
# sb._show_stat()
time.sleep(2)
#traceback.print_last()
def test_progress_bar_with_statement():
count = progress.UnsignedIntValue()
max_count = progress.UnsignedIntValue(100)
with progress.ProgressBar(count, max_count, verbose=2) as sb:
assert not sb.is_alive()
sb.start()
time.sleep(0.2)
assert sb.is_alive()
pid = sb.getpid()
sb.start()
time.sleep(0.2)
assert pid == sb.getpid()
assert not sb.is_alive()
time.sleep(0.2)
sb.stop()
def test_progress_bar_multi():
n = 4
max_count_value = 100
count = []
max_count = []
prepend = []
for i in range(n):
count.append(progress.UnsignedIntValue(0))
max_count.append(progress.UnsignedIntValue(max_count_value))
prepend.append('_{}_: '.format(i))
with progress.ProgressBar(count=count,
max_count=max_count,
interval=0.2,
speed_calc_cycles=10,
width='auto',
verbose=0,
sigint='stop',
sigterm='stop',
name='sb multi',
prepend=prepend) as sbm:
sbm.start()
for x in range(500):
i = np.random.randint(low=0, high=n)
with count[i].get_lock():
count[i].value += 1
if count[i].value > 100:
sbm.reset(i)
time.sleep(0.02)
def test_status_counter():
c = progress.UnsignedIntValue(val=0)
m = progress.UnsignedIntValue(val=100)
with progress.ProgressCounter(count=c,
max_count=m,
interval=0.2,
speed_calc_cycles=100,
verbose=2,
sigint='ign',
sigterm='ign',
name='sc',
prepend='') as sc:
sc.start()
while True:
with c.get_lock():
c.value += 1
if c.value == 100:
break
time.sleep(0.01)
def test_status_counter_multi():
c1 = progress.UnsignedIntValue(val=0)
c2 = progress.UnsignedIntValue(val=0)
c = [c1, c2]
prepend = ['c1: ', 'c2: ']
with progress.ProgressCounter(count=c, prepend=prepend, verbose=2) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[0].value == 100:
break
time.sleep(0.01)
def test_intermediate_prints_while_running_progess_bar():
c = progress.UnsignedIntValue(val=0)
2014-10-16 11:30:16 +02:00
try:
with progress.ProgressCounter(count=c, verbose=2, interval=0.3) as sc:
sc.start()
while True:
with c.get_lock():
c.value += 1
if c.value == 100:
print("intermediate message")
if c.value == 400:
break
2014-10-16 11:30:16 +02:00
time.sleep(0.01)
except:
print("IN EXCEPTION TEST")
traceback.print_exc()
def test_intermediate_prints_while_running_progess_bar_multi():
c1 = progress.UnsignedIntValue(val=0)
c2 = progress.UnsignedIntValue(val=0)
c = [c1,c2]
with progress.ProgressCounter(count=c, verbose=2, interval=0.3) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[0].value == 100:
print("intermediate message")
with c[i].get_lock():
c[i].value += 1
if c[0].value == 400:
break
time.sleep(0.01)
2014-10-16 11:30:16 +02:00
def test_progress_bar_counter():
c1 = progress.UnsignedIntValue(val=0)
c2 = progress.UnsignedIntValue(val=0)
maxc = 30
m1 = progress.UnsignedIntValue(val=maxc)
m2 = progress.UnsignedIntValue(val=maxc)
c = [c1, c2]
m = [m1, m2]
t0 = time.time()
with progress.ProgressBarCounter(count=c, max_count=m, verbose=1, interval=0.2) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[i].value > maxc:
sc.reset(i)
time.sleep(0.0432)
if (time.time() - t0) > 15:
break
def test_progress_bar_counter_non_max():
c1 = progress.UnsignedIntValue(val=0)
c2 = progress.UnsignedIntValue(val=0)
maxc = 30
m1 = progress.UnsignedIntValue(val=-1)
m2 = progress.UnsignedIntValue(val=-1)
c = [c1, c2]
m = [m1, m2]
t0 = time.time()
with progress.ProgressBarCounter(count=c, max_count=m, verbose=1, interval=0.2) as sc:
sc.start()
while True:
i = np.random.randint(0,2)
with c[i].get_lock():
c[i].value += 1
if c[i].value > maxc:
sc.reset(i)
time.sleep(0.0432)
if (time.time() - t0) > 15:
break
if __name__ == "__main__":
func = [
# test_loop_basic(),
# test_loop_signals(),
# test_loop_normal_stop(),
# test_loop_need_sigterm_to_stop(),
# test_loop_need_sigkill_to_stop(),
# test_why_with_statement,
# test_progress_bar,
# test_progress_bar_with_statement,
# test_progress_bar_multi,
# test_status_counter,
# test_status_counter_multi,
# test_intermediate_prints_while_running_progess_bar,
2014-10-16 11:30:16 +02:00
# test_intermediate_prints_while_running_progess_bar_multi,
# test_progress_bar_counter,
test_progress_bar_counter_non_max
]
2014-10-16 11:30:16 +02:00
for f in func:
print()
print('#'*80)
print('## {}'.format(f.__name__))
print()
f()
time.sleep(1)