mirror of
https://github.com/vale981/jobmanager
synced 2025-03-04 17:31:39 -05:00
Merge branch 'dev' of https://github.com/cimatosa/jobmanager into dev
This commit is contained in:
commit
4dcb650be9
1 changed files with 80 additions and 43 deletions
|
@ -85,14 +85,14 @@ myQueue = queue.Queue
|
|||
AuthenticationError = mp.AuthenticationError
|
||||
|
||||
def humanize_size(size_in_bytes):
|
||||
thr = 99
|
||||
thr = 999
|
||||
scales = [1024, 1024, 1024]
|
||||
units = ['k', 'M', 'G', 'T']
|
||||
i = 0
|
||||
while (size_in_bytes > thr) and (i < len(scales)):
|
||||
size_in_bytes = size_in_bytes / scales[i]
|
||||
i += 1
|
||||
return "{:.2f}{}B".format(size_in_bytes, units[i])
|
||||
return "{:.4g}{}B".format(size_in_bytes, units[i])
|
||||
|
||||
def get_user():
|
||||
out = subprocess.check_output('id -un', shell=True).decode().strip()
|
||||
|
@ -543,7 +543,8 @@ class JobManager_Client(object):
|
|||
try:
|
||||
tp_0 = time.time()
|
||||
with sig_delay([signal.SIGTERM]):
|
||||
local_result_q.put((arg, res))
|
||||
bin_data = pickle.dumps((arg, res))
|
||||
local_result_q.put(bin_data)
|
||||
log.debug('put result to local result_q, done!')
|
||||
tp_1 = time.time()
|
||||
time_queue += (tp_1-tp_0)
|
||||
|
@ -565,6 +566,7 @@ class JobManager_Client(object):
|
|||
reset_pbc()
|
||||
log.debug("continue with next arg")
|
||||
|
||||
|
||||
# considered as normal exit caused by some user interaction, SIGINT, SIGTERM
|
||||
# note SIGINT, SIGTERM -> SystemExit is achieved by overwriting the
|
||||
# default signal handlers
|
||||
|
@ -587,7 +589,7 @@ class JobManager_Client(object):
|
|||
handle_unexpected_queue_error(e)
|
||||
else:
|
||||
log.debug("putting arg back to local job_q was successful")
|
||||
|
||||
finally:
|
||||
try:
|
||||
sta = progress.humanize_time(time_calc / cnt)
|
||||
except:
|
||||
|
@ -600,7 +602,7 @@ class JobManager_Client(object):
|
|||
pass
|
||||
|
||||
log.info(stat)
|
||||
|
||||
print("client {}:{}\n".format(i, stat))
|
||||
log.debug("JobManager_Client.__worker_func at end (PID %s)", os.getpid())
|
||||
|
||||
def start(self):
|
||||
|
@ -663,6 +665,8 @@ class JobManager_Client(object):
|
|||
local_fail_q = mp.Queue()
|
||||
|
||||
infoline = progress.StringValue(num_of_bytes=64)
|
||||
bytes_send = mp.Value('L', 0) # 4 byte unsigned int
|
||||
time_result_q_put = mp.Value('d', 0) # 8 byte float (double)
|
||||
|
||||
|
||||
|
||||
|
@ -680,9 +684,13 @@ class JobManager_Client(object):
|
|||
job_q_put_pending_lock = threading.Lock()
|
||||
fail_q_put_pending_lock = threading.Lock()
|
||||
|
||||
def update_infoline(infoline, local_result_q):
|
||||
def update_infoline(infoline, local_result_q, bytes_send, time_result_q_put):
|
||||
while True:
|
||||
infoline.value = "local res_q {}".format(local_result_q.qsize()).encode('utf-8')
|
||||
if time_result_q_put.value > 0:
|
||||
speed = humanize_size(bytes_send.value / time_result_q_put.value) + "/s"
|
||||
else:
|
||||
speed = ''
|
||||
infoline.value = "local res_q {} {}".format(local_result_q.qsize(), speed).encode('utf-8')
|
||||
if self.timeout:
|
||||
infoline.value += " timeout in: {}s".format(int(self.timeout - (time.time() - self.init_time))).encode('utf-8')
|
||||
time.sleep(1)
|
||||
|
@ -698,15 +706,22 @@ class JobManager_Client(object):
|
|||
# log.debug("stopped thread thr_job_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
|
||||
|
||||
|
||||
def pass_result_q_put(result_q_put, local_result_q, result_q_put_pending_lock):
|
||||
def pass_result_q_put(result_q_put, local_result_q, result_q_put_pending_lock, bytes_send, time_result_q_put):
|
||||
log.debug("this is thread thr_result_q_put with tid %s", ctypes.CDLL('libc.so.6').syscall(186))
|
||||
try:
|
||||
while True:
|
||||
data = local_result_q.get()
|
||||
log.debug("result_q client forward...\n{}".format(data))
|
||||
with result_q_put_pending_lock:
|
||||
t0 = time.time()
|
||||
result_q_put(data)
|
||||
log.debug("result_q client forward, done!".format(data))
|
||||
t1 = time.time()
|
||||
with bytes_send.get_lock():
|
||||
bytes_send.value += len(data)
|
||||
with time_result_q_put.get_lock():
|
||||
time_result_q_put.value += (t1 - t0)
|
||||
del data
|
||||
log.debug("result_q client forward, done! ({:.2f}s)".format(t1 - t0))
|
||||
except Exception as e:
|
||||
log.error("thr_result_q_put caught error %s", type(e))
|
||||
log.info(traceback.format_exc())
|
||||
|
@ -723,12 +738,12 @@ class JobManager_Client(object):
|
|||
|
||||
thr_job_q_put = threading.Thread(target=pass_job_q_put , args=(job_q_put , local_job_q, job_q_put_pending_lock))
|
||||
thr_job_q_put.daemon = True
|
||||
thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q, result_q_put_pending_lock))
|
||||
thr_result_q_put = threading.Thread(target=pass_result_q_put, args=(result_q_put, local_result_q, result_q_put_pending_lock, bytes_send, time_result_q_put))
|
||||
thr_result_q_put.daemon = True
|
||||
thr_fail_q_put = threading.Thread(target=pass_fail_q_put , args=(fail_q_put , local_fail_q, fail_q_put_pending_lock))
|
||||
thr_fail_q_put.daemon = True
|
||||
|
||||
thr_update_infoline = threading.Thread(target=update_infoline, args=(infoline, local_result_q))
|
||||
thr_update_infoline = threading.Thread(target=update_infoline, args=(infoline, local_result_q, bytes_send, time_result_q_put))
|
||||
thr_update_infoline.daemon = True
|
||||
|
||||
thr_job_q_put.start()
|
||||
|
@ -797,7 +812,7 @@ class JobManager_Client(object):
|
|||
pid = p.pid,
|
||||
bold = True) for i, p in enumerate(self.procs)],
|
||||
signals = exit_handler_signals,
|
||||
timeout = 2)
|
||||
timeout = 15)
|
||||
|
||||
log.debug("setup Signal_handler_for_Jobmanager_client handler for signals %s", jm_client_special_interrupt_signals)
|
||||
Signal_handler_for_Jobmanager_client(client_object = self,
|
||||
|
@ -1449,8 +1464,8 @@ class JobManager_Server(object):
|
|||
fname = None
|
||||
|
||||
self.job_q = ArgsContainer(fname)
|
||||
self.result_q = ClosableQueue(name='result_q')
|
||||
self.fail_q = ClosableQueue(name='fail_q')
|
||||
self.result_q = mp.Queue() # ClosableQueue(name='result_q')
|
||||
self.fail_q = mp.Queue() # ClosableQueue(name='fail_q')
|
||||
|
||||
self.stat = None
|
||||
|
||||
|
@ -1480,13 +1495,13 @@ class JobManager_Server(object):
|
|||
JobManager_Manager.register('get_const_arg', callable=lambda: self.const_arg)
|
||||
|
||||
|
||||
rc = self.result_q.client()
|
||||
fc = self.fail_q.client()
|
||||
JobManager_Manager.register('get_result_q', callable=lambda: rc, exposed=['get', 'put', 'qsize'])
|
||||
JobManager_Manager.register('get_fail_q', callable=lambda: fc, exposed=['get', 'put', 'qsize'])
|
||||
#rc = self.result_q.client()
|
||||
#fc = self.fail_q.client()
|
||||
#JobManager_Manager.register('get_result_q', callable=lambda: rc, exposed=['get', 'put', 'qsize'])
|
||||
#JobManager_Manager.register('get_fail_q', callable=lambda: fc, exposed=['get', 'put', 'qsize'])
|
||||
|
||||
# JobManager_Manager.register('get_result_q', callable=lambda: self.result_q, exposed=['get', 'put', 'qsize'])
|
||||
# JobManager_Manager.register('get_fail_q', callable=lambda: self.fail_q, exposed=['get', 'put', 'qsize'])
|
||||
JobManager_Manager.register('get_result_q', callable=lambda: self.result_q, exposed=['get', 'put', 'qsize'])
|
||||
JobManager_Manager.register('get_fail_q', callable=lambda: self.fail_q, exposed=['get', 'put', 'qsize'])
|
||||
|
||||
|
||||
|
||||
|
@ -1548,8 +1563,8 @@ class JobManager_Server(object):
|
|||
with open(self.status_file, 'w') as f:
|
||||
f.write('stop')
|
||||
self.job_q.close()
|
||||
self.result_q.close()
|
||||
self.fail_q.close()
|
||||
#self.result_q.close()
|
||||
#self.fail_q.close()
|
||||
log.debug("queues closed!")
|
||||
|
||||
self._stop_manager()
|
||||
|
@ -1765,6 +1780,12 @@ class JobManager_Server(object):
|
|||
log.debug("at start: number of jobs: {}".format(numjobs.value))
|
||||
log.debug("at start: number of results: {}".format(numresults.value))
|
||||
|
||||
speed_q = myQueue()
|
||||
curr_time = time.time()
|
||||
bytes_recieved = 0
|
||||
for i in range(15):
|
||||
speed_q.put((bytes_recieved, curr_time))
|
||||
|
||||
|
||||
with progress.ProgressBarFancy(count = numresults,
|
||||
max_count = numjobs,
|
||||
|
@ -1776,8 +1797,8 @@ class JobManager_Server(object):
|
|||
if not self.hide_progress:
|
||||
self.stat.start()
|
||||
|
||||
bytes_recieved = self.result_q.get_bytes_recieved()
|
||||
curr_time = time.time()
|
||||
|
||||
|
||||
data_speed = 0
|
||||
while numresults.value < numjobs.value:
|
||||
numjobs.value = self.job_q.put_items()
|
||||
|
@ -1786,11 +1807,10 @@ class JobManager_Server(object):
|
|||
markeditems = self.job_q.marked_items()
|
||||
numresults.value = failqsize + markeditems
|
||||
if (time.time() - curr_time) > self.msg_interval:
|
||||
old_time = curr_time
|
||||
old_bytes = bytes_recieved
|
||||
old_bytes, old_time = speed_q.get()
|
||||
|
||||
bytes_recieved = self.result_q.get_bytes_recieved()
|
||||
curr_time = time.time()
|
||||
speed_q.put((bytes_recieved, curr_time))
|
||||
data_speed = humanize_size((bytes_recieved - old_bytes) / (curr_time - old_time))
|
||||
|
||||
if (self.timeout is not None):
|
||||
|
@ -1818,7 +1838,10 @@ class JobManager_Server(object):
|
|||
log.info("infoline {}".format(info_line.value))
|
||||
# allows for update of the info line
|
||||
try:
|
||||
arg, result = self.result_q.get(timeout=self.msg_interval)
|
||||
bin_data = self.result_q.get(timeout=self.msg_interval)
|
||||
arg, result = pickle.loads(bin_data)
|
||||
bytes_recieved += len(bin_data)
|
||||
del bin_data
|
||||
except queue.Empty:
|
||||
continue
|
||||
# print("got arg", arg)
|
||||
|
@ -1961,7 +1984,7 @@ class Signal_handler_for_Jobmanager_client(object):
|
|||
elif r == 'k':
|
||||
for p in self.exit_handler.process_list:
|
||||
print("send SIGKILL to", p)
|
||||
os.kill(p, signal.SIGKILL)
|
||||
os.kill(p.pid, signal.SIGKILL)
|
||||
else:
|
||||
print("input '{}' ignored".format(r))
|
||||
|
||||
|
@ -2208,15 +2231,27 @@ class proxy_operation_decorator(object):
|
|||
self.ping_retry = ping_retry
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
try:
|
||||
res = self.o(*args, **kwargs)
|
||||
except Exception as e:
|
||||
log.warning("operation '%s' -> %s FAILED due to '%s' -> retry", self.operation, self.dest, type(e))
|
||||
else:
|
||||
log.debug("operation '{}' successfully executed")
|
||||
return res
|
||||
|
||||
t0 = time.time()
|
||||
c = 0
|
||||
reconnect_wait = self.reconnect_wait
|
||||
while True:
|
||||
reconnect_wait *= 1.2
|
||||
t1 = time.time()
|
||||
check_if_host_is_reachable_unix_ping(adr = self.dest[0][0],
|
||||
timeout = self.ping_timeout,
|
||||
retry = self.ping_retry)
|
||||
log.debug("ping time: {:.2f}s".format(time.time() - t1))
|
||||
log.debug("establish connection to %s", self.dest)
|
||||
try:
|
||||
t1 = time.time()
|
||||
self.proxy._connect()
|
||||
except Exception as e:
|
||||
log.warning("establishing connection to %s FAILED due to '%s'", self.dest, type(e))
|
||||
|
@ -2228,10 +2263,11 @@ class proxy_operation_decorator(object):
|
|||
log.info("wait %s seconds and retry", reconnect_wait)
|
||||
time.sleep(reconnect_wait)
|
||||
continue
|
||||
|
||||
log.debug("self.proxy._connect() time: {:.2f}s".format(time.time() - t1))
|
||||
log.debug("execute operation '%s' -> %s", self.operation, self.dest)
|
||||
|
||||
try:
|
||||
t1 = time.time()
|
||||
res = self.o(*args, **kwargs)
|
||||
except queue.Empty as e:
|
||||
log.info("operation '%s' -> %s FAILED due to '%s'", self.operation, self.dest, type(e))
|
||||
|
@ -2258,7 +2294,8 @@ class proxy_operation_decorator(object):
|
|||
else:
|
||||
handler_unexpected_error(e)
|
||||
else: # SUCCESS -> return True
|
||||
log.debug("operation '%s' successfully executed", self.operation)
|
||||
log.debug("self.o(*args, **kwargs) time: {:.2f}s".format(time.time() - t1))
|
||||
log.debug("operation '{}' successfully executed (overall time {:.2f}s)".format(self.operation, time.time() - t0))
|
||||
return res
|
||||
|
||||
log.debug("close connection to %s", self.dest)
|
||||
|
|
Loading…
Add table
Reference in a new issue