new ArgsContainer passes tests in Py34 and Py35

This commit is contained in:
Richard Hartmann 2016-11-21 17:08:23 +01:00
parent ff4002d779
commit b16fbe9dec
5 changed files with 562 additions and 368 deletions

View file

@ -31,6 +31,7 @@ The class JobManager_Client
from __future__ import division, print_function
import copy
from datetime import datetime
import inspect
import multiprocessing as mp
from multiprocessing.managers import BaseManager, RemoteError
@ -46,31 +47,14 @@ import traceback
import warnings
import binfootprint as bf
import progression as progress
#import queuelib
import shelve
import hashlib
import logging
import threading
import ctypes
# taken from here: https://mail.python.org/pipermail/python-list/2010-November/591474.html
class MultiLineFormatter(logging.Formatter):
def format(self, record):
_str = logging.Formatter.format(self, record)
header = _str.split(record.message)[0]
_str = _str.replace('\n', '\n' + ' '*len(header))
return _str
from shutil import rmtree
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
console_hand = logging.StreamHandler(stream = sys.stderr)
console_hand.setLevel(logging.DEBUG)
fmt = MultiLineFormatter('%(asctime)s %(name)s %(levelname)s : %(message)s')
console_hand.setFormatter(fmt)
log.addHandler(console_hand)
from datetime import datetime
# This is a list of all python objects that will be imported upon
# initialization during module import (see __init__.py)
@ -120,7 +104,7 @@ else:
class JMHostNotReachableError(JMConnectionError):
pass
myQueue = mp.Queue
myQueue = queue.Queue
AuthenticationError = mp.AuthenticationError
def humanize_size(size_in_bytes):
@ -301,7 +285,6 @@ class JobManager_Client(object):
self.procs = []
self.manager_objects = None # will be set via connect()
self.connect() # get shared objects from server
def connect(self):
if self.manager_objects is None:
@ -467,6 +450,10 @@ class JobManager_Client(object):
except queue.Empty:
log.info("finds empty job queue, processed %s jobs", cnt)
break
except ContainerClosedError:
log.info("job queue was closed, processed %s jobs", cnt)
break
# handle SystemExit in outer try ... except
except SystemExit as e:
arg = None
@ -594,6 +581,7 @@ class JobManager_Client(object):
retruns when all subprocesses have terminated
"""
self.connect() # get shared objects from server
if not self.connected:
raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)")
@ -826,36 +814,161 @@ def get_shared_status(ss):
else:
return ss.value
class PersistentQueue(object):
def __init__(self, fname):
self._path = fname
self.q = shelve.open(self._path)
self.head = 0
self.tail = 0
def close(self):
self.q.close()
os.remove(self._path)
def qsize(self):
# log.info("qsize: this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
# log.info("qsize {} ({},{})".format(self.tail - self.head, self.tail, self.head))
return self.tail - self.head
class ContainerClosedError(Exception):
pass
class ClosableQueue(object):
def __init__(self):
self.q = myQueue()
self._closed = False
def put(self, item):
# log.info("put item {}".format(item))
self.q[str(self.tail)] = item
self.tail += 1
if self._closed:
raise ContainerClosedError
self.q.put(item)
def get(self, timeout=None):
return self.q.get(timeout=timeout)
def qsize(self):
return self.q.qsize()
def close(self):
self._close = True
class ArgsContainer(object):
def __init__(self, path=None):
self._path = path
self._lock = threading.Lock()
if self._path is None:
self.data = {}
else:
self._open_shelve()
self._closed = False
self._not_gotten_ids = set()
self._marked_ids = set()
self._max_id = 0
def _open_shelve(self, new_shelve=True):
if os.path.exists(self._path):
if os.path.isfile(self._path):
raise RuntimeWarning("can not create shelve, path '{}' is an existing file".format(self._path))
else:
os.makedirs(self._path)
fname = os.path.join(self._path, 'args')
if os.path.exists(fname) and new_shelve:
raise RuntimeError("a shelve with name {} already exists".format(fname))
self.data = shelve.open(fname)
def __getstate__(self):
with self._lock:
if self._path is None:
return (self.data, self._not_gotten_ids, self._marked_ids, self._max_id)
else:
return (self._path, self._not_gotten_ids, self._marked_ids, self._max_id)
def __setstate__(self, state):
tmp, tmp_not_gotten_ids, self._marked_ids, self._max_id = state
# the not gotten ones are all items except the markes ones
# the old gotten ones which are not marked where lost
self._not_gotten_ids = set(range(self._max_id)) - self._marked_ids
if isinstance(tmp, dict):
self.data = tmp
self._path = None
else:
self._path = tmp
self._open_shelve(new_shelve=False)
self._closed = False
self._lock = threading.Lock()
def close(self):
self._closed = True
def close_shelve(self):
if self._path is not None:
self.data.close()
def clear(self):
with self._lock:
if self._path is not None:
self.data.close()
rmtree(self._path)
else:
self.data.clear()
self._closed = False
self._not_gotten_ids = set()
self._marked_ids = set()
def qsize(self):
return len(self._not_gotten_ids)
def put_items(self):
s = len(self.data)//2
return s
def marked_items(self):
return len(self._marked_ids)
def gotten_items(self):
return self.put_items() - self.qsize()
def unmarked_items(self):
return self.put_items() - self.marked_items()
def put(self, item):
with self._lock:
if self._closed:
raise ContainerClosedError
item_hash = hashlib.sha256(bf.dump(item)).hexdigest()
if item_hash in self.data:
item_id = self.data[item_hash]
if (item_id in self._not_gotten_ids) or (item_id in self._marked_ids):
# the item has either not 'gotten' yet or is 'marked'
# in both cases a reinsert is not allowed
msg = ("do not add the same argument twice! If you are sure, they are not the same, "+
"there might be an error with the binfootprint mehtods or a hash collision!")
log.critical(msg)
raise ValueError(msg)
else:
# the item is allready known, but has been 'gotten' and not marked yet
# thefore a reinster it allowd
self._not_gotten_ids.add(item_id)
else:
str_id = '_'+str(self._max_id)
self.data[str_id] = item
self.data[item_hash] = self._max_id
self._not_gotten_ids.add(self._max_id)
self._max_id += 1
def get(self):
try:
item = self.q[str(self.head)]
except KeyError:
raise queue.Empty
with self._lock:
if self._closed:
raise ContainerClosedError
try:
get_idx = self._not_gotten_ids.pop()
except KeyError:
raise queue.Empty
str_id = '_' + str(get_idx)
item = self.data[str_id]
return item
def mark(self, item):
with self._lock:
item_hash = hashlib.sha256(bf.dump(item)).hexdigest()
item_id = self.data[item_hash]
if item_id in self._not_gotten_ids:
raise ValueError("item not gotten yet, can not be marked")
if item_id in self._marked_ids:
raise RuntimeWarning("item allready marked")
self._marked_ids.add(item_id)
self.head += 1
# log.info("get item {}".format(item))
return item
RAND_STR_ASCII_IDX_LIST = list(range(48,58)) + list(range(65,91)) + list(range(97,123))
@ -865,10 +978,10 @@ def rand_str(l = 8):
s += chr(random.choice(RAND_STR_ASCII_IDX_LIST))
return s
def _new_rand_file_name(path='.', end='', l=8):
def _new_rand_file_name(path='.', pre='', end='', l=8):
c = 0
while True:
fname = rand_str(l) + end
fname = pre + rand_str(l) + end
full_name = os.path.join(path, fname)
if not os.path.exists(full_name):
return fname
@ -877,7 +990,11 @@ def _new_rand_file_name(path='.', end='', l=8):
if c > 10:
l += 2
c = 0
print("INFO: increase random file name length to", l)
print("INFO: increase random file name length to", l)
class JobManager_Manager(BaseManager):
pass
class JobManager_Server(object):
"""general usage:
@ -1001,97 +1118,65 @@ class JobManager_Server(object):
# from the set if it was caught by the result_q
# so iff all results have been processed successfully,
# the args_dict will be empty
self.args_dict = dict() # has the bin footprint in it
self.args_list = [] # has the actual args object
#self.args_dict = dict() # has the bin footprint in it
#self.args_list = [] # has the actual args object
# thread safe integer values
self._numresults = mp.Value('i', 0) # count the successfully processed jobs
self._numjobs = mp.Value('i', 0) # overall number of jobs
# final result as list, other types can be achieved by subclassing
self.final_result = []
# NOTE: it only works using multiprocessing.Queue()
# the Queue class from the module queue does NOT work
self.job_q_on_disk = job_q_on_disk
if job_q_on_disk:
fname = _new_rand_file_name(job_q_on_disk_path, '.jobqdb')
self.job_q = PersistentQueue(fname)
else:
self.job_q = myQueue() # queue holding args to process
self.result_q = myQueue() # queue holding returned results
self.fail_q = myQueue() # queue holding args where processing failed
self.manager = None
self.manager_server = None
self.hostname = socket.gethostname()
self.job_q_on_disk = job_q_on_disk
self.job_q_on_disk_path = job_q_on_disk_path
if self.job_q_on_disk:
fname = _new_rand_file_name(path = self.job_q_on_disk_path, pre='.',end='_jobqdb')
else:
fname = None
def __stop_SyncManager(self):
if self.manager == None:
return
manager_proc = self.manager._process
# stop SyncManager
self.manager.shutdown()
progress.check_process_termination(proc = manager_proc,
prefix = 'SyncManager: ',
timeout = 2,
auto_kill_on_last_resort = True)
def __check_bind(self):
self.job_q = ArgsContainer(fname)
self.result_q = ClosableQueue()
self.fail_q = ClosableQueue()
@staticmethod
def _check_bind(host, port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((self.hostname, self.port))
s.bind((host, port))
except:
log.critical("test bind to %s:%s failed", self.hostname, self.port)
log.critical("test bind to %s:%s failed", host, port)
raise
finally:
s.close()
def __start_SyncManager(self):
self.__check_bind()
class JobQueueManager(BaseManager):
pass
def _start_manager_thread(self):
self._check_bind(self.hostname, self.port)
# make job_q, result_q, fail_q, const_arg available via network
JobQueueManager.register('get_job_q', callable=lambda: self.job_q)
JobQueueManager.register('get_result_q', callable=lambda: self.result_q)
JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q)
JobQueueManager.register('get_const_arg', callable=lambda: self.const_arg)
JobManager_Manager.register('get_job_q', callable=lambda: self.job_q, exposed=['get', 'put', 'qsize'])
JobManager_Manager.register('get_result_q', callable=lambda: self.result_q, exposed=['get', 'put', 'qsize'])
JobManager_Manager.register('get_fail_q', callable=lambda: self.fail_q, exposed=['get', 'put', 'qsize'])
JobManager_Manager.register('get_const_arg', callable=lambda: self.const_arg)
address=('', self.port) #ip='' means local
authkey=self.authkey
self.manager = JobQueueManager(address, authkey)
# start manager with non default signal handling given by
# the additional init function setup_SIG_handler_manager
try:
self.manager.start(setup_SIG_handler_manager)
except EOFError as e:
log.error("can not start SyncManager on %s:%s\n"+
"this is usually the case when the port used is not available!", self.hostname, self.port)
manager_proc = self.manager._process
manager_identifier = progress.get_identifier(name='SyncManager')
progress.check_process_termination(proc = manager_proc,
prefix = manager_identifier,
timeout = 0.3,
auto_kill_on_last_resort = True)
self.manager = None
return False
self.manager_server = JobManager_Manager(address, authkey).get_server()
log.info("SyncManager with PID %s started on %s:%s (%s)", self.manager._process.pid, self.hostname, self.port, authkey)
return True
server_thr = threading.Thread(target=self.manager_server.serve_forever)
server_thr.daemon = True
server_thr.start()
m_test = BaseManager(('localhost', self.port), authkey)
try:
m_test.connect()
except:
raise ConnectionError("test conntect to JobManager_Manager failed")
log.info("JobManager_Manager thread started on %s:%s (%s)", self.hostname, self.port, authkey)
def __restart_SyncManager(self):
self.__stop_SyncManager()
if not self.__start_SyncManager():
log.critical("faild to restart SyncManager")
raise RuntimeError("could not start server")
def __enter__(self):
return self
@ -1112,19 +1197,19 @@ class JobManager_Server(object):
# causes exception traceback to be printed
return False
@property
def numjobs(self):
return self._numjobs.value
@numjobs.setter
def numjobs(self, numjobs):
self._numjobs.value = numjobs
@property
def numresults(self):
return self._numresults.value
@numresults.setter
def numresults(self, numresults):
self._numresults.value = numresults
# @property
# def numjobs(self):
# return self._numjobs.value
# @numjobs.setter
# def numjobs(self, numjobs):
# self._numjobs.value = numjobs
#
# @property
# def numresults(self):
# return self._numresults.value
# @numresults.setter
# def numresults(self, numresults):
# self._numresults.value = numresults
def shutdown(self):
""""stop all spawned processes and clean up
@ -1133,13 +1218,21 @@ class JobManager_Server(object):
- if job_q is not empty dump remaining job_q
"""
# will only be False when _shutdown was started in subprocess
self.job_q.close()
self.__stop_SyncManager()
log.debug("SyncManager stopped!")
if sys.version_info[0] == 3:
if self.manager_server is not None:
self.manager_server.stop_event.set()
log.debug("set stopEvent for JobManager_Manager server")
self.manager_server = None
# self.job_q.close()
# self.result_q.close()
# self.fail_q.close()
# log.debug("queues closed!")
# do user defined final processing
self.process_final_result()
log.debug("process_final_result done!")
# print(self.fname_dump)
if self.fname_dump is not None:
if self.fname_dump == 'auto':
@ -1147,7 +1240,8 @@ class JobManager_Server(object):
else:
fname = self.fname_dump
log.info("dump current state to '%s'", fname)
log.info("dump current state to '%s'", fname)
with open(fname, 'wb') as f:
self.__dump(f)
@ -1161,15 +1255,14 @@ class JobManager_Server(object):
assert self._pid == os.getpid()
self.show_statistics()
log.info("JobManager_Server was successfully shut down")
def show_statistics(self):
if self.show_stat:
all_jobs = self.numjobs
succeeded = self.numresults
all_jobs = self.job_q.put_items()
succeeded = self.job_q.marked_items()
failed = self.fail_q.qsize()
all_processed = succeeded + failed
id1 = self.__class__.__name__+" "
@ -1188,61 +1281,40 @@ class JobManager_Server(object):
print("{} not processed : {}".format(id2, all_not_processed))
print("{} queried : {}".format(id2, queried_but_not_processed))
print("{} not queried yet : {}".format(id2, not_queried))
print("{}len(args_dict) : {}".format(id2, len(self.args_dict)))
if (all_not_processed + failed) != len(self.args_dict):
log.error("'all_not_processed != len(self.args_dict)' something is inconsistent!")
def all_successfully_processed(self):
return self.numjobs == self.numresults
return self.job_q.qsize() == 0
@staticmethod
def static_load(f):
data = {}
data['numjobs'] = pickle.load(f)
data['numresults'] = pickle.load(f)
data['final_result'] = pickle.load(f)
data['args_dict'] = pickle.load(f)
data['args_list'] = pickle.load(f)
fail_list = pickle.load(f)
data['fail_set'] = {bf.dump(fail_item[0]) for fail_item in fail_list}
data['fail_q'] = myQueue()
data['job_q'] = myQueue()
for fail_item in fail_list:
data['fail_q'].put_nowait(fail_item)
for arg in data['args_dict']:
if arg not in data['fail_set']:
arg_idx = data['args_dict'][arg]
data['job_q'].put_nowait(data['args_list'][arg_idx])
data['job_q'] = pickle.load(f)
data['fail_list'] = pickle.load(f)
return data
def __load(self, f):
data = JobManager_Server.static_load(f)
for key in ['numjobs', 'numresults', 'final_result',
'args_dict', 'args_list', 'fail_q','job_q']:
self.__setattr__(key, data[key])
self.final_result = data['final_result']
self.job_q = data['job_q']
for fail_item in data['fail_list']:
self.fail_q.put(fail_item)
def __dump(self, f):
pickle.dump(self.numjobs, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.numresults, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.final_result, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.args_dict, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.args_list, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.job_q, f, protocol=pickle.HIGHEST_PROTOCOL)
fail_list = []
log.info("__dump: failq size {}".format(self.fail_q.qsize()))
try:
while True:
fail_list.append(self.fail_q.get_nowait())
fail_list.append(self.fail_q.get(timeout = 0))
except queue.Empty:
pass
pickle.dump(fail_list, f, protocol=pickle.HIGHEST_PROTOCOL)
def read_old_state(self, fname_dump=None):
if fname_dump == None:
fname_dump = self.fname_dump
if fname_dump == 'auto':
@ -1263,21 +1335,23 @@ class JobManager_Server(object):
def put_arg(self, a):
"""add argument a to the job_q
"""
bfa = bf.dump(a)
if bfa in self.args_dict:
log.critical("do not add the same argument twice! If you are sure, they are not the same, there might be an error with the binfootprint mehtods!")
raise ValueError("do not add the same argument twice! If you are sure, they are not the same, there might be an error with the binfootprint mehtods!")
#hash_bfa = hashlib.sha256(bf.dump(a)).digest()
# if hash_bfa in self.args_dict:
# msg = ("do not add the same argument twice! If you are sure, they are not the same, "+
# "there might be an error with the binfootprint mehtods or a hash collision!")
# log.critical(msg)
# raise ValueError(msg)
# this dict associates an unique index with each argument 'a'
# or better with its binary footprint
self.args_dict[bfa] = len(self.args_list)
#self.args_dict[hash_bfa] = len(self.args_list)
#self.args_list.append(a)
self.args_list.append(a)
# the actual shared queue
self.job_q.put(copy.copy(a))
self.job_q.put(a)
with self._numjobs.get_lock():
self._numjobs.value += 1
#with self._numjobs.get_lock():
# self._numjobs.value += 1
def args_from_list(self, args):
"""serialize a list of arguments to the job_q
@ -1302,35 +1376,35 @@ class JobManager_Server(object):
print("{} awaits client results".format(self.__class__.__name__))
def bring_him_up(self):
if not self.__start_SyncManager():
log.critical("could not start server")
raise RuntimeError("could not start server")
self._start_manager_thread()
if self._pid != os.getpid():
log.critical("do not run JobManager_Server.start() in a subprocess")
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
if (self.numjobs - self.numresults) != len(self.args_dict):
log.debug("numjobs: %s\n" +
"numresults: %s\n" +
"len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict))
# if (self.numjobs - self.numresults) != len(self.args_dict):
# log.debug("numjobs: %s\n" +
# "numresults: %s\n" +
# "len(self.args_dict): %s", self.numjobs, self.numresults, len(self.args_dict))
#
# log.critical(
# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
# raise RuntimeError(
# "inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
log.critical(
"inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
raise RuntimeError(
"inconsistency detected! (self.numjobs - self.numresults) != len(self.args_dict)! use JobManager_Server.put_arg to put arguments to the job_q")
if self.numjobs == 0:
jobqsize = self.job_q.qsize()
if jobqsize == 0:
log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q")
return
else:
log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port,
self.numjobs)
jobqsize)
print("{} started (host:{} authkey:{} port:{} jobs:{})".format(self.__class__.__name__,
self.hostname,
self.authkey.decode(),
self.port,
self.numjobs))
jobqsize))
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
@ -1345,40 +1419,39 @@ class JobManager_Server(object):
"""
info_line = progress.StringValue(num_of_bytes=100)
with progress.ProgressBarFancy(count=self._numresults,
max_count=self._numjobs,
interval=self.msg_interval,
speed_calc_cycles=self.speed_calc_cycles,
sigint='ign',
sigterm='ign',
numresults = progress.UnsignedIntValue(0)
numjobs = progress.UnsignedIntValue(self.job_q.put_items())
with progress.ProgressBarFancy(count = numresults,
max_count = numjobs,
interval = self.msg_interval,
speed_calc_cycles = self.speed_calc_cycles,
sigint = 'ign',
sigterm = 'ign',
info_line=info_line) as stat:
if not self.hide_progress:
stat.start()
while (len(self.args_dict) - self.fail_q.qsize()) > 0:
info_line.value = "result_q size:{}, job_q size:{}, recieved results:{}".format(self.result_q.qsize(),
self.job_q.qsize(),
self.numresults).encode(
'utf-8')
while numresults.value < numjobs.value:
failqsize = self.fail_q.qsize()
jobqsize = self.job_q.qsize()
markeditems = self.job_q.marked_items()
numresults.value = failqsize + markeditems
info_line.value = ("result_q size:{}, jobs: remaining:{}, "+
"done:{}, failed:{}, in progress:{}").format(self.result_q.qsize(),
jobqsize,
markeditems,
failqsize,
numjobs.value - numresults.value - jobqsize).encode('utf-8')
log.info("infoline {}".format(info_line.value))
log.info("failq size {}".format(self.fail_q.qsize()))
# allows for update of the info line
try:
arg, result = self.result_q.get(timeout=self.msg_interval)
except queue.Empty:
continue
bf_arg = bf.dump(arg)
if bf_arg not in self.args_dict:
log.warning(
"got an argument that is not listed in the args_dict (probably crunshed twice, uups) -> will be skipped")
del arg
del result
continue
del self.args_dict[bf_arg]
self.numresults += 1
self.job_q.mark(arg)
self.process_new_result(arg, result)
if not self.keep_new_result_in_memory:
del arg

View file

@ -16,9 +16,12 @@ class PersistentData_Server(JobManager_Server):
msg_interval=1,
fname_dump=None,
speed_calc_cycles=50,
overwrite=False):
overwrite=False,
**kwargs):
JobManager_Server.__init__(self, authkey, const_arg=const_arg, port=port, verbose=verbose, msg_interval=msg_interval, fname_dump=fname_dump, speed_calc_cycles=speed_calc_cycles)
JobManager_Server.__init__(self, authkey, const_arg=const_arg, port=port, verbose=verbose,
msg_interval=msg_interval, fname_dump=fname_dump,
speed_calc_cycles=speed_calc_cycles, **kwargs)
self.pds = persistent_data_structure
self.overwrite = overwrite
if self.overwrite:

View file

@ -5,24 +5,20 @@ from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import numpy as np
from scipy.integrate import ode
from scipy.special import mathieu_sem, mathieu_cem, mathieu_a, mathieu_b
import time
import warnings
#from mpl_toolkits.mplot3d import Axes3D
import warnings
try:
from matplotlib import cm
import matplotlib.pyplot as plt
except ImportError:
warnings.warn("Plotting options not available."+\
" Reason: {}.".format(sys.exc_info()[1]))
warnings.filterwarnings('error')
#warnings.filterwarnings('always', category=DeprecationWarning)
warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning)
warnings.filterwarnings('error', append=True)
import jobmanager as jm
@ -121,10 +117,10 @@ def test_distributed_mathieu():
const_arg['verbose'] = 0
authkey = 'integration_jm'
PORT = np.random.randint(10000, 60000)
with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL,
authkey = authkey,
port = 42520,
port = PORT,
const_arg = const_arg,
nproc=1,
niceness_clients=0,

View file

@ -6,6 +6,7 @@ import os
import sys
import time
import multiprocessing as mp
from multiprocessing.managers import BaseManager
import socket
import signal
import logging
@ -18,8 +19,6 @@ from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import jobmanager
import binfootprint
import progression as progress
@ -28,20 +27,30 @@ if sys.version_info[0] == 2:
elif sys.version_info[0] == 3:
TIMEOUT = 15
progress.log.setLevel(logging.ERROR)
from jobmanager.jobmanager import log as jm_log
jm_log.setLevel(logging.WARNING)
import warnings
warnings.filterwarnings('error')
#warnings.filterwarnings('always', category=ImportWarning)
warnings.filterwarnings('ignore', module='traitlets', append=False, category=DeprecationWarning)
warnings.filterwarnings('error', append=True)
import jobmanager
#logging.getLogger('jobmanager').setLevel(logging.INFO)
logging.getLogger('progression').setLevel(logging.ERROR)
logging.basicConfig(level = logging.INFO)
AUTHKEY = 'testing'
PORT = random.randint(10000, 60000)
SERVER = socket.gethostname()
def test_WarningError():
try:
warnings.warn("test warning, should be an error")
except:
pass
else:
assert False
def test_Signal_to_SIG_IGN():
from jobmanager.jobmanager import Signal_to_SIG_IGN
global PORT
@ -187,6 +196,8 @@ def start_client(hide_progress=True):
jm_client.start()
def test_start_server_with_no_args():
global PORT
PORT += 1
n = 10
args = range(1,n)
@ -197,6 +208,8 @@ def test_start_server_with_no_args():
jm_server.start()
def test_start_server():
global PORT
PORT += 1
n = 10
args = range(1,n)
@ -216,6 +229,8 @@ def test_start_server():
jm_server.start()
def test_jobmanager_static_client_call():
global PORT
PORT += 1
jm_client = jobmanager.JobManager_Client(server = SERVER,
authkey = AUTHKEY,
port = PORT,
@ -292,10 +307,10 @@ def test_jobmanager_basic():
assert not p_client.is_alive(), "the client did not terminate on time!"
# client must not throw an exception
assert p_client.exitcode == 0, "the client raised an exception"
p_server.join(3)
p_server.join(5)
# server should have come down
assert not p_server.is_alive(), "the server did not terminate on time!"
assert p_server.exitcode == 0, "the server raised an exception"
print("[+] client and server terminated")
fname = 'jobmanager.dump'
@ -339,20 +354,17 @@ def test_jobmanager_server_signals():
print("[+] still alive (assume shut down takes some time)")
p_server.join(timeout)
assert not p_server.is_alive(), "timeout for server shutdown reached"
assert p_server.exitcode == 0, "the server raised an exception"
print("[+] now terminated (timeout of {}s not reached)".format(timeout))
fname = 'jobmanager.dump'
with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f)
args_set = set(data['args_dict'].keys())
args_ref = range(1,n)
ref_set = set()
for a in args_ref:
ref_set.add(binfootprint.dump(a))
assert len(args_set) == len(ref_set)
assert len(ref_set - args_set) == 0
ac = data['job_q']
assert ac.qsize() == n-1
assert ac.marked_items() == 0
print("[+] args_set from dump contains all arguments")
except:
if p_server is not None:
@ -385,7 +397,7 @@ def test_shutdown_server_while_client_running():
p_client = None
try:
p_server = mp.Process(target=start_server, args=(n,False,1))
p_server = mp.Process(target=start_server, args=(n,False,0.1))
p_server.start()
time.sleep(0.5)
assert p_server.is_alive()
@ -400,8 +412,10 @@ def test_shutdown_server_while_client_running():
p_server.join(TIMEOUT)
assert not p_server.is_alive(), "server did not shut down on time"
assert p_server.exitcode == 0, "the server raised an exception"
p_client.join(TIMEOUT)
assert not p_client.is_alive(), "client did not shut down on time"
assert p_client.exitcode == 0, "the client raised an exception"
print("[+] server and client joined {}".format(datetime.datetime.now().isoformat()))
@ -410,7 +424,8 @@ def test_shutdown_server_while_client_running():
with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f)
args_set = set(data['args_dict'].keys())
ac = data['job_q']
args_set = {binfootprint.dump(ac.data['_'+str(id_)]) for id_ in ac._not_gotten_ids}
final_result = data['final_result']
final_res_args = {binfootprint.dump(a[0]) for a in final_result}
@ -499,7 +514,8 @@ def shutdown_client(sig):
with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f)
assert len(data['args_dict']) == 0
ac = data['job_q']
assert ac.qsize() == 0
print("[+] args_set is empty -> all args processed & none failed")
final_res_args_set = {a[0] for a in data['final_result']}
@ -517,70 +533,6 @@ def shutdown_client(sig):
p_client.terminate()
raise
def test_check_fail():
global PORT
PORT += 1
class Client_Random_Error(jobmanager.JobManager_Client):
def func(self, args, const_args, c, m):
c.value = 0
m.value = -1
fail_on = [3,5,13]
time.sleep(0.1)
if args in fail_on:
raise RuntimeError("fail_on Error")
return os.getpid()
n = 20
p_server = mp.Process(target=start_server, args=(n,))
p_server.start()
time.sleep(1)
print("START CLIENT")
jm_client = Client_Random_Error(server=SERVER,
authkey=AUTHKEY,
port=PORT,
nproc=0)
p_client = mp.Process(target=jm_client.start)
p_client.start()
try:
assert p_server.is_alive()
assert p_client.is_alive()
except:
p_client.terminate()
p_server.terminate()
raise
print("[+] server and client running")
p_server.join(60)
p_client.join(60)
assert not p_server.is_alive()
assert not p_client.is_alive()
print("[+] server and client stopped")
fname = 'jobmanager.dump'
with open(fname, 'rb') as f:
data = jobmanager.JobManager_Server.static_load(f)
set_ref = {binfootprint.dump(a) for a in range(1,n)}
args_set = set(data['args_dict'].keys())
assert args_set == data['fail_set']
final_result_args_set = {binfootprint.dump(a[0]) for a in data['final_result']}
all_set = final_result_args_set | data['fail_set']
assert len(set_ref - all_set) == 0, "final result union with reported failure do not correspond to all args!"
print("[+] all argumsents found in final_results | reported failure")
def test_jobmanager_read_old_stat():
"""
start server, start client, start process trivial jobs,
@ -609,6 +561,8 @@ def test_jobmanager_read_old_stat():
assert not p_client.is_alive(), "the client did not terminate on time!"
assert not p_server.is_alive(), "the server did not terminate on time!"
assert p_client.exitcode == 0
assert p_server.exitcode == 0
print("[+] client and server terminated")
time.sleep(1)
@ -627,6 +581,8 @@ def test_jobmanager_read_old_stat():
assert not p_client.is_alive(), "the client did not terminate on time!"
assert not p_server.is_alive(), "the server did not terminate on time!"
assert p_client.exitcode == 0
assert p_server.exitcode == 0
print("[+] client and server terminated")
fname = 'jobmanager.dump'
@ -717,23 +673,19 @@ def test_start_server_on_used_port():
p1.start()
time.sleep(1)
other_error = False
try:
start_server2()
except (RuntimeError, OSError) as e:
print("caught Exception '{}' {}".format(type(e).__name__, e))
except:
other_error = True
time.sleep(1)
p1.terminate()
time.sleep(1)
p1.join()
assert not other_error
raise
finally:
time.sleep(1)
p1.terminate()
time.sleep(1)
p1.join()
def test_shared_const_arg():
global PORT
PORT += 1
@ -820,6 +772,190 @@ def test_hum_size():
assert humanize_size(1024**2) == '1.00GB'
assert humanize_size(1024**3) == '1.00TB'
assert humanize_size(1024**4) == '1024.00TB'
def test_ArgsContainer():
from jobmanager.jobmanager import ArgsContainer
from shutil import rmtree
path = 'argscont'
try:
rmtree(path)
except FileNotFoundError:
pass
for p in [None, path]:
ac = ArgsContainer(p)
for arg in 'abcde':
ac.put(arg)
assert ac.qsize() == 5
item1 = ac.get()
item2 = ac.get()
assert ac.qsize() == 3
assert ac.marked_items() == 0
assert ac.unmarked_items() == 5
# reinserting a non marked item is allowed
ac.put(item1)
assert ac.qsize() == 4
assert ac.marked_items() == 0
assert ac.unmarked_items() == 5
# marking an item that has not been gotten yet failes
try:
ac.mark(item1)
except ValueError:
pass
else:
assert False
assert ac.qsize() == 4
assert ac.marked_items() == 0
assert ac.unmarked_items() == 5
ac.mark(item2)
assert ac.qsize() == 4
assert ac.marked_items() == 1
assert ac.unmarked_items() == 4
# already marked items can not be reinserted
try:
ac.put(item2)
except:
pass
else:
assert False
assert ac.qsize() == 4
assert ac.marked_items() == 1
assert ac.unmarked_items() == 4
# remarking raises a RuntimeWarning
try:
ac.mark(item2)
except RuntimeWarning:
pass
else:
assert False
item3 = ac.get()
assert ac.qsize() == 3
assert ac.marked_items() == 1
assert ac.unmarked_items() == 4
import pickle
ac_dump = pickle.dumps(ac)
del ac
ac2 = pickle.loads(ac_dump)
# note here, when loading, the _not_gottem_id are all ids
# except the marked its
assert ac2.qsize() == 4
from jobmanager.jobmanager import bf, hashlib
item3_hash = hashlib.sha256(bf.dump(item3)).hexdigest()
assert item3_hash in ac2.data
assert ac2.marked_items() == 1
assert ac2.unmarked_items() == 4
ac2.get()
ac2.get()
ac2.get()
item = ac2.get()
assert ac2.qsize() == 0
assert ac2.marked_items() == 1
assert ac2.unmarked_items() == 4
ac2.mark(item)
assert ac2.qsize() == 0
assert ac2.marked_items() == 2
assert ac2.unmarked_items() == 3
import queue
try:
ac2.get()
except queue.Empty:
pass
else:
assert False
ac2.clear()
def put_from_subprocess(port):
class MM_remote(BaseManager):
pass
try:
MM_remote.register('get_job_q')
m = MM_remote(('localhost', port), b'test_argscomnt')
m.connect()
ac = m.get_job_q()
for item in range(1, 200):
ac.put(item)
time.sleep(0.01)
except ValueError:
pass
def test_ArgsContainer_BaseManager():
from jobmanager.jobmanager import ArgsContainer
global PORT
path = 'argscont'
from shutil import rmtree
try:
rmtree(path)
except FileNotFoundError:
pass
for p in [path, None]:
PORT += 1
ac_inst = ArgsContainer(p)
#ac_inst.close_shelve()
class MM(BaseManager):
pass
MM.register('get_job_q', callable=lambda: ac_inst, exposed = ['put', 'get'])
def start_manager_thread():
m = MM(('', PORT), b'test_argscomnt')
m.get_server().serve_forever()
server_thr = threading.Thread(target=start_manager_thread, daemon=True)
class MM_remote(BaseManager):
pass
MM_remote.register('get_job_q')
m = MM_remote(('localhost', PORT), b'test_argscomnt')
server_thr.start()
m.connect()
pr = mp.Process(target = put_from_subprocess, args=(PORT,))
pr.start()
try:
for arg in range(200, 0,-1):
ac_inst.put(arg)
time.sleep(0.01)
except ValueError:
pass
pr.join()
assert pr.exitcode == 0
print(ac_inst.qsize())
assert ac_inst.qsize() == 200
if __name__ == "__main__":
jm_log.setLevel(logging.INFO)
@ -831,29 +967,28 @@ if __name__ == "__main__":
pass
else:
func = [
# test_hum_size,
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list,
# test_jobmanager_static_client_call,
# test_start_server_with_no_args,
# test_start_server,
# test_client,
test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
# test_client_status,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
lambda : print("END")
# test_ArgsContainer,
# test_ArgsContainer_BaseManager,
# test_hum_size,
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list,
# test_jobmanager_static_client_call,
# test_start_server_with_no_args,
# test_start_server,
# test_client,
# test_jobmanager_basic,
test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_jobmanager_read_old_stat,
# test_client_status,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
# test_hum_size,
lambda : print("END")
]
for f in func:
print()

View file

@ -1,13 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import jobmanager as jm
if __name__ == "__main__":
pass