minor add ons related to the JM_Local class

This commit is contained in:
Richard Hartmann 2020-04-06 21:25:55 +02:00
parent 5d9329dd5b
commit 0f64ece8e8

View file

@ -124,6 +124,24 @@ ServerQueueManager.register('get_result_q')
ServerQueueManager.register('get_fail_q')
ServerQueueManager.register('get_const_arg')
def parse_nproc(nproc):
# nproc specifies n explicitly
if nproc >= 1:
n = int(nproc)
# nproc specifies n by telling how many core should be left unused
elif nproc <= 0:
n = int(mp.cpu_count() + nproc)
if n <= 0:
raise RuntimeError(
"Invalid Number of Processes\ncan not spawn {} processes (cores found: {}, cores NOT to use: {} = -nproc)".format(
n, mp.cpu_count(), abs(nproc)))
# nproc specifies n as the fraction of all cores available, however, at least one
else:
n = max(int(nproc * mp.cpu_count()), 1)
return n
class JobManager_Client(object):
"""
Calls the functions self.func with arguments fetched from the job_q.
@ -226,7 +244,7 @@ class JobManager_Client(object):
self._sid = os.getsid(self._pid)
if verbose is not None:
log.warning("verbose is deprecated, only allowed for compatibility")
log.warning("\nverbose is deprecated, only allowed for compatibility")
warnings.warn("verbose is deprecated", DeprecationWarning)
self.hide_progress = hide_progress
@ -273,16 +291,7 @@ class JobManager_Client(object):
log.debug("port:%s", self.port)
self.nice = nice
log.debug("nice:%s", self.nice)
if nproc >= 1:
self.nproc = int(nproc)
elif nproc <= 0:
self.nproc = int(mp.cpu_count() + nproc)
if self.nproc <= 0:
raise RuntimeError("Invalid Number of Processes\ncan not spawn {} processes (cores found: {}, cores NOT to use: {} = -nproc)".format(self.nproc, mp.cpu_count(), abs(nproc)))
else:
self.nproc = int(nproc*mp.cpu_count())
if self.nproc == 0:
self.nproc = 1
self.nproc = parse_nproc(nproc)
log.debug("nproc:%s", self.nproc)
self.nthreads = nthreads
@ -428,7 +437,7 @@ class JobManager_Client(object):
count_args = progress.getCountKwargs(func)
if count_args is None:
log.warning("found function without status information (progress will not work)")
log.info("found function without status information (progress will not work)")
m.value = 0 # setting max_count to -1 will hide the progress bar
_func = lambda arg, const_arg, c, m : func(arg, const_arg)
elif count_args != ["c", "m"]:
@ -1289,7 +1298,7 @@ class ArgsContainer(object):
if item_id in self._not_gotten_ids:
raise ValueError("item not gotten yet, can not be marked")
if item_id in self._marked_ids:
raise RuntimeWarning("item allready marked")
raise RuntimeWarning("item already marked")
self._marked_ids.add(item_id)
@ -1408,6 +1417,9 @@ class JobManager_Server(object):
This init actually starts the SyncManager as a new process. As a next step
the job_q has to be filled, see put_arg().
"""
assert fname_dump is None
global log
log = logging.getLogger(__name__+'.'+self.__class__.__name__)
log.setLevel(log_level)
@ -1538,6 +1550,7 @@ class JobManager_Server(object):
return self
def __exit__(self, err, val, trb):
print("\n############## in JM SERVER EXIT\n")
# KeyboardInterrupt via SIGINT will be mapped to SystemExit
# SystemExit is considered non erroneous
if (err is None) or (err == SystemExit):
@ -1579,7 +1592,6 @@ class JobManager_Server(object):
self.show_statistics()
# print(self.fname_dump)
if self.fname_dump is not None:
if self.fname_dump == 'auto':
fname = "{}_{}.dump".format(self.authkey.decode('utf8'), getDateForFileName(includePID=False))
@ -1624,7 +1636,7 @@ class JobManager_Server(object):
print("{} failed : {}".format(id2, failed))
all_not_processed = all_jobs - all_processed
not_queried = self.job_q.qsize()
not_queried = self.number_of_jobs()
queried_but_not_processed = all_not_processed - not_queried
print("{} not processed : {}".format(id2, all_not_processed))
@ -1632,7 +1644,7 @@ class JobManager_Server(object):
print("{} not queried yet : {}".format(id2, not_queried))
def all_successfully_processed(self):
return self.job_q.qsize() == 0
return self.number_of_jobs() == 0
@staticmethod
def static_load(f):
@ -1650,7 +1662,7 @@ class JobManager_Server(object):
self.fail_q.put(fail_item)
log.debug("load: len(final_result): {}".format(len(self.final_result)))
log.debug("load: job_q.qsize: {}".format(self.job_q.qsize()))
log.debug("load: job_q.qsize: {}".format(self.number_of_jobs()))
log.debug("load: job_q.marked_items: {}".format(self.job_q.marked_items()))
log.debug("load: job_q.gotten_items: {}".format(self.job_q.gotten_items()))
log.debug("load: job_q.unmarked_items: {}".format(self.job_q.unmarked_items()))
@ -1661,7 +1673,7 @@ class JobManager_Server(object):
pickle.dump(self.final_result, f, protocol=pickle.HIGHEST_PROTOCOL)
log.debug("dump: len(final_result): {}".format(len(self.final_result)))
pickle.dump(self.job_q, f, protocol=pickle.HIGHEST_PROTOCOL)
log.debug("dump: job_q.qsize: {}".format(self.job_q.qsize()))
log.debug("dump: job_q.qsize: {}".format(self.number_of_jobs()))
log.debug("dump: job_q.marked_items: {}".format(self.job_q.marked_items()))
log.debug("dump: job_q.gotten_items: {}".format(self.job_q.gotten_items()))
log.debug("dump: job_q.unmarked_items: {}".format(self.job_q.unmarked_items()))
@ -1714,6 +1726,9 @@ class JobManager_Server(object):
#with self._numjobs.get_lock():
# self._numjobs.value += 1
def number_of_jobs(self):
return self.job_q.qsize()
def args_from_list(self, args):
"""serialize a list of arguments to the job_q
@ -1737,7 +1752,7 @@ class JobManager_Server(object):
# please overwrite for individual hooks to notify that the server process runs
print("{} awaits client results".format(self.__class__.__name__))
def bring_him_up(self):
def bring_him_up(self, no_sys_exit_on_signal=False):
self._start_manager()
@ -1745,7 +1760,7 @@ class JobManager_Server(object):
log.critical("do not run JobManager_Server.start() in a subprocess")
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
jobqsize = self.job_q.qsize()
jobqsize = self.number_of_jobs()
if jobqsize == 0:
log.info("no jobs to process! use JobManager_Server.put_arg to put arguments to the job_q")
return
@ -1757,8 +1772,10 @@ class JobManager_Server(object):
self.authkey.decode(),
self.port,
jobqsize))
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
if no_sys_exit_on_signal:
log.info("no_sys_exit_on_signal was set to True. It's the users responsability to call the 'shutdown' method.")
else:
Signal_to_sys_exit(signals=[signal.SIGTERM, signal.SIGINT])
log.debug("ready for processing incoming results")
self.print_jm_ready()
@ -1805,7 +1822,7 @@ class JobManager_Server(object):
while numresults.value < numjobs.value:
numjobs.value = self.job_q.put_items()
failqsize = self.fail_q.qsize()
jobqsize = self.job_q.qsize()
jobqsize = self.number_of_jobs()
markeditems = self.job_q.marked_items()
numresults.value = failqsize + markeditems
if (time.time() - curr_time) > self.msg_interval:
@ -1874,23 +1891,15 @@ class JobManager_Local(object):
authkey = 'local_jobmanager',
nproc = -1,
const_arg = None,
port = 42524,
verbose = None,
port = 42524,
verbose_client = None,
show_statusbar_for_jobs = False,
show_counter_only = False,
niceness_clients = 19,
msg_interval = 1,
fname_dump = 'auto',
speed_calc_cycles = 50):
niceness_clients = 19):
self.server = server_class(authkey = authkey,
const_arg = const_arg,
port = port,
verbose = verbose,
msg_interval = msg_interval,
fname_dump = fname_dump,
speed_calc_cycles = speed_calc_cycles,
**server_init_kwargs)
self.client_class = client_class
@ -1902,6 +1911,24 @@ class JobManager_Local(object):
self.show_statusbar_for_jobs = show_statusbar_for_jobs
self.show_counter_only = show_counter_only
self.niceness_clients = niceness_clients
self.p_client = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.server.stat:
self.server.stat.stop() # stop the progress bar to see all messages
if self.p_client is not None:
self.p_client.terminate()
print("join client ...")
self.p_client.join()
print("client has joined")
print("shutdown server")
self.server.shutdown()
@staticmethod
def _start_client(authkey,
@ -1922,7 +1949,7 @@ class JobManager_Local(object):
verbose = verbose,
show_statusbar_for_jobs = show_statusbar_for_jobs,
show_counter_only = show_counter_only,
use_special_SIG_INT_handler = False,
ask_on_sigterm = False,
**client_init_kwargs)
Signal_to_sys_exit(signals=[signal.SIGINT, signal.SIGTERM])
@ -1930,25 +1957,28 @@ class JobManager_Local(object):
def start(self):
p_client = mp.Process(target=JobManager_Local._start_client,
n = parse_nproc(self.nproc)
nproc = min(self.server.number_of_jobs(), n)
self.p_client = mp.Process(target=JobManager_Local._start_client,
args=(self.authkey,
self.port,
self.client_class,
self.client_init_kwargs,
self.nproc,
nproc,
self.niceness_clients,
self.verbose_client,
self.show_statusbar_for_jobs,
self.show_counter_only))
self.server.bring_him_up()
p_client.start()
try:
self.server.join()
finally:
progress.check_process_termination(p_client,
prefix = 'local_client',
timeout = 2)
self.server.bring_him_up(no_sys_exit_on_signal=True)
if nproc == 0:
self.p_client = None
return
self.p_client.start()
self.server.join()
class RemoteKeyError(RemoteError):
pass
@ -2242,6 +2272,8 @@ class proxy_operation_decorator(object):
def __call__(self, *args, **kwargs):
try:
res = self.o(*args, **kwargs)
except queue.Empty as e:
log.debug("operation '%s' -> %s FAILED due to '%s' -> retry", self.operation, self.dest, type(e))
except Exception as e:
log.warning("operation '%s' -> %s FAILED due to '%s' -> retry", self.operation, self.dest, type(e))
else: