intermed work on const_arg copy vs shared, handling port used Error, some highlighting

This commit is contained in:
cimatosa 2015-01-14 15:07:01 +01:00
parent 9d42345970
commit ba967893f7
3 changed files with 231 additions and 84 deletions

View file

@ -231,11 +231,10 @@ class Signal_handler_for_Jobmanager_client(object):
self.client_object.pbc.pause()
try:
r = input("<q> - quit, <i> - server info: ")
r = input(progress.ESC_BOLD + progress.ESC_LIGHT_RED+"<q> - quit, <i> - server info: " + progress.ESC_NO_CHAR_ATTR)
except:
r = 'q'
if r == 'i':
self._show_server_info()
elif r == 'q':
@ -360,53 +359,55 @@ class JobManager_Server(object):
This init actually starts the SyncManager as a new process. As a next step
the job_q has to be filled, see put_arg().
"""
self.verbose = verbose
self._pid = os.getpid()
self._pid_start = None
self._identifier = progress.get_identifier(name=self.__class__.__name__, pid=self._pid)
if self.verbose > 1:
print("{}: I'm the JobManager_Server main process".format(self._identifier))
self.__wait_before_stop = 2
self.port = port
if isinstance(authkey, bytearray):
self.authkey = authkey
else:
self.authkey = bytearray(authkey, encoding='utf8')
try:
self.verbose = verbose
self._pid = os.getpid()
self._pid_start = None
self._identifier = progress.get_identifier(name=self.__class__.__name__, pid=self._pid)
if self.verbose > 1:
print("{}: I'm the JobManager_Server main process".format(self._identifier))
self.const_arg = const_arg
self.fname_dump = fname_dump
self.msg_interval = msg_interval
self.speed_calc_cycles = speed_calc_cycles
# to do some redundant checking, might be removed
# the args_set holds all arguments to be processed
# in contrast to the job_q, an argument will only be removed
# from the set if it was caught by the result_q
# so iff all results have been processed successfully,
# the args_set will be empty
self.args_set = set()
# thread safe integer values
self._numresults = mp.Value('i', 0) # count the successfully processed jobs
self._numjobs = mp.Value('i', 0) # overall number of jobs
# final result as list, other types can be achieved by subclassing
self.final_result = []
# NOTE: it only works using multiprocessing.Queue()
# the Queue class from the module queue does NOT work
self.job_q = myQueue() # queue holding args to process
self.result_q = myQueue() # queue holding returned results
self.fail_q = myQueue() # queue holding args where processing failed
self.manager = None
self.__start_SyncManager()
self.__wait_before_stop = 2
self.port = port
if isinstance(authkey, bytearray):
self.authkey = authkey
else:
self.authkey = bytearray(authkey, encoding='utf8')
self.const_arg = const_arg
self.fname_dump = fname_dump
self.msg_interval = msg_interval
self.speed_calc_cycles = speed_calc_cycles
# to do some redundant checking, might be removed
# the args_set holds all arguments to be processed
# in contrast to the job_q, an argument will only be removed
# from the set if it was caught by the result_q
# so iff all results have been processed successfully,
# the args_set will be empty
self.args_set = set()
# thread safe integer values
self._numresults = mp.Value('i', 0) # count the successfully processed jobs
self._numjobs = mp.Value('i', 0) # overall number of jobs
# final result as list, other types can be achieved by subclassing
self.final_result = []
# NOTE: it only works using multiprocessing.Queue()
# the Queue class from the module queue does NOT work
self.job_q = myQueue() # queue holding args to process
self.result_q = myQueue() # queue holding returned results
self.fail_q = myQueue() # queue holding args where processing failed
self.manager = None
except:
print("RERAISE in __init__")
raise
def __stop_SyncManager(self):
if self.manager == None:
@ -427,32 +428,50 @@ class JobManager_Server(object):
def __start_SyncManager(self):
class JobQueueManager(SyncManager):
pass
# make job_q, result_q, fail_q, const_arg available via network
JobQueueManager.register('get_job_q', callable=lambda: self.job_q)
JobQueueManager.register('get_result_q', callable=lambda: self.result_q)
JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q)
JobQueueManager.register('get_const_arg', callable=lambda: self.const_arg)
address=('', self.port) #ip='' means local
authkey=self.authkey
self.manager = JobQueueManager(address, authkey)
try:
# make job_q, result_q, fail_q, const_arg available via network
JobQueueManager.register('get_job_q', callable=lambda: self.job_q)
JobQueueManager.register('get_result_q', callable=lambda: self.result_q)
JobQueueManager.register('get_fail_q', callable=lambda: self.fail_q)
JobQueueManager.register('get_const_arg', callable=lambda: self.const_arg, exposed=['__iter__'])
# start manager with non default signal handling given by
# the additional init function setup_SIG_handler_manager
self.manager.start(setup_SIG_handler_manager)
self.hostname = socket.gethostname()
address=('', self.port) #ip='' means local
authkey=self.authkey
self.manager = JobQueueManager(address, authkey)
self.hostname = socket.gethostname()
# start manager with non default signal handling given by
# the additional init function setup_SIG_handler_manager
self.manager.start(setup_SIG_handler_manager)
except EOFError as e:
print("{}: can not start {} on {}:{}".format(progress.ESC_RED + self._identifier, self.__class__.__name__, self.hostname, self.port))
print("{}: this is usually the case when the port used in not available!".format(progress.ESC_RED + self._identifier))
manager_proc = self.manager._process
manager_identifier = progress.get_identifier(name='SyncManager')
progress.check_process_termination(proc=manager_proc,
identifier=manager_identifier,
timeout=0.3,
verbose=self.verbose,
auto_kill_on_last_resort=True)
self.manager = None
return False
if self.verbose > 1:
print("{}: started on {}:{} with authkey '{}'".format(progress.get_identifier('SyncManager', self.manager._process.pid),
self.hostname,
self.port,
authkey))
return True
def __restart_SyncManager(self):
self.__stop_SyncManager()
self.__start_SyncManager()
if not self.__start_SyncManager():
raise RuntimeError("could not start server")
def __enter__(self):
return self
@ -460,17 +479,23 @@ class JobManager_Server(object):
def __exit__(self, err, val, trb):
# KeyboardInterrupt via SIGINT will be mapped to SystemExit
# SystemExit is considered non erroneous
if err == SystemExit:
if (err is None) or (err == SystemExit):
if self.verbose > 0:
print("{}: normal shutdown caused by SystemExit".format(self._identifier))
# bring everything down, dump status to file
self.shoutdown()
# no exception traceback will be printed
elif err != None:
return True
else:
if self.verbose > 0:
print("{}: shutdown due to exception {}".format(progress.ESC_RED + self._identifier, err))
# bring everything down, dump status to file
self.shoutdown()
# causes exception traceback to be printed
traceback.print_exception(err, val, trb)
return False
# bring everything down, dump status to file
self.shoutdown()
return True
@property
def numjobs(self):
@ -533,7 +558,9 @@ class JobManager_Server(object):
all_processed = succeeded + failed
id = self._identifier + ": "
id2 = progress.ESC_HIDDEN + self._identifier + progress.ESC_RESET_HIDDEN + "| "
stripped_id = progress.remove_ESC_SEQ_from_string(self._identifier)
l = len(stripped_id)
id2 = ' '*l + "| "
print("{}total number of jobs : {}".format(id, all_jobs))
print("{} processed : {}".format(id2, all_processed))
@ -668,6 +695,10 @@ class JobManager_Server(object):
When finished, or on exception call stop() afterwards to shout down gracefully.
"""
if not self.__start_SyncManager():
raise RuntimeError("could not start server")
if self._pid != os.getpid():
raise RuntimeError("do not run JobManager_Server.start() in a subprocess")
@ -754,7 +785,8 @@ class JobManager_Client(object):
verbose=1,
show_statusbar_for_jobs=True,
show_counter_only=False,
interval=0.3):
interval=0.3,
copy_const_arg=False):
"""
server [string] - ip address or hostname where the JobManager_Server is running
@ -782,6 +814,7 @@ class JobManager_Client(object):
self.show_counter_only = show_counter_only
self.interval = interval
self.verbose = verbose
self.copy_const_arg = copy_const_arg
self._pid = os.getpid()
self._identifier = progress.get_identifier(name=self.__class__.__name__, pid=self._pid)
@ -818,10 +851,11 @@ class JobManager_Client(object):
self.port,
self.authkey,
self._identifier,
self.verbose)
self.verbose,
self.copy_const_arg)
@staticmethod
def _get_manager_objects(server, port, authkey, identifier, verbose=0):
def _get_manager_objects(server, port, authkey, identifier, verbose = 0, copy_const_arg = False):
"""
connects to the server and get registered shared objects such as
job_q, result_q, fail_q, const_arg
@ -864,6 +898,11 @@ class JobManager_Client(object):
result_q = manager.get_result_q()
fail_q = manager.get_fail_q()
const_arg = manager.get_const_arg()
print("manager:", type(const_arg))
if copy_const_arg:
const_arg = copy.deepcopy(const_arg)
print("copied :", type(const_arg))
return job_q, result_q, fail_q, const_arg
@staticmethod

View file

@ -32,7 +32,6 @@ for s in dir(signal):
else:
signal_dict[n] = s
ESC_NO_CHAR_ATTR = "\033[0m"
ESC_BOLD = "\033[1m"
@ -67,12 +66,48 @@ ESC_LIGHT_MAGENTA = "\033[95m"
ESC_LIGHT_CYAN = "\033[96m"
ESC_WHITE = "\033[97m"
ESC_SEQ_SET = [ESC_NO_CHAR_ATTR,
ESC_BOLD,
ESC_DIM,
ESC_UNDERLINED,
ESC_BLINK,
ESC_INVERTED,
ESC_HIDDEN,
ESC_RESET_BOLD,
ESC_RESET_DIM,
ESC_RESET_UNDERLINED,
ESC_RESET_BLINK,
ESC_RESET_INVERTED,
ESC_RESET_HIDDEN,
ESC_DEFAULT,
ESC_BLACK,
ESC_RED,
ESC_GREEN,
ESC_YELLOW,
ESC_BLUE,
ESC_MAGENTA,
ESC_CYAN,
ESC_LIGHT_GREY,
ESC_DARK_GREY,
ESC_LIGHT_RED,
ESC_LIGHT_GREEN,
ESC_LIGHT_YELLOW,
ESC_LIGHT_BLUE,
ESC_LIGHT_MAGENTA,
ESC_LIGHT_CYAN,
ESC_WHITE]
def ESC_MOVE_LINE_UP(n):
return "\033[{}A".format(n)
def ESC_MOVE_LINE_DOWN(n):
return "\033[{}B".format(n)
def remove_ESC_SEQ_from_string(s):
for esc_seq in ESC_SEQ_SET:
s = s.replace(esc_seq, '')
return s
def humanize_time(secs):
"""convert second in to hh:mm:ss format

View file

@ -553,6 +553,76 @@ def test_jobmanager_local():
jm_server.args_from_list(args)
jm_server.start()
def test_start_server_on_used_port():
def start_server():
const_arg = None
arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg',
const_arg=const_arg,
fname_dump=None) as server:
server.args_from_list(arg)
server.start()
def start_server2():
const_arg = None
arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg',
const_arg=const_arg,
fname_dump=None) as server:
server.args_from_list(arg)
server.start()
p1 = mp.Process(target=start_server)
p1.start()
time.sleep(1)
start_server2()
print(os.getpid())
time.sleep(1)
p1.terminate()
time.sleep(1)
p1.join()
def test_shared_const_arg():
def start_server():
const_arg = [1,2,3]
arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg',
const_arg=const_arg,
fname_dump=None) as server:
server.args_from_list(arg)
server.start()
print("const_arg at server side", const_arg)
def start_client():
class myClient(jobmanager.JobManager_Client):
@staticmethod
def func(arg, const_arg):
const_arg.append(os.get_pid())
print(self._identifier, arg, const_arg)
return None
p1 = mp.Process(target=start_server)
p2 = mp.Process(target=start_server)
p1.start()
time.sleep(1)
p2.start()
p2.join()
time.sleep(1)
p1.terminate()
time.sleep(1)
p1.join()
def _test_interrupt_server():
start_server(n = 100)
@ -591,15 +661,18 @@ if __name__ == "__main__":
# test_hashedViewOnNumpyArray,
# test_client_status,
# test_jobmanager_local,
test_start_server_on_used_port,
# test_shared_const_arg,
lambda : print("END")
]
# for f in func:
# print()
# print('#'*80)
# print('## {}'.format(f.__name__))
# print()
# f()
# time.sleep(1)
for f in func:
print()
print('#'*80)
print('## {}'.format(f.__name__))
print()
f()
time.sleep(1)
_test_interrupt_client()
# _test_interrupt_client()