diff --git a/jobmanager/jobmanager.py b/jobmanager/jobmanager.py index 6025d82..896fc6a 100644 --- a/jobmanager/jobmanager.py +++ b/jobmanager/jobmanager.py @@ -31,6 +31,7 @@ The class JobManager_Client from __future__ import division, print_function import copy +from datetime import datetime import inspect import multiprocessing as mp from multiprocessing.managers import BaseManager, RemoteError @@ -46,31 +47,14 @@ import traceback import warnings import binfootprint as bf import progression as progress -#import queuelib import shelve +import hashlib import logging import threading import ctypes - -# taken from here: https://mail.python.org/pipermail/python-list/2010-November/591474.html -class MultiLineFormatter(logging.Formatter): - def format(self, record): - _str = logging.Formatter.format(self, record) - header = _str.split(record.message)[0] - _str = _str.replace('\n', '\n' + ' '*len(header)) - return _str - +from shutil import rmtree log = logging.getLogger(__name__) -log.setLevel(logging.INFO) -console_hand = logging.StreamHandler(stream = sys.stderr) -console_hand.setLevel(logging.DEBUG) -fmt = MultiLineFormatter('%(asctime)s %(name)s %(levelname)s : %(message)s') -console_hand.setFormatter(fmt) -log.addHandler(console_hand) - - -from datetime import datetime # This is a list of all python objects that will be imported upon # initialization during module import (see __init__.py) @@ -120,7 +104,7 @@ else: class JMHostNotReachableError(JMConnectionError): pass -myQueue = mp.Queue +myQueue = queue.Queue AuthenticationError = mp.AuthenticationError def humanize_size(size_in_bytes): @@ -301,7 +285,6 @@ class JobManager_Client(object): self.procs = [] self.manager_objects = None # will be set via connect() - self.connect() # get shared objects from server def connect(self): if self.manager_objects is None: @@ -467,6 +450,10 @@ class JobManager_Client(object): except queue.Empty: log.info("finds empty job queue, processed %s jobs", cnt) break + except ContainerClosedError: + log.info("job queue was closed, processed %s jobs", cnt) + break + # handle SystemExit in outer try ... except except SystemExit as e: arg = None @@ -594,6 +581,7 @@ class JobManager_Client(object): retruns when all subprocesses have terminated """ + self.connect() # get shared objects from server if not self.connected: raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)") @@ -826,36 +814,161 @@ def get_shared_status(ss): else: return ss.value -class PersistentQueue(object): - def __init__(self, fname): - self._path = fname - self.q = shelve.open(self._path) - self.head = 0 - self.tail = 0 - - def close(self): - self.q.close() - os.remove(self._path) - - def qsize(self): - # log.info("qsize: this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) - # log.info("qsize {} ({},{})".format(self.tail - self.head, self.tail, self.head)) - return self.tail - self.head +class ContainerClosedError(Exception): + pass + +class ClosableQueue(object): + def __init__(self): + self.q = myQueue() + self._closed = False def put(self, item): - # log.info("put item {}".format(item)) - self.q[str(self.tail)] = item - self.tail += 1 + if self._closed: + raise ContainerClosedError + self.q.put(item) + + def get(self, timeout=None): + return self.q.get(timeout=timeout) + + def qsize(self): + return self.q.qsize() + + def close(self): + self._close = True + + +class ArgsContainer(object): + def __init__(self, path=None): + self._path = path + self._lock = threading.Lock() + + if self._path is None: + self.data = {} + else: + self._open_shelve() + + self._closed = False + self._not_gotten_ids = set() + self._marked_ids = set() + self._max_id = 0 + + def _open_shelve(self, new_shelve=True): + if os.path.exists(self._path): + if os.path.isfile(self._path): + raise RuntimeWarning("can not create shelve, path '{}' is an existing file".format(self._path)) + else: + os.makedirs(self._path) + fname = os.path.join(self._path, 'args') + if os.path.exists(fname) and new_shelve: + raise RuntimeError("a shelve with name {} already exists".format(fname)) + + self.data = shelve.open(fname) + + def __getstate__(self): + with self._lock: + if self._path is None: + return (self.data, self._not_gotten_ids, self._marked_ids, self._max_id) + else: + return (self._path, self._not_gotten_ids, self._marked_ids, self._max_id) + + def __setstate__(self, state): + tmp, tmp_not_gotten_ids, self._marked_ids, self._max_id = state + # the not gotten ones are all items except the markes ones + # the old gotten ones which are not marked where lost + self._not_gotten_ids = set(range(self._max_id)) - self._marked_ids + if isinstance(tmp, dict): + self.data = tmp + self._path = None + else: + self._path = tmp + self._open_shelve(new_shelve=False) + self._closed = False + self._lock = threading.Lock() + + def close(self): + self._closed = True + + def close_shelve(self): + if self._path is not None: + self.data.close() + + def clear(self): + with self._lock: + if self._path is not None: + self.data.close() + rmtree(self._path) + else: + self.data.clear() + self._closed = False + self._not_gotten_ids = set() + self._marked_ids = set() + + def qsize(self): + return len(self._not_gotten_ids) + + def put_items(self): + s = len(self.data)//2 + return s + + def marked_items(self): + return len(self._marked_ids) + + def gotten_items(self): + return self.put_items() - self.qsize() + + def unmarked_items(self): + return self.put_items() - self.marked_items() + + def put(self, item): + with self._lock: + if self._closed: + raise ContainerClosedError + + item_hash = hashlib.sha256(bf.dump(item)).hexdigest() + if item_hash in self.data: + item_id = self.data[item_hash] + if (item_id in self._not_gotten_ids) or (item_id in self._marked_ids): + # the item has either not 'gotten' yet or is 'marked' + # in both cases a reinsert is not allowed + msg = ("do not add the same argument twice! If you are sure, they are not the same, "+ + "there might be an error with the binfootprint mehtods or a hash collision!") + log.critical(msg) + raise ValueError(msg) + else: + # the item is allready known, but has been 'gotten' and not marked yet + # thefore a reinster it allowd + self._not_gotten_ids.add(item_id) + else: + str_id = '_'+str(self._max_id) + self.data[str_id] = item + self.data[item_hash] = self._max_id + self._not_gotten_ids.add(self._max_id) + self._max_id += 1 def get(self): - try: - item = self.q[str(self.head)] - except KeyError: - raise queue.Empty + with self._lock: + if self._closed: + raise ContainerClosedError + try: + get_idx = self._not_gotten_ids.pop() + except KeyError: + raise queue.Empty + + str_id = '_' + str(get_idx) + item = self.data[str_id] + return item + + def mark(self, item): + with self._lock: + item_hash = hashlib.sha256(bf.dump(item)).hexdigest() + item_id = self.data[item_hash] + if item_id in self._not_gotten_ids: + raise ValueError("item not gotten yet, can not be marked") + if item_id in self._marked_ids: + raise RuntimeWarning("item allready marked") + self._marked_ids.add(item_id) + - self.head += 1 - # log.info("get item {}".format(item)) - return item RAND_STR_ASCII_IDX_LIST = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) @@ -865,10 +978,10 @@ def rand_str(l = 8): s += chr(random.choice(RAND_STR_ASCII_IDX_LIST)) return s -def _new_rand_file_name(path='.', end='', l=8): +def _new_rand_file_name(path='.', pre='', end='', l=8): c = 0 while True: - fname = rand_str(l) + end + fname = pre + rand_str(l) + end full_name = os.path.join(path, fname) if not os.path.exists(full_name): return fname @@ -877,7 +990,11 @@ def _new_rand_file_name(path='.', end='', l=8): if c > 10: l += 2 c = 0 - print("INFO: increase random file name length to", l) + print("INFO: increase random file name length to", l) + + +class JobManager_Manager(BaseManager): + pass class JobManager_Server(object): """general usage: @@ -1001,97 +1118,65 @@ class JobManager_Server(object): # from the set if it was caught by the result_q # so iff all results have been processed successfully, # the args_dict will be empty - self.args_dict = dict() # has the bin footprint in it - self.args_list = [] # has the actual args object + #self.args_dict = dict() # has the bin footprint in it + #self.args_list = [] # has the actual args object - # thread safe integer values - self._numresults = mp.Value('i', 0) # count the successfully processed jobs - self._numjobs = mp.Value('i', 0) # overall number of jobs # final result as list, other types can be achieved by subclassing self.final_result = [] # NOTE: it only works using multiprocessing.Queue() # the Queue class from the module queue does NOT work - self.job_q_on_disk = job_q_on_disk - if job_q_on_disk: - fname = _new_rand_file_name(job_q_on_disk_path, '.jobqdb') - self.job_q = PersistentQueue(fname) - else: - self.job_q = myQueue() # queue holding args to process - self.result_q = myQueue() # queue holding returned results - self.fail_q = myQueue() # queue holding args where processing failed - - self.manager = None + + self.manager_server = None self.hostname = socket.gethostname() + self.job_q_on_disk = job_q_on_disk + self.job_q_on_disk_path = job_q_on_disk_path + if self.job_q_on_disk: + fname = _new_rand_file_name(path = self.job_q_on_disk_path, pre='.',end='_jobqdb') + else: + fname = None - def __stop_SyncManager(self): - if self.manager == None: - return - - manager_proc = self.manager._process - # stop SyncManager - self.manager.shutdown() - - progress.check_process_termination(proc = manager_proc, - prefix = 'SyncManager: ', - timeout = 2, - auto_kill_on_last_resort = True) - - def __check_bind(self): + self.job_q = ArgsContainer(fname) + self.result_q = ClosableQueue() + self.fail_q = ClosableQueue() + + @staticmethod + def _check_bind(host, port): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind((self.hostname, self.port)) + s.bind((host, port)) except: - log.critical("test bind to %s:%s failed", self.hostname, self.port) + log.critical("test bind to %s:%s failed", host, port) raise finally: s.close() - - def __start_SyncManager(self): - self.__check_bind() - - class JobQueueManager(BaseManager): - pass - + + def _start_manager_thread(self): + self._check_bind(self.hostname, self.port) + # make job_q, result_q, fail_q, const_arg available via network - JobQueueManager.register('get_job_q', callable=lambda: self.job_q) - JobQueueManager.register('get_result_q', callable=lambda: self.result_q) - JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q) - JobQueueManager.register('get_const_arg', callable=lambda: self.const_arg) + JobManager_Manager.register('get_job_q', callable=lambda: self.job_q, exposed=['get', 'put', 'qsize']) + JobManager_Manager.register('get_result_q', callable=lambda: self.result_q, exposed=['get', 'put', 'qsize']) + JobManager_Manager.register('get_fail_q', callable=lambda: self.fail_q, exposed=['get', 'put', 'qsize']) + JobManager_Manager.register('get_const_arg', callable=lambda: self.const_arg) address=('', self.port) #ip='' means local authkey=self.authkey - - self.manager = JobQueueManager(address, authkey) - - # start manager with non default signal handling given by - # the additional init function setup_SIG_handler_manager - - try: - self.manager.start(setup_SIG_handler_manager) - except EOFError as e: - log.error("can not start SyncManager on %s:%s\n"+ - "this is usually the case when the port used is not available!", self.hostname, self.port) - - manager_proc = self.manager._process - manager_identifier = progress.get_identifier(name='SyncManager') - progress.check_process_termination(proc = manager_proc, - prefix = manager_identifier, - timeout = 0.3, - auto_kill_on_last_resort = True) - - self.manager = None - return False + self.manager_server = JobManager_Manager(address, authkey).get_server() - log.info("SyncManager with PID %s started on %s:%s (%s)", self.manager._process.pid, self.hostname, self.port, authkey) - return True + server_thr = threading.Thread(target=self.manager_server.serve_forever) + server_thr.daemon = True + server_thr.start() + + m_test = BaseManager(('localhost', self.port), authkey) + try: + m_test.connect() + except: + raise ConnectionError("test conntect to JobManager_Manager failed") + + log.info("JobManager_Manager thread started on %s:%s (%s)", self.hostname, self.port, authkey) - def __restart_SyncManager(self): - self.__stop_SyncManager() - if not self.__start_SyncManager(): - log.critical("faild to restart SyncManager") - raise RuntimeError("could not start server") def __enter__(self): return self @@ -1112,19 +1197,19 @@ class JobManager_Server(object): # causes exception traceback to be printed return False - @property - def numjobs(self): - return self._numjobs.value - @numjobs.setter - def numjobs(self, numjobs): - self._numjobs.value = numjobs - - @property - def numresults(self): - return self._numresults.value - @numresults.setter - def numresults(self, numresults): - self._numresults.value = numresults +# @property +# def numjobs(self): +# return self._numjobs.value +# @numjobs.setter +# def numjobs(self, numjobs): +# self._numjobs.value = numjobs +# +# @property +# def numresults(self): +# return self._numresults.value +# @numresults.setter +# def numresults(self, numresults): +# self._numresults.value = numresults def shutdown(self): """"stop all spawned processes and clean up @@ -1133,13 +1218,21 @@ class JobManager_Server(object): - if job_q is not empty dump remaining job_q """ # will only be False when _shutdown was started in subprocess - self.job_q.close() - self.__stop_SyncManager() - log.debug("SyncManager stopped!") + if sys.version_info[0] == 3: + if self.manager_server is not None: + self.manager_server.stop_event.set() + log.debug("set stopEvent for JobManager_Manager server") + self.manager_server = None + +# self.job_q.close() +# self.result_q.close() +# self.fail_q.close() +# log.debug("queues closed!") + # do user defined final processing self.process_final_result() log.debug("process_final_result done!") - + # print(self.fname_dump) if self.fname_dump is not None: if self.fname_dump == 'auto': @@ -1147,7 +1240,8 @@ class JobManager_Server(object): else: fname = self.fname_dump - log.info("dump current state to '%s'", fname) + log.info("dump current state to '%s'", fname) + with open(fname, 'wb') as f: self.__dump(f) @@ -1161,15 +1255,14 @@ class JobManager_Server(object): assert self._pid == os.getpid() self.show_statistics() - - log.info("JobManager_Server was successfully shut down") def show_statistics(self): if self.show_stat: - all_jobs = self.numjobs - succeeded = self.numresults + all_jobs = self.job_q.put_items() + succeeded = self.job_q.marked_items() failed = self.fail_q.qsize() + all_processed = succeeded + failed id1 = self.__class__.__name__+" " @@ -1188,61 +1281,40 @@ class JobManager_Server(object): print("{} not processed : {}".format(id2, all_not_processed)) print("{} queried : {}".format(id2, queried_but_not_processed)) print("{} not queried yet : {}".format(id2, not_queried)) - print("{}len(args_dict) : {}".format(id2, len(self.args_dict))) - if (all_not_processed + failed) != len(self.args_dict): - log.error("'all_not_processed != len(self.args_dict)' something is inconsistent!") - + def all_successfully_processed(self): - return self.numjobs == self.numresults + return self.job_q.qsize() == 0 @staticmethod def static_load(f): data = {} - data['numjobs'] = pickle.load(f) - data['numresults'] = pickle.load(f) data['final_result'] = pickle.load(f) - data['args_dict'] = pickle.load(f) - data['args_list'] = pickle.load(f) - - fail_list = pickle.load(f) - data['fail_set'] = {bf.dump(fail_item[0]) for fail_item in fail_list} - - data['fail_q'] = myQueue() - data['job_q'] = myQueue() - - for fail_item in fail_list: - data['fail_q'].put_nowait(fail_item) - for arg in data['args_dict']: - if arg not in data['fail_set']: - arg_idx = data['args_dict'][arg] - data['job_q'].put_nowait(data['args_list'][arg_idx]) - + data['job_q'] = pickle.load(f) + data['fail_list'] = pickle.load(f) return data def __load(self, f): data = JobManager_Server.static_load(f) - for key in ['numjobs', 'numresults', 'final_result', - 'args_dict', 'args_list', 'fail_q','job_q']: - self.__setattr__(key, data[key]) + self.final_result = data['final_result'] + self.job_q = data['job_q'] + for fail_item in data['fail_list']: + self.fail_q.put(fail_item) + + def __dump(self, f): - pickle.dump(self.numjobs, f, protocol=pickle.HIGHEST_PROTOCOL) - pickle.dump(self.numresults, f, protocol=pickle.HIGHEST_PROTOCOL) pickle.dump(self.final_result, f, protocol=pickle.HIGHEST_PROTOCOL) - pickle.dump(self.args_dict, f, protocol=pickle.HIGHEST_PROTOCOL) - pickle.dump(self.args_list, f, protocol=pickle.HIGHEST_PROTOCOL) + pickle.dump(self.job_q, f, protocol=pickle.HIGHEST_PROTOCOL) fail_list = [] - log.info("__dump: failq size {}".format(self.fail_q.qsize())) try: while True: - fail_list.append(self.fail_q.get_nowait()) + fail_list.append(self.fail_q.get(timeout = 0)) except queue.Empty: pass pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL) def read_old_state(self, fname_dump=None): - if fname_dump == None: fname_dump = self.fname_dump if fname_dump == 'auto': @@ -1263,21 +1335,23 @@ class JobManager_Server(object): def put_arg(self, a): """add argument a to the job_q """ - bfa = bf.dump(a) - if bfa in self.args_dict: - log.critical("do not add the same argument twice! If you are sure, they are not the same, there might be an error with the binfootprint mehtods!") - raise ValueError("do not add the same argument twice! If you are sure, they are not the same, there might be an error with the binfootprint mehtods!") + #hash_bfa = hashlib.sha256(bf.dump(a)).digest() +# if hash_bfa in self.args_dict: +# msg = ("do not add the same argument twice! If you are sure, they are not the same, "+ +# "there might be an error with the binfootprint mehtods or a hash collision!") +# log.critical(msg) +# raise ValueError(msg) # this dict associates an unique index with each argument 'a' # or better with its binary footprint - self.args_dict[bfa] = len(self.args_list) + #self.args_dict[hash_bfa] = len(self.args_list) + #self.args_list.append(a) - self.args_list.append(a) # the actual shared queue - self.job_q.put(copy.copy(a)) + self.job_q.put(a) - with self._numjobs.get_lock(): - self._numjobs.value += 1 + #with self._numjobs.get_lock(): + # self._numjobs.value += 1 def args_from_list(self, args): """serialize a list of arguments to the job_q @@ -1302,35 +1376,35 @@ class JobManager_Server(object): print("{} awaits client results".format(self.__class__.__name__)) def bring_him_up(self): - if not self.__start_SyncManager(): - log.critical("could not start server") - raise RuntimeError("could not start server") + + self._start_manager_thread() if self._pid != os.getpid(): log.critical("do not run JobManager_Server.start() in a subprocess") raise RuntimeError("do not run JobManager_Server.start() in a subprocess") - if (self.numjobs - self.numresults) != len(self.args_dict): - log.debug("numjobs: %s\n" + - "numresults: %s\n" + - "len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict)) +# if (self.numjobs - self.numresults) != len(self.args_dict): +# log.debug("numjobs: %s\n" + +# "numresults: %s\n" + +# "len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict)) +# +# log.critical( +# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") +# raise RuntimeError( +# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") - log.critical( - "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") - raise RuntimeError( - "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q") - - if self.numjobs == 0: + jobqsize = self.job_q.qsize() + if jobqsize == 0: log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q") return else: log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port, - self.numjobs) + jobqsize) print("{} started (host:{} authkey:{} port:{} jobs:{})".format(self.__class__.__name__, self.hostname, self.authkey.decode(), self.port, - self.numjobs)) + jobqsize)) Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT]) @@ -1345,40 +1419,39 @@ class JobManager_Server(object): """ info_line = progress.StringValue(num_of_bytes=100) - with progress.ProgressBarFancy(count=self._numresults, - max_count=self._numjobs, - interval=self.msg_interval, - speed_calc_cycles=self.speed_calc_cycles, - sigint='ign', - sigterm='ign', + + numresults = progress.UnsignedIntValue(0) + numjobs = progress.UnsignedIntValue(self.job_q.put_items()) + + with progress.ProgressBarFancy(count = numresults, + max_count = numjobs, + interval = self.msg_interval, + speed_calc_cycles = self.speed_calc_cycles, + sigint = 'ign', + sigterm = 'ign', info_line=info_line) as stat: if not self.hide_progress: stat.start() - while (len(self.args_dict) - self.fail_q.qsize()) > 0: - info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}".format(self.result_q.qsize(), - self.job_q.qsize(), - self.numresults).encode( - 'utf-8') + while numresults.value < numjobs.value: + failqsize = self.fail_q.qsize() + jobqsize = self.job_q.qsize() + markeditems = self.job_q.marked_items() + numresults.value = failqsize + markeditems + info_line.value = ("result_q size:{}, jobs: remaining:{}, "+ + "done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(), + jobqsize, + markeditems, + failqsize, + numjobs.value - numresults.value - jobqsize).encode('utf-8') log.info("infoline {}".format(info_line.value)) - log.info("failq size {}".format(self.fail_q.qsize())) - # allows for update of the info line try: arg, result = self.result_q.get(timeout=self.msg_interval) except queue.Empty: continue - bf_arg = bf.dump(arg) - if bf_arg not in self.args_dict: - log.warning( - "got an argument that is not listed in the args_dict (probably crunshed twice, uups) -> will be skipped") - del arg - del result - continue - - del self.args_dict[bf_arg] - self.numresults += 1 + self.job_q.mark(arg) self.process_new_result(arg, result) if not self.keep_new_result_in_memory: del arg diff --git a/jobmanager/servers.py b/jobmanager/servers.py index ccbf8a9..b57e070 100644 --- a/jobmanager/servers.py +++ b/jobmanager/servers.py @@ -16,9 +16,12 @@ class PersistentData_Server(JobManager_Server): msg_interval=1, fname_dump=None, speed_calc_cycles=50, - overwrite=False): + overwrite=False, + **kwargs): - JobManager_Server.__init__(self, authkey, const_arg=const_arg, port=port, verbose=verbose, msg_interval=msg_interval, fname_dump=fname_dump, speed_calc_cycles=speed_calc_cycles) + JobManager_Server.__init__(self, authkey, const_arg=const_arg, port=port, verbose=verbose, + msg_interval=msg_interval, fname_dump=fname_dump, + speed_calc_cycles=speed_calc_cycles, **kwargs) self.pds = persistent_data_structure self.overwrite = overwrite if self.overwrite: diff --git a/tests/test_clients.py b/tests/test_clients.py index 4e5301b..7d06b43 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -5,24 +5,20 @@ from os.path import abspath, dirname, split # Add parent directory to beginning of path variable sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path - import numpy as np from scipy.integrate import ode from scipy.special import mathieu_sem, mathieu_cem, mathieu_a, mathieu_b import time -import warnings -#from mpl_toolkits.mplot3d import Axes3D +import warnings try: from matplotlib import cm import matplotlib.pyplot as plt except ImportError: warnings.warn("Plotting options not available."+\ " Reason: {}.".format(sys.exc_info()[1])) - - -warnings.filterwarnings('error') -#warnings.filterwarnings('always', category=DeprecationWarning) +warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning) +warnings.filterwarnings('error', append=True) import jobmanager as jm @@ -121,10 +117,10 @@ def test_distributed_mathieu(): const_arg['verbose'] = 0 authkey = 'integration_jm' - + PORT = np.random.randint(10000, 60000) with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL, authkey = authkey, - port = 42520, + port = PORT, const_arg = const_arg, nproc=1, niceness_clients=0, diff --git a/tests/test_jobmanager.py b/tests/test_jobmanager.py index f5b1a71..67cbe3e 100644 --- a/tests/test_jobmanager.py +++ b/tests/test_jobmanager.py @@ -6,6 +6,7 @@ import os import sys import time import multiprocessing as mp +from multiprocessing.managers import BaseManager import socket import signal import logging @@ -18,8 +19,6 @@ from os.path import abspath, dirname, split # Add parent directory to beginning of path variable sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path - -import jobmanager import binfootprint import progression as progress @@ -28,20 +27,30 @@ if sys.version_info[0] == 2: elif sys.version_info[0] == 3: TIMEOUT = 15 -progress.log.setLevel(logging.ERROR) - -from jobmanager.jobmanager import log as jm_log - -jm_log.setLevel(logging.WARNING) - import warnings -warnings.filterwarnings('error') -#warnings.filterwarnings('always', category=ImportWarning) +warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning) +warnings.filterwarnings('error', append=True) + +import jobmanager + +#logging.getLogger('jobmanager').setLevel(logging.INFO) +logging.getLogger('progression').setLevel(logging.ERROR) +logging.basicConfig(level = logging.INFO) + + AUTHKEY = 'testing' PORT = random.randint(10000, 60000) SERVER = socket.gethostname() +def test_WarningError(): + try: + warnings.warn("test warning, should be an error") + except: + pass + else: + assert False + def test_Signal_to_SIG_IGN(): from jobmanager.jobmanager import Signal_to_SIG_IGN global PORT @@ -187,6 +196,8 @@ def start_client(hide_progress=True): jm_client.start() def test_start_server_with_no_args(): + global PORT + PORT += 1 n = 10 args = range(1,n) @@ -197,6 +208,8 @@ def test_start_server_with_no_args(): jm_server.start() def test_start_server(): + global PORT + PORT += 1 n = 10 args = range(1,n) @@ -216,6 +229,8 @@ def test_start_server(): jm_server.start() def test_jobmanager_static_client_call(): + global PORT + PORT += 1 jm_client = jobmanager.JobManager_Client(server = SERVER, authkey = AUTHKEY, port = PORT, @@ -292,10 +307,10 @@ def test_jobmanager_basic(): assert not p_client.is_alive(), "the client did not terminate on time!" # client must not throw an exception assert p_client.exitcode == 0, "the client raised an exception" - p_server.join(3) + p_server.join(5) # server should have come down assert not p_server.is_alive(), "the server did not terminate on time!" - + assert p_server.exitcode == 0, "the server raised an exception" print("[+] client and server terminated") fname = 'jobmanager.dump' @@ -339,20 +354,17 @@ def test_jobmanager_server_signals(): print("[+] still alive (assume shut down takes some time)") p_server.join(timeout) assert not p_server.is_alive(), "timeout for server shutdown reached" + assert p_server.exitcode == 0, "the server raised an exception" print("[+] now terminated (timeout of {}s not reached)".format(timeout)) fname = 'jobmanager.dump' with open(fname, 'rb') as f: data = jobmanager.JobManager_Server.static_load(f) - - args_set = set(data['args_dict'].keys()) - args_ref = range(1,n) - ref_set = set() - for a in args_ref: - ref_set.add(binfootprint.dump(a)) - - assert len(args_set) == len(ref_set) - assert len(ref_set - args_set) == 0 + + ac = data['job_q'] + assert ac.qsize() == n-1 + assert ac.marked_items() == 0 + print("[+] args_set from dump contains all arguments") except: if p_server is not None: @@ -385,7 +397,7 @@ def test_shutdown_server_while_client_running(): p_client = None try: - p_server = mp.Process(target=start_server, args=(n,False,1)) + p_server = mp.Process(target=start_server, args=(n,False,0.1)) p_server.start() time.sleep(0.5) assert p_server.is_alive() @@ -400,8 +412,10 @@ def test_shutdown_server_while_client_running(): p_server.join(TIMEOUT) assert not p_server.is_alive(), "server did not shut down on time" + assert p_server.exitcode == 0, "the server raised an exception" p_client.join(TIMEOUT) assert not p_client.is_alive(), "client did not shut down on time" + assert p_client.exitcode == 0, "the client raised an exception" print("[+] server and client joined {}".format(datetime.datetime.now().isoformat())) @@ -410,7 +424,8 @@ def test_shutdown_server_while_client_running(): with open(fname, 'rb') as f: data = jobmanager.JobManager_Server.static_load(f) - args_set = set(data['args_dict'].keys()) + ac = data['job_q'] + args_set = {binfootprint.dump(ac.data['_'+str(id_)]) for id_ in ac._not_gotten_ids} final_result = data['final_result'] final_res_args = {binfootprint.dump(a[0]) for a in final_result} @@ -499,7 +514,8 @@ def shutdown_client(sig): with open(fname, 'rb') as f: data = jobmanager.JobManager_Server.static_load(f) - assert len(data['args_dict']) == 0 + ac = data['job_q'] + assert ac.qsize() == 0 print("[+] args_set is empty -> all args processed & none failed") final_res_args_set = {a[0] for a in data['final_result']} @@ -517,70 +533,6 @@ def shutdown_client(sig): p_client.terminate() raise -def test_check_fail(): - global PORT - PORT += 1 - class Client_Random_Error(jobmanager.JobManager_Client): - def func(self, args, const_args, c, m): - c.value = 0 - m.value = -1 - fail_on = [3,5,13] - time.sleep(0.1) - if args in fail_on: - raise RuntimeError("fail_on Error") - return os.getpid() - - - n = 20 - p_server = mp.Process(target=start_server, args=(n,)) - p_server.start() - - time.sleep(1) - - print("START CLIENT") - jm_client = Client_Random_Error(server=SERVER, - authkey=AUTHKEY, - port=PORT, - nproc=0) - - p_client = mp.Process(target=jm_client.start) - p_client.start() - - try: - assert p_server.is_alive() - assert p_client.is_alive() - except: - p_client.terminate() - p_server.terminate() - raise - - print("[+] server and client running") - - p_server.join(60) - p_client.join(60) - - assert not p_server.is_alive() - assert not p_client.is_alive() - - print("[+] server and client stopped") - - fname = 'jobmanager.dump' - with open(fname, 'rb') as f: - data = jobmanager.JobManager_Server.static_load(f) - - - set_ref = {binfootprint.dump(a) for a in range(1,n)} - - args_set = set(data['args_dict'].keys()) - assert args_set == data['fail_set'] - - final_result_args_set = {binfootprint.dump(a[0]) for a in data['final_result']} - - all_set = final_result_args_set | data['fail_set'] - - assert len(set_ref - all_set) == 0, "final result union with reported failure do not correspond to all args!" - print("[+] all argumsents found in final_results | reported failure") - def test_jobmanager_read_old_stat(): """ start server, start client, start process trivial jobs, @@ -609,6 +561,8 @@ def test_jobmanager_read_old_stat(): assert not p_client.is_alive(), "the client did not terminate on time!" assert not p_server.is_alive(), "the server did not terminate on time!" + assert p_client.exitcode == 0 + assert p_server.exitcode == 0 print("[+] client and server terminated") time.sleep(1) @@ -627,6 +581,8 @@ def test_jobmanager_read_old_stat(): assert not p_client.is_alive(), "the client did not terminate on time!" assert not p_server.is_alive(), "the server did not terminate on time!" + assert p_client.exitcode == 0 + assert p_server.exitcode == 0 print("[+] client and server terminated") fname = 'jobmanager.dump' @@ -717,23 +673,19 @@ def test_start_server_on_used_port(): p1.start() time.sleep(1) - - other_error = False - + try: start_server2() except (RuntimeError, OSError) as e: print("caught Exception '{}' {}".format(type(e).__name__, e)) except: - other_error = True - - time.sleep(1) - p1.terminate() - time.sleep(1) - p1.join() - - assert not other_error - + raise + finally: + time.sleep(1) + p1.terminate() + time.sleep(1) + p1.join() + def test_shared_const_arg(): global PORT PORT += 1 @@ -820,6 +772,190 @@ def test_hum_size(): assert humanize_size(1024**2) == '1.00GB' assert humanize_size(1024**3) == '1.00TB' assert humanize_size(1024**4) == '1024.00TB' + +def test_ArgsContainer(): + from jobmanager.jobmanager import ArgsContainer + from shutil import rmtree + path = 'argscont' + try: + rmtree(path) + except FileNotFoundError: + pass + + for p in [None, path]: + + ac = ArgsContainer(p) + for arg in 'abcde': + ac.put(arg) + + assert ac.qsize() == 5 + + item1 = ac.get() + item2 = ac.get() + + assert ac.qsize() == 3 + assert ac.marked_items() == 0 + assert ac.unmarked_items() == 5 + + # reinserting a non marked item is allowed + ac.put(item1) + assert ac.qsize() == 4 + assert ac.marked_items() == 0 + assert ac.unmarked_items() == 5 + + # marking an item that has not been gotten yet failes + try: + ac.mark(item1) + except ValueError: + pass + else: + assert False + assert ac.qsize() == 4 + assert ac.marked_items() == 0 + assert ac.unmarked_items() == 5 + + + ac.mark(item2) + assert ac.qsize() == 4 + assert ac.marked_items() == 1 + assert ac.unmarked_items() == 4 + + # already marked items can not be reinserted + try: + ac.put(item2) + except: + pass + else: + assert False + + assert ac.qsize() == 4 + assert ac.marked_items() == 1 + assert ac.unmarked_items() == 4 + + # remarking raises a RuntimeWarning + try: + ac.mark(item2) + except RuntimeWarning: + pass + else: + assert False + + item3 = ac.get() + assert ac.qsize() == 3 + assert ac.marked_items() == 1 + assert ac.unmarked_items() == 4 + + import pickle + ac_dump = pickle.dumps(ac) + del ac + ac2 = pickle.loads(ac_dump) + + # note here, when loading, the _not_gottem_id are all ids + # except the marked its + + assert ac2.qsize() == 4 + from jobmanager.jobmanager import bf, hashlib + item3_hash = hashlib.sha256(bf.dump(item3)).hexdigest() + assert item3_hash in ac2.data + assert ac2.marked_items() == 1 + assert ac2.unmarked_items() == 4 + + ac2.get() + ac2.get() + ac2.get() + item = ac2.get() + assert ac2.qsize() == 0 + assert ac2.marked_items() == 1 + assert ac2.unmarked_items() == 4 + + ac2.mark(item) + assert ac2.qsize() == 0 + assert ac2.marked_items() == 2 + assert ac2.unmarked_items() == 3 + + import queue + try: + ac2.get() + except queue.Empty: + pass + else: + assert False + + ac2.clear() + +def put_from_subprocess(port): + class MM_remote(BaseManager): + pass + try: + MM_remote.register('get_job_q') + m = MM_remote(('localhost', port), b'test_argscomnt') + m.connect() + ac = m.get_job_q() + for item in range(1, 200): + ac.put(item) + time.sleep(0.01) + + except ValueError: + pass + + +def test_ArgsContainer_BaseManager(): + from jobmanager.jobmanager import ArgsContainer + global PORT + + path = 'argscont' + from shutil import rmtree + try: + rmtree(path) + except FileNotFoundError: + pass + + for p in [path, None]: + PORT += 1 + ac_inst = ArgsContainer(p) + #ac_inst.close_shelve() + class MM(BaseManager): + pass + MM.register('get_job_q', callable=lambda: ac_inst, exposed = ['put', 'get']) + + def start_manager_thread(): + m = MM(('', PORT), b'test_argscomnt') + + m.get_server().serve_forever() + server_thr = threading.Thread(target=start_manager_thread, daemon=True) + + + class MM_remote(BaseManager): + pass + + MM_remote.register('get_job_q') + m = MM_remote(('localhost', PORT), b'test_argscomnt') + + + server_thr.start() + m.connect() + + + + pr = mp.Process(target = put_from_subprocess, args=(PORT,)) + pr.start() + + try: + for arg in range(200, 0,-1): + ac_inst.put(arg) + time.sleep(0.01) + except ValueError: + pass + + pr.join() + assert pr.exitcode == 0 + print(ac_inst.qsize()) + + assert ac_inst.qsize() == 200 + + + + if __name__ == "__main__": jm_log.setLevel(logging.INFO) @@ -831,29 +967,28 @@ if __name__ == "__main__": pass else: func = [ - -# test_hum_size, -# test_Signal_to_SIG_IGN, -# test_Signal_to_sys_exit, -# test_Signal_to_terminate_process_list, -# test_jobmanager_static_client_call, -# test_start_server_with_no_args, -# test_start_server, -# test_client, - test_jobmanager_basic, -# test_jobmanager_server_signals, -# test_shutdown_server_while_client_running, -# test_shutdown_client, -# test_check_fail, -# test_jobmanager_read_old_stat, -# test_client_status, -# test_jobmanager_local, -# test_start_server_on_used_port, -# test_shared_const_arg, -# test_digest_rejected, -# test_hum_size, - - lambda : print("END") +# test_ArgsContainer, +# test_ArgsContainer_BaseManager, +# test_hum_size, +# test_Signal_to_SIG_IGN, +# test_Signal_to_sys_exit, +# test_Signal_to_terminate_process_list, +# test_jobmanager_static_client_call, +# test_start_server_with_no_args, +# test_start_server, +# test_client, +# test_jobmanager_basic, + test_jobmanager_server_signals, +# test_shutdown_server_while_client_running, +# test_shutdown_client, +# test_jobmanager_read_old_stat, +# test_client_status, +# test_jobmanager_local, +# test_start_server_on_used_port, +# test_shared_const_arg, +# test_digest_rejected, +# test_hum_size, + lambda : print("END") ] for f in func: print() diff --git a/tests/test_servers.py b/tests/test_servers.py deleted file mode 100644 index 03d27bc..0000000 --- a/tests/test_servers.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import sys -from os.path import abspath, dirname, split - -# Add parent directory to beginning of path variable -sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path - -import jobmanager as jm - - -if __name__ == "__main__": - pass