work in progress to fix speed issues

This commit is contained in:
Richard Hartmann 2019-06-03 20:52:36 +02:00
parent 909007a53d
commit 6922a596ff

View file

@ -29,9 +29,9 @@ The class JobManager_Client
""" """
import copy import copy
import ctypes #import ctypes
from datetime import datetime from datetime import datetime
import inspect #import inspect
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.managers import BaseManager, RemoteError from multiprocessing.managers import BaseManager, RemoteError
import subprocess import subprocess
@ -46,6 +46,7 @@ import time
import traceback import traceback
import warnings import warnings
import binfootprint as bf import binfootprint as bf
import pathlib
import progression as progress import progression as progress
import shelve import shelve
import hashlib import hashlib
@ -639,13 +640,6 @@ class JobManager_Client(object):
prepend = [] prepend = []
if self.timeout is None:
infoline = None
else:
infoline = progress.StringValue(num_of_bytes=32)
infoline.value = "timeout in: {}s".format(int(self.timeout - (time.time() - self.init_time))).encode('utf-8')
# try: # try:
# worker_stdout_queue = mp.Queue(-1) # worker_stdout_queue = mp.Queue(-1)
# listener = QueueListener(worker_stdout_queue, console_hand) # listener = QueueListener(worker_stdout_queue, console_hand)
@ -667,6 +661,10 @@ class JobManager_Client(object):
local_result_q = mp.Queue() local_result_q = mp.Queue()
local_fail_q = mp.Queue() local_fail_q = mp.Queue()
infoline = progress.StringValue(num_of_bytes=64)
kwargs = {'reconnect_wait': self.reconnect_wait, kwargs = {'reconnect_wait': self.reconnect_wait,
'reconnect_tries': self.reconnect_tries, 'reconnect_tries': self.reconnect_tries,
'ping_timeout': self.ping_timeout, 'ping_timeout': self.ping_timeout,
@ -680,6 +678,13 @@ class JobManager_Client(object):
result_q_put_pending_lock = threading.Lock() result_q_put_pending_lock = threading.Lock()
job_q_put_pending_lock = threading.Lock() job_q_put_pending_lock = threading.Lock()
fail_q_put_pending_lock = threading.Lock() fail_q_put_pending_lock = threading.Lock()
def update_infoline(infoline, local_result_q):
while True:
infoline.value = "local res_q {}".format(local_result_q.qsize()).encode('utf-8')
if self.timeout:
infoline.value += " timeout in: {}s".format(int(self.timeout - (time.time() - self.init_time))).encode('utf-8')
time.sleep(1)
def pass_job_q_put(job_q_put, local_job_q, job_q_put_pending_lock): def pass_job_q_put(job_q_put, local_job_q, job_q_put_pending_lock):
# log.debug("this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186)) # log.debug("this is thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
@ -722,9 +727,13 @@ class JobManager_Client(object):
thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q, fail_q_put_pending_lock)) thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q, fail_q_put_pending_lock))
thr_fail_q_put.daemon = True thr_fail_q_put.daemon = True
thr_update_infoline = threading.Thread(target=update_infoline, args=(infoline, local_result_q))
thr_update_infoline.daemon = True
thr_job_q_put.start() thr_job_q_put.start()
thr_result_q_put.start() thr_result_q_put.start()
thr_fail_q_put.start() thr_fail_q_put.start()
thr_update_infoline.start()
with progress.ProgressBarCounterFancy(count = c, with progress.ProgressBarCounterFancy(count = c,
max_count = m_progress, max_count = m_progress,
@ -1331,7 +1340,8 @@ class JobManager_Server(object):
job_q_on_disk = False, job_q_on_disk = False,
job_q_on_disk_path = '.', job_q_on_disk_path = '.',
timeout = None, timeout = None,
log_level = logging.WARNING): log_level = logging.WARNING,
status_file_name = None):
""" """
authkey [string] - authentication key used by the SyncManager. authkey [string] - authentication key used by the SyncManager.
Server and Client must have the same authkey. Server and Client must have the same authkey.
@ -1367,6 +1377,16 @@ class JobManager_Server(object):
self._pid = os.getpid() self._pid = os.getpid()
self._pid_start = None self._pid_start = None
if status_file_name:
self.status_file = pathlib.Path(status_file_name)
if self.status_file.exists():
raise FileExistsError("the status file '{}' already exists".format(status_file_name))
with open(self.status_file, 'w') as f:
f.write('init')
else:
self.status_file = None
if verbose is not None: if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility") log.warning("verbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning) warnings.warn("verbose is deprecated", DeprecationWarning)
@ -1417,6 +1437,7 @@ class JobManager_Server(object):
self.start_time = datetime.now() self.start_time = datetime.now()
@staticmethod @staticmethod
def _check_bind(host, port): def _check_bind(host, port):
@ -1502,6 +1523,9 @@ class JobManager_Server(object):
- if job_q is not empty dump remaining job_q - if job_q is not empty dump remaining job_q
""" """
# will only be False when _shutdown was started in subprocess # will only be False when _shutdown was started in subprocess
if self.status_file:
with open(self.status_file, 'w') as f:
f.write('stop')
self.job_q.close() self.job_q.close()
self.result_q.close() self.result_q.close()
self.fail_q.close() self.fail_q.close()
@ -1541,6 +1565,8 @@ class JobManager_Server(object):
log.info("JobManager_Server was successfully shut down") log.info("JobManager_Server was successfully shut down")
if self.status_file:
os.remove(self.status_file)
def show_statistics(self): def show_statistics(self):
if self.show_stat: if self.show_stat:
@ -1699,6 +1725,10 @@ class JobManager_Server(object):
log.debug("ready for processing incoming results") log.debug("ready for processing incoming results")
self.print_jm_ready() self.print_jm_ready()
if self.status_file:
with open(self.status_file, 'w') as f:
f.write('ready')
def join(self): def join(self):
""" """
starts to loop over incoming results starts to loop over incoming results