mirror of
https://github.com/vale981/jobmanager
synced 2025-03-04 17:31:39 -05:00
remove python2 support, out proxy decorator in class style
This commit is contained in:
parent
a84ca661cd
commit
1ca9cb2174
1 changed files with 36 additions and 139 deletions
|
@ -28,8 +28,6 @@ following tasks:
|
|||
The class JobManager_Client
|
||||
|
||||
"""
|
||||
from __future__ import division, print_function
|
||||
|
||||
import copy
|
||||
from datetime import datetime
|
||||
import inspect
|
||||
|
@ -41,6 +39,7 @@ import pickle
|
|||
import signal
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
|
@ -1942,7 +1941,7 @@ def address_authkey_from_proxy(proxy):
|
|||
def address_authkey_from_manager(manager):
|
||||
return manager.address, manager._authkey.decode()
|
||||
|
||||
def call_connect_python3(connect, dest, reconnect_wait=2, reconnect_tries=3):
|
||||
def call_connect(connect, dest, reconnect_wait=2, reconnect_tries=3):
|
||||
c = 0
|
||||
while True:
|
||||
try: # here we try re establish the connection
|
||||
|
@ -1975,46 +1974,6 @@ def call_connect_python3(connect, dest, reconnect_wait=2, reconnect_tries=3):
|
|||
log.debug("connection to %s successfully established".format(dest))
|
||||
return True
|
||||
|
||||
def call_connect_python2(connect, dest, reconnect_wait=2, reconnect_tries=3):
|
||||
c = 0
|
||||
while True:
|
||||
try: # here we try re establish the connection
|
||||
log.debug("try connecting to %s", dest)
|
||||
connect()
|
||||
|
||||
except Exception as e:
|
||||
log.error("connection to %s could not be established due to '%s'", dest, type(e))
|
||||
log.info(traceback.format_stack()[-3].strip())
|
||||
|
||||
if type(e) is socket.error: # error in socket communication
|
||||
log.error("caught %s with args %s", type(e), e.args)
|
||||
err_code = e.args[0]
|
||||
if err_code == errno.ECONNRESET: # ... when the destination hangs up on us
|
||||
c = handler_connection_reset(dest, c, reconnect_wait, reconnect_tries)
|
||||
elif err_code == errno.ECONNREFUSED: # ... when the destination refuses our connection
|
||||
handler_connection_refused(e, dest)
|
||||
else:
|
||||
handler_unexpected_error(e)
|
||||
elif type(e) is AuthenticationError : # ... when the destination refuses our connection due authkey missmatch
|
||||
handler_authentication_error(e, dest)
|
||||
elif type(e) is RemoteError: # ... when the destination send us an error message
|
||||
if 'KeyError' in e.args[0]:
|
||||
handler_remote_key_error(e, dest)
|
||||
elif 'ValueError: unsupported pickle protocol:' in e.args[0]:
|
||||
handler_remote_value_error(e, dest)
|
||||
else:
|
||||
handler_remote_error(e, dest)
|
||||
elif type(e) is ValueError:
|
||||
handler_value_error(e)
|
||||
else: # any other exception
|
||||
handler_unexpected_error(e)
|
||||
|
||||
else: # no exception
|
||||
log.debug("connection to %s successfully established", dest)
|
||||
return True # SUCCESS -> return True
|
||||
|
||||
call_connect = call_connect_python2 if sys.version_info[0] == 2 else call_connect_python3
|
||||
|
||||
def copyQueueToList(q):
|
||||
res_list = []
|
||||
res_q = myQueue()
|
||||
|
@ -2150,47 +2109,56 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
|
|||
raise JMHostNotReachableError("could not reach host '{}'\nping error reads: {}".format(adr, output))
|
||||
|
||||
|
||||
def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconnect_tries=3, ping_timeout=2, ping_retry=5):
|
||||
o = getattr(proxy, operation)
|
||||
dest = address_authkey_from_proxy(proxy)
|
||||
|
||||
def _operation(*args, **kwargs):
|
||||
class proxy_operation_decorator(object):
|
||||
def __init__(self, proxy, operation, reconnect_wait=2, reconnect_tries=3, ping_timeout=2, ping_retry=5):
|
||||
self.proxy = proxy
|
||||
self.operation = operation
|
||||
self.o = getattr(proxy, operation)
|
||||
self.dest = address_authkey_from_proxy(proxy)
|
||||
self.reconnect_wait = reconnect_wait
|
||||
self.reconnect_tries = reconnect_tries
|
||||
self.ping_timeout = ping_timeout
|
||||
self.ping_retry = ping_retry
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
c = 0
|
||||
reconnect_wait = self.reconnect_wait
|
||||
while True:
|
||||
check_if_host_is_reachable_unix_ping(adr = dest[0][0],
|
||||
timeout = ping_timeout,
|
||||
retry = ping_retry)
|
||||
log.debug("establish connection to %s", dest)
|
||||
try:
|
||||
proxy._connect()
|
||||
reconnect_wait *= 1.2
|
||||
check_if_host_is_reachable_unix_ping(adr = self.dest[0][0],
|
||||
timeout = self.ping_timeout,
|
||||
retry = self.ping_retry)
|
||||
log.debug("establish connection to %s", self.dest)
|
||||
try:
|
||||
self.proxy._connect()
|
||||
except Exception as e:
|
||||
log.warning("establishing connection to %s FAILED due to '%s'", dest, type(e))
|
||||
log.warning("establishing connection to %s FAILED due to '%s'", self.dest, type(e))
|
||||
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
c += 1
|
||||
if (c > reconnect_tries) and (reconnect_tries > 0):
|
||||
log.error("reached maximum number of reconnect tries %s, raise exception", reconnect_tries)
|
||||
if (c > self.reconnect_tries) and (self.reconnect_tries > 0):
|
||||
log.error("reached maximum number of reconnect tries %s, raise exception", self.reconnect_tries)
|
||||
raise e
|
||||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
time.sleep(reconnect_wait)
|
||||
continue
|
||||
|
||||
log.debug("execute operation '%s' -> %s", operation, dest)
|
||||
log.debug("execute operation '%s' -> %s", self.operation, self.dest)
|
||||
|
||||
try:
|
||||
res = o(*args, **kwargs)
|
||||
res = self.o(*args, **kwargs)
|
||||
except queue.Empty as e:
|
||||
log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
|
||||
log.info("operation '%s' -> %s FAILED due to '%s'", self.operation, self.dest, type(e))
|
||||
raise e
|
||||
except Exception as e:
|
||||
log.warning("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
|
||||
log.warning("operation '%s' -> %s FAILED due to '%s'", self.operation, self.dest, type(e))
|
||||
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
if type(e) is ConnectionResetError:
|
||||
log.debug("show traceback.print_exc(limit=1))")
|
||||
log.debug(traceback.print_exc(limit=1))
|
||||
|
||||
c += 1
|
||||
if (c > reconnect_tries) and (reconnect_tries > 0):
|
||||
log.error("reached maximum number of reconnect tries %s", reconnect_tries)
|
||||
if (c > self.reconnect_tries) and (self.reconnect_tries > 0):
|
||||
log.error("reached maximum number of reconnect tries %s", self.reconnect_tries)
|
||||
raise e
|
||||
|
||||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
|
@ -2203,88 +2171,17 @@ def proxy_operation_decorator_python3(proxy, operation, reconnect_wait=2, reconn
|
|||
else:
|
||||
handler_unexpected_error(e)
|
||||
else: # SUCCESS -> return True
|
||||
log.debug("operation '%s' successfully executed", operation)
|
||||
log.debug("operation '%s' successfully executed", self.operation)
|
||||
return res
|
||||
|
||||
log.debug("close connection to %s", dest)
|
||||
log.debug("close connection to %s", self.dest)
|
||||
try:
|
||||
proxy._tls.connection.close()
|
||||
self.proxy._tls.connection.close()
|
||||
except Exception as e:
|
||||
log.error("closeing connection to %s FAILED due to %s", dest, type(e))
|
||||
log.error("closeing connection to %s FAILED due to %s", self.dest, type(e))
|
||||
log.info("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
|
||||
return _operation
|
||||
|
||||
def proxy_operation_decorator_python2(proxy, operation, reconnect_wait=2, reconnect_tries=3, ping_timeout=2, ping_retry=5):
|
||||
o = getattr(proxy, operation)
|
||||
dest = address_authkey_from_proxy(proxy)
|
||||
|
||||
def _operation(*args, **kwargs):
|
||||
c = 0
|
||||
while True:
|
||||
check_if_host_is_reachable_unix_ping(adr = dest[0][0],
|
||||
timeout = ping_timeout,
|
||||
retry = ping_retry)
|
||||
log.debug("establishing connection to %s ...", dest)
|
||||
try:
|
||||
proxy._connect()
|
||||
except Exception as e:
|
||||
log.warning("establishing connection to %s FAILED due to '%s'", dest, type(e))
|
||||
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
c += 1
|
||||
if c > reconnect_tries:
|
||||
log.error("reached maximum number of reconnect tries %s, raise exception", reconnect_tries)
|
||||
raise e
|
||||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
time.sleep(reconnect_wait)
|
||||
continue
|
||||
|
||||
log.debug("execute operation '%s' -> %s", operation, dest)
|
||||
|
||||
try:
|
||||
res = o(*args, **kwargs)
|
||||
except queue.Empty as e:
|
||||
log.info("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
|
||||
raise e
|
||||
except Exception as e:
|
||||
log.warning("operation '%s' -> %s FAILED due to '%s'", operation, dest, type(e))
|
||||
log.debug("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
|
||||
if type(e) is IOError:
|
||||
log.debug("%s with args %s", type(e), e.args)
|
||||
err_code = e.args[0]
|
||||
if err_code == errno.ECONNRESET: # ... when the destination hangs up on us
|
||||
log.debug("show traceback.print_exc(limit=1))")
|
||||
log.debug(traceback.print_exc(limit=1))
|
||||
|
||||
c += 1
|
||||
if c > reconnect_tries:
|
||||
log.error("reached maximum number of reconnect tries %s", reconnect_tries)
|
||||
raise e
|
||||
|
||||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
time.sleep(reconnect_wait)
|
||||
continue
|
||||
else:
|
||||
handler_unexpected_error(e)
|
||||
elif type(e) is EOFError:
|
||||
handler_eof_error(e)
|
||||
else:
|
||||
handler_unexpected_error(e)
|
||||
else: # SUCCESS -> return True
|
||||
log.debug("operation '%s' successfully executed", operation)
|
||||
return res
|
||||
|
||||
log.debug("close connection to %s", dest)
|
||||
try:
|
||||
proxy._tls.connection.close()
|
||||
except Exception as e:
|
||||
log.error("closeing connection to %s FAILED due to %s", dest, type(e))
|
||||
log.info("show traceback.format_stack()[-3]\n%s", traceback.format_stack()[-3].strip())
|
||||
|
||||
return _operation
|
||||
|
||||
proxy_operation_decorator = proxy_operation_decorator_python2 if sys.version_info[0] == 2 else proxy_operation_decorator_python3
|
||||
|
||||
def setup_SIG_handler_manager():
|
||||
"""
|
||||
|
|
Loading…
Add table
Reference in a new issue