some intermed state

This commit is contained in:
cimatosa 2015-01-22 15:27:13 +01:00
parent 4a747ac9e6
commit 7f3411b4ea
5 changed files with 195 additions and 69 deletions

View file

@ -750,7 +750,7 @@ class JobManager_Server(object):
try: try:
arg, result = self.result_q.get(timeout=1) arg, result = self.result_q.get(timeout=1)
self.args_set.remove(arg) self.args_set.remove(arg)
self.numresults = self.numjobs - (len(self.args_set) - self.fail_q.qsize()) self.numresults = self.numjobs - len(self.args_set)
self.process_new_result(arg, result) self.process_new_result(arg, result)
except queue.Empty: except queue.Empty:
pass pass
@ -849,10 +849,21 @@ class JobManager_Client(object):
self.procs = [] self.procs = []
self.manager_objects = self.get_manager_objects() self.manager_objects = None # will be set via connect()
self.connect() # get shared objects from server
self.pbc = None self.pbc = None
def connect(self):
if self.manager_objects is None:
self.manager_objects = self.get_manager_objects()
else:
if self.verbose > 0:
print("{}: already connected (at least shared object are available)".format(self._identifier))
@property
def connected(self):
return self.manager_objects is not None
def get_manager_objects(self): def get_manager_objects(self):
return JobManager_Client._get_manager_objects(self.server, return JobManager_Client._get_manager_objects(self.server,
@ -951,13 +962,6 @@ class JobManager_Client(object):
Signal_to_sys_exit(signals=[signal.SIGTERM]) Signal_to_sys_exit(signals=[signal.SIGTERM])
Signal_to_SIG_IGN(signals=[signal.SIGINT]) Signal_to_SIG_IGN(signals=[signal.SIGINT])
if manager_objects is None:
manager_objects = JobManager_Client._get_manager_objects(server, port, authkey, identifier, verbose)
if manager_objects == None:
if verbose > 1:
print("{}: no shared object recieved, terminate!".format(identifier))
sys.exit(1)
job_q, result_q, fail_q, const_arg = manager_objects job_q, result_q, fail_q, const_arg = manager_objects
n = os.nice(0) n = os.nice(0)
@ -1145,6 +1149,10 @@ class JobManager_Client(object):
retruns when all subprocesses have terminated retruns when all subprocesses have terminated
""" """
if not self.connected:
raise ConnectionError("Can not start Client with no connection to server (shared objetcs are not available)")
if self.verbose > 1: if self.verbose > 1:
print("{}: start {} processes to work on the remote queue".format(self._identifier, self.nproc)) print("{}: start {} processes to work on the remote queue".format(self._identifier, self.nproc))
@ -1234,6 +1242,7 @@ class JobManager_Local(JobManager_Server):
speed_calc_cycles=speed_calc_cycles) speed_calc_cycles=speed_calc_cycles)
self.client_class = client_class self.client_class = client_class
self.port = port
self.nproc = nproc self.nproc = nproc
self.delay = delay self.delay = delay
self.verbose_client=verbose_client self.verbose_client=verbose_client
@ -1243,6 +1252,7 @@ class JobManager_Local(JobManager_Server):
@staticmethod @staticmethod
def _start_client(authkey, def _start_client(authkey,
port,
client_class, client_class,
nproc=0, nproc=0,
nice=19, nice=19,
@ -1256,6 +1266,7 @@ class JobManager_Local(JobManager_Server):
time.sleep(delay) time.sleep(delay)
client = client_class(server='localhost', client = client_class(server='localhost',
authkey=authkey, authkey=authkey,
port=port,
nproc=nproc, nproc=nproc,
nice=nice, nice=nice,
verbose=verbose, verbose=verbose,
@ -1268,6 +1279,7 @@ class JobManager_Local(JobManager_Server):
def start(self): def start(self):
p_client = mp.Process(target=JobManager_Local._start_client, p_client = mp.Process(target=JobManager_Local._start_client,
args=(self.authkey, args=(self.authkey,
self.port,
self.client_class, self.client_class,
self.nproc, self.nproc,
self.niceness_clients, self.niceness_clients,

View file

@ -1,9 +1,22 @@
import sqlitedict as sqd import sqlitedict as sqd
from os.path import abspath, join, exists from os.path import abspath, join, exists
import os import os
import sys
import shutil import shutil
import traceback import traceback
if sys.version_info[0] == 2:
# fixes keyword problems with python 2.x
os_remove = os.remove
def new_remove(path):
os_remove(path)
os.remove = new_remove
os_rmdir = os.rmdir
def new_rmdir(path):
os_rmdir(path)
os.rmdir = new_rmdir
MAGIC_SIGN = 0xff4a87 MAGIC_SIGN = 0xff4a87
KEY_COUNTER = '0' KEY_COUNTER = '0'

View file

@ -4,7 +4,83 @@ cd $(dirname $0)
OUTFILE='pytest.html' OUTFILE='pytest.html'
PYTHON="python"
PYTHON2_7="python2.7"
PYTHON3_4="python3.4"
PYLIST=( $PYTHON )
CLEAN="yes"
while getopts ":p:ahn" opt; do
case $opt in
a)
echo "run all!" >&2
PYLIST=( $PYTHON2_7 $PYTHON3_4 )
;;
p)
if [ "$OPTARG" = "2.7" ]; then
echo "run with python2.7"
PYLIST=( $PYTHON2_7 )
elif [ "$OPTARG" = "3.4" ]; then
echo "run with python3.4"
PYLIST=( $PYTHON3_4 )
else
echo "arguent $OPTARG for -p not unterstood"
exit 1
fi
;;
n)
CLEAN="no"
;;
h)
echo "run PYTEST included in runtests.py"
echo "and write the output in html format to '$OUTFILE'"
echo "to choose the python version use the following switches"
echo ""
echo " -p VER"
echo " VER = 2.7 -> uses $PYTHON2_7"
echo " VER = 3.4 -> uses $PYTHON3_4"
echo ""
echo " -a"
echo " runs both, $PYTHON2_7 and $PYTHON3_4"
echo ""
echo "if neither -p nor -a are given, the default interpreter '$PYTHON' is used"
echo ""
echo " -n"
echo " no cleanup after all"
echo ""
exit 0
;;
\?)
echo "Invalid option: -$OPTARG" >&2
exit 1
;;
:)
echo "option $OPTARG requires an argument"
exit 1
;;
esac
done
echo "Working directory: $(pwd)" echo "Working directory: $(pwd)"
echo "Running py.test ..." rm -f -v $OUTFILE
(date; python runtests.py --color=yes) | aha --black --title "pytest output for jobmanager module"> $OUTFILE touch $OUTFILE
echo "Done! (output written to $OUTFILE)" for py in ${PYLIST[@]}; do
$py --version
echo "Running py.test ..."
(echo ""; date; $py runtests.py --color=yes) | tee -a $OUTFILE
echo "Done!"
done
cat $OUTFILE | aha --black --title "pytest output for jobmanager module" > $OUTFILE
if [ "$CLEAN" = "yes" ]; then
rm -f *.trb
rm -f *.dump
rm -f *.db
fi
echo "ALL DONE! (output written to $OUTFILE)"

View file

@ -116,6 +116,7 @@ def test_distributed_mathieu():
with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL, with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL,
authkey = authkey, authkey = authkey,
port = 42525,
const_arg = const_arg, const_arg = const_arg,
nproc=1, nproc=1,
verbose_client=2, verbose_client=2,

View file

@ -16,6 +16,9 @@ sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import jobmanager, progress from jobmanager import jobmanager, progress
PORT = 42525
AUTHKEY = 'testing'
def test_Signal_to_SIG_IGN(): def test_Signal_to_SIG_IGN():
def f(): def f():
@ -118,11 +121,11 @@ def test_Signal_to_terminate_process_list():
def start_server(n, read_old_state=False, verbose=1): def start_server(n, read_old_state=False, verbose=1):
print("START SERVER") print("START SERVER")
args = range(1,n) args = range(1,n)
authkey = 'testing' with jobmanager.JobManager_Server(authkey = AUTHKEY,
with jobmanager.JobManager_Server(authkey=authkey, port = PORT,
verbose=verbose, verbose = verbose,
msg_interval=1, msg_interval = 1,
fname_dump='jobmanager.dump') as jm_server: fname_dump = 'jobmanager.dump') as jm_server:
if not read_old_state: if not read_old_state:
jm_server.args_from_list(args) jm_server.args_from_list(args)
else: else:
@ -131,10 +134,11 @@ def start_server(n, read_old_state=False, verbose=1):
def start_client(verbose=1): def start_client(verbose=1):
print("START CLIENT") print("START CLIENT")
jm_client = jobmanager.JobManager_Client(server='localhost', jm_client = jobmanager.JobManager_Client(server = 'localhost',
authkey='testing', authkey = AUTHKEY,
port=42524, port = PORT,
nproc=0, verbose=verbose) nproc = 0,
verbose = verbose)
jm_client.start() jm_client.start()
def test_jobmanager_basic(): def test_jobmanager_basic():
@ -367,16 +371,21 @@ def test_check_fail():
print("START CLIENT") print("START CLIENT")
jm_client = Client_Random_Error(server='localhost', jm_client = Client_Random_Error(server='localhost',
authkey='testing', authkey=AUTHKEY,
port=42524, port=PORT,
nproc=0, nproc=0,
verbose=verbose) verbose=verbose)
p_client = mp.Process(target=jm_client.start) p_client = mp.Process(target=jm_client.start)
p_client.start() p_client.start()
assert p_server.is_alive() try:
assert p_client.is_alive() assert p_server.is_alive()
assert p_client.is_alive()
except:
p_client.terminate()
p_server.terminate()
raise
print("[+] server and client running") print("[+] server and client running")
@ -537,20 +546,20 @@ def test_client_status():
return os.getpid() return os.getpid()
client = Client_With_Status(server='localhost', client = Client_With_Status(server = 'localhost',
authkey='testing', authkey = AUTHKEY,
port=42524, port = PORT,
nproc=4, nproc = 4,
verbose=1) verbose = 1)
client.start() client.start()
p_server.join() p_server.join()
def test_jobmanager_local(): def test_jobmanager_local():
args = range(1,200) args = range(1,200)
authkey = 'testing'
with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client, with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client,
authkey=authkey, authkey = AUTHKEY,
verbose=1, port = PORT,
verbose = 1,
verbose_client=0, verbose_client=0,
) as jm_server: ) as jm_server:
jm_server.args_from_list(args) jm_server.args_from_list(args)
@ -560,7 +569,8 @@ def test_start_server_on_used_port():
def start_server(): def start_server():
const_arg = None const_arg = None
arg = [10,20,30] arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg', with jobmanager.JobManager_Server(authkey = AUTHKEY,
port = PORT,
const_arg=const_arg, const_arg=const_arg,
fname_dump=None) as server: fname_dump=None) as server:
server.args_from_list(arg) server.args_from_list(arg)
@ -569,7 +579,8 @@ def test_start_server_on_used_port():
def start_server2(): def start_server2():
const_arg = None const_arg = None
arg = [10,20,30] arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg', with jobmanager.JobManager_Server(authkey=AUTHKEY,
port = PORT,
const_arg=const_arg, const_arg=const_arg,
fname_dump=None) as server: fname_dump=None) as server:
server.args_from_list(arg) server.args_from_list(arg)
@ -600,7 +611,8 @@ def test_shared_const_arg():
def start_server(): def start_server():
const_arg = {1:1, 2:2, 3:3} const_arg = {1:1, 2:2, 3:3}
arg = [10,20,30] arg = [10,20,30]
with jobmanager.JobManager_Server(authkey='test_shared_const_arg', with jobmanager.JobManager_Server(authkey=AUTHKEY,
port = PORT,
const_arg=const_arg, const_arg=const_arg,
fname_dump=None) as server: fname_dump=None) as server:
server.args_from_list(arg) server.args_from_list(arg)
@ -617,7 +629,8 @@ def test_shared_const_arg():
return None return None
client = myClient(server='localhost', client = myClient(server='localhost',
authkey='test_shared_const_arg', authkey=AUTHKEY,
port = PORT,
nproc=1, nproc=1,
verbose=2) verbose=2)
@ -637,47 +650,58 @@ def test_shared_const_arg():
time.sleep(1) time.sleep(1)
p1.join() p1.join()
def test_digest_rejected():
n = 10
p_server = mp.Process(target=start_server, args=(n,False,0))
p_server.start()
def _test_interrupt_server(): time.sleep(1)
start_server(n = 100)
def _test_interrupt_client(): class Client_With_Status(jobmanager.JobManager_Client):
def func(self, args, const_args, c, m):
class DoNothing_Client(jobmanager.JobManager_Client): m.value = 100
@staticmethod for i in range(m.value):
def func(arg, const_arg): c.value = i+1
while True: time.sleep(0.05)
time.sleep(10)
c = DoNothing_Client(server='localhost', authkey = 'testing', verbose=2, show_statusbar_for_jobs=True)
c.start()
return os.getpid()
client = Client_With_Status(server = 'localhost',
authkey = AUTHKEY+' not the same',
port = PORT,
nproc = 4,
verbose = 2)
try:
client.start()
except ConnectionError as e:
print("Not an error: caught '{}' with message '{}'".format(e.__class__.__name__, e))
p_server.terminate()
p_server.join()
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) > 1: if len(sys.argv) > 1:
if sys.argv[1] == 'server': pass
start_server(n = 100)
else: else:
func = [ func = [
# test_Signal_to_SIG_IGN, # test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit, # test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list, # test_Signal_to_terminate_process_list,
# #
# test_jobmanager_basic, # test_jobmanager_basic,
# test_jobmanager_server_signals, # test_jobmanager_server_signals,
# test_shutdown_server_while_client_running, # test_shutdown_server_while_client_running,
# test_shutdown_client, # test_shutdown_client,
# test_check_fail, # test_check_fail,
test_jobmanager_read_old_stat, # test_jobmanager_read_old_stat,
# test_hashDict, # test_hashDict,
# test_hashedViewOnNumpyArray, # test_hashedViewOnNumpyArray,
# test_client_status, # test_client_status,
# test_jobmanager_local, # test_jobmanager_local,
# test_start_server_on_used_port, # test_start_server_on_used_port,
# test_shared_const_arg, # test_shared_const_arg,
test_digest_rejected,
lambda : print("END") lambda : print("END")
] ]