from .stocproc import *
from .class_stocproc_kle import *
import gquad
from .class_stocproc import StocProc_FFT
from .class_stocproc import StocProc_KLE
from class_stocproc import StocProc_FFT
from class_stocproc import StocProc_KLE
from class_stocproc_kle import StocProc
import gquad

import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
from . import StocProc
class ComplexInterpolatedUnivariateSpline(object):
Univariant spline interpolator from scpi.interpolate in a convenient fashion to
interpolate real and imaginary parts of complex data
def __init__(self, x, y, k=2):
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y))
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y))
def __call__(self, t):
return self.re_spline(t) + 1j*self.im_spline(t)
class _absStocProc(object):
Abstract base class to stochastic process interface
general work flow:
- Specify the time axis of interest [0, t_max] and it resolution (number of grid points), :math:`t_i = i \frac{t_max}{N_t-1}.
- To evaluate the stochastic process at these points, a mapping from :math:`N_z` normal distributed
random complex numbers with :math:`\langle y_i y_j^\ast \rangle = 2 \delta_{ij}`
to the stochastic process :math:`z_{t_i}` is needed and depends on the implemented method (:py:func:`_calc_z').
- A new process should be generated by calling :py:func:`new_process'.
- When the __call__ method is invoked the results will be interpolated between the :math:`z_t_i`.
def __init__(self, t_max, num_grid_points, seed=None, verbose=1, k=3):
:param t_max: specify time axis as [0, t_max]
:param num_grid_points: number of equidistant times on that axis
:param seed: if not ``None`` set seed to ``seed``
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
self._verbose = verbose
self.t = np.linspace(0, t_max, num_grid_points)
self._z = None
self._interpolator = None
self._k = k
if seed is not None:
def __call__(self, t):
:param t: time to evaluate the stochastic process as, float of array of floats
evaluates the stochastic process via spline interpolation between the discrete process
if self._interpolator is None:
if self._verbose > 1:
print("setup interpolator ...")
self._interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=self._k)
if self._verbose > 1:
return self._interpolator(t)
def _calc_z(self, y):
maps the normal distributed complex valued random variables y to the stochastic process
:return: the stochastic process, array of complex numbers
def get_num_y(self):
:return: number of complex random variables needed to calculate the stochastic process
def get_time(self):
:return: time axis
return self.t
def get_z(self):
use :py:func:`new_process` to generate a new process
:return: the current process
return self._z
def new_process(self, y=None, seed=None):
generate a new process by evaluating :py:func:`_calc_z'
When ``y`` is given use these random numbers as input for :py:func:`_calc_z`
otherwise generate a new set of random numbers.
:param y: independent normal distributed complex valued random variables with :math:`\sig_{ij}^2 = \langle y_i y_j^\ast \rangle = 2 \delta_{ij}
:param seed: if not ``None`` set seed to ``seed`` before generating samples
self._interpolator = None
if seed != None:
if self._verbose > 0:
print("use seed", seed)
if y is None:
#random complex normal samples
if self._verbose > 1:
print("generate samples ...")
y = np.random.normal(size = 2*self.get_num_y()).view(np.complex)
if self._verbose > 1:
self._z = self._calc_z(y)
class StocProc_KLE(_absStocProc):
class to simulate stochastic processes using KLE method
- Solve fredholm equation on grid with ``ng_fredholm nodes`` (trapezoidal_weights).
If case ``ng_fredholm`` is ``None`` set ``ng_fredholm = num_grid_points``. In general it should
hold ``ng_fredholm < num_grid_points`` and ``num_grid_points = 10*ng_fredholm`` might be a good ratio.
- Calculate discrete stochastic process (using interpolation solution of fredholm equation) with num_grid_points nodes
- invoke spline interpolator when calling
def __init__(self, r_tau, t_max, num_grid_points, ng_fredholm=None, seed=None, sig_min=1e-5, verbose=1, k=3):
:param r_tau: auto correlation function of the stochastic process
:param t_max: specify time axis as [0, t_max]
:param num_grid_points: number of equidistant times on that axis
:param ng_fredholm: number of discrete time used to solve the underlying fredholm equation, ``None`` set ``ng_fredholm = num_grid_points``
:param seed: if not ``None`` set seed to ``seed``
:param sig_min: eigenvalue threshold (see KLE method to generate stochastic processes)
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
if ng_fredholm is None:
ng_fredholm = num_grid_points
if num_grid_points < ng_fredholm:
print("WARNING: found 'num_grid_points < ng_fredholm' -> set 'num_grid_points == ng_fredholm'")
# if ng_fredholm == num_grid_points -> no kle_fredholm interpolation is needed
if ng_fredholm == num_grid_points:
self.kle_interp = False
self.kle_interp = True
self.stocproc = StocProc.new_instance_with_trapezoidal_weights(r_tau = r_tau,
t_max = t_max,
ng = ng_fredholm,
sig_min = sig_min,
seed = seed,
verbose = verbose)
super().__init__(t_max=t_max, num_grid_points=num_grid_points, seed=seed, verbose=verbose, k=k)
# this is only needed access the underlaying stocproc KLE class
# in a convenient fashion
self._r_tau = self.stocproc._r_tau
self._s = self.stocproc._s
self._A = self.stocproc._A
self.num_y = self.stocproc._num_ev
self.verbose = verbose
def _cal_z(self, y):
uses the underlaying stocproc class to generate the process (see :py:class:`StocProc` for details)
if self.kle_interp:
return self.stocproc.x_t_array(self.t)
return self.stocproc.x_for_initial_time_grid()
def get_num_y(self):
return self.num_y
class StocProc_FFT(object):
Simulate Stochastic Process using FFT method
def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=1, k=3):
super().__init__(t_max = t_max,
num_grid_points = num_grid_points,
seed = seed,
verbose = verbose,
k = k)
self.n_dft = self.num_grid_points * 2 - 1
delta_t = self.t_max / (self.num_grid_points-1)
self.delta_omega = 2 * np.pi / (delta_t * self.n_dft)
#omega axis
omega = self.delta_omega*np.arange(self.n_dft)
#reshape for multiplication with matrix xi
self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 = np.sqrt(spectral_density(omega)) * np.sqrt(self.delta_omega) / np.sqrt(2)
if self.verbose > 0:
print("stoc proc fft, spectral density sampling information:")
print(" t_max :", (t_max))
print(" ng :", (num_grid_points))
print(" omega_max :", (self.delta_omega * self.n_dft))
print(" delta_omega:", (self.delta_omega))
def _cal_z(self, y):
weighted_integrand = self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 * y
#compute integral using fft routine
if self.verbose > 1:
print("calc process via fft ...")
z = np.fft.fft(weighted_integrand)[0:self.num_grid_points]
if self.verbose > 1:
return z
def get_num_y(self):
return self.n_dft

import functools
import numpy as np
import pickle
from . import solve_hom_fredholm
from . import get_mid_point_weights
from . import get_trapezoidal_weights_times
from . import gquad
class StocProc(object):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
The :py:class:`StocProc` class provides methods to simulate stochastic processes
:math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]`
with given autocorrelation function
:math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to
the one described and implemented in :py:func:`stochastic_process_kle`.
The :py:class:`StocProc` class extends the functionality of the
:py:func:`stochastic_process_kle` routine by providing an interpolation
method based on the numeric solution of the Fredholm equation.
Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation
are best interpolates using
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
with :math:`s_j` and :math:`w_j` being the time grid points and integration weights
for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being
the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation
(see Ref. [1]).
From that is follows that a good interpolation formula for the stochastic process
is given by
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j)
where the :math:`Y_i` are independent normal distributed random variables
with variance one.
For extracting values of the Stochastic Process you may use:
:py:func:`x`: returns the value of the Stochastic Process for a
single time :math:`t`
:py:func:`x_t_array`: returns the value of the Stochastic Process for
all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`.
:py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
To generate a new process call :py:func:`new_process`.
To generate a new sample use :py:func:`new_process`.
:param r_tau: function object of the one parameter correlation function
:math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the
grid points given by :py:obj:`t`
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`,
viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
For further reading see :py:func:`stochastic_process_kle`.
[1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
_dump_members = ['_r_tau',
def __init__(self,
r_tau = None,
t = None,
w = None,
seed = None,
sig_min = 1e-4,
fname = None,
cache_size = 1024,
verbose = 1):
self.verbose = verbose
if fname is None:
assert r_tau is not None
self._r_tau = r_tau
assert t is not None
self._s = t
self._num_gp = len(self._s)
assert w is not None
self._w = w
t_row = self._s.reshape(1, self._num_gp)
t_col = self._s.reshape(self._num_gp, 1)
# correlation matrix
# r_tau(t-s) -> integral/sum over s -> s must be row in EV equation
r = self._r_tau(t_col-t_row)
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose)
self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False)
self.x = self.my_cache_decorator(self._x)
self.new_process(seed = seed)
def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min):
known_names = ['trapezoidal', 'mid_point', 'gauss_legendre']
if name == 'trapezoidal':
ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min)
elif name == 'mid_point':
ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min)
elif name == 'gauss_legendre':
ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min)
raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names)) = name
return ob
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1):
t, w = get_trapezoidal_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1):
t, w = get_mid_point_weights(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1):
t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def __load(self, fname):
with open(fname, 'rb') as f:
for m in self._dump_members:
setattr(self, m, pickle.load(f))
def __calc_missing(self):
self._num_gp = len(self._s)
self._sqrt_eig_val = np.sqrt(self._eig_val)
self._num_ev = len(self._eig_val)
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
def __dump(self, fname):
with open(fname, 'wb') as f:
for m in self._dump_members:
pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL)
def __getstate__(self):
return [getattr(self, atr) for atr in self._dump_members]
def __setstate__(self, state):
for i, atr_value in enumerate(state):
setattr(self, self._dump_members[i], atr_value)
def save_to_file(self, fname):
def get_name(self):
if hasattr(self, 'name'):
return 'unknown'
def new_process(self, yi=None, seed=None):
r"""setup new process
Generates a new set of independent normal random variables :math:`Y_i`
which correspondent to the expansion coefficients of the
Karhunen-Loève expansion for the stochastic process
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t)
:param seed: a seed my be given which is passed to the random number generator
if seed != None:
if yi is None:
if self.verbose > 1:
print("generate samples ...")
self._Y = np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
if self.verbose > 1:
self._Y = yi.reshape(self._num_ev,1)
def x_for_initial_time_grid(self):
r"""Get process on initial time grid
Returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
tmp = self._Y / np.sqrt(2) * self._sqrt_eig_val.reshape(self._num_ev,1)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
if self.verbose > 1:
return res
def time_grid(self):
return self._s
def __call__(self, t):
if isinstance(t, np.ndarray):
return self.x_t_array(t)
return self.x(t)
def _x(self, t):
# _Y # (N_ev, 1 )
tmp = self._Y*self._r_tau(t-self._s.reshape(1, self._num_gp))
# (N_ev, N_gp)
# A # (N_gp, N_ev)
return np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
def get_cache_info(self):
return self.x.cache_info()
def clear_cache(self):
def x_t_array(self, t_array):
t_array = t_array.reshape(1,1,len(t_array)) # (1 , 1 , N_t)
tmp = (self._Y.reshape(self._num_ev,1,1) *
# (N_ev, N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
if self.verbose > 1:
return res
def u_i_mem_save(self, delta_t_fac, i):
a = delta_t_fac
assert isinstance(a, int)
T = self._s[-1]
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
print("WARNING! this needs to be cythonized")
u_res = np.zeros(shape=N2, dtype=np.complex)
for j in range(N2):
for l in range(N1):
k = j - a*l + N2-1
u_res[j] += self._w[l] * alpha_k[k] * self._eig_vec[l, i]
return u_res / self._eig_val[i]
def u_i(self, t_array, i):
r"""get eigenfunction of index i
Returns the i-th eigenfunction corresponding to the i-th eigenvalue
of the discrete Fredholm equation using the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:param i: index of the eigenfunction
:return: 1D array of length ``len(t_array)``
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return 1/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
def u_i_all(self, t_array):
r"""get all eigenfunctions
Returns all eigenfunctions of the discrete Fredholm equation using
the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)``
(note, the number of eigenvalues may be smaller than the number
of grid points because of the selections mechanism implemented
by the value of ``sig_min``)
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return np.tensordot(tmp, 1/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
def eigen_vector_i(self, i):
r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)"""
return self._eig_vec[:,i]
def eigen_vector_i_all(self):
r"""Returns all eigenvectors (solutions of the discrete Fredhom equation)
Note: Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
return self._eig_vec
def lambda_i(self, i):
r"""Returns the i-th eigenvalue."""
return self._eig_val[i]
def lambda_i_all(self):
r"""Returns all eigenvalues."""
return self._eig_val
def num_ev(self):
r"""Returns the number of eigenvalues / eigenfunctions used
Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
return self._num_ev
def recons_corr(self, t_array):
r"""computes the interpolated correlation functions
For the Karhunen-Loève expansion of a stochastic process the
correlation function can be expressed as follows:
.. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s)
With that one can do a consistency check whether the finite set of basis functions
for the expansion (the solutions of the discrete Fredholm equation) is good
enough to reproduce the given correlation function.
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def recons_corr_single_s(self, t_array, s):
assert False, "something is wrong here"
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0]
def mean_error(r_t_s, r_t_s_exact):
r"""mean error of the correlation function as function of s
.. math:: \mathtt{err} = \frac{1}{T}\int_0^T |r_\mathm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt
:return: the mean error ``err``
err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0)
return err
def max_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0)
def max_rel_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact))
def auto_grid_points(r_tau, t_max, tol = 1e-8, err_method = max_error, name = 'mid_point', sig_min = 1e-4):
err = 1
ng = 64
seed = None
err_method_name = err_method.__name__
print("start auto_grid_points, determine ng ...")
#exponential increase to get below error threshold
while err > tol:
ng *= 2
ng_fine = ng*10
t_fine = np.linspace(0, t_max, ng_fine)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s = stoc_proc.recons_corr(t_fine)
print("calculate exact correlation function ...")
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
print("ng {} -> err {:.3e}".format(ng, err))
ng_low = ng // 2
ng_high = ng
while (ng_high - ng_low) > 1:
print("ng_l", ng_low)
print("ng_h", ng_high)
ng = (ng_low + ng_high) // 2
print("ng", ng)
ng_fine = ng*10
t_fine = np.linspace(0, t_max, ng_fine)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s = stoc_proc.recons_corr(t_fine)
print("calculate exact correlation function ...")
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
print("ng {} -> err {:.3e}".format(ng, err))
if err > tol:
print(" err > tol!")
print(" ng_l -> ", ng)
ng_low = ng
print(" err <= tol!")
print(" ng_h -> ", ng)
ng_high = ng
return ng_high

.. todo:: implement convenient classes with fixed weights
import numpy as np
import functools
import pickle
import sys
import os
import gquad
from scipy.interpolate import InterpolatedUnivariateSpline
class ComplexInterpolatedUnivariateSpline(object):
def __init__(self, x, y, k=2):
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y))
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y))
def __call__(self, t):
return self.re_spline(t) + 1j*self.im_spline(t)
class StocProc(object):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
import numpy as np
The :py:class:`StocProc` class provides methods to simulate stochastic processes
:math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]`
with given autocorrelation function
:math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to
the one described and implemented in :py:func:`stochastic_process_kle`.
The :py:class:`StocProc` class extends the functionality of the
:py:func:`stochastic_process_kle` routine by providing an interpolation
method based on the numeric solution of the Fredholm equation.
Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation
are best interpolates using
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
with :math:`s_j` and :math:`w_j` being the time grid points and integration weights
for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being
the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation
(see Ref. [1]).
From that is follows that a good interpolation formula for the stochastic process
is given by
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j)
where the :math:`Y_i` are independent normal distributed random variables
with variance one.
For extracting values of the Stochastic Process you may use:
:py:func:`x`: returns the value of the Stochastic Process for a
single time :math:`t`
:py:func:`x_t_array`: returns the value of the Stochastic Process for
all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`.
:py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
To generate a new process call :py:func:`new_process`.
To generate a new sample use :py:func:`new_process`.
:param r_tau: function object of the one parameter correlation function
:math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the
grid points given by :py:obj:`t`
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`,
viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
For further reading see :py:func:`stochastic_process_kle`.
[1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
_dump_members = ['_r_tau',
def __init__(self,
r_tau = None,
t = None,
w = None,
seed = None,
sig_min = 1e-4,
fname = None,
cache_size = 1024,
verbose = 1):
self.verbose = verbose
if fname is None:
assert r_tau is not None
self._r_tau = r_tau
assert t is not None
self._s = t
self._num_gp = len(self._s)
assert w is not None
self._w = w
t_row = self._s.reshape(1, self._num_gp)
t_col = self._s.reshape(self._num_gp, 1)
# correlation matrix
# r_tau(t-s) -> integral/sum over s -> s must be row in EV equation
r = self._r_tau(t_col-t_row)
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose)
self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False)
self.x = self.my_cache_decorator(self._x)
self.new_process(seed = seed)
def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min):
known_names = ['trapezoidal', 'mid_point', 'gauss_legendre']
if name == 'trapezoidal':
ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min)
elif name == 'mid_point':
ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min)
elif name == 'gauss_legendre':
ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min)
raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names)) = name
return ob
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
t, w = get_trapezoidal_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
t, w = get_mid_point_weights(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
def __load(self, fname):
with open(fname, 'rb') as f:
for m in self._dump_members:
setattr(self, m, pickle.load(f))
def __calc_missing(self):
self._num_gp = len(self._s)
self._sqrt_eig_val = np.sqrt(self._eig_val)
self._num_ev = len(self._eig_val)
self._w /= np.sqrt(2)
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
def __dump(self, fname):
with open(fname, 'wb') as f:
for m in self._dump_members:
pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL)
def __getstate__(self):
return [getattr(self, atr) for atr in self._dump_members]
def __setstate__(self, state):
for i, atr_value in enumerate(state):
setattr(self, self._dump_members[i], atr_value)
def save_to_file(self, fname):
def get_name(self):
if hasattr(self, 'name'):
return 'unknown'
def new_process(self, yi=None, seed=None):
r"""setup new process
Generates a new set of independent normal random variables :math:`Y_i`
which correspondent to the expansion coefficients of the
Karhunen-Loève expansion for the stochastic process
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t)
:param seed: a seed my be given which is passed to the random number generator
if seed != None:
if yi is None:
if self.verbose > 1:
print("generate samples ...")
self._Y = np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
if self.verbose > 1:
self._Y = yi.reshape(self._num_ev,1)
def x_for_initial_time_grid(self):
r"""Get process on initial time grid
Returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
tmp = self._Y / np.sqrt(2) * self._sqrt_eig_val.reshape(self._num_ev,1)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
if self.verbose > 1:
return res
def time_grid(self):
return self._s
def __call__(self, t):
if isinstance(t, np.ndarray):
return self.x_t_array(t)
return self.x(t)
def _x(self, t):
# _Y # (N_ev, 1 )
tmp = self._Y*self._r_tau(t-self._s.reshape(1, self._num_gp))
# (N_ev, N_gp)
# A # (N_gp, N_ev)
return np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
def get_cache_info(self):
return self.x.cache_info()
def clear_cache(self):
def x_t_array(self, t_array):
t_array = t_array.reshape(1,1,len(t_array)) # (1 , 1 , N_t)
tmp = (self._Y.reshape(self._num_ev,1,1) *
# (N_ev, N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
if self.verbose > 1:
return res
def u_i(self, t_array, i):
r"""get eigenfunction of index i
Returns the i-th eigenfunction corresponding to the i-th eigenvalue
of the discrete Fredholm equation using the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:param i: index of the eigenfunction
:return: 1D array of length ``len(t_array)``
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return np.sqrt(2)/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
def u_i_all(self, t_array):
r"""get all eigenfunctions
Returns all eigenfunctions of the discrete Fredholm equation using
the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)``
(note, the number of eigenvalues may be smaller than the number
of grid points because of the selections mechanism implemented
by the value of ``sig_min``)
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return np.tensordot(tmp, np.sqrt(2)/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
def eigen_vector_i(self, i):
r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)"""
return self._eig_vec[:,i]
def eigen_vector_i_all(self):
r"""Returns all eigenvectors (solutions of the discrete Fredhom equation)
Note: Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
return self._eig_vec
def lambda_i(self, i):
r"""Returns the i-th eigenvalue."""
return self._eig_val[i]
def lambda_i_all(self):
r"""Returns all eigenvalues."""
return self._eig_val
def num_ev(self):
r"""Returns the number of eigenvalues / eigenfunctions used
Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
return self._num_ev
def recons_corr(self, t_array):
r"""computes the interpolated correlation functions
For the Karhunen-Loève expansion of a stochastic process the
correlation function can be expressed as follows:
.. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s)
With that one can do a consistency check whether the finite set of basis functions
for the expansion (the solutions of the discrete Fredholm equation) is good
enough to reproduce the given correlation function.
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def recons_corr_single_s(self, t_array, s):
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0]
class StocProc_KLE(StocProc):
def __init__(self, r_tau, t_max, num_grid_points, ng_fredholm=None, seed=None, sig_min=1e-5, verbose=1):
class to simulate stocastic processes using KLE method
- solve fredholm equation on grid with ng_fregholm nodes (trapezoidal_weights)
- calculate process (using interpolation solution of fredholm equation) with num_grid_points nodes
- ivoke cubic spline interpolator when calling
if ng_fredholm is None:
ng_fredholm = num_grid_points
if num_grid_points < ng_fredholm:
print("WARNING: found 'num_grid_points < ng_fredholm' -> set 'num_grid_points == ng_fredholm'")
if ng_fredholm == num_grid_points:
self.kle_interp = False
self.kle_interp = True
self.stocproc = StocProc.new_instance_with_trapezoidal_weights(r_tau = r_tau,
t_max = t_max,
ng = ng_fredholm,
sig_min = sig_min,
seed = seed,
verbose = verbose)
self._r_tau = self.stocproc._r_tau
self._s = self.stocproc._s
self._A = self.stocproc._A
self.stocproc.verbose = verbose
self.verbose = verbose
self.t = np.linspace(0, t_max, num_grid_points)
self._z = self.__cal_z()
if self.verbose > 1:
print("setup interpolator ...")
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
if self.verbose > 1:
def __cal_z(self):
if self.kle_interp:
return self.stocproc.x_t_array(self.t)
return self.stocproc.x_for_initial_time_grid()
def __call__(self, t):
return self.interpolator(t)
def get_time(self):
return self.t
def get_z(self):
return self._z
def new_process(self, yi = None, seed = None):
self.stocproc.new_process(yi=yi, seed=seed)
self._z = self.__cal_z()
if self.verbose > 1:
print("setup interpolator ...")
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
if self.verbose > 1:
# def reconst_corr_zero(self, t_array):
class StocProc_FFT(object):
Simulate Stochastic Process using FFT method
def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=1):
self.verbose = verbose
self.t_max = t_max
self.num_grid_points = num_grid_points
self.n_dft = self.num_grid_points * 2 - 1
delta_t = self.t_max / (self.num_grid_points-1)
self.delta_omega = 2 * np.pi / (delta_t * self.n_dft)
#omega axis
omega = self.delta_omega*np.arange(self.n_dft)
#reshape for multiplication with matrix xi
self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 = np.sqrt(spectral_density(omega)) * np.sqrt(self.delta_omega) / np.sqrt(2)
if self.verbose > 0:
print("stoc proc fft, spectral density sampling information:")
print(" t_max :", (t_max))
print(" ng :", (num_grid_points))
print(" omega_max :", (self.delta_omega * self.n_dft))
print(" delta_omega:", (self.delta_omega))
self.interpolator = None
self.t = np.linspace(0, self.t_max, self.num_grid_points)
self.new_process(seed = seed)
def __call__(self, t):
if self.interpolator is None:
if self.verbose > 1:
print("setup interpolator ...")
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
if self.verbose > 1:
return self.interpolator(t)
def get_time(self):
return self.t
def get_z(self):
return self._z
def new_process(self, yi = None, seed = None):
self.interpolator = None
if seed != None:
if self.verbose > 0:
print("use seed", seed)
if yi is None:
#random complex normal samples
if self.verbose > 1:
print("generate samples ...")
yi = np.random.normal(size = 2*self.n_dft).view(np.complex)
if self.verbose > 1:
#each row contain a different integrand
weighted_integrand = self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 * yi
#compute integral using fft routine
if self.verbose > 1:
print("calc process via fft ...")
self._z = np.fft.fft(weighted_integrand)[0:self.num_grid_points]
if self.verbose > 1:
def _mean_error(r_t_s, r_t_s_exact):
r"""mean error of the correlation function as function of s
.. math:: \mathtt{err} = \frac{1}{T}\int_0^T |r_\mathm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt
:return: the mean error ``err``
err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0)
return err
def _max_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0)
def _max_rel_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact))
def auto_grid_points(r_tau, t_max, tol = 1e-8, err_method = _max_error, name = 'mid_point', sig_min = 1e-4):
err = 1
ng = 64
seed = None
err_method_name = err_method.__name__
print("start auto_grid_points, determine ng ...")
#exponential increase to get below error threshold
while err > tol:
ng *= 2
ng_fine = ng*10
t_fine = np.linspace(0, t_max, ng_fine)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s = stoc_proc.recons_corr(t_fine)
print("calculate exact correlation function ...")
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
print("ng {} -> err {:.3e}".format(ng, err))
ng_low = ng // 2
ng_high = ng
while (ng_high - ng_low) > 1:
print("ng_l", ng_low)
print("ng_h", ng_high)
ng = (ng_low + ng_high) // 2
print("ng", ng)
ng_fine = ng*10
t_fine = np.linspace(0, t_max, ng_fine)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s = stoc_proc.recons_corr(t_fine)
print("calculate exact correlation function ...")
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
print("ng {} -> err {:.3e}".format(ng, err))
if err > tol:
print(" err > tol!")
print(" ng_l -> ", ng)
ng_low = ng
print(" err <= tol!")
print(" ng_h -> ", ng)
ng_high = ng
return ng_high
def solve_hom_fredholm(r, w, eig_val_min, verbose=1):
r"""Solves the discrete homogeneous Fredholm equation of the second kind

import pathlib
p = pathlib.PosixPath(os.path.abspath(__file__))
sys.path.insert(0, str(p.parent.parent / 'stocproc'))
sys.path.insert(0, str(p.parent.parent))
import stocproc as sp
@ -561,10 +561,12 @@ def test_auto_grid_points():
r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1)
# time interval [0,T]
t_max = 15
ng_interpolation = 1000
tol = 1e-8
ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, sig_min=0)
ng = sp.auto_grid_points(r_tau = r_tau,
t_max = t_max,
tol = tol,
sig_min = 0)
def test_chache():
@ -639,7 +641,7 @@ def show_auto_grid_points_result():
# name = 'trapezoidal'
# name = 'gauss_legendre'
ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, name=name, sig_min=sig_min)
ng = sp.auto_grid_points(r_tau, t_max, tol, name=name, sig_min=sig_min)
t, w = sp.get_trapezoidal_weights_times(t_max, ng)
stoc_proc = sp.StocProc(r_tau, t, w, seed, sig_min)
@ -647,14 +649,52 @@ def show_auto_grid_points_result():
r_t_s_exact = r_tau(t_large.reshape(ng_interpolation,1) - t_large.reshape(1, ng_interpolation))
diff = sp._mean_error(r_t_s, r_t_s_exact)
diff_max = sp._max_error(r_t_s, r_t_s_exact)
diff = sp.mean_error(r_t_s, r_t_s_exact)
diff_max = sp.max_error(r_t_s, r_t_s_exact)
plt.plot(t_large, diff)
plt.plot(t_large, diff_max)
def test_ui_mem_save():
s_param = 1
gamma_s_plus_1 = gamma(s_param+1)
r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1)
t_max = 1
N1 = 100
a = 5
N2 = a*(N1 - 1) + 1
t_fine = np.linspace(0, t_max, N2)
assert abs( (t_max/(N1-1)) - a*(t_fine[1]-t_fine[0]) ) < 1e-14, "{}".format(abs( (t_max/(N1-1)) - (t_fine[1]-t_fine[0]) ))
stoc_proc = sp.StocProc.new_instance_with_trapezoidal_weights(r_tau, t_max, ng=N1, sig_min = 1e-4)
for i in range(stoc_proc.num_ev()):
ui_ms = stoc_proc.u_i_mem_save(delta_t_fac=a, i=i)
ui = stoc_proc.u_i(t_fine, i)
# plt.plot(t_fine, np.real(ui_ms), color='k')
# plt.plot(t_fine, np.imag(ui_ms), color='k')
# plt.plot(t_fine, np.real(ui), color='r')
# plt.plot(t_fine, np.imag(ui), color='r')
# plt.plot(stoc_proc._s, np.real(stoc_proc._eig_vec[:,i]), marker = 'o', ls='', color='b')
# plt.plot(stoc_proc._s, np.imag(stoc_proc._eig_vec[:,i]), marker = 'o', ls='', color='b')
# plt.grid()
assert np.allclose(ui_ms, ui), "{}".format(max(np.abs(ui_ms - ui)))
if __name__ == "__main__":
# test_stochastic_process_KLE_correlation_function(plot=False)
@ -664,9 +704,10 @@ if __name__ == "__main__":
# test_stocproc_KLE_splineinterpolation(plot=False)
# test_stochastic_process_FFT_interpolation(plot=False)
# test_stocProc_eigenfunction_extraction()
# test_orthonomality()
# test_auto_grid_points()
# show_auto_grid_points_result()
# test_chache()
# test_dump_load()