mirror of
https://github.com/vale981/jobmanager
synced 2025-03-05 18:01:38 -05:00
handle errors during proxy operation like 'get' and 'put' -> give helpful error messages and try to reconnect in certain circumstances
This commit is contained in:
parent
d7f4163956
commit
b042b2447a
2 changed files with 180 additions and 75 deletions
|
@ -34,7 +34,7 @@ import copy
|
|||
import functools
|
||||
import inspect
|
||||
import multiprocessing as mp
|
||||
from multiprocessing.managers import BaseManager
|
||||
from multiprocessing.managers import BaseManager, RemoteError
|
||||
import numpy as np
|
||||
import os
|
||||
import pickle
|
||||
|
@ -73,9 +73,10 @@ else:
|
|||
sys.path.append(os.path.dirname(__file__))
|
||||
import progress
|
||||
|
||||
myQueue = mp.Queue
|
||||
|
||||
class FatalConnectionResetError(ConnectionError):
|
||||
pass
|
||||
|
||||
|
||||
class JobManager_Client(object):
|
||||
"""
|
||||
|
@ -211,7 +212,7 @@ class JobManager_Client(object):
|
|||
# def _get_sync_manager_data(manager):
|
||||
|
||||
@staticmethod
|
||||
def _get_manager_objects(server, port, authkey, identifier, verbose = 0):
|
||||
def _get_manager_objects(server, port, authkey, identifier, verbose=0, reconnect_wait=2, reconnect_tries=3):
|
||||
"""
|
||||
connects to the server and get registered shared objects such as
|
||||
job_q, result_q, fail_q
|
||||
|
@ -228,27 +229,19 @@ class JobManager_Client(object):
|
|||
ServerQueueManager.register('get_const_arg')
|
||||
|
||||
manager = ServerQueueManager(address=(server, port), authkey=authkey)
|
||||
|
||||
try:
|
||||
manager.connect()
|
||||
except:
|
||||
if verbose > 0:
|
||||
print("{}: connecting to {}:{} authkey '{}' FAILED!".format(identifier, server, port, authkey.decode('utf8')))
|
||||
|
||||
err, val, trb = sys.exc_info()
|
||||
print("caught exception {}: {}".format(err.__name__, val))
|
||||
|
||||
if err == ConnectionRefusedError:
|
||||
print("make sure the server is up!")
|
||||
|
||||
if verbose > 1:
|
||||
traceback.print_exception(err, val, trb)
|
||||
return None
|
||||
else:
|
||||
if verbose > 1:
|
||||
print("{}: connecting to {}:{} authkey '{}' SUCCEEDED!".format(identifier, server, port, authkey.decode('utf8')))
|
||||
|
||||
|
||||
try:
|
||||
call_connect(connect = manager.connect,
|
||||
dest = address_authkey_from_manager(manager),
|
||||
verbose = verbose,
|
||||
identifier = identifier,
|
||||
reconnect_wait = reconnect_wait,
|
||||
reconnect_tries = reconnect_tries)
|
||||
except:
|
||||
print("{}: FAILED to connect to {}".format(identifier, address_authkey_from_manager(manager)))
|
||||
return None
|
||||
|
||||
|
||||
job_q = manager.get_job_q()
|
||||
if verbose > 1:
|
||||
print("{}: found job_q with {} jobs".format(identifier, job_q.qsize()))
|
||||
|
@ -257,7 +250,6 @@ class JobManager_Client(object):
|
|||
fail_q = manager.get_fail_q()
|
||||
# deep copy const_arg from manager -> non shared object in local memory
|
||||
const_arg = copy.deepcopy(manager.get_const_arg())
|
||||
|
||||
|
||||
return job_q, result_q, fail_q, const_arg, manager
|
||||
|
||||
|
@ -338,7 +330,30 @@ class JobManager_Client(object):
|
|||
print("{}: found standard keyword arguments: [c, m]".format(identifier))
|
||||
_func = func
|
||||
|
||||
|
||||
job_q_get = proxy_operation_decorator(proxy = job_q,
|
||||
operation = 'get',
|
||||
verbose = verbose,
|
||||
identifier = identifier,
|
||||
reconnect_wait = 2,
|
||||
reconnect_tries = 3)
|
||||
job_q_put = proxy_operation_decorator(proxy = job_q,
|
||||
operation = 'put',
|
||||
verbose = verbose,
|
||||
identifier = identifier,
|
||||
reconnect_wait = 2,
|
||||
reconnect_tries = 3)
|
||||
result_q_put = proxy_operation_decorator(proxy = result_q,
|
||||
operation = 'put',
|
||||
verbose = verbose,
|
||||
identifier = identifier,
|
||||
reconnect_wait = 2,
|
||||
reconnect_tries = 3)
|
||||
fail_q_put = proxy_operation_decorator(proxy = fail_q,
|
||||
operation = 'put',
|
||||
verbose = verbose,
|
||||
identifier = identifier,
|
||||
reconnect_wait = 2,
|
||||
reconnect_tries = 3)
|
||||
|
||||
# supposed to catch SystemExit, which will shut the client down quietly
|
||||
try:
|
||||
|
@ -354,7 +369,7 @@ class JobManager_Client(object):
|
|||
# try to get an item from the job_q
|
||||
try:
|
||||
tg_0 = time.time()
|
||||
arg = job_q.get(block = True, timeout = 0.1)
|
||||
arg = job_q_get(block = True, timeout = 0.1)
|
||||
tg_1 = time.time()
|
||||
time_queue += (tg_1-tg_0)
|
||||
|
||||
|
@ -409,7 +424,7 @@ class JobManager_Client(object):
|
|||
print("{}: try to send send failed arg to fail_q ...".format(identifier), end='')
|
||||
sys.stdout.flush()
|
||||
try:
|
||||
fail_q.put((arg, err.__name__, hostname), timeout=10)
|
||||
fail_q_put((arg, err.__name__, hostname), timeout=10)
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
if verbose > 1:
|
||||
|
@ -428,46 +443,19 @@ class JobManager_Client(object):
|
|||
# processing the retrieved arguments succeeded
|
||||
# - try to send the result back to the server
|
||||
else:
|
||||
success = False
|
||||
wait = 1
|
||||
max_try = 10
|
||||
i_try = 0
|
||||
err_msg = ""
|
||||
while not success:
|
||||
try:
|
||||
if i_try > max_try:
|
||||
# this will be caught by the general Exception handler below
|
||||
raise FatalConnectionResetError("tried {} time to _connect the result_q to the server\n".format(max_try)+
|
||||
"but always caught 'ConnectionResetError' with message\n"+
|
||||
err_msg)
|
||||
|
||||
tp_0 = time.time()
|
||||
result_q.put((arg, res))
|
||||
tp_1 = time.time()
|
||||
time_queue += (tp_1-tp_0)
|
||||
success = True
|
||||
# dont know why 'ConnectionResetError: [Errno 104] Connection reset by peer'
|
||||
# occurred, but this might be a work around
|
||||
# try to reconnect and put again, try couple times
|
||||
# wait some time in between
|
||||
except ConnectionResetError as e:
|
||||
if self.vebose > 0:
|
||||
print("{}: ConnectionResetError ({}) wait {}s and try to reconnect".format(identifier,
|
||||
e.args[0],
|
||||
wait))
|
||||
err_msg += "{}\n".format(e.args)
|
||||
time.sleep(wait)
|
||||
result_q._connect()
|
||||
wait *= 2
|
||||
i_try += 1
|
||||
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
raise e
|
||||
try:
|
||||
tp_0 = time.time()
|
||||
result_q_put((arg, res))
|
||||
tp_1 = time.time()
|
||||
time_queue += (tp_1-tp_0)
|
||||
|
||||
except Exception as e:
|
||||
JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier)
|
||||
break
|
||||
# handle SystemExit in outer try ... except
|
||||
except SystemExit as e:
|
||||
raise e
|
||||
|
||||
except Exception as e:
|
||||
JobManager_Client._handle_unexpected_queue_error(e, verbose, identifier)
|
||||
break
|
||||
|
||||
cnt += 1
|
||||
reset_pbc()
|
||||
|
@ -516,10 +504,10 @@ class JobManager_Client(object):
|
|||
retruns when all subprocesses have terminated
|
||||
"""
|
||||
|
||||
print("{}: starting client with connection to server:{} authkey:{} port:{}".format(self._identifier, self.server, self.authkey.decode(), self.port))
|
||||
|
||||
if not self.connected:
|
||||
raise ConnectionError("Can not start Client with no connection to server (shared objetcs are not available)")
|
||||
|
||||
print("{}: starting client with connection to server:{} authkey:{} port:{}".format(self._identifier, self.server, self.authkey.decode(), self.port))
|
||||
|
||||
if self.verbose > 1:
|
||||
print("{}: start {} processes to work on the remote queue".format(self._identifier, self.nproc))
|
||||
|
@ -1167,6 +1155,8 @@ class JobManager_Local(JobManager_Server):
|
|||
timeout=2,
|
||||
verbose=self.verbose_client)
|
||||
|
||||
class RemoteKeyError(RemoteError):
|
||||
pass
|
||||
|
||||
class Signal_handler_for_Jobmanager_client(object):
|
||||
def __init__(self, client_object, exit_handler, signals=[signal.SIGINT], verbose=0):
|
||||
|
@ -1284,6 +1274,63 @@ class hashableCopyOfNumpyArray(np.ndarray):
|
|||
return np.all(np.equal(self, other))
|
||||
|
||||
|
||||
def address_authkey_from_proxy(proxy):
|
||||
return list(proxy._address_to_local.keys())[0], proxy._authkey.decode()
|
||||
|
||||
def address_authkey_from_manager(manager):
|
||||
return manager.address, manager._authkey.decode()
|
||||
|
||||
def call_connect(connect, dest, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3):
|
||||
identifier = mod_id(identifier)
|
||||
c = 0
|
||||
while True:
|
||||
|
||||
try: # here we try re establish the connection
|
||||
if verbose > 1:
|
||||
print("{}try connecting to {}".format(identifier, dest))
|
||||
connect()
|
||||
|
||||
except ConnectionResetError as e: # ... when the destination hangs up on us
|
||||
if verbose > 0: # during connect this might be due to firewall settings
|
||||
# or other TPC connections controlling mechanisms
|
||||
traceback.print_exc(limit=1)
|
||||
print("{}connection to {} could not be established due to 'ConnectionResetError'".format(identifier, dest))
|
||||
if verbose > 1:
|
||||
print("during connect this Error might be due to firewall settings\n"+
|
||||
"or other TPC connections controlling mechanisms!")
|
||||
c += 1
|
||||
if c > reconnect_tries:
|
||||
raise ConnectionError("{}connection to {} FAILED, ".format(identifier, address_authkey_from_proxy(queue_proxy))+
|
||||
"{} retries were NOT successfull".format(reconnect_tries))
|
||||
if verbose > 0:
|
||||
traceback.print_exc(limit=1)
|
||||
print("{}try connecting to {} again in {} seconds".format(identifier, dest, reconnect_wait))
|
||||
time.sleep(reconnect_wait)
|
||||
|
||||
except ConnectionRefusedError as e: # ... when the destination refuses our connection
|
||||
# - no matching Manager object instanciated at destination
|
||||
# (eg. wrong port, not running at all)
|
||||
if verbose > 0:
|
||||
print("{}connection to {} could NOT be established due to 'ConnectionRefusedError'".format(identifier, dest))
|
||||
if verbose > 1:
|
||||
print("this usually means that no matching Manager object was instanciated at destination side!")
|
||||
raise e
|
||||
|
||||
except mp.AuthenticationError as e: # ... when the destination refuses our connection due
|
||||
# authkey missmatch
|
||||
if verbose > 0:
|
||||
print("{}connection to {} could NOT be established due to 'AuthenticationError'".format(identifier, dest))
|
||||
if verbose > 1:
|
||||
print("authkey specified does not match the authkey at destination side!")
|
||||
raise e
|
||||
|
||||
else: # no exception
|
||||
if verbose > 1:
|
||||
print("{}connection to {} successfully stablished".format(identifier, dest))
|
||||
return True # SUCCESS -> return True
|
||||
|
||||
|
||||
|
||||
def copyQueueToList(q):
|
||||
res_list = []
|
||||
res_q = myQueue()
|
||||
|
@ -1325,6 +1372,69 @@ def getDateForFileName(includePID = False):
|
|||
name += "_{}".format(os.getpid())
|
||||
return name
|
||||
|
||||
def mod_id(identifier):
|
||||
if identifier != '':
|
||||
identifier = identifier.strip()
|
||||
if identifier[-1] != ':':
|
||||
identifier += ':'
|
||||
identifier += ' '
|
||||
|
||||
return identifier
|
||||
|
||||
def proxy_operation_decorator(proxy, operation, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3):
|
||||
identifier = mod_id(identifier)
|
||||
o = getattr(proxy, operation)
|
||||
dest = address_authkey_from_proxy(proxy)
|
||||
|
||||
def _operation(*args, **kwargs):
|
||||
while True:
|
||||
try: # here we try to put new data to the queue
|
||||
if verbose > 1:
|
||||
print("{}execute operation '{}' -> {}".format(identifier, operation, dest))
|
||||
res = o(*args, **kwargs)
|
||||
|
||||
except ConnectionResetError as e: # ... when the destination hangs up on us
|
||||
if verbose > 0:
|
||||
traceback.print_exc(limit=1)
|
||||
print("{}try to reconnect".format(identifier))
|
||||
call_connect(proxy._connect, dest, verbose, identifier, reconnect_wait, reconnect_tries)
|
||||
|
||||
except BrokenPipeError as e:
|
||||
if verbose > 0:
|
||||
print("{}operation '{}' -> {} FAILED due to 'BrokenPipeError'".format(identifier, operation, dest))
|
||||
if verbose > 1:
|
||||
print("this usually means that an established was closed, does not exists anymore")
|
||||
print("the server probably went down")
|
||||
raise e
|
||||
|
||||
except EOFError as e:
|
||||
if verbose > 0:
|
||||
print("{}operation '{}' -> {} FAILED due to 'EOFError'".format(identifier, operation, dest))
|
||||
if verbose > 1:
|
||||
print("This usually means that server did not replay, although the connection is still there.")
|
||||
print("This is due to the fact that the connection is in 'timewait' status for about 60s")
|
||||
print("after the server went down. After that time the connection will not exist anymore.")
|
||||
raise e
|
||||
except RemoteError as e:
|
||||
if verbose > 0:
|
||||
print("{}operation '{}' -> {} FAILED due to 'RemoteError'".format(identifier, operation, dest))
|
||||
|
||||
if 'KeyError' in e.args[0]:
|
||||
if verbose > 1:
|
||||
print("'KeyError' detected in RemoteError message!")
|
||||
print("This hints to the fact that actual instace of the shared object on the server")
|
||||
print("side has changed, for example due to a server restart")
|
||||
print("you need to reinstanciate the proxy object")
|
||||
raise RemoteKeyError(e.args[0])
|
||||
else:
|
||||
raise e
|
||||
|
||||
else: # SUCCESS -> return True
|
||||
if verbose > 1:
|
||||
print("{}operation '{}' successfully executed".format(identifier, operation))
|
||||
return res
|
||||
|
||||
return _operation
|
||||
|
||||
def setup_SIG_handler_manager():
|
||||
"""
|
||||
|
@ -1358,11 +1468,6 @@ def try_pickle(obj, show_exception=False):
|
|||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
myQueue = mp.Queue
|
||||
|
||||
# a list of all names of the implemented python signals
|
||||
all_signals = [s for s in dir(signal) if (s.startswith('SIG') and s[3] != '_')]
|
||||
|
||||
|
|
|
@ -704,7 +704,7 @@ if __name__ == "__main__":
|
|||
#
|
||||
# test_jobmanager_basic,
|
||||
# test_jobmanager_server_signals,
|
||||
test_shutdown_server_while_client_running,
|
||||
# test_shutdown_server_while_client_running,
|
||||
# test_shutdown_client,
|
||||
# test_check_fail,
|
||||
# test_jobmanager_read_old_stat,
|
||||
|
@ -714,7 +714,7 @@ if __name__ == "__main__":
|
|||
# test_jobmanager_local,
|
||||
# test_start_server_on_used_port,
|
||||
# test_shared_const_arg,
|
||||
# test_digest_rejected,
|
||||
test_digest_rejected,
|
||||
|
||||
lambda : print("END")
|
||||
]
|
||||
|
|
Loading…
Add table
Reference in a new issue