This commit is contained in:
Richard Hartmann 2016-10-28 15:47:50 +02:00
parent af824bbf6d
commit 0fd682cb99
4 changed files with 0 additions and 1228 deletions

View file

@ -1,690 +0,0 @@
# -*- coding: utf8 -*-
"""
advanced class to do all sort of things with KLE generated stochastic processes
"""
from __future__ import print_function, division
import functools
import numpy as np
import pickle
import sys
from .stocproc import solve_hom_fredholm
from .stocproc import get_mid_point_weights
from .stocproc import get_trapezoidal_weights_times
from .stocproc import get_simpson_weights_times
import gquad
from . import stocproc_c
from scipy.integrate import quad
from scipy.interpolate import InterpolatedUnivariateSpline
from itertools import product
from math import fsum
class StocProc(object):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
The :py:class:`StocProc` class provides methods to simulate stochastic processes
:math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]`
with given autocorrelation function
:math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to
the one described and implemented in :py:func:`stochastic_process_kle`.
The :py:class:`StocProc` class extends the functionality of the
:py:func:`stochastic_process_kle` routine by providing an interpolation
method based on the numeric solution of the Fredholm equation.
Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation
are best interpolates using
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
with :math:`s_j` and :math:`w_j` being the time grid points and integration weights
for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being
the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation
(see Ref. [1]).
From that is follows that a good interpolation formula for the stochastic process
is given by
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j)
where the :math:`Y_i` are independent normal distributed random variables
with variance one.
For extracting values of the Stochastic Process you may use:
:py:func:`x`: returns the value of the Stochastic Process for a
single time :math:`t`
:py:func:`x_t_array`: returns the value of the Stochastic Process for
all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`.
:py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
To generate a new process call :py:func:`new_process`.
To generate a new sample use :py:func:`new_process`.
:param r_tau: function object of the one parameter correlation function
:math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the
grid points given by :py:obj:`t`
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`,
viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
For further reading see :py:func:`stochastic_process_kle`.
References:
[1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
"""
_dump_members = ['_r_tau',
'_s',
'_w',
'_eig_val',
'_eig_vec']
def __init__(self,
r_tau = None,
t = None,
w = None,
seed = None,
sig_min = 1e-4,
fname = None,
cache_size = 1024,
verbose = 1,
align_eig_vec = False):
self.verbose = verbose
self._one_over_sqrt_2 = 1/np.sqrt(2)
if fname is None:
assert r_tau is not None
self._r_tau = r_tau
assert t is not None
self._s = t
self._num_gp = len(self._s)
assert w is not None
self._w = w
r = StocProc._calc_corr_matrix(self._s, self._r_tau)
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose)
if align_eig_vec:
for i in range(self._eig_vec.shape[1]):
s = np.sum(self._eig_vec[:,i])
phase = np.exp(1j*np.arctan2(np.real(s), np.imag(s)))
self._eig_vec[:,i]/= phase
else:
self.__load(fname)
self.__calc_missing()
self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False)
self.x = self.my_cache_decorator(self._x)
self.new_process(seed = seed)
@staticmethod
def _calc_corr_matrix(s, bcf):
"""calculates the matrix alpha_ij = bcf(t_i-s_j)
calls bcf only for s-s_0 and reconstructs the rest
"""
n_ = len(s)
bcf_n_plus = bcf(s-s[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
return r
@classmethod
def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min, verbose=1, align_eig_vec=False):
"""create a new StocProc instance where the weights are given by name"""
known_names = ['trapezoidal', 'mid_point', 'simpson', 'gauss_legendre']
if name == 'trapezoidal':
ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'mid_point':
ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'simpson':
ob = cls.new_instance_with_simpson_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'gauss_legendre':
ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
else:
raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names))
ob.name = name
return ob
@classmethod
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use trapezoidal weights (see :py:func:`get_trapezoidal_weights_times`)"""
t, w = get_trapezoidal_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_simpson_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use simpson weights (see :py:func:`get_simpson_weights_times`)"""
t, w = get_simpson_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use mid-point weights (see :py:func:`get_mid_point_weights`)"""
t, w = get_mid_point_weights(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use gauss legendre weights (see :py:func:`gauss_nodes_weights_legendre`)"""
t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
def __load(self, fname):
with open(fname, 'rb') as f:
for m in self._dump_members:
setattr(self, m, pickle.load(f))
def __calc_missing(self):
self._num_gp = len(self._s)
self._sqrt_eig_val = np.sqrt(self._eig_val)
self._num_ev = len(self._eig_val)
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
def __dump(self, fname):
with open(fname, 'wb') as f:
for m in self._dump_members:
pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL)
def __getstate__(self):
return [getattr(self, atr) for atr in self._dump_members]
def __setstate__(self, state):
for i, atr_value in enumerate(state):
setattr(self, self._dump_members[i], atr_value)
self.__calc_missing()
def save_to_file(self, fname):
self.__dump(fname)
def get_name(self):
if hasattr(self, 'name'):
return self.name
else:
return 'unknown'
def new_process(self, yi=None, seed=None):
r"""setup new process
Generates a new set of independent normal random variables :math:`Y_i`
which correspondent to the expansion coefficients of the
Karhunen-Loève expansion for the stochastic process
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t)
:param seed: a seed my be given which is passed to the random number generator
"""
if seed != None:
np.random.seed(seed)
self.clear_cache()
if yi is None:
if self.verbose > 1:
print("generate samples ...")
self._Y = np.random.normal(scale = self._one_over_sqrt_2, size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
if self.verbose > 1:
print("done!")
else:
self._Y = yi.reshape(self._num_ev,1)
self._a_tmp = np.tensordot(self._Y[:,0], self._A, axes=([0],[1]))
def x_for_initial_time_grid(self):
r"""Get process on initial time grid
Returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
"""
tmp = self._Y * self._sqrt_eig_val.reshape(self._num_ev,1)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
if self.verbose > 1:
print("done!")
return res
def time_grid(self):
return self._s
def __call__(self, t):
if isinstance(t, np.ndarray):
return self.x_t_array(t)
else:
return self.x(t)
def _x(self, t):
"""calculates the stochastic process at time t"""
R = self._r_tau(t-self._s)
res = np.tensordot(R, self._a_tmp, axes=([0],[0]))
return res
def get_cache_info(self):
return self.x.cache_info()
def clear_cache(self):
self.x.cache_clear()
def x_t_array(self, t_array):
"""calculates the stochastic process at several times [t_i]"""
R = self._r_tau(t_array.reshape(1,-1,)-self._s.reshape(-1, 1)) # because t_array can be anything
# it can not be optimized with _calc_corr_matrix
res = np.tensordot(R, self._a_tmp, axes=([0],[0]))
return res
def x_t_mem_save(self, delta_t_fac, kahanSum=False):
"""calculate the stochastic process (SP) for a certain class fine grids
when the SP is setup with n grid points, which means we know the eigenfunctions
for the n discrete times t_i = i/(n-1)*t_max, i = 0,1,...n-1
with delta_t = t_max/(n-1)
it is efficient to calculate the interpolated process
for finer grids with delta_t_fine = delta_t/delta_t_fac
because we only need to know the bcf on the finer grid
"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.z_t(delta_t_fac = delta_t_fac,
N1 = N1,
alpha_k = alpha_k,
a_tmp = self._a_tmp,
kahanSum = kahanSum)
def x_t_fsum(self, t):
"""slow fsum variant for testing / development reasons
"""
alpha_k = self._r_tau(t - self._s)
terms = np.asarray([self._Y[a] * alpha_k[i] * self._A[i, a] for (a,i) in product(range(self._num_ev), range(self._num_gp))])
re = fsum(np.real(terms))
im = fsum(np.imag(terms))
return re + 1j*im
def u_i_mem_save(self, delta_t_fac, i):
"""efficient evaluation of the interpolated eigen function on special subgrids"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.eig_func_interp(delta_t_fac,
self._s,
alpha_k,
self._w,
self._eig_val[i],
self._eig_vec[:, i])
def u_i(self, t_array, i):
r"""get eigenfunction of index i
Returns the i-th eigenfunction corresponding to the i-th eigenvalue
of the discrete Fredholm equation using the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:param i: index of the eigenfunction
:return: 1D array of length ``len(t_array)``
scales like len(t_array)*num_gp
"""
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return 1/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
def u_i_all(self, t_array):
r"""get all eigenfunctions
Returns all eigenfunctions of the discrete Fredholm equation using
the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)``
(note, the number of eigenvalues may be smaller than the number
of grid points because of the selections mechanism implemented
by the value of ``sig_min``)
scales like len(t_array)*num_gp*num_ev
"""
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return np.tensordot(tmp, 1/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
def u_i_all_mem_save(self, delta_t_fac):
"""efficient evaluation of the interpolated eigen function on special subgrids"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.eig_func_all_interp(delta_t_fac = delta_t_fac,
time_axis = self._s,
alpha_k = alpha_k,
weights = self._w,
eigen_val = self._eig_val,
eigen_vec = self._eig_vec)
# print("WARNING! this needs to be cythonized")
# u_res = np.zeros(shape=(N2, self.num_ev()), dtype=np.complex)
# for i in range(self.num_ev()):
# for j in range(N2):
# for l in range(N1):
# k = j - a*l + N2-1
# u_res[j, i] += self._w[l] * alpha_k[k] * self._eig_vec[l, i]
#
# u_res[:, i] /= self._eig_val[i]
#
# return u_res
def t_mem_save(self, delta_t_fac):
T = self._s[-1]
N = len(self._s)
return np.linspace(0, T, delta_t_fac*(N-1) + 1)
def eigen_vector_i(self, i):
r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)"""
return self._eig_vec[:,i]
def eigen_vector_i_all(self):
r"""Returns all eigenvectors (solutions of the discrete Fredhom equation)
Note: Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
"""
return self._eig_vec
def lambda_i(self, i):
r"""Returns the i-th eigenvalue."""
return self._eig_val[i]
def lambda_i_all(self):
r"""Returns all eigenvalues."""
return self._eig_val
def num_ev(self):
r"""Returns the number of eigenvalues / eigenfunctions used
Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
"""
return self._num_ev
def recons_corr(self, t_array):
r"""computes the interpolated correlation functions
For the Karhunen-Loève expansion of a stochastic process the
correlation function can be expressed as follows:
.. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s)
With that one can do a consistency check whether the finite set of basis functions
for the expansion (the solutions of the discrete Fredholm equation) is good
enough to reproduce the given correlation function.
"""
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def recons_corr_single_s(self, t_array, s):
assert False, "something is wrong here"
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0]
def recons_corr_memsave(self, delta_t_fac):
u_i_all_t = self.u_i_all_mem_save(delta_t_fac) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def get_num_ef(self, rel_threshold):
G = self._sqrt_eig_val
return get_num_ef(G, rel_threshold)
def get_largest_indices(self, rel_threshold):
G = self._sqrt_eig_val
return get_largest_indices(G, rel_threshold)
def check_integral_eq(self, index, delta_t_fac = 4, num_t = 50):
t = self.t_mem_save(delta_t_fac)
u_t_discrete = self.u_i_mem_save(delta_t_fac, index)
tmax = self._s[-1]
G = self._sqrt_eig_val[index]
bcf = self._r_tau
data, norm = check_integral_eq(G, u_t_discrete, t, tmax, bcf, num_t)
return u_t_discrete, data, norm
def get_num_ef(G, rel_threshold):
# print("WARNING: debugging check for sorted G still active!")
# g_old = np.Inf
# for g in G:
# assert g_old >= g
# g_old = g
# G must be in decreasing order
return int(sum(G/max(G) >= rel_threshold))
def get_largest_indices(G, rel_threshold):
print("WARNING: debugging check for sorted G still active!")
g_old = np.Inf
for g in G:
assert g_old >= g
g_old = g
# G must be in decreasing order
idx = sum(G/max(G) >= rel_threshold)
idx_selection = np.arange(0, idx)
return idx_selection
def check_integral_eq(G, U, t_U, tmax, bcf, num_t=50, limit=5000, c=None):
u_t = ComplexInterpolatedUnivariateSpline(t_U, U, k=3)
data = np.empty(shape=(num_t, 2), dtype = np.complex128)
tau = np.linspace(0, tmax, num_t)
for i, tau_ in enumerate(tau):
data[i, 0] = complex_quad(lambda s: bcf(tau_-s) * u_t(s), 0, tmax, limit=limit)
data[i, 1] = G**2*u_t(tau_)
if c is not None:
with c.get_lock():
c.value += 1
norm = quad(lambda s: np.abs(u_t(s))**2, 0, tmax, limit=limit)[0]
return data, norm
def mean_error(r_t_s, r_t_s_exact):
r"""mean error of the correlation function as function of s
.. math:: \mathrm{err} = \frac{1}{T}\int_0^T |r_\mathrm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt
:return: the mean error ``err``
"""
err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0)
return err
def max_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0)
def max_rel_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact))
def recons_corr_and_get_bcf(T, ng, w, eig_val, eig_vec, bcf):
"""
doing things here again for efficiency reasons
"""
delta_t_fac = 2
N1 = ng
N2 = delta_t_fac * (N1 - 1) + 1
bcf_n_plus = bcf(np.linspace(0, T, N2))
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
u_i_all_t = stocproc_c.eig_func_all_interp(delta_t_fac = delta_t_fac,
time_axis = np.linspace(0, T, N1),
alpha_k = bcf_n,
weights = w,
eigen_val = eig_val,
eigen_vec = eig_vec)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
num_ev = len(eig_val)
tmp = eig_val.reshape(1, num_ev) * u_i_all_t #(N_gp, N_ev)
recs_bcf = np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
refc_bcf = np.empty(shape=(N2,N2), dtype = np.complex128)
for i in range(N2):
idx = N2-1-i
refc_bcf[:,i] = bcf_n[idx:idx+N2]
return recs_bcf, refc_bcf
def auto_grid_points(r_tau, t_max, tol = 1e-3, err_method = max_rel_error, name = 'simpson', sig_min = 1e-6, verbose=1):
err = 1
c = 2
seed = None
err_method_name = err_method.__name__
if verbose > 0:
print("start auto_grid_points, determine ng ...")
#exponential increase to get below error threshold
while err > tol:
c *= 2
ng = 2*c + 1
ng_fine = ng*2-1
if verbose == 1:
print("ng:{} new proc ({}) ... ".format(ng, name), end='')
sys.stdout.flush()
if verbose > 1:
print("#"*40)
print("c", c, "ng", ng)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min, verbose-1)
if verbose > 1:
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s, r_t_s_exact = recons_corr_and_get_bcf(T = t_max,
ng = ng,
w = stoc_proc._w,
eig_val = stoc_proc._eig_val,
eig_vec = stoc_proc._eig_vec,
bcf = r_tau)
if verbose > 1:
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
if verbose > 0:
print("err {:.3e}".format(err))
c_low = c // 2
c_high = c
while (c_high - c_low) > 1:
if verbose > 1:
print("#"*40)
print("c_low", c_low)
print("c_high", c_high)
c = (c_low + c_high) // 2
ng = 2*c + 1
ng_fine = ng * 2 - 1
if verbose > 1:
print("c", c)
print("ng", ng)
print("ng_fine", ng_fine)
if verbose == 1:
print("ng:{} new proc ({}) ... ".format(ng, name), end='')
sys.stdout.flush()
if verbose > 1:
print("new process with {} weights ({} points)...".format(name, ng))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min, verbose-1)
if verbose > 1:
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s, r_t_s_exact = recons_corr_and_get_bcf(T = t_max,
ng = ng,
w = stoc_proc._w,
eig_val = stoc_proc._eig_val,
eig_vec = stoc_proc._eig_vec,
bcf = r_tau)
if verbose > 1:
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
if verbose > 0:
print("err {:.3e}".format(err))
if err > tol:
if verbose > 1:
print(" err > tol!")
print(" c_low -> ", c)
c_low = c
else:
if verbose > 1:
print(" err <= tol!")
print(" c_high -> ", c)
c_high = c
return ng

View file

@ -1,101 +0,0 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
"""
module to generate the weights and nodes for Guass quadrature
inspired by pyOrthpol (https://pypi.python.org/pypi/orthpol)
as well as the original fortran resource from
Gautschi, W. (1994). Algorithm 726:
ORTHPOLa package of routines for generating orthogonal polynomials
and Gauss-type quadrature rules.
ACM Transactions on Mathematical Software (TOMS), 20(1), 2162.
doi:10.1145/174603.174605
"""
import numpy as np
import numpy.polynomial as pln
from scipy.linalg import eig_banded
from scipy.special import gamma
def _recur_laguerre(n, al=0.):
r"""Calculate the recursion coefficients leading to the
Laguerre polynomials motivated by the Gauss quadrature
formula for integrals with exponential weights ~exp(-x)
see Theodore Seio Chihara,
An Introduction to Orthogonal Polynomials, 1978, p.217
"""
nrange = np.arange(n)
a = 2*nrange + al + 1
b = nrange*(nrange+al)
b[0] = gamma(al + 1.)
return (a, b)
def gauss_nodes_weights_laguerre(n, al=0.):
r"""
.. math::
\int_0^\infty dx \; f(x) x^\alpha \exp(-x) ~= \sum_{i=1}^n w_i f(x_i)
"""
a, b = _recur_laguerre(n, al)
return _gauss_nodes_weights(a, b)
def _recur_legendre(n):
nrange = np.arange(n, dtype = float)
a = np.zeros(n)
b = nrange**2 / ((2*nrange - 1)*(2*nrange + 1))
b[0] = 2
return (a, b)
def gauss_nodes_weights_legendre(n, low=-1, high=1):
r"""
.. math::
\int_{-1}^{1} dx \; f(x) ~= \sum_{i=1}^n w_i f(x_i)
"""
a, b = _recur_legendre(n)
x, w= _gauss_nodes_weights(a, b)
fac = (high-low)/2
return (x + 1)*fac + low, fac*w
def _gauss_nodes_weights(a,b):
r"""Calculate the nodes and weights for given
recursion coefficients assuming a normalized
weights functions.
see Walter Gautschi, Algorithm 726: ORTHPOL;
a Package of Routines for Generating Orthogonal
Polynomials and Gauss-type Quadrature Rules, 1994
"""
assert len(a) == len(b)
a_band = np.vstack((np.sqrt(b),a))
w, v = eig_banded(a_band)
nodes = w # eigenvalues
weights = b[0] * v[0,:]**2 # first component of each eigenvector
# the prefactor b[0] from the original paper
# accounts for the weights of unnormalized weight functions
return nodes, weights
def get_poly(a, b):
n = len(a)
assert len(b) == n
p = []
p.append( 0 )
p.append( pln.Polynomial(coef=(1,)) )
x = pln.Polynomial(coef=(0,1))
for i in range(n):
p_i = (x - a[i]) * p[-1] - b[i] * p[-2]
p.append( p_i )
return p[1:]

View file

@ -1,117 +0,0 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
import numpy as np
from scipy.optimize import brentq
from . import class_stocproc
def get_param_single_lorentz(tmax, dw_max, eta, gamma, wc, x=1e-4, verbose=0):
d = gamma * np.sqrt(1/x - 1)
w_min = wc - d
w_max = wc + d
if verbose > 0:
print('w_min :{:.3}'.format(w_min))
print('w_max :{:.3}'.format(w_max))
C = (w_max - w_min)*tmax / 2 / np.pi
N = int(np.ceil((2 + C)/2 + np.sqrt( (2+C)**2 / 4 - 1)))
dw = (w_max - w_min)/N
if verbose > 0:
print('N: {}'.format(N))
print('-> dw: {:.3}'.format(dw))
if dw <= dw_max:
if verbose > 0:
print('dw <= dw_max: {:.3}'.format(dw_max))
return N, w_min, tmax
else:
if verbose > 0:
print('dw > dw_max: {:.3}'.format(dw_max))
print('adjust tmax and N to fulfill dw <= dw_max')
N = int(np.ceil((w_max - w_min) / dw_max)) - 1
dt = 2*np.pi / (dw_max*N)
tmax_ = dt*N
if verbose > 0:
print('N: {}'.format(N))
print('-> tmax: {:.3}'.format(tmax_))
assert tmax_ > tmax, "tmax_={} > tmax={} FAILED".format(tmax_, tmax)
return N, w_min, tmax
def get_param_ohmic(t_max, spec_dens, x=1e-12, verbose=0):
fmin_spec_dens = lambda w: abs(spec_dens(w)) - spec_dens.maximum_val()*x
w_pos = spec_dens.maximum_at()
w_neg = 2*w_pos
while fmin_spec_dens(w_neg) > 0:
w_neg *= 2
omega_max = brentq(fmin_spec_dens, w_pos, w_neg)
if verbose > 0:
print("omega_max from threshold condition: J(w_max) = x = {:.3g} <-> w_max = {:.3g}".format(x, omega_max))
dw = np.pi / t_max
if verbose > 0:
print("dw:{:.3}".format(dw))
ng_omega = np.ceil(omega_max / dw) # next larger integer
ng_omega = np.ceil(ng_omega / 2) * 2 - 1 # next lager odd integer
ng_t = (ng_omega + 1) / 2 # so this becomes an integer
delta_t = 2 * np.pi / (dw * ng_omega)
sp_t_max = ng_t * delta_t
if verbose > 0:
print("result ng_t: {}".format(ng_t))
print("result sp_t_max: {:.3g}".format(sp_t_max))
return ng_t, sp_t_max
def show_ohmic_sp(ng_t, sp_t_max, spec_dens, seed, ax, t_max):
try:
n = len(ng_t)
except:
n = None
try:
m = len(sp_t_max)
except:
m = None
if (n is None) and m is not None:
ng_t = [ng_t] * m
elif (n is not None) and m is None:
sp_t_max = [sp_t_max] * n
elif (n is not None) and (m is not None):
if n != m:
raise ValueError("len(ng_t) == len(sp_t_max) FAILED")
else:
ng_t = [ng_t]
sp_t_max = [sp_t_max]
for i in range(len(ng_t)):
spfft = class_stocproc.StocProc_FFT(spectral_density = spec_dens,
t_max = sp_t_max[i],
num_grid_points = ng_t[i],
seed = seed)
spfft.new_process()
t = np.linspace(0, sp_t_max[i], ng_t[i])
t_interp = np.linspace(0, sp_t_max[i], 10*ng_t[i])
t = t[np.where(t < t_max)]
t_interp = t_interp[np.where(t_interp < t_max)]
eta = spfft(t)
eta_interp = spfft(t_interp)
p, = ax.plot(t_interp, np.real(eta_interp))
ax.plot(t_interp, np.imag(eta_interp), color=p.get_color(), ls='--')
ax.plot(t, np.real(eta), color=p.get_color(), ls='', marker = '.')
ax.plot(t, np.imag(eta), color=p.get_color(), ls='', marker = '.')

View file

@ -35,327 +35,7 @@ def complex_quad(func, a, b, **kw_args):
I_im = quad(func_im, a, b, **kw_args)[0]
return I_re + 1j * I_im
def solve_hom_fredholm(r, w, eig_val_min, verbose=1):
r"""Solves the discrete homogeneous Fredholm equation of the second kind
.. math:: \int_0^{t_\mathrm{max}} \mathrm{d}s R(t-s) u(s) = \lambda u(t)
Quadrature approximation of the integral gives a discrete representation of the
basis functions and leads to a regular eigenvalue problem.
.. math:: \sum_i w_i R(t_j-s_i) u(s_i) = \lambda u(t_j) \equiv \mathrm{diag(w_i)} \cdot R \cdot u = \lambda u
Note: If :math:`t_i = s_i \forall i` the matrix :math:`R(t_j-s_i)` is
a hermitian matrix. In order to preserve hermiticity for arbitrary :math:`w_i`
one defines the diagonal matrix :math:`D = \mathrm{diag(\sqrt{w_i})}`
with leads to the equivalent expression:
.. math:: D \cdot R \cdot D \cdot D \cdot u = \lambda D \cdot u \equiv \tilde R \tilde u = \lambda \tilde u
where :math:`\tilde R` is hermitian and :math:`u = D^{-1}\tilde u`
Setting :math:`t_i = s_i` and due to the definition of the correlation function the
matrix :math:`r_{ij} = R(t_i, s_j)` is hermitian.
:param r: correlation matrix :math:`R(t_j-s_i)`
:param w: integrations weights :math:`w_i`
(they have to correspond to the discrete time :math:`t_i`)
:param eig_val_min: discards all eigenvalues and eigenvectos with
:math:`\lambda_i < \mathtt{eig\_val\_min} / \mathrm{max}(\lambda)`
:return: eigenvalues, eigenvectos (eigenvectos are stored in the normal numpy fashion, ordered in decreasing order)
"""
# weighted matrix r due to quadrature weights
if verbose > 0:
print("build matrix ...")
# d = np.diag(np.sqrt(w))
# r = np.dot(d, np.dot(r, d))
n = len(w)
w_sqrt = np.sqrt(w)
r = w_sqrt.reshape(n,1) * r * w_sqrt.reshape(1,n)
if verbose > 0:
print("solve eigenvalue equation ...")
eig_val, eig_vec = scipy_eigh(r, overwrite_a=True) # eig_vals in ascending
# use only eigenvalues larger than sig_min**2
min_idx = sum(eig_val < eig_val_min)
eig_val = eig_val[min_idx:][::-1]
eig_vec = eig_vec[:, min_idx:][:, ::-1]
num_of_functions = len(eig_val)
if verbose > 0:
print("use {} / {} eigenfunctions (sig_min = {})".format(num_of_functions, len(w), np.sqrt(eig_val_min)))
# inverse scale of the eigenvectors
# d_inverse = np.diag(1/np.sqrt(w))
# eig_vec = np.dot(d_inverse, eig_vec)
eig_vec = np.reshape(1/w_sqrt, (n,1)) * eig_vec
if verbose > 0:
print("done!")
return eig_val, eig_vec
def stochastic_process_kle(r_tau, t, w, num_samples, seed = None, sig_min = 1e-4, verbose=1):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
Simulate :math:`N_\mathrm{S}` wide-sense stationary stochastic processes
with given correlation :math:`R(\tau) = \langle X(t) X^\ast(s) \rangle = R (t-s)`.
Expanding :math:`X(t)` in a KarhunenLoève manner
.. math:: X(t) = \sum_i X_i u_i(t)
with
.. math::
\langle X_i X^\ast_j \rangle = \lambda_i \delta_{i,j} \qquad \langle u_i | u_j \rangle =
\int_0^{t_\mathrm{max}} u_i(t) u^\ast_i(t) dt = \delta_{i,j}
where :math:`\lambda_i` and :math:`u_i` are solutions of the following
homogeneous Fredholm equation of the second kind.
.. math:: \int_0^{t_\mathrm{max}} \mathrm{d}s R(t-s) u(s) = \lambda u(t)
Discrete solutions are provided by :py:func:`solve_hom_fredholm`.
With these solutions and expressing the random variables :math:`X_i` through
independent normal distributed random variables :math:`Y_i` with variance one
the Stochastic Process at discrete times :math:`t_j` can be calculates as follows
.. math:: X(t_j) = \sum_i Y_i \sqrt{\lambda_i} u_i(t_j)
To calculate the Stochastic Process for abitrary times
:math:`t \in [0,t_\mathrm{max}]` use :py:class:`StocProc`.
References:
[1] Kobayashi, H., Mark, B.L., Turin, W.,
2011. Probability, Random Processes, and Statistical Analysis,
Cambridge University Press, Cambridge. (pp. 363)
[2] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
:param r_tau: function object of the one parameter correlation function :math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the grid points given by :py:obj:`t`
:param num_samples: number of stochastic process to sample
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i`
viewed as coefficient for the base function :math:`u_i(t)` must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
:return: returns a 2D array of the shape (num_samples, len(t)). Each row of the returned array contains one sample of the
stochastic process.
"""
if verbose > 0:
print("__ stochastic_process __")
if seed != None:
np.random.seed(seed)
t_row = t
t_col = t_row[:,np.newaxis]
# correlation matrix
# r_tau(t-s) -> integral/sum over s -> s must be row in EV equation
r_old = r_tau(t_col-t_row)
n_ = len(t)
bcf_n_plus = r_tau(t-t[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
assert np.max(np.abs(r_old - r)) < 1e-14
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
eig_val, eig_vec = solve_hom_fredholm(r, w, sig_min**2)
num_of_functions = len(eig_val)
# generate samples
sig = np.sqrt(eig_val).reshape(1, num_of_functions) # variance of the random quantities of the Karhunen-Loève expansion
if verbose > 0:
print("generate samples ...")
x = np.random.normal(scale=1/np.sqrt(2), size=(2*num_samples*num_of_functions)).view(np.complex).reshape(num_of_functions, num_samples).T
# random quantities all aligned for num_samples samples
x_t_array = np.tensordot(x*sig, eig_vec, axes=([1],[1])) # multiplication with the eigenfunctions == base of Karhunen-Loève expansion
if verbose > 0:
print("done!")
return x_t_array
def _stochastic_process_alternative_samples(num_samples, num_of_functions, t, sig, eig_vec, seed):
r"""used for debug and test reasons
generate each sample independently in a for loop
should be slower than using numpy's array operations to do it all at once
"""
np.random.seed(seed)
x_t_array = np.empty(shape=(num_samples, len(t)), dtype = complex)
for i in range(num_samples):
x = np.random.normal(size=num_of_functions) * sig
x_t_array[i,:] = np.dot(eig_vec, x.T)
return x_t_array
def get_mid_point_weights(t_max, num_grid_points):
r"""Returns the time gridpoints and wiehgts for numeric integration via **mid point rule**.
The idea is to use :math:`N_\mathrm{GP}` time grid points located in the middle
each of the :math:`N_\mathrm{GP}` equally distributed subintervals of :math:`[0, t_\mathrm{max}]`.
.. math:: t_i = \left(i + \frac{1}{2}\right)\frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
The corresponding trivial weights for integration are
.. math:: w_i = \Delta t = \frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
:param t_max: end of the interval for the time grid :math:`[0,t_\mathrm{max}]`
(note: this would corespond to an integration from :math:`0-\Delta t / 2`
to :math:`t_\mathrm{max}+\Delta t /2`)
:param num_grid_points: number of
"""
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.ones(num_grid_points)*delta_t
return t, w
def stochastic_process_mid_point_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **mid point rule** for integration
The idea is to use :math:`N_\mathrm{GP}` time grid points located in the middle
each of the :math:`N_\mathrm{GP}` equally distributed subintervals of :math:`[0, t_\mathrm{max}]`.
.. math:: t_i = \left(i + \frac{1}{2}\right)\frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
The corresponding trivial weights for integration are
.. math:: w_i = \Delta t = \frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
Since the autocorrelation function depends solely on the time difference :math:`\tau` the static shift for :math:`t_i` does not
alter matrix used to solve the Fredholm equation. So for the reason of convenience the time grid points are not centered at
the middle of the intervals, but run from 0 to :math:`t_\mathrm{max}` equally distributed.
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t,w = get_trapezoidal_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def get_trapezoidal_weights_times(t_max, num_grid_points):
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.ones(num_grid_points)*delta_t
w[0] /= 2
w[-1] /= 2
return t, w
def stochastic_process_trapezoidal_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **trapezoidal rule** for integration
.. math:: t_i = i \frac{t_\mathrm{max}}{N_\mathrm{GP}} = i \Delta t \qquad i = 0,1, ... N_\mathrm{GP}
The corresponding weights for integration are
.. math:: w_0 = w_{N_\mathrm{GP}} = \frac{\Delta t}{2}, \qquad w_i = \Delta t = \qquad i = 1, ... N_\mathrm{GP} - 1
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t, w = get_trapezoidal_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def get_simpson_weights_times(t_max, num_grid_points):
if num_grid_points % 2 == 0:
raise RuntimeError("simpson weight need odd number of grid points, but git ng={}".format(num_grid_points))
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.empty(num_grid_points, dtype=np.float64)
w[0::2] = 2/3*delta_t
w[1::2] = 4/3*delta_t
w[0] = 1/3*delta_t
w[-1] = 1/3*delta_t
return t, w
def stochastic_process_simpson_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **simpson rule** for integration
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points (need to be odd) used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t, w = get_simpson_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def _FT(f, x_max, N, x_min=0):
"""
calculate g(y) = int_x_min^x_max dx f(x) exp(-1j x y) using fft
when setting x' = x - x_min we get
g(y) = int_0^x_max-x_min dx' f(x'+x_min) exp(-1j x' y) exp(-1j x_min y) = exp(-1j x_min y) * int_0^x_max-x_min dx' f(x'+x_min) exp(-1j x' y)
and in a discrete fashion with N gp such that x'_i = dx * i and dx = (N-1) / (x_max - x_min)
further we find dy = 2pi / N / dx so we get y_k = dy * k
g_k = exp(-1j x_min y_k) * sum_0^N dx f(x_i + x_min) exp(-1j dx * i dy * k)
using dx * dk = 2 pi / N we end up with
g_k = exp(-1j x_min y_k) * dx * sum_0^N f(x_i + x_min) exp(-1j 2 pi i k / N)
= exp(-1j x_min y_k) * dx * FFT( f(x_i + x_min) )
"""
x, dx = np.linspace(x_min, x_max, N, retstep=True)
f_i = f(x)
dy = 2*np.pi / dx / N
y_k = np.linspace(0, dy*(N-1), N)
return np.exp(-1j * x_min * y_k) * dx * np.fft.fft(f_i), y_k
def stochastic_process_fft(spectral_density, t_max, num_grid_points, num_samples, seed = None, verbose=1, omega_min=0):
r"""Simulate Stochastic Process using FFT method