Merge branch 'master' into dev

This commit is contained in:
Richard Hartmann 2016-11-21 17:07:34 +01:00
commit 3fac795ecd
2 changed files with 191 additions and 121 deletions

View file

@ -40,11 +40,14 @@ import pickle
import signal
import socket
import sys
import random
import time
import traceback
import warnings
import binfootprint as bf
import progression as progress
#import queuelib
import shelve
import logging
import threading
import ctypes
@ -183,10 +186,10 @@ class JobManager_Client(object):
show_counter_only = False,
interval = 0.3,
emergency_dump_path = '.',
job_q_get_timeout = 1,
job_q_put_timeout = 10,
result_q_put_timeout = 300,
fail_q_put_timeout = 10,
#job_q_get_timeout = 1,
#job_q_put_timeout = 10,
#result_q_put_timeout = 300,
#fail_q_put_timeout = 10,
reconnect_wait = 2,
reconnect_tries = 3,
ping_timeout = 2,
@ -248,14 +251,14 @@ class JobManager_Client(object):
self.interval = interval
log.debug("interval:%s", self.interval)
self._job_q_get_timeout = job_q_get_timeout
log.debug("_job_q_get_timeout:%s", self._job_q_get_timeout)
self._job_q_put_timeout = job_q_put_timeout
log.debug("_job_q_put_timeout:%s", self._job_q_put_timeout)
self._result_q_put_timeout = result_q_put_timeout
log.debug("_result_q_put_timeout:%s", self._result_q_put_timeout)
self._fail_q_put_timeout = fail_q_put_timeout
log.debug("_fail_q_put_timeout:%s", self._fail_q_put_timeout)
#self._job_q_get_timeout = job_q_get_timeout
#log.debug("_job_q_get_timeout:%s", self._job_q_get_timeout)
#self._job_q_put_timeout = job_q_put_timeout
#log.debug("_job_q_put_timeout:%s", self._job_q_put_timeout)
#self._result_q_put_timeout = result_q_put_timeout
#log.debug("_result_q_put_timeout:%s", self._result_q_put_timeout)
#self._fail_q_put_timeout = fail_q_put_timeout
#log.debug("_fail_q_put_timeout:%s", self._fail_q_put_timeout)
self.reconnect_wait = reconnect_wait
log.debug("reconnect_wait:%s", self.reconnect_wait)
self.reconnect_tries = reconnect_tries
@ -396,7 +399,7 @@ class JobManager_Client(object):
reset_pbc,
njobs,
emergency_dump_path,
job_q_get_timeout,
# job_q_get_timeout,
host,
port,
authkey):
@ -457,25 +460,26 @@ class JobManager_Client(object):
njobs -= 1
# try to get an item from the job_q
try:
tg_0 = time.time()
arg = job_q_get(block = True, timeout = job_q_get_timeout)
tg_1 = time.time()
time_queue += (tg_1-tg_0)
try:
arg = job_q_get()
# regular case, just stop working when empty job_q was found
except queue.Empty:
log.info("finds empty job queue, processed %s jobs", cnt)
break
# handle SystemExit in outer try ... except
except SystemExit as e:
arg = None
log.warning('getting arg from job_q failed due to SystemExit')
raise e
# job_q.get failed -> server down?
except Exception as e:
arg = None
log.error("Error when calling 'job_q_get'")
handle_unexpected_queue_error(e)
break
tg_1 = time.time()
time_queue += (tg_1-tg_0)
# try to process the retrieved argument
try:
@ -548,6 +552,9 @@ class JobManager_Client(object):
# note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the
# default signal handlers
except SystemExit:
if arg is None:
log.warning("SystemExit, quit processing, no argument to reinsert")
else:
log.warning("SystemExit, quit processing, reinsert current argument, please wait")
log.debug("put arg back to local job_q")
try:
@ -590,7 +597,6 @@ class JobManager_Client(object):
if not self.connected:
raise JMConnectionError("Can not start Client with no connection to server (shared objetcs are not available)")
log.info("STARTING CLIENT\nserver:%s authkey:%s port:%s num proc:%s", self.server, self.authkey.decode(), self.port, self.nproc)
c = []
@ -607,6 +613,8 @@ class JobManager_Client(object):
if not self.show_counter_only:
m_set_by_function = m_progress
else:
m_progress = None
prepend = []
infoline = progress.StringValue(num_of_bytes=12)
@ -643,41 +651,40 @@ class JobManager_Client(object):
result_q_put = proxy_operation_decorator(proxy=result_q, operation='put', **kwargs)
fail_q_put = proxy_operation_decorator(proxy=fail_q, operation='put', **kwargs)
def pass_job_q_put(job_q_put, local_job_q, timeout):
def pass_job_q_put(job_q_put, local_job_q):
# log.debug("this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
while True:
data = local_job_q.get()
job_q_put(data, timeout=timeout)
job_q_put(data)
# log.debug("stopped thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
def pass_result_q_put(result_q_put, local_result_q, timeout):
def pass_result_q_put(result_q_put, local_result_q):
log.debug("this is thread thr_result_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
try:
while True:
data = local_result_q.get()
result_q_put(data, timeout=timeout)
result_q_put(data)
except Exception as e:
log.error("thr_result_q_put caught error %s", type(e))
log.info(traceback.format_exc())
log.debug("stopped thread thr_result_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
def pass_fail_q_put(fail_q_put, local_fail_q, timeout):
def pass_fail_q_put(fail_q_put, local_fail_q):
# log.debug("this is thread thr_fail_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
while True:
data = local_fail_q.get()
fail_q_put(data, timeout=timeout)
log.info("put {} to failq".format(data))
fail_q_put(data)
# log.debug("stopped thread thr_fail_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
thr_job_q_put = threading.Thread(target=pass_job_q_put , args=(job_q_put , local_job_q , self._job_q_put_timeout))
thr_job_q_put = threading.Thread(target=pass_job_q_put , args=(job_q_put , local_job_q))
thr_job_q_put.daemon = True
thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q, self._result_q_put_timeout))
thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q))
thr_result_q_put.daemon = True
thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q , self._fail_q_put_timeout))
thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q))
thr_fail_q_put.daemon = True
thr_job_q_put.start()
thr_result_q_put.start()
thr_fail_q_put.start()
@ -709,7 +716,7 @@ class JobManager_Client(object):
reset_pbc, # reset_pbc
self.njobs, # njobs
self.emergency_dump_path, # emergency_dump_path
self._job_q_get_timeout, # job_q_get_timeout
#self._job_q_get_timeout, # job_q_get_timeout
self.server, # host
self.port, # port
self.authkey)) # authkey
@ -819,6 +826,59 @@ def get_shared_status(ss):
else:
return ss.value
class PersistentQueue(object):
def __init__(self, fname):
self._path = fname
self.q = shelve.open(self._path)
self.head = 0
self.tail = 0
def close(self):
self.q.close()
os.remove(self._path)
def qsize(self):
# log.info("qsize: this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
# log.info("qsize {} ({},{})".format(self.tail - self.head, self.tail, self.head))
return self.tail - self.head
def put(self, item):
# log.info("put item {}".format(item))
self.q[str(self.tail)] = item
self.tail += 1
def get(self):
try:
item = self.q[str(self.head)]
except KeyError:
raise queue.Empty
self.head += 1
# log.info("get item {}".format(item))
return item
RAND_STR_ASCII_IDX_LIST = list(range(48,58)) + list(range(65,91)) + list(range(97,123))
def rand_str(l = 8):
s = ''
for i in range(l):
s += chr(random.choice(RAND_STR_ASCII_IDX_LIST))
return s
def _new_rand_file_name(path='.', end='', l=8):
c = 0
while True:
fname = rand_str(l) + end
full_name = os.path.join(path, fname)
if not os.path.exists(full_name):
return fname
c += 1
if c > 10:
l += 2
c = 0
print("INFO: increase random file name length to", l)
class JobManager_Server(object):
"""general usage:
@ -873,7 +933,9 @@ class JobManager_Server(object):
speed_calc_cycles = 50,
keep_new_result_in_memory = False,
hide_progress = False,
show_statistics = True):
show_statistics = True,
job_q_on_disk = False,
job_q_on_disk_path = '.'):
"""
authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey.
@ -951,9 +1013,15 @@ class JobManager_Server(object):
# NOTE: it only works using multiprocessing.Queue()
# the Queue class from the module queue does NOT work
self.job_q_on_disk = job_q_on_disk
if job_q_on_disk:
fname = _new_rand_file_name(job_q_on_disk_path, '.jobqdb')
self.job_q = PersistentQueue(fname)
else:
self.job_q = myQueue() # queue holding args to process
self.result_q = myQueue() # queue holding returned results
self.fail_q = myQueue() # queue holding args where processing failed
self.manager = None
self.hostname = socket.gethostname()
@ -1065,10 +1133,9 @@ class JobManager_Server(object):
- if job_q is not empty dump remaining job_q
"""
# will only be False when _shutdown was started in subprocess
self.job_q.close()
self.__stop_SyncManager()
log.debug("SyncManager stopped!")
# do user defined final processing
self.process_final_result()
log.debug("process_final_result done!")
@ -1165,6 +1232,7 @@ class JobManager_Server(object):
pickle.dump(self.args_dict, f, protocol=pickle.HIGHEST_PROTOCOL)
pickle.dump(self.args_list, f, protocol=pickle.HIGHEST_PROTOCOL)
fail_list = []
log.info("__dump: failq size {}".format(self.fail_q.qsize()))
try:
while True:
fail_list.append(self.fail_q.get_nowait())
@ -1231,7 +1299,7 @@ class JobManager_Server(object):
def print_jm_ready(self):
# please overwrite for individual hooks to notify that the server process runs
print("jobmanager awaits client results")
print("{} awaits client results".format(self.__class__.__name__))
def bring_him_up(self):
if not self.__start_SyncManager():
@ -1258,6 +1326,11 @@ class JobManager_Server(object):
else:
log.info("started (host:%s authkey:%s port:%s jobs:%s)", self.hostname, self.authkey.decode(), self.port,
self.numjobs)
print("{} started (host:{} authkey:{} port:{} jobs:{})".format(self.__class__.__name__,
self.hostname,
self.authkey.decode(),
self.port,
self.numjobs))
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
@ -1270,6 +1343,7 @@ class JobManager_Server(object):
When finished, or on exception call stop() afterwards to shut down gracefully.
"""
info_line = progress.StringValue(num_of_bytes=100)
with progress.ProgressBarFancy(count=self._numresults,
max_count=self._numjobs,
@ -1286,6 +1360,8 @@ class JobManager_Server(object):
self.job_q.qsize(),
self.numresults).encode(
'utf-8')
log.info("infoline {}".format(info_line.value))
log.info("failq size {}".format(self.fail_q.qsize()))
# allows for update of the info line
try:

View file

@ -36,7 +36,7 @@ jm_log.setLevel(logging.WARNING)
import warnings
warnings.filterwarnings('error')
#warnings.filterwarnings('always', category=DeprecationWarning)
#warnings.filterwarnings('always', category=ImportWarning)
AUTHKEY = 'testing'
PORT = random.randint(10000, 60000)
@ -160,7 +160,7 @@ def test_Signal_to_terminate_process_list():
def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False):
def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False, job_q_on_disk=False):
print("START SERVER")
args = range(1,n)
with jobmanager.JobManager_Server(authkey = AUTHKEY,
@ -168,7 +168,8 @@ def start_server(n, read_old_state=False, client_sleep=0.1, hide_progress=False)
msg_interval = 1,
const_arg = client_sleep,
fname_dump = 'jobmanager.dump',
hide_progress = hide_progress) as jm_server:
hide_progress = hide_progress,
job_q_on_disk = job_q_on_disk) as jm_server:
if not read_old_state:
jm_server.args_from_list(args)
else:
@ -182,9 +183,6 @@ def start_client(hide_progress=True):
port = PORT,
nproc = 3,
reconnect_tries = 0,
job_q_put_timeout = 1,
result_q_put_timeout = 1,
fail_q_put_timeout = 1,
hide_progress = hide_progress)
jm_client.start()
@ -222,10 +220,7 @@ def test_jobmanager_static_client_call():
authkey = AUTHKEY,
port = PORT,
nproc = 3,
reconnect_tries = 0,
job_q_put_timeout = 1,
result_q_put_timeout = 1,
fail_q_put_timeout = 1)
reconnect_tries = 0)
jm_client.func(arg=1, const_arg=1)
@ -247,10 +242,7 @@ def test_client():
authkey = AUTHKEY,
port = PORT,
nproc = 3,
reconnect_tries = 0,
job_q_put_timeout = 1,
result_q_put_timeout = 1,
fail_q_put_timeout = 1)
reconnect_tries = 0)
jmc.start()
p_server.join(5)
@ -271,15 +263,17 @@ def test_jobmanager_basic():
check if all arguments are found in final_result of dump
"""
global PORT
for jqd in [False, True]:
PORT += 1
n = 5
p_server = None
p_client = None
try:
# start a server
p_server = mp.Process(target=start_server, args=(n,False))
p_server = mp.Process(target=start_server, args=(n,False), kwargs={'job_q_on_disk': jqd})
p_server.start()
time.sleep(0.5)
# server needs to be running
@ -470,7 +464,7 @@ def shutdown_client(sig):
try:
p_server = mp.Process(target=start_server, args=(n, False, 0.1))
p_server = mp.Process(target=start_server, args=(n, False, 0.4))
p_server.start()
time.sleep(0.5)
@ -828,10 +822,10 @@ def test_hum_size():
assert humanize_size(1024**4) == '1024.00TB'
if __name__ == "__main__":
jm_log.setLevel(logging.DEBUG)
progress.log.setLevel(logging.DEBUG)
jm_log.setLevel(logging.INFO)
# progress.log.setLevel(logging.DEBUG)
# jm_log.setLevel(logging.ERROR)
# progress.log.setLevel(logging.ERROR)
progress.log.setLevel(logging.ERROR)
if len(sys.argv) > 1:
pass
@ -846,14 +840,14 @@ if __name__ == "__main__":
# test_start_server_with_no_args,
# test_start_server,
# test_client,
# test_jobmanager_basic,
test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
# test_client_status,
test_jobmanager_local,
# test_jobmanager_local,
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,