added signalDelay to fix dead lock problems on client timeout, also work in progress to make the test run

This commit is contained in:
Richard Hartmann 2018-06-28 15:13:21 +02:00
parent 2bc4fadbf1
commit 779119c617
7 changed files with 404 additions and 155 deletions

View file

@ -14,9 +14,7 @@ branches:
python:
- '2.7'
- '3.4'
- '3.5'
- '3.6'
before_install:
- sudo apt-get install libhdf5-openmpi-dev

View file

@ -1,3 +1,3 @@
# put this into a separate file so we don't have a problem installing
# with pip (unmet dependencies upon import).
__version__ = "0.2.0"
__version__ = "0.3.0"

View file

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
"""jobmanager module
Richard Hartmann 2014
Richard Hartmann 2014-2018
This module provides an easy way to implement distributed computing
@ -53,6 +53,7 @@ import logging
import threading
import ctypes
from shutil import rmtree
from .signalDelay import sig_delay
log = logging.getLogger(__name__)
@ -64,41 +65,14 @@ __all__ = ["JobManager_Client",
"getDateForFileName"
]
# Magic conversion from 3 to 2
if sys.version_info[0] == 2:
# Python 2
import Queue as queue
class getfullargspec(object):
"A quick and dirty replacement for getfullargspec for Python 2"
def __init__(self, f):
self.args, self.varargs, self.varkw, self.defaults = \
inspect.getargspec(f)
self.annotations = getattr(f, '__annotations__', {})
inspect.getfullargspec = getfullargspec
# IOError (socket.error) expection handling
import errno
class JMConnectionError(Exception):
pass
class JMConnectionRefusedError(JMConnectionError):
pass
class JMConnectionResetError(JMConnectionError):
pass
input_promt = raw_input
else:
# Python 3
import queue
JMConnectionError = ConnectionError
JMConnectionRefusedError = ConnectionRefusedError
JMConnectionResetError = ConnectionResetError
import queue
input_promt = input
JMConnectionError = ConnectionError
JMConnectionRefusedError = ConnectionRefusedError
JMConnectionResetError = ConnectionResetError
input_promt = input
class JMHostNotReachableError(JMConnectionError):
pass
@ -405,9 +379,8 @@ class JobManager_Client(object):
log = logging.getLogger(__name__+'.'+progress.get_identifier(name='worker{}'.format(i+1), bold=False))
log.setLevel(loglevel)
q_lock = threading.Lock()
Signal_to_sys_exit(signals=[signal.SIGTERM], lock = q_lock)
Signal_to_sys_exit(signals=[signal.SIGTERM])
Signal_to_SIG_IGN(signals=[signal.SIGINT])
n = os.nice(0)
@ -461,11 +434,13 @@ class JobManager_Client(object):
try:
log.debug("wait until local result q is almost empty")
while local_result_q.qsize() > 1:
time.sleep(1)
time.sleep(0.1)
log.debug("done waiting, call job_q_get")
with q_lock:
with sig_delay([signal.SIGTERM]):
arg = job_q_get()
log.debug("process {}".format(arg))
log.debug("process {}".format(arg))
# regular case, just stop working when empty job_q was found
except queue.Empty:
log.info("finds empty job queue, processed %s jobs", cnt)
@ -516,7 +491,7 @@ class JobManager_Client(object):
log.debug("put arg to local fail_q")
try:
with q_lock:
with sig_delay([signal.SIGTERM]):
local_fail_q.put((arg, err.__name__, hostname))
# handle SystemExit in outer try ... except
except SystemExit as e:
@ -537,7 +512,7 @@ class JobManager_Client(object):
else:
try:
tp_0 = time.time()
with q_lock:
with sig_delay([signal.SIGTERM]):
local_result_q.put((arg, res))
tp_1 = time.time()
time_queue += (tp_1-tp_0)
@ -569,7 +544,7 @@ class JobManager_Client(object):
log.warning("SystemExit, quit processing, reinsert current argument, please wait")
log.debug("put arg back to local job_q")
try:
with q_lock:
with sig_delay([signal.SIGTERM]):
local_job_q.put(arg)
# handle SystemExit in outer try ... except
except SystemExit as e:
@ -633,8 +608,13 @@ class JobManager_Client(object):
m_progress = None
prepend = []
infoline = progress.StringValue(num_of_bytes=12)
infoline = None
if self.timeout is None:
infoline = None
else:
infoline = progress.StringValue(num_of_bytes=32)
infoline.value = "timeout in: {}s".format(int(self.timeout - (time.time() - self.init_time))).encode('utf-8')
# try:
# worker_stdout_queue = mp.Queue(-1)
@ -772,13 +752,16 @@ class JobManager_Client(object):
for p in self.procs:
while p.is_alive():
if (self.timeout is not None) and (self.timeout < time.time() - self.init_time):
log.debug("TIMEOUT for Client reached, terminate worker process %s", p.pid)
p.terminate()
log.debug("wait for worker process %s to be joined ...", p.pid)
p.join()
log.debug("worker process %s was joined", p.pid)
break
if self.timeout is not None:
elps_time = int(time.time() - self.init_time)
if self.timeout < elps_time:
log.debug("TIMEOUT for Client reached, terminate worker process %s", p.pid)
p.terminate()
log.debug("wait for worker process %s to be joined ...", p.pid)
p.join()
log.debug("worker process %s was joined", p.pid)
break
infoline.value = "timeout in: {}s".format(int(self.timeout - elps_time)).encode('utf-8')
p.join(10)
log.debug("worker process %s exitcode %s", p.pid, p.exitcode)
log.debug("worker process %s was joined", p.pid)
@ -859,14 +842,18 @@ class ContainerClosedError(Exception):
pass
class ClosableQueue_Data(object):
def __init__(self):
def __init__(self, name=None):
self.q = myQueue()
self.is_closed = False
self.lock = threading.Lock()
if name:
self.log_prefix = 'ClosableQueue:{} - '.format(name)
else:
self.log_prefix = ''
def new_conn(self):
conn1, conn2 = mp.Pipe()
log.debug("create new pair of connections, {}".format(conn2))
log.debug("{}create new pair of connections, {}".format(self.log_prefix, conn2))
thr_listener = threading.Thread(target=self._listener, args=(conn1, ))
thr_listener.daemon = True
thr_listener.start()
@ -877,7 +864,7 @@ class ClosableQueue_Data(object):
try:
res = f(*args)
except Exception as e:
log.debug("exec_cmd '{}' raises exception '{}'".format(f.__name__, type(e)))
log.debug("exec_cmd '{}' sends exception '{}'".format(f.__name__, type(e)))
conn.send( ('#exc', e) )
else:
log.debug("exec_cmd succesfull")
@ -887,7 +874,7 @@ class ClosableQueue_Data(object):
while True:
try:
cmd = conn.recv()
log.debug("listener got cmd '{}'".format(cmd))
log.debug("{}listener got cmd '{}'".format(self.log_prefix, cmd))
args = conn.recv()
if cmd == '#put':
self._exec_cmd(self.put, conn, args)
@ -900,7 +887,7 @@ class ClosableQueue_Data(object):
else:
raise ValueError("unknown command '{}'".format(cmd))
except EOFError:
log.debug("listener got EOFError, stop thread")
log.debug("{}listener got EOFError, stop thread".format(self.log_prefix))
break
def put(self, item, block=True, timeout=None):
@ -920,13 +907,14 @@ class ClosableQueue_Data(object):
def close(self):
with self.lock:
self.is_closed = True
log.debug("queue closed")
log.debug("{}queue closed".format(self.log_prefix))
class ClosableQueue(object):
def __init__(self):
self.data = ClosableQueue_Data()
def __init__(self, name=None):
self.data = ClosableQueue_Data(name)
self.conn = self.data.new_conn()
self.lock = threading.Lock()
self.name = name
def client(self):
cls = ClosableQueue.__new__(ClosableQueue)
@ -1368,10 +1356,8 @@ class JobManager_Server(object):
fname = None
self.job_q = ArgsContainer(fname)
self.result_q = ClosableQueue()
self.fail_q = ClosableQueue()
self.numjobs = progress.UnsignedIntValue(0)
self.result_q = ClosableQueue(name='result_q')
self.fail_q = ClosableQueue(name='fail_q')
self.stat = None
@ -1546,11 +1532,23 @@ class JobManager_Server(object):
for fail_item in data['fail_list']:
self.fail_q.put(fail_item)
log.debug("load: len(final_result): {}".format(len(self.final_result)))
log.debug("load: job_q.qsize: {}".format(self.job_q.qsize()))
log.debug("load: job_q.marked_items: {}".format(self.job_q.marked_items()))
log.debug("load: job_q.gotten_items: {}".format(self.job_q.gotten_items()))
log.debug("load: job_q.unmarked_items: {}".format(self.job_q.unmarked_items()))
log.debug("load: len(fail_q): {}".format(self.fail_q.qsize()))
def __dump(self, f):
pickle.dump(self.final_result, f, protocol=pickle.HIGHEST_PROTOCOL)
log.debug("dump: len(final_result): {}".format(len(self.final_result)))
pickle.dump(self.job_q, f, protocol=pickle.HIGHEST_PROTOCOL)
log.debug("dump: job_q.qsize: {}".format(self.job_q.qsize()))
log.debug("dump: job_q.marked_items: {}".format(self.job_q.marked_items()))
log.debug("dump: job_q.gotten_items: {}".format(self.job_q.gotten_items()))
log.debug("dump: job_q.unmarked_items: {}".format(self.job_q.unmarked_items()))
fail_list = []
try:
while True:
@ -1558,6 +1556,7 @@ class JobManager_Server(object):
except queue.Empty:
pass
pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL)
log.debug("dump: len(fail_q): {}".format(len(fail_list)))
def read_old_state(self, fname_dump=None):
@ -1595,8 +1594,7 @@ class JobManager_Server(object):
# the actual shared queue
self.job_q.put(copy.copy(a))
self.numjobs.value += 1
#with self._numjobs.get_lock():
# self._numjobs.value += 1
@ -1630,17 +1628,7 @@ class JobManager_Server(object):
log.critical("do not run JobManager_Server.start() in a subprocess")
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
# if (self.numjobs - self.numresults) != len(self.args_dict):
# log.debug("numjobs: %s\n" +
# "numresults: %s\n" +
# "len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict))
#
# log.critical(
# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
# raise RuntimeError(
# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
jobqsize = self.job_q.qsize()
jobqsize = self.job_q.qsize()
if jobqsize == 0:
log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q")
return
@ -1667,11 +1655,15 @@ class JobManager_Server(object):
info_line = progress.StringValue(num_of_bytes=100)
numresults = progress.UnsignedIntValue(0)
#numjobs = progress.UnsignedIntValue(self.job_q.put_items())
numresults = progress.UnsignedIntValue(self.job_q.marked_items() + self.fail_q.qsize())
numjobs = progress.UnsignedIntValue(self.job_q.put_items())
log.debug("at start: number of jobs: {}".format(numjobs.value))
log.debug("at start: number of results: {}".format(numresults.value))
with progress.ProgressBarFancy(count = numresults,
max_count = self.numjobs,
max_count = numjobs,
interval = self.msg_interval,
speed_calc_cycles = self.speed_calc_cycles,
sigint = 'ign',
@ -1680,7 +1672,8 @@ class JobManager_Server(object):
if not self.hide_progress:
self.stat.start()
while numresults.value < self.numjobs.value:
while numresults.value < numjobs.value:
numjobs.value = self.job_q.put_items()
failqsize = self.fail_q.qsize()
jobqsize = self.job_q.qsize()
markeditems = self.job_q.marked_items()
@ -1692,13 +1685,13 @@ class JobManager_Server(object):
self.stat.stop()
log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout))
break
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
"done:{}, failed:{}, in progress:{}, "+
info_line.value = ("res_q size:{}, jobs: rem.:{}, "+
"done:{}, failed:{}, prog.:{}, "+
"timeout in:{}s").format(self.result_q.qsize(),
jobqsize,
markeditems,
failqsize,
self.numjobs.value - numresults.value - jobqsize,
numjobs.value - numresults.value - jobqsize,
time_left).encode('utf-8')
else:
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
@ -1706,7 +1699,7 @@ class JobManager_Server(object):
jobqsize,
markeditems,
failqsize,
self.numjobs.value - numresults.value - jobqsize).encode('utf-8')
numjobs.value - numresults.value - jobqsize).encode('utf-8')
log.info("infoline {}".format(info_line.value))
# allows for update of the info line
try:
@ -1875,9 +1868,7 @@ class Signal_to_SIG_IGN(object):
class Signal_to_sys_exit(object):
def __init__(self, signals=[signal.SIGINT, signal.SIGTERM], lock=threading.Lock()):
self.lock = lock
self.c = 0
def __init__(self, signals=[signal.SIGINT, signal.SIGTERM]):
for s in signals:
signal.signal(s, self._handler)
@ -1886,16 +1877,8 @@ class Signal_to_sys_exit(object):
sys.exit('exit due to signal {}'.format(progress.signal_dict[signal]))
def _handler(self, signal, frame):
if self.c == 1:
log.warning("PID %s: received signal %s more than once, ignore lock", os.getpid(), progress.signal_dict[signal])
self._exit(signal)
self.c += 1
while not self.lock.acquire(timeout = 10):
log.warning("sys exit is stalled while aquiring lock")
try:
self._exit(signal)
finally:
self.lock.release()
self._exit(signal)
class Signal_to_terminate_process_list(object):
@ -2109,6 +2092,7 @@ def emergency_dump(arg, res, emergency_dump_path, host, port, authkey):
pickle.dump(res, f)
def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
output = ''
for i in range(retry):
try:
cmd = 'ping -c 1 -W {} {} '.format(int(timeout), adr)
@ -2117,6 +2101,7 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
except subprocess.CalledProcessError as e:
# on exception, resume with loop
log.warning("CalledProcessError on ping with message: %s", e)
output = e.output
continue
else:
# no exception, ping was successful, return without error
@ -2125,7 +2110,7 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
# no early return happend, ping was never successful, raise error
log.error("ping failed after %s retries", retry)
raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, e.output))
raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, output))
def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconnect_tries=3, ping_timeout=2, ping_retry=5):

105
jobmanager/signalDelay.py Normal file
View file

@ -0,0 +1,105 @@
"""
The signalDelay module provides a decorator to protect a function
from being interrupted via signaling mechanism.
When a protected function receives a signal, the signal will be stored
and emitted again AFTER the function has terminated.
"""
import logging
import traceback
import signal
import os
log = logging.getLogger(__name__)
SIG_MAP = {}
# read the signal names from the signal module and
# provide the emapping from the numeric value to the names
for s in signal.__dict__:
if s.startswith("SIG") and s[3] != '_':
SIG_MAP[getattr(signal, s)] = s
class SigHandler(object):
""" a handler class which
- stores the receives signals
- and allows to re-emit them via os.kill
"""
def __init__(self):
self.sigs_caught = []
def __call__(self, sig, frame): # a callable suitable to signal.signal
"""
the actual handler function
which will be called when a signal is received
all it does is appending the signal the the sigs_caught list
"""
log.info("caught sig '{}'".format(SIG_MAP[sig]))
log.debug("frame: {}".format(traceback.format_stack(frame)))
self.sigs_caught.append(sig)
def emit(self): # emit the signals
"""
reemits all the saved signals in the same order as they were caught
"""
l = len(self.sigs_caught)
if l > 0:
log.info("caught {} signal(s)".format(l))
for s in self.sigs_caught:
log.info("emit signal '{}'".format(SIG_MAP[s]))
os.kill(os.getpid(), s)
class sig_delay(object):
"""
a decorator / context manager object to prevent a function / context to be interrupted
by signals
init takes a list of signals which will be delayed
"""
def __init__(self, sigs):
self.sigs = sigs
self.sigh = None
def _setup(self):
"""
save the current signal handlers for the signals in the given signal list
and implement the special handler 'SigHandler' which only captures the reveived
signals in a list
"""
self.sigh = SigHandler()
self.old_handlers = []
log.debug("setup alternative signal handles for {}".format([SIG_MAP[s] for s in self.sigs]))
for s in self.sigs:
self.old_handlers.append(signal.getsignal(s))
signal.signal(s, self.sigh)
def _restore(self):
"""
reimplement the original handler for each signal of the given signal list
and trigger the SigHandler 'emit' function to reemit the signals
"""
log.debug("restore signal handles")
for i, s in enumerate(self.sigs):
signal.signal(s, self.old_handlers[i])
self.sigh.emit()
def __call__(self, func):
"""
enables the decorator style
"""
def _protected_func(*args, **kwargs):
with self:
log.debug("call function ...")
func(*args, **kwargs)
return _protected_func
def __enter__(self):
self._setup()
log.debug("signalDelay context entered ...")
def __exit__(self, exc_type, exc_val, exc_tb):
log.debug("signalDelay context left!")
self._restore()

View file

@ -75,10 +75,7 @@ if __name__ == "__main__":
"progress", "manager", "job", "persistent data", "scheduler"],
classifiers= [
'Operating System :: OS Independent',
#'Programming Language :: Python :: 2.7', #Todo
#'Programming Language :: Python :: 3.2', # also not very well tested
#'Programming Language :: Python :: 3.3', # also not very well tested
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.6',
'Intended Audience :: Science/Research'
],
platforms=['ALL'],

View file

@ -1,13 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import os
import sys
import time
import multiprocessing as mp
from multiprocessing import util
# util.log_to_stderr()
from multiprocessing.managers import BaseManager
import socket
import signal
@ -16,18 +12,15 @@ import datetime
import threading
from numpy import random
import pytest
import shutil
from os.path import abspath, dirname, split
import pathlib
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import binfootprint
import progression as progress
if sys.version_info[0] == 2:
TIMEOUT = 300
elif sys.version_info[0] == 3:
TIMEOUT = 15
TIMEOUT = 15
import warnings
warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning)
@ -35,7 +28,10 @@ warnings.filterwarnings('error', append=True)
import jobmanager
logging.getLogger('jobmanager').setLevel(logging.WARNING)
logging.getLogger('jobmanager').setLevel(logging.INFO)
# logging.getLogger('jobmanager').setLevel(logging.INFO)
# logging.getLogger('jobmanager.jobmanager.JobManager_Server').setLevel(logging.INFO)
logging.getLogger('jobmanager.signalDelay').setLevel(logging.INFO)
logging.getLogger('progression').setLevel(logging.ERROR)
logging.basicConfig(level = logging.INFO)
@ -559,9 +555,16 @@ def test_jobmanager_read_old_stat():
p_client.join(10)
p_server.join(10)
assert not p_client.is_alive(), "the client did not terminate on time!"
assert not p_server.is_alive(), "the server did not terminate on time!"
try:
assert not p_client.is_alive(), "the client did not terminate on time!"
except AssertionError:
p_client.terminate()
raise
assert p_client.exitcode == 0
assert p_server.exitcode == 0
print("[+] client and server terminated")
@ -572,9 +575,10 @@ def test_jobmanager_read_old_stat():
p_server = mp.Process(target=start_server, args=(n,True))
p_server.start()
time.sleep(1)
time.sleep(2)
p_client = mp.Process(target=start_client)
print("trigger start client")
p_client.start()
p_client.join(30)
@ -637,16 +641,25 @@ def test_client_status():
def test_jobmanager_local():
global PORT
PORT += 1
args = range(1,200)
args = range(1,201)
client_sleep = 0.1
num_client = 4
t0 = time.time()
with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client,
authkey = AUTHKEY,
port = PORT,
const_arg = 0.1,
nproc = 4) as jm_server:
const_arg = client_sleep,
nproc = num_client) as jm_server:
jm_server.args_from_list(args)
jm_server.start()
assert jm_server.all_successfully_processed()
t1 = time.time()
print("local JM, nproc {}".format(num_client))
print("used time : {:.3}s".format(t1-t0))
print("ideal time: {}s".format(len(args)*client_sleep/num_client))
def test_start_server_on_used_port():
global PORT
@ -780,14 +793,37 @@ def test_ArgsContainer():
from shutil import rmtree
import shelve
fname = "test.shelve"
try:
os.remove(fname)
except FileNotFoundError:
pass
try:
os.remove(fname+'.db')
except FileNotFoundError:
pass
# simple test on shelve, close is needed to write data to disk
s = shelve.open("test")
s = shelve.open(fname)
s['a'] = 1
s.close()
s2 = shelve.open("test")
s2 = shelve.open(fname)
assert s2['a'] == 1
try:
os.remove(fname)
except FileNotFoundError:
pass
try:
os.remove(fname+'.db')
except FileNotFoundError:
pass
path = 'argscont'
# remove old test data
@ -978,6 +1014,9 @@ def test_ArgsContainer_BaseManager():
assert ac_inst.qsize() == 200
ac_inst.clear()
def test_ArgsContainer_BaseManager_in_subprocess():
from jobmanager.jobmanager import ArgsContainer
@ -1075,6 +1114,8 @@ def test_ArgsContainer_BaseManager_in_subprocess():
print("caught ContainerClosedError")
else:
assert False
ac_inst.clear()
def test_havy_load_on_ArgsContainer():
from jobmanager.jobmanager import ArgsContainer
@ -1129,6 +1170,7 @@ def test_havy_load_on_ArgsContainer():
time.sleep(1)
print(ac_inst.qsize())
ac_inst.clear()
def test_ClosableQueue():
@ -1264,36 +1306,36 @@ def test_ClosableQueue_with_manager():
if __name__ == "__main__":
logging.getLogger('jobmanager').setLevel(logging.DEBUG)
logging.getLogger('jobmanager').setLevel(logging.INFO)
if len(sys.argv) > 1:
pass
else:
func = [
test_ArgsContainer,
test_ArgsContainer_BaseManager,
test_ArgsContainer_BaseManager_in_subprocess,
test_havy_load_on_ArgsContainer,
# test_ClosableQueue,
# test_ClosableQueue_with_manager,
# test_hum_size,
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list,
# test_jobmanager_static_client_call,
# test_start_server_with_no_args,
# test_start_server,
# test_client,
# test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_jobmanager_read_old_stat,
# test_client_status,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
# test_ArgsContainer,
# test_ArgsContainer_BaseManager,
# test_ArgsContainer_BaseManager_in_subprocess,
# test_havy_load_on_ArgsContainer,
# test_ClosableQueue,
# test_ClosableQueue_with_manager,
# test_hum_size,
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list,
# test_jobmanager_static_client_call,
# test_start_server_with_no_args,
# test_start_server,
# test_client,
# test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_jobmanager_read_old_stat,
# test_client_status,
test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
lambda : print("END")
]
@ -1303,4 +1345,10 @@ if __name__ == "__main__":
print('## {}'.format(f.__name__))
print()
f()
#time.sleep(1)
#time.sleep(1)
for f in os.listdir('./'):
if f.endswith('.dump'):
os.remove('./{}'.format(f))
elif f.endswith('_jobqdb'):
shutil.rmtree('./{}'.format(f))

116
tests/test_signalDelay.py Normal file
View file

@ -0,0 +1,116 @@
import pathlib
import sys
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
import logging
import multiprocessing as mp
from jobmanager import signalDelay
import os
import signal
import time
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter(fmt="%(asctime)s|%(name)s|%(levelname)s|%(msg)s"))
signalDelay.log.setLevel(logging.DEBUG)
signalDelay.log.addHandler(ch)
v = mp.Value('I')
sleep_time = 0.1
def no_output():
return
f = open('/dev/null', 'w')
sys.stdout = f
sys.stderr = f
def test_sd():
def _important_func(v):
no_output()
try:
for i in range(10):
v.value = i
time.sleep(sleep_time)
except KeyboardInterrupt:
# this prevents the traceback
pass
@signalDelay.sig_delay([signal.SIGINT])
def _important_func_with_dec(v):
_important_func(v)
# call _important_func in a subprocess and send SIGINT after 1 second
# the subprocess will terminate immediately
# v should be smaller than 5
p = mp.Process(target=_important_func, args=(v,))
p.start()
time.sleep(2*sleep_time)
os.kill(p.pid, signal.SIGINT)
p.join()
assert v.value < 5
assert p.exitcode == 0 # since the KeyboardInterrupt Error is caught and ignored
# call _important_func in a subprocess and send SIGINT after 1 second
# the subprocess will terminate immediately
# v should be smaller than 5
p = mp.Process(target=_important_func_with_dec, args=(v,))
p.start()
time.sleep(2*sleep_time)
os.kill(p.pid, signal.SIGINT)
p.join()
assert v.value == 9
assert p.exitcode == 1 # since the SIGINT is reemited again after the scope
# of _important_func the KeyboardInterrupt Error can not be caught
def test_sd_ctx():
def _important_func(v):
no_output()
try:
for i in range(10):
v.value = i
time.sleep(sleep_time)
except KeyboardInterrupt:
# this prevents the traceback
pass
def _important_func_with_dec(v):
with signalDelay.sig_delay([signal.SIGINT]):
_important_func(v)
# call _important_func in a subprocess and send SIGINT after 1 second
# the subprocess will terminate immediately
# v should be smaller than 5
p = mp.Process(target=_important_func, args=(v,))
p.start()
time.sleep(2*sleep_time)
os.kill(p.pid, signal.SIGINT)
p.join()
assert v.value < 5
assert p.exitcode == 0 # since the KeyboardInterrupt Error is caught and ignored
# call _important_func in a subprocess and send SIGINT after 1 second
# the subprocess will terminate immediately
# v should be smaller than 5
p = mp.Process(target=_important_func_with_dec, args=(v,))
p.start()
time.sleep(2*sleep_time)
os.kill(p.pid, signal.SIGINT)
p.join()
assert v.value == 9
assert p.exitcode == 1 # since the SIGINT is reemited again after the scope
# of _important_func the KeyboardInterrupt Error can not be caught
if __name__ == "__main__":
test_sd()
test_sd_ctx()