From 779119c6179f5c9720bdb51dc2f7b47527f6715e Mon Sep 17 00:00:00 2001 From: Richard Hartmann Date: Thu, 28 Jun 2018 15:13:21 +0200 Subject: [PATCH] added signalDelay to fix dead lock problems on client timeout, also work in progress to make the test run --- .travis.yml | 4 +- jobmanager/jm_version.py | 2 +- jobmanager/jobmanager.py | 185 ++++++++++++++++++-------------------- jobmanager/signalDelay.py | 105 ++++++++++++++++++++++ setup.py | 5 +- tests/test_jobmanager.py | 142 +++++++++++++++++++---------- tests/test_signalDelay.py | 116 ++++++++++++++++++++++++ 7 files changed, 404 insertions(+), 155 deletions(-) create mode 100644 jobmanager/signalDelay.py create mode 100644 tests/test_signalDelay.py diff --git a/.travis.yml b/.travis.yml index fc4fc02..88dfa87 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,9 +14,7 @@ branches: python: -- '2.7' -- '3.4' -- '3.5' +- '3.6' before_install: - sudo apt-get install libhdf5-openmpi-dev diff --git a/jobmanager/jm_version.py b/jobmanager/jm_version.py index 4a83951..0dda9b8 100644 --- a/jobmanager/jm_version.py +++ b/jobmanager/jm_version.py @@ -1,3 +1,3 @@ # put this into a separate file so we don't have a problem installing # with pip (unmet dependencies upon import). -__version__ = "0.2.0" +__version__ = "0.3.0" diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index ae1bcae..802b2da 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """jobmanager module -Richard Hartmann 2014 +Richard Hartmann 2014-2018 This module provides an easy way to implement distributed computing @@ -53,6 +53,7 @@ import logging import threading import ctypes from shutil import rmtree +from .signalDelay import sig_delay log = logging.getLogger(__name__) @@ -64,41 +65,14 @@ __all__ = ["JobManager_Client", "getDateForFileName" ] -# Magic conversion from 3 to 2 -if sys.version_info[0] == 2: - # Python 2 - import Queue as queue - class getfullargspec(object): - "A quick and dirty replacement for getfullargspec for Python 2" - def __init__(self, f): - self.args, self.varargs, self.varkw, self.defaults = \ - inspect.getargspec(f) - self.annotations = getattr(f, '__annotations__', {}) - inspect.getfullargspec = getfullargspec - - # IOError (socket.error) expection handling - import errno - - class JMConnectionError(Exception): - pass - - class JMConnectionRefusedError(JMConnectionError): - pass - - class JMConnectionResetError(JMConnectionError): - pass - input_promt = raw_input - -else: - # Python 3 - import queue - - JMConnectionError = ConnectionError - JMConnectionRefusedError = ConnectionRefusedError - JMConnectionResetError = ConnectionResetError +import queue - input_promt = input +JMConnectionError = ConnectionError +JMConnectionRefusedError = ConnectionRefusedError +JMConnectionResetError = ConnectionResetError + +input_promt = input class JMHostNotReachableError(JMConnectionError): pass @@ -405,9 +379,8 @@ class JobManager_Client(object): log = logging.getLogger(__name__+'.'+progress.get_identifier(name='worker{}'.format(i+1), bold=False)) log.setLevel(loglevel) - q_lock = threading.Lock() - Signal_to_sys_exit(signals=[signal.SIGTERM], lock = q_lock) + Signal_to_sys_exit(signals=[signal.SIGTERM]) Signal_to_SIG_IGN(signals=[signal.SIGINT]) n = os.nice(0) @@ -461,11 +434,13 @@ class JobManager_Client(object): try: log.debug("wait until local result q is almost empty") while local_result_q.qsize() > 1: - time.sleep(1) + time.sleep(0.1) log.debug("done waiting, call job_q_get") - with q_lock: + + with sig_delay([signal.SIGTERM]): arg = job_q_get() - log.debug("process {}".format(arg)) + log.debug("process {}".format(arg)) + # regular case, just stop working when empty job_q was found except queue.Empty: log.info("finds empty job queue, processed %s jobs", cnt) @@ -516,7 +491,7 @@ class JobManager_Client(object): log.debug("put arg to local fail_q") try: - with q_lock: + with sig_delay([signal.SIGTERM]): local_fail_q.put((arg, err.__name__, hostname)) # handle SystemExit in outer try ... except except SystemExit as e: @@ -537,7 +512,7 @@ class JobManager_Client(object): else: try: tp_0 = time.time() - with q_lock: + with sig_delay([signal.SIGTERM]): local_result_q.put((arg, res)) tp_1 = time.time() time_queue += (tp_1-tp_0) @@ -569,7 +544,7 @@ class JobManager_Client(object): log.warning("SystemExit, quit processing, reinsert current argument, please wait") log.debug("put arg back to local job_q") try: - with q_lock: + with sig_delay([signal.SIGTERM]): local_job_q.put(arg) # handle SystemExit in outer try ... except except SystemExit as e: @@ -633,8 +608,13 @@ class JobManager_Client(object): m_progress = None prepend = [] - infoline = progress.StringValue(num_of_bytes=12) - infoline = None + + if self.timeout is None: + infoline = None + else: + infoline = progress.StringValue(num_of_bytes=32) + infoline.value = "timeout in: {}s".format(int(self.timeout - (time.time() - self.init_time))).encode('utf-8') + # try: # worker_stdout_queue = mp.Queue(-1) @@ -772,13 +752,16 @@ class JobManager_Client(object): for p in self.procs: while p.is_alive(): - if (self.timeout is not None) and (self.timeout < time.time() - self.init_time): - log.debug("TIMEOUT for Client reached, terminate worker process %s", p.pid) - p.terminate() - log.debug("wait for worker process %s to be joined ...", p.pid) - p.join() - log.debug("worker process %s was joined", p.pid) - break + if self.timeout is not None: + elps_time = int(time.time() - self.init_time) + if self.timeout < elps_time: + log.debug("TIMEOUT for Client reached, terminate worker process %s", p.pid) + p.terminate() + log.debug("wait for worker process %s to be joined ...", p.pid) + p.join() + log.debug("worker process %s was joined", p.pid) + break + infoline.value = "timeout in: {}s".format(int(self.timeout - elps_time)).encode('utf-8') p.join(10) log.debug("worker process %s exitcode %s", p.pid, p.exitcode) log.debug("worker process %s was joined", p.pid) @@ -859,14 +842,18 @@ class ContainerClosedError(Exception): pass class ClosableQueue_Data(object): - def __init__(self): + def __init__(self, name=None): self.q = myQueue() self.is_closed = False self.lock = threading.Lock() + if name: + self.log_prefix = 'ClosableQueue:{} - '.format(name) + else: + self.log_prefix = '' def new_conn(self): conn1, conn2 = mp.Pipe() - log.debug("create new pair of connections, {}".format(conn2)) + log.debug("{}create new pair of connections, {}".format(self.log_prefix, conn2)) thr_listener = threading.Thread(target=self._listener, args=(conn1, )) thr_listener.daemon = True thr_listener.start() @@ -877,7 +864,7 @@ class ClosableQueue_Data(object): try: res = f(*args) except Exception as e: - log.debug("exec_cmd '{}' raises exception '{}'".format(f.__name__, type(e))) + log.debug("exec_cmd '{}' sends exception '{}'".format(f.__name__, type(e))) conn.send( ('#exc', e) ) else: log.debug("exec_cmd succesfull") @@ -887,7 +874,7 @@ class ClosableQueue_Data(object): while True: try: cmd = conn.recv() - log.debug("listener got cmd '{}'".format(cmd)) + log.debug("{}listener got cmd '{}'".format(self.log_prefix, cmd)) args = conn.recv() if cmd == '#put': self._exec_cmd(self.put, conn, args) @@ -900,7 +887,7 @@ class ClosableQueue_Data(object): else: raise ValueError("unknown command '{}'".format(cmd)) except EOFError: - log.debug("listener got EOFError, stop thread") + log.debug("{}listener got EOFError, stop thread".format(self.log_prefix)) break def put(self, item, block=True, timeout=None): @@ -920,13 +907,14 @@ class ClosableQueue_Data(object): def close(self): with self.lock: self.is_closed = True - log.debug("queue closed") + log.debug("{}queue closed".format(self.log_prefix)) class ClosableQueue(object): - def __init__(self): - self.data = ClosableQueue_Data() + def __init__(self, name=None): + self.data = ClosableQueue_Data(name) self.conn = self.data.new_conn() self.lock = threading.Lock() + self.name = name def client(self): cls = ClosableQueue.__new__(ClosableQueue) @@ -1368,10 +1356,8 @@ class JobManager_Server(object): fname = None self.job_q = ArgsContainer(fname) - self.result_q = ClosableQueue() - self.fail_q = ClosableQueue() - - self.numjobs = progress.UnsignedIntValue(0) + self.result_q = ClosableQueue(name='result_q') + self.fail_q = ClosableQueue(name='fail_q') self.stat = None @@ -1546,11 +1532,23 @@ class JobManager_Server(object): for fail_item in data['fail_list']: self.fail_q.put(fail_item) + log.debug("load: len(final_result): {}".format(len(self.final_result))) + log.debug("load: job_q.qsize: {}".format(self.job_q.qsize())) + log.debug("load: job_q.marked_items: {}".format(self.job_q.marked_items())) + log.debug("load: job_q.gotten_items: {}".format(self.job_q.gotten_items())) + log.debug("load: job_q.unmarked_items: {}".format(self.job_q.unmarked_items())) + log.debug("load: len(fail_q): {}".format(self.fail_q.qsize())) + - def __dump(self, f): pickle.dump(self.final_result, f, protocol=pickle.HIGHEST_PROTOCOL) + log.debug("dump: len(final_result): {}".format(len(self.final_result))) pickle.dump(self.job_q, f, protocol=pickle.HIGHEST_PROTOCOL) + log.debug("dump: job_q.qsize: {}".format(self.job_q.qsize())) + log.debug("dump: job_q.marked_items: {}".format(self.job_q.marked_items())) + log.debug("dump: job_q.gotten_items: {}".format(self.job_q.gotten_items())) + log.debug("dump: job_q.unmarked_items: {}".format(self.job_q.unmarked_items())) + fail_list = [] try: while True: @@ -1558,6 +1556,7 @@ class JobManager_Server(object): except queue.Empty: pass pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL) + log.debug("dump: len(fail_q): {}".format(len(fail_list))) def read_old_state(self, fname_dump=None): @@ -1595,8 +1594,7 @@ class JobManager_Server(object): # the actual shared queue self.job_q.put(copy.copy(a)) - self.numjobs.value += 1 - + #with self._numjobs.get_lock(): # self._numjobs.value += 1 @@ -1630,17 +1628,7 @@ class JobManager_Server(object): log.critical("do not run JobManager_Server.start() in a subprocess") raise RuntimeError("do not run JobManager_Server.start() in a subprocess") -# if (self.numjobs - self.numresults) != len(self.args_dict): -# log.debug("numjobs: %s\n" + -# "numresults: %s\n" + -# "len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict)) -# -# log.critical( -# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") -# raise RuntimeError( -# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") - - jobqsize = self.job_q.qsize() + jobqsize = self.job_q.qsize() if jobqsize == 0: log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q") return @@ -1667,11 +1655,15 @@ class JobManager_Server(object): info_line = progress.StringValue(num_of_bytes=100) - numresults = progress.UnsignedIntValue(0) - #numjobs = progress.UnsignedIntValue(self.job_q.put_items()) - + numresults = progress.UnsignedIntValue(self.job_q.marked_items() + self.fail_q.qsize()) + numjobs = progress.UnsignedIntValue(self.job_q.put_items()) + + log.debug("at start: number of jobs: {}".format(numjobs.value)) + log.debug("at start: number of results: {}".format(numresults.value)) + + with progress.ProgressBarFancy(count = numresults, - max_count = self.numjobs, + max_count = numjobs, interval = self.msg_interval, speed_calc_cycles = self.speed_calc_cycles, sigint = 'ign', @@ -1680,7 +1672,8 @@ class JobManager_Server(object): if not self.hide_progress: self.stat.start() - while numresults.value < self.numjobs.value: + while numresults.value < numjobs.value: + numjobs.value = self.job_q.put_items() failqsize = self.fail_q.qsize() jobqsize = self.job_q.qsize() markeditems = self.job_q.marked_items() @@ -1692,13 +1685,13 @@ class JobManager_Server(object): self.stat.stop() log.warning("timeout ({}s) exceeded -> quit server".format(self.timeout)) break - info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ - "done:{}, failed:{}, in progress:{}, "+ + info_line.value = ("res_q size:{}, jobs: rem.:{}, "+ + "done:{}, failed:{}, prog.:{}, "+ "timeout in:{}s").format(self.result_q.qsize(), jobqsize, markeditems, failqsize, - self.numjobs.value - numresults.value - jobqsize, + numjobs.value - numresults.value - jobqsize, time_left).encode('utf-8') else: info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ @@ -1706,7 +1699,7 @@ class JobManager_Server(object): jobqsize, markeditems, failqsize, - self.numjobs.value - numresults.value - jobqsize).encode('utf-8') + numjobs.value - numresults.value - jobqsize).encode('utf-8') log.info("infoline {}".format(info_line.value)) # allows for update of the info line try: @@ -1875,9 +1868,7 @@ class Signal_to_SIG_IGN(object): class Signal_to_sys_exit(object): - def __init__(self, signals=[signal.SIGINT, signal.SIGTERM], lock=threading.Lock()): - self.lock = lock - self.c = 0 + def __init__(self, signals=[signal.SIGINT, signal.SIGTERM]): for s in signals: signal.signal(s, self._handler) @@ -1886,16 +1877,8 @@ class Signal_to_sys_exit(object): sys.exit('exit due to signal {}'.format(progress.signal_dict[signal])) def _handler(self, signal, frame): - if self.c == 1: - log.warning("PID %s: received signal %s more than once, ignore lock", os.getpid(), progress.signal_dict[signal]) - self._exit(signal) - self.c += 1 - while not self.lock.acquire(timeout = 10): - log.warning("sys exit is stalled while aquiring lock") - try: - self._exit(signal) - finally: - self.lock.release() + self._exit(signal) + class Signal_to_terminate_process_list(object): @@ -2109,6 +2092,7 @@ def emergency_dump(arg, res, emergency_dump_path, host, port, authkey): pickle.dump(res, f) def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5): + output = '' for i in range(retry): try: cmd = 'ping -c 1 -W {} {} '.format(int(timeout), adr) @@ -2117,6 +2101,7 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5): except subprocess.CalledProcessError as e: # on exception, resume with loop log.warning("CalledProcessError on ping with message: %s", e) + output = e.output continue else: # no exception, ping was successful, return without error @@ -2125,7 +2110,7 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5): # no early return happend, ping was never successful, raise error log.error("ping failed after %s retries", retry) - raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, e.output)) + raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, output)) def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconnect_tries=3, ping_timeout=2, ping_retry=5): diff --git a/jobmanager/signalDelay.py b/jobmanager/signalDelay.py new file mode 100644 index 0000000..1c1028b --- /dev/null +++ b/jobmanager/signalDelay.py @@ -0,0 +1,105 @@ +""" + The signalDelay module provides a decorator to protect a function + from being interrupted via signaling mechanism. + + When a protected function receives a signal, the signal will be stored + and emitted again AFTER the function has terminated. +""" + +import logging +import traceback +import signal +import os + +log = logging.getLogger(__name__) + +SIG_MAP = {} +# read the signal names from the signal module and +# provide the emapping from the numeric value to the names +for s in signal.__dict__: + if s.startswith("SIG") and s[3] != '_': + SIG_MAP[getattr(signal, s)] = s + +class SigHandler(object): + """ a handler class which + - stores the receives signals + - and allows to re-emit them via os.kill + """ + def __init__(self): + self.sigs_caught = [] + + def __call__(self, sig, frame): # a callable suitable to signal.signal + """ + the actual handler function + + which will be called when a signal is received + + all it does is appending the signal the the sigs_caught list + """ + log.info("caught sig '{}'".format(SIG_MAP[sig])) + log.debug("frame: {}".format(traceback.format_stack(frame))) + self.sigs_caught.append(sig) + + def emit(self): # emit the signals + """ + reemits all the saved signals in the same order as they were caught + """ + l = len(self.sigs_caught) + if l > 0: + log.info("caught {} signal(s)".format(l)) + for s in self.sigs_caught: + log.info("emit signal '{}'".format(SIG_MAP[s])) + os.kill(os.getpid(), s) + +class sig_delay(object): + """ + a decorator / context manager object to prevent a function / context to be interrupted + by signals + + init takes a list of signals which will be delayed + """ + def __init__(self, sigs): + self.sigs = sigs + self.sigh = None + + def _setup(self): + """ + save the current signal handlers for the signals in the given signal list + and implement the special handler 'SigHandler' which only captures the reveived + signals in a list + """ + self.sigh = SigHandler() + self.old_handlers = [] + log.debug("setup alternative signal handles for {}".format([SIG_MAP[s] for s in self.sigs])) + for s in self.sigs: + self.old_handlers.append(signal.getsignal(s)) + signal.signal(s, self.sigh) + + def _restore(self): + """ + reimplement the original handler for each signal of the given signal list + and trigger the SigHandler 'emit' function to reemit the signals + """ + log.debug("restore signal handles") + for i, s in enumerate(self.sigs): + signal.signal(s, self.old_handlers[i]) + self.sigh.emit() + + def __call__(self, func): + """ + enables the decorator style + """ + def _protected_func(*args, **kwargs): + with self: + log.debug("call function ...") + func(*args, **kwargs) + + return _protected_func + + def __enter__(self): + self._setup() + log.debug("signalDelay context entered ...") + + def __exit__(self, exc_type, exc_val, exc_tb): + log.debug("signalDelay context left!") + self._restore() \ No newline at end of file diff --git a/setup.py b/setup.py index 4757204..8bd3a48 100644 --- a/setup.py +++ b/setup.py @@ -75,10 +75,7 @@ if __name__ == "__main__": "progress", "manager", "job", "persistent data", "scheduler"], classifiers= [ 'Operating System :: OS Independent', - #'Programming Language :: Python :: 2.7', #Todo - #'Programming Language :: Python :: 3.2', # also not very well tested - #'Programming Language :: Python :: 3.3', # also not very well tested - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.6', 'Intended Audience :: Science/Research' ], platforms=['ALL'], diff --git a/tests/test_jobmanager.py b/tests/test_jobmanager.py index 3927ce3..6d08ee9 100644 --- a/tests/test_jobmanager.py +++ b/tests/test_jobmanager.py @@ -1,13 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from __future__ import division, print_function - import os import sys import time import multiprocessing as mp -from multiprocessing import util -# util.log_to_stderr() from multiprocessing.managers import BaseManager import socket import signal @@ -16,18 +12,15 @@ import datetime import threading from numpy import random import pytest +import shutil -from os.path import abspath, dirname, split +import pathlib +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) -# Add parent directory to beginning of path variable -sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path import binfootprint import progression as progress -if sys.version_info[0] == 2: - TIMEOUT = 300 -elif sys.version_info[0] == 3: - TIMEOUT = 15 +TIMEOUT = 15 import warnings warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning) @@ -35,7 +28,10 @@ warnings.filterwarnings('error', append=True) import jobmanager -logging.getLogger('jobmanager').setLevel(logging.WARNING) +logging.getLogger('jobmanager').setLevel(logging.INFO) +# logging.getLogger('jobmanager').setLevel(logging.INFO) +# logging.getLogger('jobmanager.jobmanager.JobManager_Server').setLevel(logging.INFO) +logging.getLogger('jobmanager.signalDelay').setLevel(logging.INFO) logging.getLogger('progression').setLevel(logging.ERROR) logging.basicConfig(level = logging.INFO) @@ -559,9 +555,16 @@ def test_jobmanager_read_old_stat(): p_client.join(10) p_server.join(10) - - assert not p_client.is_alive(), "the client did not terminate on time!" + + assert not p_server.is_alive(), "the server did not terminate on time!" + + try: + assert not p_client.is_alive(), "the client did not terminate on time!" + except AssertionError: + p_client.terminate() + raise + assert p_client.exitcode == 0 assert p_server.exitcode == 0 print("[+] client and server terminated") @@ -572,9 +575,10 @@ def test_jobmanager_read_old_stat(): p_server = mp.Process(target=start_server, args=(n,True)) p_server.start() - time.sleep(1) + time.sleep(2) p_client = mp.Process(target=start_client) + print("trigger start client") p_client.start() p_client.join(30) @@ -637,16 +641,25 @@ def test_client_status(): def test_jobmanager_local(): global PORT PORT += 1 - args = range(1,200) + args = range(1,201) + client_sleep = 0.1 + num_client = 4 + t0 = time.time() with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client, authkey = AUTHKEY, port = PORT, - const_arg = 0.1, - nproc = 4) as jm_server: + const_arg = client_sleep, + nproc = num_client) as jm_server: jm_server.args_from_list(args) jm_server.start() assert jm_server.all_successfully_processed() + t1 = time.time() + print("local JM, nproc {}".format(num_client)) + print("used time : {:.3}s".format(t1-t0)) + print("ideal time: {}s".format(len(args)*client_sleep/num_client)) + + def test_start_server_on_used_port(): global PORT @@ -780,14 +793,37 @@ def test_ArgsContainer(): from shutil import rmtree import shelve + fname = "test.shelve" + + try: + os.remove(fname) + except FileNotFoundError: + pass + + try: + os.remove(fname+'.db') + except FileNotFoundError: + pass # simple test on shelve, close is needed to write data to disk - s = shelve.open("test") + s = shelve.open(fname) s['a'] = 1 s.close() - s2 = shelve.open("test") + s2 = shelve.open(fname) assert s2['a'] == 1 + try: + os.remove(fname) + except FileNotFoundError: + pass + + try: + os.remove(fname+'.db') + except FileNotFoundError: + pass + + + path = 'argscont' # remove old test data @@ -978,6 +1014,9 @@ def test_ArgsContainer_BaseManager(): assert ac_inst.qsize() == 200 + ac_inst.clear() + + def test_ArgsContainer_BaseManager_in_subprocess(): from jobmanager.jobmanager import ArgsContainer @@ -1075,6 +1114,8 @@ def test_ArgsContainer_BaseManager_in_subprocess(): print("caught ContainerClosedError") else: assert False + + ac_inst.clear() def test_havy_load_on_ArgsContainer(): from jobmanager.jobmanager import ArgsContainer @@ -1129,6 +1170,7 @@ def test_havy_load_on_ArgsContainer(): time.sleep(1) print(ac_inst.qsize()) + ac_inst.clear() def test_ClosableQueue(): @@ -1264,36 +1306,36 @@ def test_ClosableQueue_with_manager(): if __name__ == "__main__": - logging.getLogger('jobmanager').setLevel(logging.DEBUG) + logging.getLogger('jobmanager').setLevel(logging.INFO) if len(sys.argv) > 1: pass else: func = [ - test_ArgsContainer, - test_ArgsContainer_BaseManager, - test_ArgsContainer_BaseManager_in_subprocess, - test_havy_load_on_ArgsContainer, -# test_ClosableQueue, -# test_ClosableQueue_with_manager, -# test_hum_size, -# test_Signal_to_SIG_IGN, -# test_Signal_to_sys_exit, -# test_Signal_to_terminate_process_list, -# test_jobmanager_static_client_call, -# test_start_server_with_no_args, -# test_start_server, -# test_client, -# test_jobmanager_basic, -# test_jobmanager_server_signals, -# test_shutdown_server_while_client_running, -# test_shutdown_client, -# test_jobmanager_read_old_stat, -# test_client_status, -# test_jobmanager_local, -# test_start_server_on_used_port, -# test_shared_const_arg, -# test_digest_rejected, -# test_hum_size, + # test_ArgsContainer, + # test_ArgsContainer_BaseManager, + # test_ArgsContainer_BaseManager_in_subprocess, + # test_havy_load_on_ArgsContainer, + # test_ClosableQueue, + # test_ClosableQueue_with_manager, + # test_hum_size, + # test_Signal_to_SIG_IGN, + # test_Signal_to_sys_exit, + # test_Signal_to_terminate_process_list, + # test_jobmanager_static_client_call, + # test_start_server_with_no_args, + # test_start_server, + # test_client, + # test_jobmanager_basic, + # test_jobmanager_server_signals, + # test_shutdown_server_while_client_running, + # test_shutdown_client, + # test_jobmanager_read_old_stat, + # test_client_status, + test_jobmanager_local, + # test_start_server_on_used_port, + # test_shared_const_arg, + # test_digest_rejected, + # test_hum_size, lambda : print("END") ] @@ -1303,4 +1345,10 @@ if __name__ == "__main__": print('## {}'.format(f.__name__)) print() f() - #time.sleep(1) \ No newline at end of file + #time.sleep(1) + + for f in os.listdir('./'): + if f.endswith('.dump'): + os.remove('./{}'.format(f)) + elif f.endswith('_jobqdb'): + shutil.rmtree('./{}'.format(f)) \ No newline at end of file diff --git a/tests/test_signalDelay.py b/tests/test_signalDelay.py new file mode 100644 index 0000000..bbe3b7f --- /dev/null +++ b/tests/test_signalDelay.py @@ -0,0 +1,116 @@ +import pathlib +import sys +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) + +import logging +import multiprocessing as mp +from jobmanager import signalDelay +import os +import signal + +import time + + + + +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +ch.setFormatter(logging.Formatter(fmt="%(asctime)s|%(name)s|%(levelname)s|%(msg)s")) + +signalDelay.log.setLevel(logging.DEBUG) +signalDelay.log.addHandler(ch) + +v = mp.Value('I') + +sleep_time = 0.1 + + +def no_output(): + return + f = open('/dev/null', 'w') + sys.stdout = f + sys.stderr = f + +def test_sd(): + + def _important_func(v): + no_output() + try: + for i in range(10): + v.value = i + time.sleep(sleep_time) + except KeyboardInterrupt: + # this prevents the traceback + pass + + @signalDelay.sig_delay([signal.SIGINT]) + def _important_func_with_dec(v): + _important_func(v) + + # call _important_func in a subprocess and send SIGINT after 1 second + # the subprocess will terminate immediately + # v should be smaller than 5 + p = mp.Process(target=_important_func, args=(v,)) + p.start() + time.sleep(2*sleep_time) + os.kill(p.pid, signal.SIGINT) + p.join() + assert v.value < 5 + assert p.exitcode == 0 # since the KeyboardInterrupt Error is caught and ignored + + + # call _important_func in a subprocess and send SIGINT after 1 second + # the subprocess will terminate immediately + # v should be smaller than 5 + p = mp.Process(target=_important_func_with_dec, args=(v,)) + p.start() + time.sleep(2*sleep_time) + os.kill(p.pid, signal.SIGINT) + p.join() + assert v.value == 9 + assert p.exitcode == 1 # since the SIGINT is reemited again after the scope + # of _important_func the KeyboardInterrupt Error can not be caught + + +def test_sd_ctx(): + def _important_func(v): + no_output() + try: + for i in range(10): + v.value = i + time.sleep(sleep_time) + except KeyboardInterrupt: + # this prevents the traceback + pass + + def _important_func_with_dec(v): + with signalDelay.sig_delay([signal.SIGINT]): + _important_func(v) + + # call _important_func in a subprocess and send SIGINT after 1 second + # the subprocess will terminate immediately + # v should be smaller than 5 + p = mp.Process(target=_important_func, args=(v,)) + p.start() + time.sleep(2*sleep_time) + os.kill(p.pid, signal.SIGINT) + p.join() + assert v.value < 5 + assert p.exitcode == 0 # since the KeyboardInterrupt Error is caught and ignored + + + # call _important_func in a subprocess and send SIGINT after 1 second + # the subprocess will terminate immediately + # v should be smaller than 5 + p = mp.Process(target=_important_func_with_dec, args=(v,)) + p.start() + time.sleep(2*sleep_time) + os.kill(p.pid, signal.SIGINT) + p.join() + assert v.value == 9 + assert p.exitcode == 1 # since the SIGINT is reemited again after the scope + # of _important_func the KeyboardInterrupt Error can not be caught + +if __name__ == "__main__": + test_sd() + test_sd_ctx() \ No newline at end of file