mirror of
https://github.com/vale981/stocproc
synced 2025-03-05 09:41:42 -05:00
intermed work
This commit is contained in:
parent
116071b8ea
commit
8fdf4ef777
4 changed files with 292 additions and 109 deletions
|
@ -0,0 +1 @@
|
|||
from .stocproc import *
|
256
stocproc.py
256
stocproc.py
|
@ -182,18 +182,15 @@ class StocProc(object):
|
|||
# solve discrete Fredholm equation
|
||||
# eig_val = lambda
|
||||
# eig_vec = u(t)
|
||||
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2)
|
||||
self._sqrt_eig_val = np.sqrt(self._eig_val)
|
||||
self._num_ev = len(self._eig_val)
|
||||
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
|
||||
|
||||
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose)
|
||||
else:
|
||||
self.__load(fname)
|
||||
self.__calc_missing()
|
||||
|
||||
self.__calc_missing()
|
||||
|
||||
self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False)
|
||||
self.x = self.my_cache_decorator(self._x)
|
||||
self.new_process(seed)
|
||||
self.new_process(seed = seed)
|
||||
|
||||
@classmethod
|
||||
def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min):
|
||||
|
@ -211,19 +208,19 @@ class StocProc(object):
|
|||
ob.name = name
|
||||
return ob
|
||||
@classmethod
|
||||
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed, sig_min):
|
||||
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
|
||||
t, w = get_trapezoidal_weights_times(t_max, ng)
|
||||
return cls(r_tau, t, w, seed, sig_min)
|
||||
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
|
||||
|
||||
@classmethod
|
||||
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed, sig_min):
|
||||
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
|
||||
t, w = get_mid_point_weights(t_max, ng)
|
||||
return cls(r_tau, t, w, seed, sig_min)
|
||||
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
|
||||
|
||||
@classmethod
|
||||
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed, sig_min):
|
||||
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1):
|
||||
t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max)
|
||||
return cls(r_tau, t, w, seed, sig_min)
|
||||
return cls(r_tau, t, w, seed, sig_min, verbose=verbose)
|
||||
|
||||
def __load(self, fname):
|
||||
with open(fname, 'rb') as f:
|
||||
|
@ -234,7 +231,8 @@ class StocProc(object):
|
|||
self._num_gp = len(self._s)
|
||||
self._sqrt_eig_val = np.sqrt(self._eig_val)
|
||||
self._num_ev = len(self._eig_val)
|
||||
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
|
||||
self._w /= np.sqrt(2)
|
||||
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
|
||||
|
||||
|
||||
def __dump(self, fname):
|
||||
|
@ -259,7 +257,7 @@ class StocProc(object):
|
|||
else:
|
||||
return 'unknown'
|
||||
|
||||
def new_process(self, seed = None):
|
||||
def new_process(self, yi=None, seed=None):
|
||||
r"""setup new process
|
||||
|
||||
Generates a new set of independent normal random variables :math:`Y_i`
|
||||
|
@ -274,7 +272,14 @@ class StocProc(object):
|
|||
np.random.seed(seed)
|
||||
|
||||
self.clear_cache()
|
||||
self._Y = 1/np.sqrt(2)*np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
|
||||
if yi is None:
|
||||
if self.verbose > 1:
|
||||
print("generate samples ...")
|
||||
self._Y = np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
else:
|
||||
self._Y = yi.reshape(self._num_ev,1)
|
||||
|
||||
|
||||
def x_for_initial_time_grid(self):
|
||||
|
@ -285,8 +290,14 @@ class StocProc(object):
|
|||
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
|
||||
same weights :math:`w_i` and time grid points :math:`s_i`.
|
||||
"""
|
||||
tmp = self._Y * self._sqrt_eig_val.reshape(self._num_ev,1)
|
||||
return np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
|
||||
tmp = self._Y / np.sqrt(2) * self._sqrt_eig_val.reshape(self._num_ev,1)
|
||||
if self.verbose > 1:
|
||||
print("calc process via matrix prod ...")
|
||||
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
|
||||
return res
|
||||
|
||||
def time_grid(self):
|
||||
return self._s
|
||||
|
@ -295,9 +306,8 @@ class StocProc(object):
|
|||
if isinstance(t, np.ndarray):
|
||||
return self.x_t_array(t)
|
||||
else:
|
||||
return self._x(t)
|
||||
return self.x(t)
|
||||
|
||||
@functools.lru_cache(maxsize=1024, typed=False)
|
||||
def _x(self, t):
|
||||
# _Y # (N_ev, 1 )
|
||||
tmp = self._Y*self._r_tau(t-self._s.reshape(1, self._num_gp))
|
||||
|
@ -319,7 +329,12 @@ class StocProc(object):
|
|||
# (N_ev, N_gp, N_t)
|
||||
# A # (N_gp, N_ev)
|
||||
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
|
||||
return np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
|
||||
if self.verbose > 1:
|
||||
print("calc process via matrix prod ...")
|
||||
res = np.tensordot(tmp, self._A, axes=([1,0],[0,1]))
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
return res
|
||||
|
||||
|
||||
def u_i(self, t_array, i):
|
||||
|
@ -340,7 +355,7 @@ class StocProc(object):
|
|||
# (N_gp, N_t)
|
||||
# A # (N_gp, N_ev)
|
||||
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
|
||||
return 1/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
|
||||
return np.sqrt(2)/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
|
||||
|
||||
def u_i_all(self, t_array):
|
||||
r"""get all eigenfunctions
|
||||
|
@ -362,7 +377,7 @@ class StocProc(object):
|
|||
# (N_gp, N_t)
|
||||
# A # (N_gp, N_ev)
|
||||
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
|
||||
return np.tensordot(tmp, 1/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
|
||||
return np.tensordot(tmp, np.sqrt(2)/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
|
||||
|
||||
def eigen_vector_i(self, i):
|
||||
r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)"""
|
||||
|
@ -425,19 +440,83 @@ class StocProc(object):
|
|||
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
|
||||
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0]
|
||||
|
||||
class StocProc_KLE(StocProc):
|
||||
def __init__(self, r_tau, t_max, num_grid_points, ng_fredholm=None, seed=None, sig_min=1e-5, verbose=1):
|
||||
"""
|
||||
class to simulate stocastic processes using KLE method
|
||||
- solve fredholm equation on grid with ng_fregholm nodes (trapezoidal_weights)
|
||||
- calculate process (using interpolation solution of fredholm equation) with num_grid_points nodes
|
||||
- ivoke cubic spline interpolator when calling
|
||||
"""
|
||||
|
||||
if ng_fredholm is None:
|
||||
ng_fredholm = num_grid_points
|
||||
|
||||
if num_grid_points < ng_fredholm:
|
||||
print("WARNING: found 'num_grid_points < ng_fredholm' -> set 'num_grid_points == ng_fredholm'")
|
||||
|
||||
if ng_fredholm == num_grid_points:
|
||||
self.kle_interp = False
|
||||
else:
|
||||
self.kle_interp = True
|
||||
|
||||
self.stocproc = StocProc.new_instance_with_trapezoidal_weights(r_tau = r_tau,
|
||||
t_max = t_max,
|
||||
ng = ng_fredholm,
|
||||
sig_min = sig_min,
|
||||
seed = seed,
|
||||
verbose = verbose)
|
||||
self._r_tau = self.stocproc._r_tau
|
||||
self._s = self.stocproc._s
|
||||
self._A = self.stocproc._A
|
||||
self.stocproc.verbose = verbose
|
||||
self.verbose = verbose
|
||||
self.t = np.linspace(0, t_max, num_grid_points)
|
||||
self._z = self.__cal_z()
|
||||
if self.verbose > 1:
|
||||
print("setup interpolator ...")
|
||||
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
|
||||
def __cal_z(self):
|
||||
if self.kle_interp:
|
||||
return self.stocproc.x_t_array(self.t)
|
||||
else:
|
||||
return self.stocproc.x_for_initial_time_grid()
|
||||
|
||||
def __call__(self, t):
|
||||
return self.interpolator(t)
|
||||
|
||||
def get_time(self):
|
||||
return self.t
|
||||
|
||||
def get_z(self):
|
||||
return self._z
|
||||
|
||||
def new_process(self, yi = None, seed = None):
|
||||
self.stocproc.new_process(yi=yi, seed=seed)
|
||||
self._z = self.__cal_z()
|
||||
if self.verbose > 1:
|
||||
print("setup interpolator ...")
|
||||
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
|
||||
|
||||
|
||||
# def reconst_corr_zero(self, t_array):
|
||||
class StocProc_FFT(object):
|
||||
r"""
|
||||
Simulate Stochastic Process using FFT method
|
||||
"""
|
||||
def __init__(self, spectral_density, t_max, num_grid_points, seed = None, verbose=1):
|
||||
self.verbose = verbose
|
||||
self.t_max = t_max
|
||||
def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=1):
|
||||
self.verbose = verbose
|
||||
self.t_max = t_max
|
||||
self.num_grid_points = num_grid_points
|
||||
self.n_dft = self.num_grid_points * 2 - 1
|
||||
delta_t = self.t_max / (self.num_grid_points-1)
|
||||
self.delta_omega = 2 * np.pi / (delta_t * self.n_dft)
|
||||
self.n_dft = self.num_grid_points * 2 - 1
|
||||
delta_t = self.t_max / (self.num_grid_points-1)
|
||||
self.delta_omega = 2 * np.pi / (delta_t * self.n_dft)
|
||||
|
||||
#omega axis
|
||||
omega = self.delta_omega*np.arange(self.n_dft)
|
||||
|
@ -445,36 +524,56 @@ class StocProc_FFT(object):
|
|||
self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 = np.sqrt(spectral_density(omega)) * np.sqrt(self.delta_omega) / np.sqrt(2)
|
||||
|
||||
if self.verbose > 0:
|
||||
print(" omega_max : {:.2}".format(self.delta_omega * self.n_dft))
|
||||
print(" delta_omega: {:.2}".format(self.delta_omega))
|
||||
print("stoc proc fft, spectral density sampling information:")
|
||||
print(" t_max :", (t_max))
|
||||
print(" ng :", (num_grid_points))
|
||||
|
||||
print(" omega_max :", (self.delta_omega * self.n_dft))
|
||||
print(" delta_omega:", (self.delta_omega))
|
||||
|
||||
self.interpolator = None
|
||||
self.t = np.linspace(0, self.t_max, self.num_grid_points)
|
||||
self.new_process(seed = seed)
|
||||
|
||||
def __call__(self, t):
|
||||
if self.interpolator is None:
|
||||
if self.verbose > 1:
|
||||
print("setup interpolator ...")
|
||||
self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3)
|
||||
if self.verbose > 1:
|
||||
print("done!")
|
||||
return self.interpolator(t)
|
||||
|
||||
def get_time(self):
|
||||
return self.t
|
||||
|
||||
def get_z(self):
|
||||
return self._z
|
||||
|
||||
def new_process(self, seed = None):
|
||||
def new_process(self, yi = None, seed = None):
|
||||
self.interpolator = None
|
||||
if seed != None:
|
||||
if self.verbose > 0:
|
||||
print("use seed", seed)
|
||||
np.random.seed(seed)
|
||||
#random complex normal samples
|
||||
yi = np.random.normal(size = 2*self.n_dft).view(np.complex)
|
||||
if yi is None:
|
||||
#random complex normal samples
|
||||
if self.verbose > 1:
|
||||
print("generate samples ...")
|
||||
yi = np.random.normal(size = 2*self.n_dft).view(np.complex)
|
||||
if self.verbose > 1:
|
||||
print("done")
|
||||
#each row contain a different integrand
|
||||
weighted_integrand = self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 * yi
|
||||
#compute integral using fft routine
|
||||
self.z_ast = np.fft.fft(weighted_integrand)[0:self.num_grid_points]
|
||||
if self.verbose > 1:
|
||||
print("calc process via fft ...")
|
||||
self._z = np.fft.fft(weighted_integrand)[0:self.num_grid_points]
|
||||
if self.verbose > 1:
|
||||
print("done")
|
||||
|
||||
|
||||
def x_for_initial_time_grid(self):
|
||||
return self.z_ast
|
||||
|
||||
def __call__(self, t):
|
||||
if self.interpolator is None:
|
||||
self.interpolator = ComplexInterpolatedUnivariateSpline(self.get_time_axis(), self.z_ast, k=3)
|
||||
|
||||
return self.interpolator(t)
|
||||
|
||||
def get_time_axis(self):
|
||||
#corresponding time axis
|
||||
return np.linspace(0, self.t_max, self.num_grid_points)
|
||||
|
||||
|
||||
def _mean_error(r_t_s, r_t_s_exact):
|
||||
|
@ -484,64 +583,65 @@ def _mean_error(r_t_s, r_t_s_exact):
|
|||
|
||||
:return: the mean error ``err``
|
||||
"""
|
||||
len_t, len_s = r_t_s.shape
|
||||
abs_sqare = np.abs(r_t_s - r_t_s_exact)**2
|
||||
|
||||
abs_sqare[0,:] /= 2
|
||||
abs_sqare[-1,:] /= 2
|
||||
|
||||
err = np.sum(abs_sqare, axis = 0) / len_t
|
||||
err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0)
|
||||
return err
|
||||
|
||||
def _max_error(r_t_s, r_t_s_exact):
|
||||
return np.max(np.abs(r_t_s - r_t_s_exact))
|
||||
return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0)
|
||||
|
||||
def _max_rel_error(r_t_s, r_t_s_exact):
|
||||
return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact))
|
||||
|
||||
def auto_grid_points(r_tau, t_max, ng_interpolation, tol = 1e-8, err_method = _max_error, name = 'mid_point', sig_min = 1e-4):
|
||||
def auto_grid_points(r_tau, t_max, tol = 1e-8, err_method = _max_error, name = 'mid_point', sig_min = 1e-4):
|
||||
err = 1
|
||||
ng = 1
|
||||
ng = 64
|
||||
seed = None
|
||||
t_large = np.linspace(0, t_max, ng_interpolation)
|
||||
err_method_name = err_method.__name__
|
||||
print("start auto_grid_points, determine ng ...")
|
||||
#exponential increase to get below error threshold
|
||||
while err > tol:
|
||||
ng *= 2
|
||||
ng_fine = ng*10
|
||||
t_fine = np.linspace(0, t_max, ng_fine)
|
||||
print("#"*40)
|
||||
print("new process with {} weights ...".format(name))
|
||||
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
|
||||
print(" new process with {} weights".format(stoc_proc.get_name()))
|
||||
r_t_s = stoc_proc.recons_corr(t_large)
|
||||
r_t_s_exact = r_tau(t_large.reshape(ng_interpolation,1) - t_large.reshape(1, ng_interpolation))
|
||||
|
||||
err = err_method(r_t_s, r_t_s_exact)
|
||||
|
||||
print(" ng {} -> err {:.2e}".format(ng, err))
|
||||
print("reconstruct correlation function ({} points)...".format(ng_fine))
|
||||
r_t_s = stoc_proc.recons_corr(t_fine)
|
||||
print("calculate exact correlation function ...")
|
||||
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
|
||||
print("calculate error using {} ...".format(err_method_name))
|
||||
err = np.max(err_method(r_t_s, r_t_s_exact))
|
||||
print("ng {} -> err {:.3e}".format(ng, err))
|
||||
|
||||
ng_low = ng // 2
|
||||
ng_high = ng
|
||||
|
||||
while (ng_high - ng_low) > 1:
|
||||
print("#"*40)
|
||||
print(" ng_l", ng_low)
|
||||
print(" ng_h", ng_high)
|
||||
print("ng_l", ng_low)
|
||||
print("ng_h", ng_high)
|
||||
ng = (ng_low + ng_high) // 2
|
||||
print(" ng", ng)
|
||||
|
||||
print("ng", ng)
|
||||
ng_fine = ng*10
|
||||
t_fine = np.linspace(0, t_max, ng_fine)
|
||||
print("new process with {} weights ...".format(name))
|
||||
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min)
|
||||
|
||||
r_t_s = stoc_proc.recons_corr(t_large)
|
||||
r_t_s_exact = r_tau(t_large.reshape(ng_interpolation,1) - t_large.reshape(1, ng_interpolation))
|
||||
|
||||
err = err_method(r_t_s, r_t_s_exact)
|
||||
|
||||
print(" ng {} -> err {:.2e}".format(ng, err))
|
||||
print("reconstruct correlation function ({} points)...".format(ng_fine))
|
||||
r_t_s = stoc_proc.recons_corr(t_fine)
|
||||
print("calculate exact correlation function ...")
|
||||
r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine))
|
||||
print("calculate error using {} ...".format(err_method_name))
|
||||
err = np.max(err_method(r_t_s, r_t_s_exact))
|
||||
print("ng {} -> err {:.3e}".format(ng, err))
|
||||
if err > tol:
|
||||
print(" err > tol!")
|
||||
print(" ng_l -> ", ng)
|
||||
print(" err > tol!")
|
||||
print(" ng_l -> ", ng)
|
||||
ng_low = ng
|
||||
else:
|
||||
print(" err <= tol!")
|
||||
print(" ng_h -> ", ng)
|
||||
print(" err <= tol!")
|
||||
print(" ng_h -> ", ng)
|
||||
ng_high = ng
|
||||
|
||||
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
import gquad
|
||||
from scipy.integrate import quad
|
||||
import numpy as np
|
||||
import numpy.polynomial as pln
|
||||
import pytest
|
||||
|
||||
import os
|
||||
import sys
|
||||
path = os.path.dirname(__file__)
|
||||
sys.path.append(path)
|
||||
import gquad
|
||||
|
||||
|
||||
def scp_laguerre(p1,p2):
|
||||
f = lambda x: p1(x)*p2(x)*np.exp(-x)
|
||||
|
|
137
test_stocproc.py
137
test_stocproc.py
|
@ -182,6 +182,7 @@ def test_func_vs_class_KLE_FFT():
|
|||
sig_min = sig_min)
|
||||
x_t_array_class = stoc_proc.x_for_initial_time_grid()
|
||||
|
||||
print(np.max(np.abs(x_t_array_func - x_t_array_class)))
|
||||
assert np.all(x_t_array_func == x_t_array_class), "stochastic_process_kle vs. StocProc Class not identical"
|
||||
|
||||
x_t_array_func, t = sp.stochastic_process_fft(spectral_density = J,
|
||||
|
@ -195,7 +196,7 @@ def test_func_vs_class_KLE_FFT():
|
|||
num_grid_points = ng,
|
||||
seed = seed)
|
||||
|
||||
x_t_array_class = stoc_proc.x_for_initial_time_grid()
|
||||
x_t_array_class = stoc_proc.get_z()
|
||||
|
||||
print(np.max(np.abs(x_t_array_func - x_t_array_class)))
|
||||
assert np.all(x_t_array_func == x_t_array_class), "stochastic_process_fft vs. StocProc Class not identical"
|
||||
|
@ -214,7 +215,7 @@ def test_stochastic_process_KLE_interpolation(plot=False):
|
|||
ng_fine = ng*3
|
||||
|
||||
seed = 0
|
||||
sig_min = 1e-2
|
||||
sig_min = 1e-5
|
||||
|
||||
stoc_proc = sp.StocProc.new_instance_by_name(name = 'trapezoidal',
|
||||
r_tau = r_tau,
|
||||
|
@ -229,13 +230,12 @@ def test_stochastic_process_KLE_interpolation(plot=False):
|
|||
ns = 6000
|
||||
|
||||
x_t_samples = np.empty(shape=(ns, ng_fine), dtype=np.complex)
|
||||
|
||||
|
||||
print("generate samples ...")
|
||||
for n in range(ns):
|
||||
if n % 100 == 0:
|
||||
print(n, ns)
|
||||
stoc_proc.new_process()
|
||||
x_t_samples[n] = stoc_proc(finer_t)
|
||||
|
||||
print("done!")
|
||||
ac_kle_int_conj, ac_kle_int_not_conj = sp.auto_correlation(x_t_samples)
|
||||
|
||||
t_grid = np.linspace(0, t_max, ng_fine)
|
||||
|
@ -293,6 +293,93 @@ def test_stochastic_process_KLE_interpolation(plot=False):
|
|||
assert max_diff_not_conj < 4e-2
|
||||
assert max_diff_conj < 4e-2
|
||||
|
||||
def test_stocproc_KLE_splineinterpolation(plot=False):
|
||||
s_param = 1
|
||||
gamma_s_plus_1 = gamma(s_param+1)
|
||||
# two parameter correlation function -> correlation matrix
|
||||
r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1)
|
||||
# time interval [0,T]
|
||||
t_max = 15
|
||||
# number of subintervals
|
||||
# leads to N+1 grid points
|
||||
ng_fredholm = 60
|
||||
ng_kle_interp = ng_fredholm*3
|
||||
ng_fine = ng_fredholm*30
|
||||
|
||||
seed = 0
|
||||
sig_min = 1e-5
|
||||
stoc_proc = sp.StocProc_KLE(r_tau, t_max, ng_kle_interp, ng_fredholm, seed, sig_min)
|
||||
|
||||
finer_t = np.linspace(0, t_max, ng_fine)
|
||||
|
||||
ns = 6000
|
||||
|
||||
ac_conj = np.zeros(shape=(ng_fine, ng_fine), dtype=np.complex)
|
||||
ac_not_conj = np.zeros(shape=(ng_fine, ng_fine), dtype=np.complex)
|
||||
print("generate samples ...")
|
||||
for n in range(ns):
|
||||
stoc_proc.new_process()
|
||||
x_t = stoc_proc(finer_t)
|
||||
ac_conj += x_t.reshape(ng_fine, 1) * np.conj(x_t.reshape(1, ng_fine))
|
||||
ac_not_conj += x_t.reshape(ng_fine, 1) * x_t.reshape(1, ng_fine)
|
||||
print("done!")
|
||||
ac_conj /= ns
|
||||
ac_not_conj /= ns
|
||||
|
||||
t_grid = np.linspace(0, t_max, ng_fine)
|
||||
ac_true = r_tau(t_grid.reshape(ng_fine, 1) - t_grid.reshape(1, ng_fine))
|
||||
|
||||
max_diff_conj = np.max(np.abs(ac_true - ac_conj))
|
||||
print("max diff <x(t) x^ast(s)>: {:.2e}".format(max_diff_conj))
|
||||
|
||||
max_diff_not_conj = np.max(np.abs(ac_not_conj))
|
||||
print("max diff <x(t) x(s)>: {:.2e}".format(max_diff_not_conj))
|
||||
|
||||
if plot:
|
||||
v_min_real = np.floor(np.min(np.real(ac_true)))
|
||||
v_max_real = np.ceil(np.max(np.real(ac_true)))
|
||||
|
||||
v_min_imag = np.floor(np.min(np.imag(ac_true)))
|
||||
v_max_imag = np.ceil(np.max(np.imag(ac_true)))
|
||||
|
||||
fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(10,12))
|
||||
ax[0,0].set_title(r"exact $\mathrm{re}\left(\langle x(t) x^\ast(s) \rangle\right)$")
|
||||
ax[0,0].imshow(np.real(ac_true), interpolation='none', vmin=v_min_real, vmax=v_max_real)
|
||||
ax[0,1].set_title(r"exact $\mathrm{im}\left(\langle x(t) x^\ast(s) \rangle\right)$")
|
||||
ax[0,1].imshow(np.imag(ac_true), interpolation='none', vmin=v_min_imag, vmax=v_max_imag)
|
||||
|
||||
ax[1,0].set_title(r"KLE $\mathrm{re}\left(\langle x(t) x^\ast(s) \rangle\right)$")
|
||||
ax[1,0].imshow(np.real(ac_conj), interpolation='none', vmin=v_min_real, vmax=v_max_real)
|
||||
ax[1,1].set_title(r"KLE $\mathrm{im}\left(\langle x(t) x^\ast(s) \rangle\right)$")
|
||||
ax[1,1].imshow(np.imag(ac_conj), interpolation='none', vmin=v_min_imag, vmax=v_max_imag)
|
||||
|
||||
ax[2,0].set_title(r"KLE $\mathrm{re}\left(\langle x(t) x(s) \rangle\right)$")
|
||||
ax[2,0].imshow(np.real(ac_not_conj), interpolation='none', vmin=v_min_real, vmax=v_max_real)
|
||||
ax[2,1].set_title(r"KLE $\mathrm{im}\left(\langle x(t) x(s) \rangle\right)$")
|
||||
ax[2,1].imshow(np.imag(ac_not_conj), interpolation='none', vmin=v_min_imag, vmax=v_max_imag)
|
||||
|
||||
ax[0,2].set_title(r"FFT log rel diff")
|
||||
im02 = ax[0,2].imshow(np.log10(np.abs(ac_conj - ac_true) / np.abs(ac_true)), interpolation='none')
|
||||
divider02 = make_axes_locatable(ax[0,2])
|
||||
cax02 = divider02.append_axes("right", size="10%", pad=0.05)
|
||||
cbar02 = plt.colorbar(im02, cax=cax02)
|
||||
|
||||
ax[1,2].set_title(r"FFT rel diff")
|
||||
im12 = ax[1,2].imshow(np.abs(ac_conj - ac_true) / np.abs(ac_true), interpolation='none')
|
||||
divider12 = make_axes_locatable(ax[1,2])
|
||||
cax12 = divider12.append_axes("right", size="10%", pad=0.05)
|
||||
cbar12 = plt.colorbar(im12, cax=cax12)
|
||||
|
||||
ax[2,2].set_title(r"FFT abs diff")
|
||||
im22 = ax[2,2].imshow(np.abs(ac_conj - ac_true), interpolation='none')
|
||||
divider22 = make_axes_locatable(ax[2,2])
|
||||
cax22 = divider22.append_axes("right", size="10%", pad=0.05)
|
||||
cbar22 = plt.colorbar(im22, cax=cax22)
|
||||
|
||||
plt.show()
|
||||
|
||||
assert max_diff_not_conj < 4e-2
|
||||
assert max_diff_conj < 4e-2
|
||||
|
||||
def test_stochastic_process_FFT_interpolation(plot=False):
|
||||
s_param = 1
|
||||
|
@ -320,18 +407,13 @@ def test_stochastic_process_FFT_interpolation(plot=False):
|
|||
|
||||
ac_conj = np.zeros(shape=(ng_fine, ng_fine), dtype=np.complex)
|
||||
ac_not_conj = np.zeros(shape=(ng_fine, ng_fine), dtype=np.complex)
|
||||
|
||||
|
||||
print("generate samples ...")
|
||||
for n in range(ns):
|
||||
if (n % 1000) == 0:
|
||||
print(n, ns)
|
||||
stoc_proc.new_process()
|
||||
x_t = stoc_proc(finer_t)
|
||||
ac_conj += x_t.reshape(ng_fine, 1) * np.conj(x_t.reshape(1, ng_fine))
|
||||
ac_not_conj += x_t.reshape(ng_fine, 1) * x_t.reshape(1, ng_fine)
|
||||
|
||||
|
||||
|
||||
print("done!")
|
||||
ac_conj /= ns
|
||||
ac_not_conj /= ns
|
||||
|
||||
|
@ -479,7 +561,7 @@ def test_auto_grid_points():
|
|||
# time interval [0,T]
|
||||
t_max = 15
|
||||
ng_interpolation = 1000
|
||||
tol = 1e-16
|
||||
tol = 1e-8
|
||||
|
||||
ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, sig_min=0)
|
||||
print(ng)
|
||||
|
@ -506,7 +588,7 @@ def test_chache():
|
|||
for t_i in t.keys():
|
||||
for i in range(t[t_i]):
|
||||
total += 1
|
||||
stocproc.x(t_i)
|
||||
stocproc(t_i)
|
||||
|
||||
ci = stocproc.get_cache_info()
|
||||
assert ci.hits == total - misses
|
||||
|
@ -538,25 +620,23 @@ def test_dump_load():
|
|||
assert np.all(x_t == x_t_2)
|
||||
|
||||
|
||||
|
||||
|
||||
def show_auto_grid_points_result():
|
||||
s_param = 1
|
||||
gamma_s_plus_1 = gamma(s_param+1)
|
||||
# two parameter correlation function -> correlation matrix
|
||||
r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1)
|
||||
# time interval [0,T]
|
||||
t_max = 3
|
||||
t_max = 15
|
||||
ng_interpolation = 1000
|
||||
tol = 1e-10
|
||||
tol = 1e-8
|
||||
seed = None
|
||||
sig_min = 0
|
||||
|
||||
t_large = np.linspace(0, t_max, ng_interpolation)
|
||||
|
||||
name = 'mid_point'
|
||||
name = 'trapezoidal'
|
||||
name = 'gauss_legendre'
|
||||
# name = 'trapezoidal'
|
||||
# name = 'gauss_legendre'
|
||||
|
||||
ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, name=name, sig_min=sig_min)
|
||||
|
||||
|
@ -570,25 +650,22 @@ def show_auto_grid_points_result():
|
|||
diff_max = sp._max_error(r_t_s, r_t_s_exact)
|
||||
|
||||
plt.plot(t_large, diff)
|
||||
plt.axhline(y=diff_max)
|
||||
plt.plot(t_large, diff_max)
|
||||
plt.yscale('log')
|
||||
plt.grid()
|
||||
plt.show()
|
||||
|
||||
def test_fft_func_vs_class():
|
||||
pass
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# test_stochastic_process_KLE_correlation_function(plot=False)
|
||||
# test_stochastic_process_FFT_correlation_function(plot=False)
|
||||
# test_func_vs_class_KLE_FFT()
|
||||
test_stochastic_process_KLE_interpolation(plot=True)
|
||||
test_stochastic_process_FFT_interpolation(plot=True)
|
||||
# test_stochastic_process_KLE_interpolation(plot=False)
|
||||
# test_stocproc_KLE_splineinterpolation(plot=False)
|
||||
# test_stochastic_process_FFT_interpolation(plot=False)
|
||||
# test_stocProc_eigenfunction_extraction()
|
||||
# test_orthonomality()
|
||||
# test_auto_grid_points()
|
||||
# show_auto_grid_points_result()
|
||||
show_auto_grid_points_result()
|
||||
# test_chache()
|
||||
# test_dump_load()
|
||||
pass
|
Loading…
Add table
Reference in a new issue