allow for negative reconnect_tries and ping_retry to try to reconnect in a non-stopping manner

This commit is contained in:
Richard Hartmann 2019-01-03 10:27:12 +01:00
parent 935c783b93
commit e321ce0cae

View file

@ -519,7 +519,7 @@ class JobManager_Client(object):
tp_0 = time.time() tp_0 = time.time()
with sig_delay([signal.SIGTERM]): with sig_delay([signal.SIGTERM]):
local_result_q.put((arg, res)) local_result_q.put((arg, res))
log.warning('put result to local result_q, done!') log.debug('put result to local result_q, done!')
tp_1 = time.time() tp_1 = time.time()
time_queue += (tp_1-tp_0) time_queue += (tp_1-tp_0)
@ -1163,6 +1163,9 @@ class ArgsContainer(object):
raise ContainerClosedError raise ContainerClosedError
item_hash = hashlib.sha256(bf.dump(item)).hexdigest() item_hash = hashlib.sha256(bf.dump(item)).hexdigest()
# print("ADD arg with hash", item_hash)
# print(item)
# print()
if item_hash in self.data: if item_hash in self.data:
item_id = self.data[item_hash] item_id = self.data[item_hash]
if (item_id in self._not_gotten_ids) or (item_id in self._marked_ids): if (item_id in self._not_gotten_ids) or (item_id in self._marked_ids):
@ -1197,11 +1200,19 @@ class ArgsContainer(object):
str_id = '_' + str(get_idx) str_id = '_' + str(get_idx)
item = self.data[str_id] item = self.data[str_id]
item_hash = hashlib.sha256(bf.dump(item)).hexdigest()
# print("GET item with hash", item_hash)
# print(item)
# print()
return item return item
def mark(self, item): def mark(self, item):
with self._lock: with self._lock:
item_hash = hashlib.sha256(bf.dump(item)).hexdigest() item_hash = hashlib.sha256(bf.dump(item)).hexdigest()
# print("MARK item with hash", item_hash)
# print(item)
# print()
item_id = self.data[item_hash] item_id = self.data[item_hash]
#print("mark", item_id, self._not_gotten_ids, self._marked_ids) #print("mark", item_id, self._not_gotten_ids, self._marked_ids)
if item_id in self._not_gotten_ids: if item_id in self._not_gotten_ids:
@ -1724,7 +1735,9 @@ class JobManager_Server(object):
arg, result = self.result_q.get(timeout=self.msg_interval) arg, result = self.result_q.get(timeout=self.msg_interval)
except queue.Empty: except queue.Empty:
continue continue
# print("got arg", arg)
self.job_q.mark(arg) self.job_q.mark(arg)
# print("has been marked!")
log.debug("received {}".format(arg)) log.debug("received {}".format(arg))
self.process_new_result(arg, result) self.process_new_result(arg, result)
if not self.keep_new_result_in_memory: if not self.keep_new_result_in_memory:
@ -2111,7 +2124,9 @@ def emergency_dump(arg, res, emergency_dump_path, host, port, authkey):
def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5): def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
output = '' output = ''
for i in range(retry):
i = 0
while True:
try: try:
cmd = 'ping -c 1 -W {} {} '.format(int(timeout), adr) cmd = 'ping -c 1 -W {} {} '.format(int(timeout), adr)
log.debug("[%s/%s]call: %s", i+1, retry, cmd) log.debug("[%s/%s]call: %s", i+1, retry, cmd)
@ -2125,6 +2140,10 @@ def check_if_host_is_reachable_unix_ping(adr, timeout=2, retry=5):
# no exception, ping was successful, return without error # no exception, ping was successful, return without error
log.debug("ping was succesfull") log.debug("ping was succesfull")
return return
i += 1
if i >= retry:
break
# no early return happend, ping was never successful, raise error # no early return happend, ping was never successful, raise error
log.error("ping failed after %s retries", retry) log.error("ping failed after %s retries", retry)