Merge pull request #7 from cimatosa/pypi-setup-files

Pypi setup files
This commit is contained in:
Richard Hartmann 2015-01-06 17:06:05 +01:00
commit 1e6e5ae0fb
23 changed files with 503 additions and 32 deletions

4
MANIFEST.in Normal file
View file

@ -0,0 +1,4 @@
include examples/*.py
include examples/advanced/*.py
include doc/*
include README.md

View file

@ -1,4 +1,16 @@
jobmanager jobmanager
========== ==========
easy distributed computing based on the python class SyncManager for remote communication and python module multiprocessing for local parallelism Easy distributed computing based on the python class SyncManager for remote communication and python module multiprocessing for local parallelism.
### Testing on Linux
After cloning into jobmanager, create a virtual environment
virtualenv --system-site-packages ve_jm
source ve_jm/bin/activate
Install jobmanager into the environment and run a simple example.
python setupy.py install
python examples/simple_example.py

View file

View file

@ -5,10 +5,11 @@ from __future__ import division, print_function
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
import os import os
from os.path import split, dirname, abspath
from scipy.optimize import minimize from scipy.optimize import minimize
import sys import sys
sys.path.append(os.path.abspath("../")) sys.path.append(split(split(dirname(abspath(__file__)))[0])[0])
import jobmanager as jm import jobmanager as jm
from calculations import * from calculations import *

View file

@ -4,13 +4,16 @@ from __future__ import division, print_function
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
from os.path import split, dirname, abspath
import sys import sys
import time import time
sys.path.append('../') sys.path.append(split(dirname(abspath(__file__)))[0])
import jobmanager as jm import jobmanager as jm
class Example_Client(jm.JobManager_Client): class Example_Client(jm.JobManager_Client):
def __init__(self): def __init__(self):
# start quiet client (verbopse=0) # start quiet client (verbopse=0)
@ -29,7 +32,7 @@ class Example_Server(jm.JobManager_Server):
def __init__(self): def __init__(self):
# server show status information (verbose=1) # server show status information (verbose=1)
super(Example_Server, self).__init__(authkey='simple example', super(Example_Server, self).__init__(authkey='simple example',
verbose=2) verbose=1)
self.final_result = 1 self.final_result = 1
@ -44,17 +47,19 @@ class Example_Server(jm.JobManager_Server):
print("final_result:", self.final_result) print("final_result:", self.final_result)
def run_server(): def run_server():
server = Example_Server() server = Example_Server()
for i in range(5000): for i in range(5000):
server.put_arg(np.random.rand()) server.put_arg(np.random.rand())
server.start() server.start()
def run_client(): def run_client():
client = Example_Client() client = Example_Client()
client.start() client.start()
if __name__ == "__main__": if __name__ == "__main__":
p_server = mp.Process(target=run_server) p_server = mp.Process(target=run_server)
p_server.start() p_server.start()

110
examples/wrapper_example.py Normal file
View file

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" How to wrap or decorate a function with a progress bar.
"""
from __future__ import division, print_function
from os.path import split, dirname, abspath
import sys
import time
sys.path.append(split(dirname(abspath(__file__)))[0])
import jobmanager as jm
@jm.decorators.ProgressBar
def decorated_function_alpha(an_argument, c, m):
""" A simple example of a ProgressBar-decorated function.
The arguments `c` and `m` are the counter and maximal counter
variables of the ProgressBar. They are instances of
`multiprocessing.Value`.
"""
m.value=10
c.value=0
for i in range(10):
# this is were things are computed
c.value += 1
time.sleep(.2)
return an_argument
@jm.decorators.ProgressBar
def decorated_function_beta(an_argument, jmc, jmm):
""" A simple example of a ProgressBar-decorated function.
In comparison to `decorated_function_alpha`, we now have the
arguments `jmc` and `jmm`. Jobmanager automatically detects
arguments that are registered in
`jobmanager.jobmanager.validCountKwargs`.
Note that we do not need to set the value of jmc to zero, as the
ProgressBar initiates the variable with zero.
"""
jmm.value=10
for i in range(10):
# this is were things are computed
jmc.value += 1
time.sleep(.2)
return an_argument
@jm.decorators.ProgressBar
def decorated_function_gamma(arg, jmc, jmm, kwarg="2"):
""" A simple example of a ProgressBar-decorated function.
In comparison to `decorated_function_alpha`, we now have the
arguments `jmc` and `jmm`. Jobmanager automatically detects
arguments that are registered in
`jobmanager.jobmanager.validCountKwargs`.
Note that we do not need to set the value of jmc to zero, as the
ProgressBar initiates the variable with zero.
"""
jmm.value=10
for i in range(10):
# this is were things are computed
jmc.value += 1
time.sleep(.2)
return "{} {}".format(arg, kwarg)
def wrapped_function_beta(an_argument, jmc=None, jmm=None):
""" A simple example of a ProgressBar-decorated function.
In comparison to `decorated_function_beta`, the count arguments
became keyword arguments. The function works with and without
the ProgressBar.
"""
if jmm is not None:
jmm.value=10
for i in range(10):
# this is were things are computed
if jmc is not None:
jmc.value += 1
time.sleep(.2)
return an_argument
if __name__ == "__main__":
##d ecorated
retd1 = decorated_function_alpha("argument")
retd2 = decorated_function_beta("argument")
retd3 = decorated_function_gamma("argument", kwarg="test")
## wrapped
# When using the wrapper, you can define arguments for
# `jm.progress.ProgressBar`.
pb = jm.decorators.ProgressBar(wrapped_function_beta,
interval=.05)
retw1 = pb("argument")
# or
retw2 = jm.decorators.ProgressBar(wrapped_function_beta)("arg")
print(retd1, retd2, retd3, sep=" | ")
print(retw1, retw2, sep=" | ")

17
jobmanager/__init__.py Normal file
View file

@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .jm_version import __version__
from .jobmanager import *
from . import clients
from . import decorators
from . import servers
from . import progress
# ode_wrapper requires scipy
try:
from . import ode_wrapper
except ImportError:
warnings.warn("Submodule 'ode_wrapper' is not available."+\
" Reason: {}.".format(sys.exc_info()[1].message))

View file

@ -1,16 +1,19 @@
import os #!/usr/bin/env python
import sys # -*- coding: utf-8 -*-
import traceback
from .jobmanager import JobManager_Client
import ode_wrapper
""" """
The clients module The clients module
This module provides special subclasses of the JobManager_Client This module provides special subclasses of the JobManager_Client
""" """
import os
import sys
import traceback
from .jobmanager import JobManager_Client
from . import ode_wrapper
class Integration_Client_CPLX(JobManager_Client): class Integration_Client_CPLX(JobManager_Client):
""" """
@ -60,7 +63,7 @@ class Integration_Client_CPLX(JobManager_Client):
f(t, x, arg_1, arg_2, ... const_arg_1, const_arg_2, ...) f(t, x, arg_1, arg_2, ... const_arg_1, const_arg_2, ...)
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super(Integration_Client_CPLX, self).__init__(**kwargs)
@staticmethod @staticmethod
def func(arg, const_arg, c, m): def func(arg, const_arg, c, m):
@ -96,6 +99,8 @@ class Integration_Client_CPLX(JobManager_Client):
# t0, t1, N, f, args, x0, integrator, verbose, c, **kwargs # t0, t1, N, f, args, x0, integrator, verbose, c, **kwargs
return ode_wrapper.integrate_cplx(c=c, args=args_dgl, **kwargs) return ode_wrapper.integrate_cplx(c=c, args=args_dgl, **kwargs)
class Integration_Client_REAL(JobManager_Client): class Integration_Client_REAL(JobManager_Client):
""" """
A JobManager_Client subclass to integrate a set of complex real ODE. A JobManager_Client subclass to integrate a set of complex real ODE.
@ -105,7 +110,7 @@ class Integration_Client_REAL(JobManager_Client):
performance issue and 'zvode' is obviously not supported. performance issue and 'zvode' is obviously not supported.
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super(Integration_Client_REAL, self).__init__(**kwargs)
@staticmethod @staticmethod
def func(arg, const_arg, c, m): def func(arg, const_arg, c, m):
@ -146,6 +151,8 @@ class Integration_Client_REAL(JobManager_Client):
# t0, t1, N, f, args, x0, integrator, verbose, c, **kwargs # t0, t1, N, f, args, x0, integrator, verbose, c, **kwargs
return ode_wrapper.integrate_real(c=c, args=args_dgl, **kwargs) return ode_wrapper.integrate_real(c=c, args=args_dgl, **kwargs)
class FunctionCall_Client(JobManager_Client): class FunctionCall_Client(JobManager_Client):
@staticmethod @staticmethod
def func(arg, const_arg, c, m): def func(arg, const_arg, c, m):

196
jobmanager/decorators.py Normal file
View file

@ -0,0 +1,196 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Implements decorators/wrappers for simple use-cases of jobmanager.
"""
from __future__ import division, print_function
import multiprocessing as mp
import time
from . import progress
from .jobmanager import getCountKwargs, validCountKwargs
__all__ = ["ProgressBar"]
class ProgressBar(object):
""" A wrapper/decorator with a text-based progress bar.
Methods:
- __init__
- __call__
Examples
--------
>>> from jobmanager.decorators import ProgressBar
>>> import time
>>>
>>> @ProgressBar
>>> def my_func(arg, kwarg="1", count=None, max_count=None):
>>> maxval = 100
>>> if max_count is not None:
>>> max_count.value = maxval
>>>
>>> for i in range(maxval):
>>> if count is not None:
>>> count.value += 1
>>> time.sleep(0.05)
>>>
>>> return arg+kwarg
>>>
>>> my_func_1("one argument", kwarg=" second argument")
# The progress of my_func is monitored on stdout.
one argument second argument
Notes
-----
You can also use this class as a wrapper and tune parameters of the
progress bar.
>>> wrapper = ProgressBar(my_func, interval=.1)
>>> result = wrapper("wrapped function", kwarg=" test")
"""
def __init__(self, func, *args, **kwargs):
""" Initiates the wrapper objet.
A function can be wrapped by decorating it with
`ProgressBar` or by instantiating `ProgressBar` and
subsequently calling it with the arguments for `func`.
Parameters
----------
func : callable
The method that is wrapped/decorated. It must accept the
two keyword-arguments `count` and `max_count` (or `c` and
`m`). The method `func` increments `count.value` up to
`max_count.value` (`c.value`, `m.value`).
*args : list
Arguments for `jobmanager.ProgressBar`.
**kwargs : dict
Keyword-arguments for `jobmanager.ProgressBar`.
Notes
-----
`func` must accept `count` and `max_count` (or `c`, `m`) and
properly set their `.value` properties. This wrapper
automatically creates the necessary `multiprocessing.Value`
objects.
"""
self.__name__ = func.__name__ # act like the function
self.func = func
self.args = args
self.kwargs = kwargs
# works with Python 2.7 and 3.3
valid = func.__code__.co_varnames[:func.__code__.co_argcount]
# Check arguments
self.cm = getCountKwargs(func)
if self.cm is None:
raise ValueError(
"The wrapped function `{}` ".format(func.func_name)+
"must accept one of the folling pairs of "+
"keyword arguments:{}".format(validCountKwargs))
def __call__(self, *args, **kwargs):
""" Calls `func` - previously defined in `__init__`.
Parameters
----------
*args : list
Arguments for `func`.
**kwargs : dict
Keyword-arguments for `func`.
"""
# check if the kwarg is already given
# (e.g. by a function that is nested.
if not kwargs.has_key(self.cm[0]) or kwargs[self.cm[0]] is None:
# count
kwargs[self.cm[0]] = progress.UnsignedIntValue(0)
if not kwargs.has_key(self.cm[1]) or kwargs[self.cm[1]] is None:
# max_count
kwargs[self.cm[1]] = progress.UnsignedIntValue(0)
with progress.ProgressBar(kwargs[self.cm[0]], kwargs[self.cm[1]],
*self.args, **self.kwargs) as pb:
pb.start()
return self.func(*args, **kwargs)
def decorate_module_ProgressBar(module, **kwargs):
""" Decorates all decoratable functions in a module with a
ProgressBar.
You can prevent wrapping of a function by not specifying the keyword
arguments as defined in `jobmanager.jobmanager.validCountKwargs` or
by defining a function `_jm_decorate_{func}".
**kwargs are keyword arguments for ProgressBar
Note that decorating all functions in a module might lead to
strange behavior of the progress bar for nested functions.
"""
vdict = module.__dict__
for key in list(vdict.keys()):
if hasattr(vdict[key], "__call__"):
if getCountKwargs(vdict[key]) is not None:
newid = "_jm_decorate_{}".format(key)
if hasattr(module, newid):
warings.warn("Wrapping of {} prevented by module.".
format(key))
else:
# copy old function
setattr(module, newid, vdict[key])
# create new function
wrapper = ProgressBar(getattr(module, newid), **kwargs)
# set new function
setattr(module, key, wrapper)
if (kwargs.has_key("verbose") and
kwargs["verbose"] > 0):
print("Jobmanager wrapped {}.{}".format(
module.__name__, key))
@ProgressBar
def _my_func_1(arg, kwarg="1", count=None, max_count=None):
maxval = 100
if max_count is not None:
max_count.value = maxval
for i in range(maxval):
if count is not None:
count.value += 1
time.sleep(0.02)
return arg+kwarg
def _my_func_2(arg, c, m, kwarg="2"):
maxval = 100
m.value += maxval
for i in range(maxval):
c.value += 1
time.sleep(0.02)
return arg+kwarg
def _test_ProgressBar():
result1 = _my_func_1("decorated function", kwarg=" 1")
print(result1)
wrapper = ProgressBar(_my_func_2, interval=.1)
result2 = wrapper("wrapped function", kwarg=" 2")
print(result2)

3
jobmanager/jm_version.py Normal file
View file

@ -0,0 +1,3 @@
# put this into a separate file so we don't have a problem installing
# with pip (unmet dependencies upon import).
__version__ = "0.1.0"

View file

@ -16,6 +16,15 @@ import sys
import time import time
import traceback import traceback
# This is a list of all python objects that will be imported upon
# initialization during module import (see __init__.py)
__all__ = ["JobManager_Client",
"JobManager_Local",
"JobManager_Server",
"hashDict",
]
# Magic conversion from 3 to 2 # Magic conversion from 3 to 2
if sys.version_info[0] == 2: if sys.version_info[0] == 2:
# Python 2 # Python 2
@ -73,6 +82,15 @@ The class JobManager_Client
# a list of all names of the implemented python signals # a list of all names of the implemented python signals
all_signals = [s for s in dir(signal) if (s.startswith('SIG') and s[3] != '_')] all_signals = [s for s in dir(signal) if (s.startswith('SIG') and s[3] != '_')]
# keyword arguments that define counting in wrapped functions
validCountKwargs = [
[ "count", "count_max"],
[ "count", "max_count"],
[ "c", "m"],
[ "jmc", "jmm"],
]
def getDateForFileName(includePID = False): def getDateForFileName(includePID = False):
"""returns the current date-time and optionally the process id in the format """returns the current date-time and optionally the process id in the format
YYYY_MM_DD_hh_mm_ss_pid YYYY_MM_DD_hh_mm_ss_pid
@ -83,6 +101,23 @@ def getDateForFileName(includePID = False):
name += "_{}".format(os.getpid()) name += "_{}".format(os.getpid())
return name return name
def getCountKwargs(func):
""" Returns a list ["count kwarg", "count_max kwarg"] for a
given function. Valid combinations are defined in
`jobmanager.jobmanager.validCountKwargs`.
Returns None if no keyword arguments are found.
"""
# Get all arguments of the function
func_args = func.__code__.co_varnames[:func.__code__.co_argcount]
for pair in validCountKwargs:
if ( pair[0] in func_args and pair[1] in func_args ):
return pair
# else
return None
def copyQueueToList(q): def copyQueueToList(q):
res_list = [] res_list = []
res_q = myQueue() res_q = myQueue()
@ -96,6 +131,7 @@ def copyQueueToList(q):
return res_q, res_list return res_q, res_list
class hashDict(dict): class hashDict(dict):
def __hash__(self): def __hash__(self):
try: try:
@ -860,14 +896,25 @@ class JobManager_Client(object):
tg_1 = tg_0 = tp_1 = tp_0 = tf_1 = tf_0 = 0 tg_1 = tg_0 = tp_1 = tp_0 = tf_1 = tf_0 = 0
args_of_func = inspect.getfullargspec(func).args
# check for func definition without status members count, max_count # check for func definition without status members count, max_count
if len(args_of_func) == 2: #args_of_func = inspect.getfullargspec(func).args
#if len(args_of_func) == 2:
count_args = getCountKwargs(func)
print(count_args)
if count_args is None:
if verbose > 1: if verbose > 1:
print("{}: found function without status information".format(identifier)) print("{}: found function without status information".format(identifier))
m.value = 0 # setting max_count to -1 will hide the progress bar m.value = 0 # setting max_count to -1 will hide the progress bar
_func = lambda arg, const_arg, c, m : func(arg, const_arg) _func = lambda arg, const_arg, c, m : func(arg, const_arg)
elif count_args != ["c", "m"]:
# Allow other arguments, such as ["jmc", "jmm"] as defined
# in `validCountKwargs`.
# Here we translate to "c" and "m".
def _func(arg, const_arg, c, m):
arg[count_args[0]] = c
arg[count_args[1]] = m
else: else:
_func = func _func = func
@ -1051,7 +1098,7 @@ class JobManager_Client(object):
sigterm='ign') as self.pbc : sigterm='ign') as self.pbc :
self.pbc.start() self.pbc.start()
for i in range(self.nproc): for i in range(self.nproc):
reset_pbc = lambda: pbc.reset(i) reset_pbc = lambda: self.pbc.reset(i)
p = mp.Process(target=self.__worker_func, args=(self.func, p = mp.Process(target=self.__worker_func, args=(self.func,
self.nice, self.nice,
self.verbose, self.verbose,
@ -1157,3 +1204,4 @@ class JobManager_Local(JobManager_Server):
identifier='local_client', identifier='local_client',
timeout=2, timeout=2,
verbose=self.verbose_client) verbose=self.verbose_client)

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlitedict import sqlitedict
from .jobmanager import JobManager_Server from .jobmanager import JobManager_Server
from collections import namedtuple from collections import namedtuple

5
setup.cfg Normal file
View file

@ -0,0 +1,5 @@
[egg_info]
tag_build =
tag_date = 0
tag_svn_revision = 0

53
setup.py Normal file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env python
# To create a distribution package for pip or easy-install:
# python setup.py sdist
from setuptools import setup, find_packages
from os.path import join, dirname, realpath
from warnings import warn
name='jobmanager'
try:
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/"+name)
# get version number
from jm_version import __version__ as version
except:
version = "unknown"
setup(
name=name,
author='Richard Hartmann',
#author_email='richard.hartmann...',
url='https://github.com/cimatosa/jobmanager',
version=version,
packages=[name],
package_dir={name: name},
license="MIT",
description='Python job manager for parallel computing.',
long_description="""easy distributed computing based on the python
class SyncManager for remote communication
and python module multiprocessing for local
parallelism.""",
install_requires=["sqlitedict", "NumPy>=1.5.1"],
# tests: psutil
keywords=["multiprocessing", "queue", "parallel",
"progress", "manager", "job"],
classifiers= [
'Operating System :: OS Independent',
#'Programming Language :: Python :: 2.7', #Todo
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Intended Audience :: Science/Research'
],
platforms=['ALL']
)

View file

@ -1,10 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys import sys
import os from os.path import abspath, dirname, split
sys.path.append(os.path.dirname(__file__)) # Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
import externals.jobmanager.jobmanager as jm import jobmanager as jm
import integrationClient
from scipy.integrate import ode from scipy.integrate import ode
import numpy as np import numpy as np
@ -15,12 +17,14 @@ from scipy.special import mathieu_sem, mathieu_cem, mathieu_a, mathieu_b
import multiprocessing as mp import multiprocessing as mp
import time import time
def dgl_mathieu(t, f, a, q): def dgl_mathieu(t, f, a, q):
f1, f2 = f[0], f[1] f1, f2 = f[0], f[1]
f1_dot = f2 f1_dot = f2
f2_dot = -(a - 2*q*np.cos(2*t))*f1 f2_dot = -(a - 2*q*np.cos(2*t))*f1
return [f1_dot, f2_dot] return [f1_dot, f2_dot]
def solve_mathiue_dgl(t0, tmax, N, m, q): def solve_mathiue_dgl(t0, tmax, N, m, q):
a = mathieu_a(m, q) a = mathieu_a(m, q)
y0 = mathieu_cem(m, q, 0) y0 = mathieu_cem(m, q, 0)
@ -110,7 +114,8 @@ def test_distributed_mathieu():
authkey = 'integration_jm' authkey = 'integration_jm'
with jm.JobManager_Local(client_class = integrationClient.Integration_Client_REAL,
with jm.JobManager_Local(client_class = jm.clients.Integration_Client_REAL,
authkey = authkey, authkey = authkey,
const_arg = const_arg, const_arg = const_arg,
nproc=1, nproc=1,

View file

@ -9,11 +9,12 @@ import signal
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
path = os.path.dirname(__file__) from os.path import abspath, dirname, split
sys.path.append(path) # Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import jobmanager, progress
import jobmanager
import progress
def test_Signal_to_SIG_IGN(): def test_Signal_to_SIG_IGN():
def f(): def f():

View file

@ -11,10 +11,12 @@ import sys
import time import time
import traceback import traceback
path = os.path.dirname(__file__) from os.path import abspath, dirname, split
sys.path.append(path)
import progress # Add parent directory to beginning of path variable
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager import progress
def test_loop_basic(): def test_loop_basic():
""" """