Merge branch 'master' of https://github.com/cimatosa/jobmanager into develop_pool

This commit is contained in:
Travis CI 2015-08-25 10:37:23 +02:00
commit aee4fbf86c
18 changed files with 476 additions and 221 deletions

53
.travis.yml Normal file
View file

@ -0,0 +1,53 @@
language: python
env:
global:
- GH_REF: github.com/cimatosa/jobmanager.git
- secure: Q3YVqRTs3QLspdzHgYMMuOoMypn7yetkb7v0LFe/68AwxZAwEkpCF24FHL/cEH9khc99AGbDIST81undY6kJLWLEDHYIWaoKUbC8OmaT7H6MUdfLtznsv1NnOLut9Z3So/mTMGQHCmTga9NKsuFpDMxg/oxp3wS915ar6+PLh3w=
notifications:
email: false
python:
- '3.4'
- '2.7'
matrix:
allow_failures:
- python: '2.7'
before_install:
# required for testing
- sudo apt-get update -qq -y
- sudo apt-get install openssl
- wget https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh
- chmod +x miniconda.sh
- "./miniconda.sh -b"
- export PATH=/home/travis/miniconda/bin:$PATH
- conda update --yes conda
## The following lines are necessary for a test that needs Python 2 and 3
# Install python2 and python3 anaconda (required for testing)
# python2
- conda create -y -n my_py2.7 python=2 pip numpy scipy matplotlib
- source activate my_py2.7
- pip install sqlitedict psutil
- source deactivate
# python3
- conda create -y -n my_py3.4 python=3 pip numpy scipy matplotlib
- source activate my_py3.4
- pip install sqlitedict psutil
- source deactivate
# add the two environments to the path
- export PATH=/home/travis/miniconda/envs/my_py3.4/bin:$PATH
- export PATH=/home/travis/miniconda/envs/my_py2.7/bin:$PATH
# activate the correct environment
- if [[ $TRAVIS_PYTHON_VERSION == 2.7 ]]; then source activate my_py2.7; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then source activate my_py3.4; fi
- which python
- which python2.7
- which python3.4
install:
- pip freeze
script:
- travis_wait python setup.py test
after_success:
- git config credential.helper "store --file=.git/credentials"
- echo "https://${GH_TOKEN}:@github.com" > .git/credentials
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then travis_retry conda install --yes sphinx; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then pip install numpydoc; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then python doc/deploy_ghpages.py; fi

View file

@ -1,5 +1,8 @@
jobmanager
==========
[![PyPI](http://img.shields.io/pypi/v/jobmanager.svg)](https://pypi.python.org/pypi/jobmanager)
[![Travis](http://img.shields.io/travis/cimatosa/jobmanager.svg?label=tests)](https://travis-ci.org/cimatosa/jobmanager)
Easy distributed computing based on the python class SyncManager for remote communication and python module multiprocessing for local parallelism.
@ -23,3 +26,5 @@ Running an example
Running tests
python setup.py test
[travis test]

View file

@ -6,12 +6,3 @@ Install [numpydoc](https://pypi.python.org/pypi/numpydoc):
To compile the documentation, run
python setup.py build_sphinx
To upload the documentation to gh-pages, run
python setup.py commit_doc
or
python doc/commit_gh-pages.py

View file

@ -1,59 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import os
from os.path import abspath,dirname
import sys
import subprocess as sp
os.chdir(dirname(dirname(abspath(__file__))))
def checkout_master(stashed):
# go back to master
sp.check_output(["git", 'checkout', 'master'])
if stashed:
print("Applying saved stash.")
# get last stash?
sp.check_output(["git", 'stash', 'apply'])
def checkout_ghpages():
# checkout the gh-pages branch
try:
sp.check_output(["git", 'checkout', 'gh-pages'])
except:
print("Automatically stashing current changes.")
sp.check_output(["git", 'stash'])
stashed = True
sp.check_output(["git", 'checkout', 'gh-pages'])
else:
stashed = False
return stashed
stashed = checkout_ghpages()
# copy built files
if os.system("cp -r ./build/sphinx/html/* ./") != 0:
checkout_master(stashed)
sys.exit()
for item in os.listdir("./build/sphinx/html/"):
# Make sure we have added all files from html
if not item.startswith("."):
os.system("git add ./{}".format(item))
# commit changes
if len(sp.check_output(["git", "diff", "HEAD"]).strip()) > 0:
sp.check_output(["git", 'commit', '-a', '-m', '"automated doc upload"'])
# push
try:
sp.check_output(["git", 'push'])
except:
print("Could not push to gh-pages.")
checkout_master(stashed)

View file

@ -139,7 +139,7 @@ exclude_patterns = ['_build']
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
html_theme = 'classic'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the

81
doc/deploy_ghpages.py Normal file
View file

@ -0,0 +1,81 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Publish the documentation on GitHub Pages.
Prerequisites
-------------
1. Create empty gh-pages branch:
git branch gh-pages
git checkout gh-pages
git symbolic-ref HEAD refs/heads/gh-pages
rm .git/index
git clean -fdx
2. Setup sphinx.
python setup.py build_sphinx
should create a build/sphinx/html folder in the repository root.
3. Create GitHub repo token and encrypt it
gem install travis
travis encrypt GH_TOKEN="<token>" --add
4. Add the encrypted token to .travis.yml
env:
global:
- GH_REF: github.com/<your name>/<your repo>.git
- secure: "jdcn3kM/dI0zvVTn0UKgal8Br+745Qc1plaKXHcoKhwcwN+0Q1y5H1BnaF0KV2dWWeExVXMpqQMLOCylUSUmd30+hFqUgd3gFQ+oh9pF/+N72uzjnxHAyVjai5Lh7QnjN0SLCd2/xLYwaUIHjWbWsr5t2vK9UuyphZ6/F+7OHf+u8BErviE9HUunD7u4Q2XRaUF0oHuF8stoWbJgnQZtUZFr+qS1Gc3vF6/KBkMqjnq/DgBV61cWsnVUS1HVak/sGClPRXZMSGyz8d63zDxfA5NDO6AbPVgK02k+QV8KQCyIX7of8rBvBmWkBYGw5RnaeETLIAf6JrCKMiQzlJQZiMyLUvd/WflSIBKJyr5YmUKCjFkwvbKKvCU3WBUxFT2p7trKZip5JWg37OMvOAO8eiatf2FC1klNly1KHADU88QqNoi/0y2R/a+1Csrl8Gr/lXZkW4mMkI2due9epLwccDJtMF8Xc39EqRR46xA7Lx9vy7szYW5lLux3zwx1tH40wV6/dX4ZVFoWp/zfJw7TKdOHuOwjZuOuKp/shfJs94G9YCu7bBtvrGv9qCH2KiSgm1NJviwcsZWsVHaq1nP0LliDE7EM3Q0mnkYzlvfOOhA2G5Ka3rHl1RFj7+WYzO5GaAFWU7piP/kdBwc0Mu+hb6PMoy0oeLt39BDr29bNKMs="
5. Add the deploy command to .travis.yml
after_success:
- git config credential.helper "store --file=.git/credentials"
- echo "https://${GH_TOKEN}:@github.com" > .git/credentials
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then pip install numpydoc sphinx; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3.4 ]]; then python doc/deploy_ghpages.py; fi
"""
from __future__ import print_function
import os
from os.path import dirname, abspath
import subprocess as sp
# go to root of repository
os.chdir(dirname(dirname(abspath(__file__))))
# build sphinx
sp.check_output("python setup.py build_sphinx", shell=True)
# clone into new folder the gh-pages branch
sp.check_output("git config --global user.email 'travis@example.com'", shell=True)
sp.check_output("git config --global user.name 'Travis CI'", shell=True)
sp.check_output("git config --global credential.helper 'store --file=.git/credentials'", shell=True)
sp.check_output("echo 'https://${GH_TOKEN}:@github.com' > .git/credentials", shell=True)
sp.check_output("git clone --depth 1 -b gh-pages https://${GH_TOKEN}@${GH_REF} gh_pages", shell=True)
# copy everything from ./build/sphinx/html to ./gh_pages
#sp.check_output("cp -r ./build/sphinx/html/* ./gh_pages/", shell=True)
sp.check_output("rsync -rt --del --exclude='.git' --exclude='.nojekyll' ./build/sphinx/html/* ./gh_pages/", shell=True)
# commit changes
os.chdir("gh_pages")
sp.check_output("echo 'https://${GH_TOKEN}:@github.com' > .git/credentials", shell=True)
sp.check_output("git add --all ./*", shell=True)
try:
sp.check_output("git commit -a -m 'travis bot build {} [ci skip]'".format(os.getenv("TRAVIS_COMMIT")), shell=True)
except:
pass #nothing to do
else:
sp.check_output("git push --force --quiet origin gh-pages", shell=True)

View file

@ -1072,18 +1072,22 @@ class JobManager_Server(object):
else:
Progress = progress.ProgressSilentDummy
info_line = progress.StringValue(num_of_bytes=80)
with Progress(count = self._numresults,
max_count = self._numjobs,
interval = self.msg_interval,
speed_calc_cycles = self.speed_calc_cycles,
verbose = self.verbose,
sigint = 'ign',
sigterm='ign') as stat:
sigterm = 'ign',
info_line = info_line) as stat:
stat.start()
while (len(self.args_set) - self.fail_q.qsize()) > 0:
try:
info_line.value = "result_q size: {}".format(self.result_q.qsize()).encode('utf-8')
arg, result = self.result_q.get(timeout=1)
self.args_set.remove(arg)
self.numresults = self.numjobs - len(self.args_set)
@ -1260,7 +1264,7 @@ class Signal_to_terminate_process_list(object):
def _handler(self, signal, frame):
if self.verbose > 0:
print(": received sig {} -> terminate all given subprocesses".format(self.identifier, progress.signal_dict[signal]))
print("{}: received sig {} -> terminate all given subprocesses".format(self.identifier, progress.signal_dict[signal]))
for i, p in enumerate(self.process_list):
p.terminate()
progress.check_process_termination(proc = p,
@ -1338,7 +1342,7 @@ def call_connect_python3(connect, dest, verbose=1, identifier='', reconnect_wait
else: # no exception
if verbose > 1:
print("{}connection to {} successfully stablished".format(identifier, dest))
print("{}connection to {} successfully established".format(identifier, dest))
return True
def call_connect_python2(connect, dest, verbose=1, identifier='', reconnect_wait=2, reconnect_tries=3):
@ -1381,7 +1385,7 @@ def call_connect_python2(connect, dest, verbose=1, identifier='', reconnect_wait
else: # no exception
if verbose > 1:
print("{}connection to {} successfully stablished".format(identifier, dest))
print("{}connection to {} successfully established".format(identifier, dest))
return True # SUCCESS -> return True
call_connect = call_connect_python2 if sys.version_info[0] == 2 else call_connect_python3
@ -1441,6 +1445,7 @@ def handler_broken_pipe_error(e, verbose):
def handler_connection_refused(e, identifier, dest, verbose):
if verbose > 1:
print("this usually means that no matching Manager object was instanciated at destination side!")
print("either there is no Manager running at all, or it is listening to another port.")
raise JMConnectionRefusedError(e)
def handler_connection_reset(identifier, dest, c, reconnect_wait, reconnect_tries, verbose):

View file

@ -44,7 +44,10 @@ class PersistentDataStructure(object):
self._name = name
self._path = abspath(path)
if not exists(self._path):
raise RuntimeError("given path does not exists ({} -> {})".format(path, self._path))
print("given path does not exists ({} -> {})".format(path, self._path))
print("create path")
os.makedirs(self._path)
self.verbose = verbose

View file

@ -340,7 +340,8 @@ class Progress(Loop):
verbose = 0,
sigint = 'stop',
sigterm = 'stop',
name='progress'):
name = 'progress',
info_line = None):
"""
count [mp.Value] - shared memory to hold the current state, (list or single value)
@ -382,7 +383,7 @@ class Progress(Loop):
except TypeError:
raise TypeError("'max_count' must be iterable")
else:
assert isinstance(max_count, mp.sharedctypes.Synchronized), "'max_count' must be if the type multiprocessing.sharedctypes.Synchronized"
assert isinstance(max_count, mp.sharedctypes.Synchronized), "'max_count' must be of the type multiprocessing.sharedctypes.Synchronized"
max_count = [max_count]
else:
max_count = [None] * self.len
@ -429,6 +430,8 @@ class Progress(Loop):
self.show_on_exit = False
self.add_args = {}
self.info_line = info_line
# setup loop class with func
super(Progress, self).__init__(func=Progress.show_stat_wrapper_multi,
args=(self.count,
@ -444,7 +447,8 @@ class Progress(Loop):
self.__class__.show_stat,
self.len,
self.add_args,
self.lock),
self.lock,
self.info_line),
interval=interval,
verbose=verbose,
sigint=sigint,
@ -551,7 +555,9 @@ class Progress(Loop):
self.__class__.show_stat,
self.len,
self.add_args,
self.lock)
self.lock,
self.info_line,
no_move_up=True)
def reset(self, i = None):
"""
@ -629,7 +635,9 @@ class Progress(Loop):
show_stat_function,
len_,
add_args,
lock):
lock,
info_line,
no_move_up=False):
"""
call the static method show_stat_wrapper for each process
"""
@ -650,7 +658,22 @@ class Progress(Loop):
add_args,
i,
lock[i])
print(ESC_MOVE_LINE_UP(len_) + ESC_NO_CHAR_ATTR, end='')
n = len_
if info_line is not None:
s = info_line.value.decode('utf-8')
s = s.split('\n')
n += len(s)
for si in s:
if width == 'auto':
width = get_terminal_width()
if len(si) > width:
si = si[:width]
print("{0:<{1}}".format(si, width))
if no_move_up:
n = 0
print(ESC_MOVE_LINE_UP(n) + ESC_NO_CHAR_ATTR, end='')
sys.stdout.flush()
def start(self):
@ -683,7 +706,7 @@ class Progress(Loop):
if self.show_on_exit:
self._show_stat()
print('\n'*(self.len-1))
print()
self.show_on_exit = False
@ -701,7 +724,8 @@ class ProgressBar(Progress):
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar'):
name='progress_bar',
info_line=None):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
@ -715,7 +739,8 @@ class ProgressBar(Progress):
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name)
name=name,
info_line=info_line)
self._PRE_PREPEND = ESC_NO_CHAR_ATTR + ESC_RED
self._POST_PREPEND = ESC_BOLD + ESC_GREEN
@ -782,7 +807,8 @@ class ProgressBarCounter(Progress):
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar_counter'):
name='progress_bar_counter',
info_line=None):
super(ProgressBarCounter, self).__init__(count=count,
max_count=max_count,
@ -793,7 +819,8 @@ class ProgressBarCounter(Progress):
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name)
name=name,
info_line=info_line)
self.counter_count = []
self.counter_q = []
@ -889,7 +916,8 @@ class ProgressBarFancy(Progress):
verbose=0,
sigint='stop',
sigterm='stop',
name='progress_bar'):
name='progress_bar',
info_line=None):
"""
width [int/'auto'] - the number of characters used to show the Progress bar,
use 'auto' to determine width from terminal information -> see _set_width
@ -906,7 +934,8 @@ class ProgressBarFancy(Progress):
verbose = verbose,
sigint=sigint,
sigterm=sigterm,
name=name)
name=name,
info_line=info_line)
@staticmethod
def get_d(s1, s2, width, lp, lps):
@ -1180,6 +1209,9 @@ def FloatValue(val=0.):
def UnsignedIntValue(val=0):
return mp.Value('I', val, lock=True)
def StringValue(num_of_bytes):
return mp.Array('c', bytearray(num_of_bytes), lock=True)
def check_process_termination(proc, identifier, timeout, verbose=0, auto_kill_on_last_resort = False):
proc.join(timeout)

View file

@ -85,7 +85,16 @@ class PersistentData_Server(JobManager_Server):
print("{}: overwriting existing data is DISABLED".format(self._identifier))
def process_new_result(self, arg, result):
self.pds[data_as_binary_key(arg.id)] = (arg, result)
"""
use arg.id as key to store the pair (arg, result) in the data base
return True, if a new data set was added (key not already in pds)
otherwise false
"""
key = data_as_binary_key(arg.id)
has_key = key in self.pds
self.pds[key] = (arg, result)
return not has_key
def put_arg(self, a):
a_bin = data_as_binary_key(a.id)

View file

@ -70,7 +70,7 @@ if __name__ == "__main__":
description=description,
long_description=longdescription,
install_requires=["sqlitedict>=1.2.0", "NumPy>=1.5.1"],
tests_require=["psutil"],
tests_require=["psutil", "scipy"],
keywords=["multiprocessing", "queue", "parallel", "distributed", "computing",
"progress", "manager", "job", "persistent data", "scheduler"],
classifiers= [

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import os
import sys
import time
import multiprocessing as mp
import numpy as np
import traceback
from os.path import abspath, dirname, split
# Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import jobmanager, progress
jobmanager.Signal_to_sys_exit(verbose=2)
class MyManager_Server(jobmanager.BaseManager):
pass
port = int(sys.argv[1])
authkey = sys.argv[2]
q = jobmanager.myQueue()
MyManager_Server.register('get_q', callable = lambda: q)
m = MyManager_Server(address = ('', port), authkey = bytearray(authkey, encoding='utf8'))
m.start()
print("MyManager_Server started (manager process at {})".format(m._process.pid))
sys.stdout.flush()
try:
while True:
time.sleep(10)
print("MyManager_Server is running, time:", time.time())
sys.stdout.flush()
finally:
print("MyManager_Server stopping")
sys.stdout.flush()

View file

@ -126,7 +126,7 @@ def test_distributed_mathieu():
with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL,
authkey = authkey,
port = 42525,
port = 42520,
const_arg = const_arg,
nproc=1,
verbose_client=2,

View file

@ -62,8 +62,9 @@ def my_func_ProgressBarOverrideCount(c = None, m = None):
c.value = i
@decorators.ProgressBar
def test_decorated_func_calls_decorated_func(
def testing_decorated_func_calls_decorated_func(
c = decorators.progress.UnsignedIntValue(val=0),
m = decorators.progress.UnsignedIntValue(val=1),
):
@ -82,6 +83,9 @@ def test_decorated_func_calls_decorated_func(
# anything - which makes the output pretty.
_my_func_1(arg=i, kwarg=0, sleep=0.005)
def test_decorated_func_calls_decorated_func():
testing_decorated_func_calls_decorated_func()
def test_decorator():
c = progress.UnsignedIntValue(val=0)
m = progress.UnsignedIntValue(val=100)

View file

@ -9,6 +9,7 @@ import signal
import multiprocessing as mp
import numpy as np
import traceback
import socket
import subprocess
import signal
@ -18,11 +19,14 @@ sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import jobmanager, progress
PORT = 42525
AUTHKEY = 'testing'
PORT = 42525
SERVER = socket.gethostname()
def test_Signal_to_SIG_IGN():
global PORT
PORT += 1
def f():
jobmanager.Signal_to_SIG_IGN()
print("before sleep")
@ -56,6 +60,8 @@ def test_Signal_to_SIG_IGN():
print("[+] terminated")
def test_Signal_to_sys_exit():
global PORT
PORT += 1
def f():
jobmanager.Signal_to_sys_exit()
while True:
@ -91,6 +97,8 @@ def test_Signal_to_sys_exit():
print("[+] terminated")
def test_Signal_to_terminate_process_list():
global PORT
PORT += 1
def child_proc():
jobmanager.Signal_to_sys_exit()
try:
@ -106,7 +114,7 @@ def test_Signal_to_terminate_process_list():
p.append(mp.Process(target=child_proc))
p[-1].start()
jobmanager.Signal_to_terminate_process_list(p)
jobmanager.Signal_to_terminate_process_list(identifier='mother_proc', process_list=p, identifier_list=["proc_{}".format(i+1) for i in range(n)], verbose=2)
print("spawned {} processes".format(n))
for i in range(n):
p[i].join()
@ -114,7 +122,8 @@ def test_Signal_to_terminate_process_list():
p_mother = mp.Process(target=mother_proc)
p_mother.start()
time.sleep(0.5)
time.sleep(1.5)
assert p_mother.is_alive()
print("send SIGINT")
os.kill(p_mother.pid, signal.SIGINT)
@ -136,7 +145,7 @@ def start_server(n, read_old_state=False, verbose=1):
def start_client(verbose=1):
print("START CLIENT")
jm_client = jobmanager.JobManager_Client(server = 'localhost',
jm_client = jobmanager.JobManager_Client(server = SERVER,
authkey = AUTHKEY,
port = PORT,
nproc = 0,
@ -151,6 +160,8 @@ def test_jobmanager_basic():
check if all arguments are found in final_result of dump
"""
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,))
p_server.start()
@ -183,6 +194,8 @@ def test_jobmanager_basic():
def test_jobmanager_server_signals():
global PORT
PORT += 1
print("## TEST SIGTERM ##")
p_server = mp.Process(target=start_server, args=(30,))
p_server.start()
@ -206,7 +219,7 @@ def test_jobmanager_server_signals():
assert len(ref_set - args_set) == 0
print("[+] args_set from dump contains all arguments")
PORT += 1
print("## TEST SIGINT ##")
p_server = mp.Process(target=start_server, args=(30,))
p_server.start()
@ -243,6 +256,8 @@ def test_shutdown_server_while_client_running():
check if the final_result and the args dump end up to include
all arguments given
"""
global PORT
PORT += 1
n = 1000
@ -251,6 +266,7 @@ def test_shutdown_server_while_client_running():
time.sleep(1)
PORT += 1
p_client = mp.Process(target=start_client, args=(2,))
p_client.start()
@ -313,7 +329,8 @@ def shutdown_client(sig):
if server does not terminate on time, something must be wrong with args_set
check if the final_result contain all arguments given
"""
global PORT
PORT += 1
n = 300
print("## terminate client with {} ##".format(progress.signal_dict[sig]))
@ -366,6 +383,8 @@ def shutdown_client(sig):
print("[+] all arguments found in final_results")
def test_check_fail():
global PORT
PORT += 1
class Client_Random_Error(jobmanager.JobManager_Client):
def func(self, args, const_args, c, m):
c.value = 0
@ -385,7 +404,7 @@ def test_check_fail():
time.sleep(1)
print("START CLIENT")
jm_client = Client_Random_Error(server='localhost',
jm_client = Client_Random_Error(server=SERVER,
authkey=AUTHKEY,
port=PORT,
nproc=0,
@ -438,6 +457,8 @@ def test_jobmanager_read_old_stat():
check if all arguments are found in final_result of dump
"""
global PORT
PORT += 1
n = 100
p_server = mp.Process(target=start_server, args=(n,))
p_server.start()
@ -459,7 +480,7 @@ def test_jobmanager_read_old_stat():
print("[+] client and server terminated")
time.sleep(2)
PORT += 1
p_server = mp.Process(target=start_server, args=(n,True))
p_server.start()
@ -546,6 +567,8 @@ def test_hashedViewOnNumpyArray():
assert bh2 in s
def test_client_status():
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,False,0))
p_server.start()
@ -561,7 +584,7 @@ def test_client_status():
return os.getpid()
client = Client_With_Status(server = 'localhost',
client = Client_With_Status(server = SERVER,
authkey = AUTHKEY,
port = PORT,
nproc = 4,
@ -570,6 +593,8 @@ def test_client_status():
p_server.join()
def test_jobmanager_local():
global PORT
PORT += 1
args = range(1,200)
with jobmanager.JobManager_Local(client_class = jobmanager.JobManager_Client,
authkey = AUTHKEY,
@ -581,6 +606,8 @@ def test_jobmanager_local():
jm_server.start()
def test_start_server_on_used_port():
global PORT
PORT += 1
def start_server():
const_arg = None
arg = [10,20,30]
@ -623,6 +650,8 @@ def test_start_server_on_used_port():
assert not other_error
def test_shared_const_arg():
global PORT
PORT += 1
def start_server():
const_arg = {1:1, 2:2, 3:3}
arg = [10,20,30]
@ -643,7 +672,7 @@ def test_shared_const_arg():
print(os.getpid(), arg, const_arg)
return None
client = myClient(server='localhost',
client = myClient(server=SERVER,
authkey=AUTHKEY,
port = PORT,
nproc=1,
@ -651,6 +680,7 @@ def test_shared_const_arg():
client.start()
PORT += 1
p1 = mp.Process(target=start_server)
p2 = mp.Process(target=start_client)
@ -666,6 +696,8 @@ def test_shared_const_arg():
p1.join()
def test_digest_rejected():
global PORT
PORT += 1
n = 10
p_server = mp.Process(target=start_server, args=(n,False,0))
p_server.start()
@ -681,7 +713,7 @@ def test_digest_rejected():
return os.getpid()
client = Client_With_Status(server = 'localhost',
client = Client_With_Status(server = SERVER,
authkey = AUTHKEY+' not the same',
port = PORT,
nproc = 4,
@ -695,27 +727,44 @@ def test_digest_rejected():
p_server.join()
def test_exception():
global PORT
class MyManager_Client(jobmanager.BaseManager):
pass
def autoproxy_server(which_python, port, authkey, outfile):
libpath = os.path.dirname(os.__file__)
python_env = os.environ.copy()
envpath = "{LIB}:{LIB}/site-packages".format(LIB=libpath)
envpath += ":{LIB}/lib-old".format(LIB=libpath)
envpath += ":{LIB}/lib-tk".format(LIB=libpath)
envpath += ":{LIB}/lib-dynload".format(LIB=libpath)
envpath += ":{LIB}/plat-linux2".format(LIB=libpath)
# env will be
# "/usr/lib/python2.7" for python 2
# "/usr/lib/python3.4" for python 3
if which_python == 2:
python_interpreter = "python2.7"
python_env = {"PYTHONPATH": "/usr/lib/python2.7"}
envpath = envpath.replace("3.4", "2.7")
elif which_python == 3:
python_interpreter = "python3.4"
python_env = {"PYTHONPATH": "/usr/lib/python3.4"}
envpath = envpath.replace("2.7", "3.4")
else:
raise ValueError("'which_python' must be 2 or 3")
python_env["PYTHONPATH"] = envpath
path = dirname(abspath(__file__))
cmd = [python_interpreter,
"{}/start_autoproxy_server.py".format(path),
str(port),
authkey]
print("+"*40)
print("start an autoproxy server with command")
print(cmd)
print("and environment")
print(python_env)
print("+"*40)
return subprocess.Popen(cmd, env=python_env, stdout=outfile, stderr=subprocess.STDOUT)
def autoproxy_connect(server, port, authkey):
@ -724,57 +773,77 @@ def test_exception():
m = MyManager_Client(address = (server, port),
authkey = bytearray(authkey, encoding='utf8'))
jobmanager.call_connect(m.connect, dest = jobmanager.address_authkey_from_manager(m), verbose=1)
jobmanager.call_connect(m.connect, dest = jobmanager.address_authkey_from_manager(m), verbose=2)
return m
for p_version_server in [2, 3]:
port = np.random.randint(20000, 30000)
PORT += 2 # plus two because we also check for wrong port
port = PORT
authkey = 'q'
with open("ap_server.out", 'w') as outfile:
p_server = autoproxy_server(p_version_server, port, authkey, outfile)
print("autoproxy server running with PID {}".format(p_server.pid))
time.sleep(1)
print("running tests ...")
try:
print("running tests with python {} ...".format(sys.version_info[0]))
print()
if sys.version_info[0] == 3:
print("we are using python 3 ... try to connect ...")
try:
autoproxy_connect(server=SERVER, port=port, authkey=authkey)
except jobmanager.RemoteValueError as e:
if p_version_server == 2:
print("that is ok, because the server is running on python2") # the occurrence of this Exception is normal
print()
else:
print("RemoteValueError error")
raise # reraise exception
except Exception as e:
print("unexpected error {}".format(e))
raise
elif sys.version_info[0] == 2:
print("we are using python 2 ... try to connect ...")
try:
autoproxy_connect(server='localhost', port=port, authkey=authkey)
except jobmanager.RemoteValueError:
if (sys.version_info[0] == 3) and (p_version_server == 2):
print("that is ok") # the occurrence of this Exception is normal
pass
else:
raise # reraise exception
except ValueError:
if (sys.version_info[0] == 2) and (p_version_server == 3):
print("that is ok") # the occurrence of this Exception is normal
pass
autoproxy_connect(server=SERVER, port=port, authkey=authkey)
except ValueError as e:
if p_version_server == 3:
print("that is ok, because the server is running on python3") # the occurrence of this Exception is normal
print()
else:
print("JMConnectionRefusedError error")
raise # reraise exception
except Exception as e:
print("unexpected error {}".format(e))
raise
# all the following only for the same python versions
if (sys.version_info[0] != p_version_server):
continue
try:
autoproxy_connect(server='localhost', port=port+1, authkey=authkey)
print("try to connect to server, use wrong port")
autoproxy_connect(server=SERVER, port=port+1, authkey=authkey)
except jobmanager.JMConnectionRefusedError:
print("that is ok")
print()
except:
raise
try:
autoproxy_connect(server='localhost', port=port, authkey=authkey+'_')
print("try to connect to server, use wrong authkey")
autoproxy_connect(server=SERVER, port=port, authkey=authkey+'_')
except jobmanager.AuthenticationError:
print("that is ok")
print()
except:
raise
m = autoproxy_connect(server='localhost', port=port, authkey=authkey)
m = autoproxy_connect(server=SERVER, port=port, authkey=authkey)
print("try pass some data forth and back ...")
q = m.get_q()
@ -787,9 +856,6 @@ def test_exception():
assert s1 == s2
finally:
print()
print("tests done! terminate server ...".format())
@ -811,6 +877,12 @@ def test_exception():
print("server terminated with exitcode {}".format(r))
with open("ap_server.out", 'r') as outfile:
print("+"*40)
print("this is the server output:")
for l in outfile:
print(" {}".format(l[:-1]))
print("+"*40)
@ -819,13 +891,13 @@ if __name__ == "__main__":
pass
else:
func = [
# test_Signal_to_SIG_IGN,
# test_Signal_to_sys_exit,
# test_Signal_to_terminate_process_list,
test_Signal_to_SIG_IGN,
test_Signal_to_sys_exit,
test_Signal_to_terminate_process_list,
#
# test_jobmanager_basic,
# test_jobmanager_server_signals,
# test_shutdown_server_while_client_running,
test_jobmanager_basic,
test_jobmanager_server_signals,
test_shutdown_server_while_client_running,
# test_shutdown_client,
# test_check_fail,
# test_jobmanager_read_old_stat,
@ -836,7 +908,7 @@ if __name__ == "__main__":
# test_start_server_on_used_port,
# test_shared_const_arg,
# test_digest_rejected,
test_exception,
# test_exception,
lambda : print("END")
]

View file

@ -732,6 +732,21 @@ def test_progress_bar_counter_fancy_hide_bar():
if (time.time() - t0) > 15:
break
def test_info_line():
c1 = progress.UnsignedIntValue(val=0)
s = progress.StringValue(80)
m1 = progress.UnsignedIntValue(val=30)
with progress.ProgressBarFancy(count=c1, max_count=m1, verbose=1, interval=0.2, info_line=s) as sc:
sc.start()
while True:
c1.value = c1.value + 1
if c1.value > 10:
s.value = b'info_line\nline2'
time.sleep(0.1)
if c1.value >= m1.value:
break
if __name__ == "__main__":
func = [
# test_loop_basic,
@ -752,12 +767,13 @@ if __name__ == "__main__":
# test_progress_bar_counter_hide_bar,
# test_progress_bar_slow_change,
# test_progress_bar_start_stop,
test_progress_bar_fancy,
test_progress_bar_multi_fancy,
test_progress_bar_fancy_small,
test_progress_bar_counter_fancy,
test_progress_bar_counter_fancy_non_max,
test_progress_bar_counter_fancy_hide_bar,
# test_progress_bar_fancy,
# test_progress_bar_multi_fancy,
# test_progress_bar_fancy_small,
# test_progress_bar_counter_fancy,
# test_progress_bar_counter_fancy_non_max,
# test_progress_bar_counter_fancy_hide_bar,
test_info_line,
lambda: print("END")
]