work in progress, some tests are now running with the new structure and logging

This commit is contained in:
Richard Hartmann 2016-09-28 00:45:18 +02:00
parent d067219f13
commit 89b1029ec0
4 changed files with 145 additions and 287 deletions

View file

@ -50,6 +50,13 @@ import binfootprint as bf
import progress
import logging
# try:
# from logging.handlers import QueueHandler
# from logging.handlers import QueueListener
# except ImportError:
# warnings.warn("could not load QueueHandler/QueueListener (python version too old\n"+
# "no coheerent subprocess logging pissible", ImportWarning)
# taken from here: https://mail.python.org/pipermail/python-list/2010-November/591474.html
class MultiLineFormatter(logging.Formatter):
def format(self, record):
@ -141,18 +148,6 @@ def get_user_num_process():
out = subprocess.check_output('ps ut | wc -l', shell=True).decode().strip()
return int(out)-2
def set_logging_level_from_verbose(verbose):
if verbose is not None:
if verbose == 0:
log.setLevel(logging.ERROR)
elif verbose == 1:
log.setLevel(logging.INFO)
else:
log.setLevel(logging.DEBUG)
else:
log.info("logging level not changes, verbose is None")
class JobManager_Client(object):
"""
Calls the functions self.func with arguments fetched from the job_q.
@ -187,7 +182,7 @@ class JobManager_Client(object):
njobs = 0,
nice = 19,
no_warnings = False,
verbose = 1,
verbose = None,
show_statusbar_for_jobs = True,
show_counter_only = False,
interval = 0.3,
@ -198,7 +193,8 @@ class JobManager_Client(object):
reconnect_wait = 2,
reconnect_tries = 3,
ping_timeout = 2,
ping_retry = 3):
ping_retry = 3,
hide_progress = False):
"""
server [string] - ip address or hostname where the JobManager_Server is running
@ -230,11 +226,17 @@ class JobManager_Client(object):
verbose [int] - 0: quiet, 1: status only, 2: debug messages
"""
global log
log = logging.getLogger(__name__+'.'+self.__class__.__name__)
self._pid = os.getpid()
self._sid = os.getsid(self._pid)
set_logging_level_from_verbose(verbose)
self.hide_progress = (verbose == 0)
if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
self.hide_progress = hide_progress
log.info("init JobManager Client instance")
@ -288,6 +290,8 @@ class JobManager_Client(object):
log.debug("njobs:%s", self.njobs)
self.emergency_dump_path = emergency_dump_path
log.debug("emergency_dump_path:%s", self.emergency_dump_path)
self.pbc = None
self.procs = []
self.manager_objects = None # will be set via connect()
@ -368,18 +372,19 @@ class JobManager_Client(object):
return os.getpid()
@staticmethod
def __worker_func(func, nice, loglevel, server, port, authkey, i, manager_objects, c, m, reset_pbc, njobs,
def __worker_func(func, nice, loglevel, server, port, authkey, i, manager_objects, c, m, reset_pbc, njobs,
emergency_dump_path, job_q_timeout, fail_q_timeout, result_q_timeout, stdout_queue,
reconnect_wait, reconnect_tries, ping_timeout, ping_retry):
"""
the wrapper spawned nproc times calling and handling self.func
"""
log = logging.getLogger(progress.get_identifier(name='worker{}'.format(i+1)))
global log
log = logging.getLogger(__name__+'.'+progress.get_identifier(name='worker{}'.format(i+1), bold=False))
log.setLevel(loglevel)
queue_hand = logging.handlers.QueueHandler(stdout_queue)
queue_hand.setFormatter(fmt)
log.addHandler(queue_hand)
# queue_hand = logging.handlers.QueueHandler(stdout_queue)
# queue_hand.setFormatter(fmt)
# log.addHandler(queue_hand)
Signal_to_sys_exit(signals=[signal.SIGTERM])
Signal_to_SIG_IGN(signals=[signal.SIGINT])
@ -553,10 +558,14 @@ class JobManager_Client(object):
sta = progress.humanize_time(time_calc / cnt)
except:
sta = 'invalid'
stat = "pure calculation time: {} single task average: {}".format(progress.humanize_time(time_calc), sta)
try:
stat += "\ncalculation:{:.2%} communication:{:.2%}".format(time_calc/(time_calc+time_queue), time_queue/(time_calc+time_queue))
except ZeroDivisionError:
pass
log.info("pure calculation time: %s single task average: %s\ncalculation:%.2f communication:%.2f",
progress.humanize_time(time_calc), sta,
100*time_calc/(time_calc+time_queue), 100*time_queue/(time_calc+time_queue))
log.info(stat)
log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid())
@ -593,9 +602,18 @@ class JobManager_Client(object):
prepend = []
infoline = progress.StringValue(num_of_bytes=12)
infoline = None
worker_stdout_queue = mp.Queue(-1)
# try:
# worker_stdout_queue = mp.Queue(-1)
# listener = QueueListener(worker_stdout_queue, console_hand)
# listener.start()
# except NameError:
# log.error("QueueListener not available in this python version (need at least 3.2)\n"
# "this may resault in incoheerent logging")
# worker_stdout_queue = None
worker_stdout_queue = None
l = len(str(self.nproc))
for i in range(self.nproc):
prepend.append("w{0:0{1}}:".format(i+1, l))
@ -606,14 +624,13 @@ class JobManager_Client(object):
prepend = prepend,
sigint = 'ign',
sigterm = 'ign',
info_line = infoline,
logging_level = log.level) as pbc :
info_line = infoline) as self.pbc :
if (not self.hide_progress) and self.show_statusbar_for_jobs:
pbc.start()
self.pbc.start()
for i in range(self.nproc):
reset_pbc = lambda: pbc.reset(i)
reset_pbc = lambda: self.pbc.reset(i)
p = mp.Process(target=self.__worker_func, args=(self.func,
self.nice,
log.level,
@ -710,13 +727,14 @@ class JobManager_Server(object):
"""
def __init__(self,
authkey,
const_arg=None,
port=42524,
verbose=1,
msg_interval=1,
fname_dump='auto',
speed_calc_cycles=50,
keep_new_result_in_memory = False):
const_arg = None,
port = 42524,
verbose = None,
msg_interval = 1,
fname_dump = 'auto',
speed_calc_cycles = 50,
keep_new_result_in_memory = False,
hide_progress = False):
"""
authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey.
@ -726,7 +744,7 @@ class JobManager_Server(object):
port [int] - network port to use
verbose [int] - 0: quiet, 1: status only, 2: debug messages
verbose deprecates, use log.setLevel to change verbosity
msg_interval [int] - number of second for the status bar to update
@ -744,11 +762,18 @@ class JobManager_Server(object):
This init actually starts the SyncManager as a new process. As a next step
the job_q has to be filled, see put_arg().
"""
"""
global log
log = logging.getLogger(__name__+'.'+self.__class__.__name__)
self._pid = os.getpid()
self._pid_start = None
set_logging_level_from_verbose(verbose)
self.hide_progress = (verbose == 0)
if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
self.hide_progress = hide_progress
log.debug("I'm the JobManager_Server main process")
@ -802,17 +827,18 @@ class JobManager_Server(object):
progress.check_process_termination(proc = manager_proc,
prefix = 'SyncManager: ',
timeout = 2,
log = log,
timeout = 2,
auto_kill_on_last_resort = True)
def __check_bind(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((self.hostname, self.port))
except:
log.critical("test bind to %s:%s failed", self.hostname, self.port)
raise
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((self.hostname, self.port))
except:
log.critical("test bind to %s:%s failed", self.hostname, self.port)
raise
finally:
s.close()
def __start_SyncManager(self):
self.__check_bind()
@ -844,8 +870,7 @@ class JobManager_Server(object):
manager_identifier = progress.get_identifier(name='SyncManager')
progress.check_process_termination(proc = manager_proc,
prefix = manager_identifier,
timeout = 0.3,
log = log,
timeout = 0.3,
auto_kill_on_last_resort = True)
self.manager = None
@ -937,7 +962,7 @@ class JobManager_Server(object):
failed = self.fail_q.qsize()
all_processed = succeeded + failed
id1 = self.__class__.__name__
id1 = self.__class__.__name__+" "
l = len(id1)
id2 = ' '*l + "| "
@ -1102,8 +1127,7 @@ class JobManager_Server(object):
speed_calc_cycles = self.speed_calc_cycles,
sigint = 'ign',
sigterm = 'ign',
info_line = info_line,
logging_level = log.level) as stat:
info_line = info_line) as stat:
if not self.hide_progress:
stat.start()
@ -1137,8 +1161,8 @@ class JobManager_Local(JobManager_Server):
delay = 1,
const_arg = None,
port = 42524,
verbose = 1,
verbose_client = 0,
verbose = None,
verbose_client = None,
show_statusbar_for_jobs = False,
show_counter_only = False,
niceness_clients = 19,
@ -1170,7 +1194,7 @@ class JobManager_Local(JobManager_Server):
nproc = 0,
nice = 19,
delay = 1,
verbose = 0,
verbose = None,
show_statusbar_for_jobs = False,
show_counter_only = False): # ignore signal, because any signal bringing the server down
# will cause an error in the client server communication
@ -1205,8 +1229,7 @@ class JobManager_Local(JobManager_Server):
progress.check_process_termination(p_client,
prefix = 'local_client',
timeout = 2,
verbose = self.verbose_client)
timeout = 2)
class RemoteKeyError(RemoteError):
pass
@ -1290,7 +1313,6 @@ class Signal_to_terminate_process_list(object):
progress.check_process_termination(proc = p,
prefix = self.identifier_list[i],
timeout = self.timeout,
log = log,
auto_kill_on_last_resort=False)
def address_authkey_from_proxy(proxy):
@ -1544,6 +1566,10 @@ def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconn
try:
res = o(*args, **kwargs)
except queue.Empty as e:
log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
raise e
except Exception as e:
log.warning("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
@ -1627,8 +1653,6 @@ def proxy_operation_decorator_python2(proxy, operation, reconnect_wait=2, reconn
continue
else:
handler_unexpected_error(e, log)
elif type(e) is BrokenPipeError:
handler_broken_pipe_error(e, log)
elif type(e) is EOFError:
handler_eof_error(e, log)
else:

View file

@ -2,10 +2,10 @@
# -*- coding: utf-8 -*-
import sys
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import multiprocessing as mp
import numpy as np
from scipy.integrate import ode
from scipy.special import mathieu_sem, mathieu_cem, mathieu_a, mathieu_b
@ -22,15 +22,10 @@ except ImportError:
warnings.filterwarnings('error')
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
#warnings.filterwarnings('always', category=DeprecationWarning)
import jobmanager as jm
def dgl_mathieu(t, f, a, q):
f1, f2 = f[0], f[1]
f1_dot = f2
@ -132,7 +127,6 @@ def test_distributed_mathieu():
port = 42520,
const_arg = const_arg,
nproc=1,
verbose_client=2,
niceness_clients=0,
show_statusbar_for_jobs=False) as jm_int:
q_list = np.linspace(q_min, q_max, q_N)

View file

@ -2,19 +2,21 @@
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import os
import sys
import time
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import decorators, progress
from jobmanager import decorators\
import progress
import warnings
warnings.filterwarnings('error')
#warnings.filterwarnings('always', category=DeprecationWarning)
@decorators.ProgressBar

View file

@ -6,12 +6,10 @@ import os
import sys
import time
import multiprocessing as mp
import numpy as np
import traceback
import socket
import subprocess
import signal
import logging
from numpy import random
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
@ -23,11 +21,16 @@ import progress
progress.log.setLevel(logging.ERROR)
from jobmanager.jobmanager import log as jm_log
jm_log.setLevel(logging.INFO)
import warnings
warnings.filterwarnings('error')
#warnings.filterwarnings('always', category=DeprecationWarning)
AUTHKEY = 'testing'
PORT = 42525
PORT = random.randint(10000, 60000)
SERVER = socket.gethostname()
def test_Signal_to_SIG_IGN():
@ -148,12 +151,11 @@ def test_Signal_to_terminate_process_list():
def start_server(n, read_old_state=False, verbose=1):
def start_server(n, read_old_state=False):
print("START SERVER")
args = range(1,n)
with jobmanager.JobManager_Server(authkey = AUTHKEY,
port = PORT,
verbose = verbose,
msg_interval = 1,
fname_dump = 'jobmanager.dump') as jm_server:
if not read_old_state:
@ -162,16 +164,13 @@ def start_server(n, read_old_state=False, verbose=1):
jm_server.read_old_state()
jm_server.start()
def start_client(verbose=1):
def start_client():
print("START CLIENT")
jm_client = jobmanager.JobManager_Client(server = SERVER,
authkey = AUTHKEY,
port = PORT,
nproc = 3,
verbose = verbose)
nproc = 3)
jm_client.start()
if verbose > 1:
print("jm_client returned")
def test_jobmanager_basic():
"""
@ -181,22 +180,26 @@ def test_jobmanager_basic():
"""
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,False,2))
p_server.start()
time.sleep(1)
assert p_server.is_alive()
n = 5
p_server = None
p_client = None
try:
p_client = mp.Process(target=start_client, args=(2,))
p_server = mp.Process(target=start_server, args=(n,False))
p_server.start()
time.sleep(0.5)
assert p_server.is_alive()
p_client = mp.Process(target=start_client)
p_client.start()
p_client.join(15)
p_client.join(3)
assert not p_client.is_alive(), "the client did not terminate on time!"
assert p_client.exitcode == 0, "the client raised an exception"
p_server.join(15)
p_server.join(3)
assert not p_server.is_alive(), "the server did not terminate on time!"
print("[+] client and server terminated")
@ -214,7 +217,10 @@ def test_jobmanager_basic():
assert len(intersect) == 0, "final result does not contain all arguments!"
print("[+] all arguments found in final_results")
except:
os.kill(p_server.pid, signal.SIGINT)
if p_server is not None:
os.kill(p_server.pid, signal.SIGINT)
if p_client is not None:
os.kill(p_client.pid, signal.SIGINT)
raise
@ -299,7 +305,7 @@ def test_shutdown_server_while_client_running():
time.sleep(1)
p_client = mp.Process(target=start_client, args=(2,))
p_client = mp.Process(target=start_client)
p_client.start()
time.sleep(2)
@ -432,7 +438,6 @@ def test_check_fail():
n = 100
verbose=2
p_server = mp.Process(target=start_server, args=(n,))
p_server.start()
@ -442,8 +447,7 @@ def test_check_fail():
jm_client = Client_Random_Error(server=SERVER,
authkey=AUTHKEY,
port=PORT,
nproc=0,
verbose=verbose)
nproc=0)
p_client = mp.Process(target=jm_client.start)
p_client.start()
@ -585,7 +589,7 @@ def test_client_status():
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,False,0))
p_server = mp.Process(target=start_server, args=(n,False))
p_server.start()
time.sleep(1)
@ -602,8 +606,7 @@ def test_client_status():
client = Client_With_Status(server = SERVER,
authkey = AUTHKEY,
port = PORT,
nproc = 4,
verbose = 1)
nproc = 4)
client.start()
p_server.join()
@ -614,8 +617,6 @@ def test_jobmanager_local():
with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client,
authkey = AUTHKEY,
port = PORT,
verbose = 1,
verbose_client=0,
) as jm_server:
jm_server.args_from_list(args)
jm_server.start()
@ -690,8 +691,7 @@ def test_shared_const_arg():
client = myClient(server=SERVER,
authkey=AUTHKEY,
port = PORT,
nproc=1,
verbose=2)
nproc=1)
client.start()
@ -714,7 +714,7 @@ def test_digest_rejected():
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,False,0))
p_server = mp.Process(target=start_server, args=(n,False))
p_server.start()
time.sleep(1)
@ -731,173 +731,14 @@ def test_digest_rejected():
client = Client_With_Status(server = SERVER,
authkey = AUTHKEY+' not the same',
port = PORT,
nproc = 4,
verbose = 2)
nproc = 4)
try:
client.start()
except ConnectionError as e:
print("Not an error: caught '{}' with message '{}'".format(e.__class__.__name__, e))
p_server.terminate()
p_server.join()
def _test_exception():
global PORT
class MyManager_Client(jobmanager.BaseManager):
pass
def autoproxy_server(which_python, port, authkey, outfile):
libpath = os.path.dirname(os.__file__)
python_env = os.environ.copy()
envpath = "{LIB}:{LIB}/site-packages".format(LIB=libpath)
envpath += ":{LIB}/lib-old".format(LIB=libpath)
envpath += ":{LIB}/lib-tk".format(LIB=libpath)
envpath += ":{LIB}/lib-dynload".format(LIB=libpath)
envpath += ":{LIB}/plat-linux2".format(LIB=libpath)
# env will be
# "/usr/lib/python2.7" for python 2
# "/usr/lib/python3.4" for python 3
if which_python == 2:
python_interpreter = "python2.7"
envpath = envpath.replace("3.4", "2.7")
elif which_python == 3:
python_interpreter = "python3.4"
envpath = envpath.replace("2.7", "3.4")
else:
raise ValueError("'which_python' must be 2 or 3")
python_env["PYTHONPATH"] = envpath
path = dirname(abspath(__file__))
cmd = [python_interpreter,
"{}/start_autoproxy_server.py".format(path),
str(port),
authkey]
print("+"*40)
print("start an autoproxy server with command")
print(cmd)
print("and environment")
print(python_env)
print("+"*40)
return subprocess.Popen(cmd, env=python_env, stdout=outfile, stderr=subprocess.STDOUT)
def autoproxy_connect(server, port, authkey):
MyManager_Client.register('get_q')
m = MyManager_Client(address = (server, port),
authkey = bytearray(authkey, encoding='utf8'))
jobmanager.call_connect(m.connect, dest = jobmanager.address_authkey_from_manager(m), verbose=2)
return m
for p_version_server in [2, 3]:
PORT += 2 # plus two because we also check for wrong port
port = PORT
authkey = 'q'
with open("ap_server.out", 'w') as outfile:
p_server = autoproxy_server(p_version_server, port, authkey, outfile)
print("autoproxy server running with PID {}".format(p_server.pid))
time.sleep(1)
try:
print("running tests with python {} ...".format(sys.version_info[0]))
print()
if sys.version_info[0] == 3:
print("we are using python 3 ... try to connect ...")
try:
autoproxy_connect(server=SERVER, port=port, authkey=authkey)
except jobmanager.RemoteValueError as e:
if p_version_server == 2:
print("that is ok, because the server is running on python2") # the occurrence of this Exception is normal
print()
else:
print("RemoteValueError error")
raise # reraise exception
except Exception as e:
print("unexpected error {}".format(e))
raise
elif sys.version_info[0] == 2:
print("we are using python 2 ... try to connect ...")
try:
autoproxy_connect(server=SERVER, port=port, authkey=authkey)
except ValueError as e:
if p_version_server == 3:
print("that is ok, because the server is running on python3") # the occurrence of this Exception is normal
print()
else:
print("JMConnectionRefusedError error")
raise # reraise exception
except Exception as e:
print("unexpected error {}".format(e))
raise
# all the following only for the same python versions
if (sys.version_info[0] != p_version_server):
continue
try:
print("try to connect to server, use wrong port")
autoproxy_connect(server=SERVER, port=port+1, authkey=authkey)
except jobmanager.JMConnectionRefusedError:
print("that is ok")
print()
except:
raise
try:
print("try to connect to server, use wrong authkey")
autoproxy_connect(server=SERVER, port=port, authkey=authkey+'_')
except jobmanager.AuthenticationError:
print("that is ok")
print()
except:
raise
m = autoproxy_connect(server=SERVER, port=port, authkey=authkey)
print("try pass some data forth and back ...")
q = m.get_q()
q_get = jobmanager.proxy_operation_decorator_python3(q, 'get')
q_put = jobmanager.proxy_operation_decorator_python3(q, 'put')
s1 = 'hallo welt'
q_put(s1)
s2 = q_get()
assert s1 == s2
finally:
print()
print("tests done! terminate server ...".format())
p_server.send_signal(signal.SIGTERM)
t = time.time()
timeout = 10
r = None
while r is None:
r = p_server.poll()
time.sleep(1)
print("will kill server in {:.1f}s".format(timeout - (time.time() - t)))
if (time.time() - t) > timeout:
print("timeout exceeded, kill p_server")
print("the managers subprocess will still be running, and needs to be killed by hand")
p_server.send_signal(signal.SIGKILL)
break
print("server terminated with exitcode {}".format(r))
with open("ap_server.out", 'r') as outfile:
print("+"*40)
print("this is the server output:")
for l in outfile:
print(" {}".format(l[:-1]))
print("+"*40)
p_server.join()
def test_hum_size():
# bypassing the __all__ clause in jobmanagers __init__
@ -923,17 +764,17 @@ if __name__ == "__main__":
# test_Signal_to_terminate_process_list,
test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
# test_client_status,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
# test_client_status,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
lambda : print("END")
]
@ -945,6 +786,3 @@ if __name__ == "__main__":
f()
time.sleep(1)
# _test_interrupt_client()