still working on some error estimation for the fft variant

This commit is contained in:
Richard Hartmann 2016-11-01 17:10:58 +01:00
parent 2e690392a0
commit 3e2307b016
4 changed files with 275 additions and 124 deletions

View file

@ -1,13 +1,25 @@
from __future__ import division, print_function
from scipy.optimize import brentq
from scipy.optimize import minimize
from numpy.fft import rfft as np_rfft
import numpy as np
import logging
log = logging.getLogger(__name__)
from .tools import ComplexInterpolatedUnivariateSpline
import logging
import numpy as np
from numpy.fft import rfft as np_rfft
from scipy.integrate import quad
from scipy.optimize import brentq
from scipy.optimize import basinhopping
import sys
import warnings
warnings.simplefilter('error')
MAX_FLOAT = sys.float_info.max
log = logging.getLogger(__name__)
class FTReferenceError(Exception):
pass
def find_integral_boundary(integrand, tol, ref_val, max_val, x0):
"""
searches for the point x_0 where integrand(x_tol) = tol
@ -30,11 +42,10 @@ def find_integral_boundary(integrand, tol, ref_val, max_val, x0):
if I_at_ref <= tol:
raise ValueError("the integrand at ref_val needs to be greater that tol")
log.debug("ref_value for search: {} tol: {}".format(ref_val, tol))
log.info("ref_value for search: {} tol: {}".format(ref_val, tol))
I = integrand(x0+ref_val)
if I/I_at_ref > tol:
log.debug("x={:.3e} I(x+ref_val) = {:.3e} > tol I(rev_val) -> veer x away from ref_value".format(x0, I))
x = 2*x0
@ -75,7 +86,7 @@ def find_integral_boundary(integrand, tol, ref_val, max_val, x0):
a = x0
log.debug("I(ref_val) = tol -> no search necessary")
log.debug("return a={:.5g}".format(a))
log.info("return {:.5g}".format(a))
return a
def find_integral_boundary_auto(integrand, tol, ref_val=0, max_val=1e6,
@ -87,9 +98,9 @@ def find_integral_boundary_auto(integrand, tol, ref_val=0, max_val=1e6,
max_val_left = max_val if max_val_left is None else max_val_left
max_val_right = max_val if max_val_right is None else max_val_right
log.debug("trigger left search")
log.info("trigger left search")
a = find_integral_boundary(integrand, tol, ref_val=ref_val_left, max_val=max_val_left, x0=-1)
log.debug("trigger right search")
log.info("trigger right search")
b = find_integral_boundary(integrand, tol, ref_val=ref_val_right, max_val=max_val_right, x0=+1)
return a,b
@ -98,13 +109,13 @@ def fourier_integral_midpoint(integrand, a, b, N):
approximates int_a^b dx integrand(x) by the riemann sum with N terms
and the most simplest uniform midpoint weights
"""
log.debug("integrate over [{:.3e},{:.3e}] using {} points".format(a,b,N))
#log.debug("integrate over [{:.3e},{:.3e}] using {} points".format(a,b,N))
delta_x = (b-a)/N
delta_k = 2*np.pi/(b-a)
yl = integrand(np.linspace(a+delta_x/2, b+delta_x/2, N, endpoint=False))
fft_vals = np_rfft(yl)
tau = np.arange(len(fft_vals))*delta_k
log.debug("yields d_x={:.3e}, d_k={:.3e} kmax={:.3e}".format(delta_x, delta_k, tau[-1]))
#log.debug("yields d_x={:.3e}, d_k={:.3e} kmax={:.3e}".format(delta_x, delta_k, tau[-1]))
return tau, delta_x*np.exp(-1j*tau*(a+delta_x/2))*fft_vals
def get_fourier_integral_simps_weighted_values(yl):
@ -127,7 +138,6 @@ def fourier_integral_simps(integrand, a, b, N):
approximates int_a^b dx integrand(x) by the riemann sum with N terms
using simpson integration scheme
"""
delta_x = (b-a)/(N-1)
delta_k = 2*np.pi/N/delta_x
l = np.arange(0, N)
@ -139,62 +149,109 @@ def fourier_integral_simps(integrand, a, b, N):
return tau, delta_x*np.exp(-1j*tau*a)*fft_vals
def get_N_a_b_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, opt_b_only, N_max = 2**20, method='midp'):
def f_opt_b_only(x, a, integrand, N, t_max):
def _relDiff(xRef, x):
diff = np.abs(xRef - x)
norm_xRef = np.abs(xRef)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
res = np.where(diff == 0, 0, diff / norm_xRef)
idx0 = np.where(np.logical_and(norm_xRef == 0, diff != 0))
res[idx0] = MAX_FLOAT
return res
def _f_opt_b_only(x, a, integrand, N, t_max, ft_ref):
b = x[0]
if np.isnan(b):
return 300
tau, ft_tau = fourier_integral_midpoint(integrand, a, b, N)
idx = np.where(tau <= t_max)
ft_ref_tau = ft_ref(tau[idx])
rd = np.max(_relDiff(ft_ref_tau, ft_tau[idx]))
return np.log10(rd)
def _f_opt(x, integrand, N, t_max, ft_ref):
a, b = x[0], x[1]
if np.isnan(a) or np.isnan(b):
return 300
tau, ft_tau = fourier_integral_midpoint(integrand, a, b, N)
idx = np.where(tau <= t_max)
ft_ref_tau = ft_ref(tau[idx])
rd = np.max(_relDiff(ft_ref_tau, ft_tau[idx]))
return np.log10(rd)
class _AcceptTest(object):
def __init__(self, N, tmax):
self.tmax = tmax
self._tmp = np.pi * N
def __call__(self, x):
a, b = x[0], x[1]
tmax_new = self._tmp / (b - a)
return tmax_new - self.tmax
class _AcceptTest_b_onbly(object):
def __init__(self, N, a, tmax):
self.tmax = tmax
self.a = a
self._tmp = np.pi * N
def __call__(self, x):
b = x[0]
tau, ft_tau = fourier_integral(integrand, a, b, N)
idx = np.where(tau <= t_max)
ft_ref_tau = ft_ref(tau[idx])
rd = np.max(np.abs(ft_tau[idx] - ft_ref_tau) / np.abs(ft_ref_tau))
return np.log10(rd)
tmax_new = self._tmp / (b - self.a)
return tmax_new - self.tmax
def f_opt(x, integrand, N, t_max):
a = x[0]
b = x[1]
tau, ft_tau = fourier_integral(integrand, a, b, N)
idx = np.where(tau <= t_max)
ft_ref_tau = ft_ref(tau[idx])
rd = np.max(np.abs(ft_tau[idx] - ft_ref_tau) / np.abs(ft_ref_tau))
return np.log10(rd)
def opt_integral_boundaries(integrand, a, b, t_max, ft_ref, opt_b_only, N):
log.debug("optimize integral boundary N:{} [{:.3e},{:.3e}]".format(N, a, b))
if opt_b_only:
act = _AcceptTest_b_onbly(N, a, t_max)
r = basinhopping(_f_opt_b_only, [b], niter=150,
minimizer_kwargs={"method": "slsqp",
"args": (a, integrand, N, t_max, ft_ref),
"constraints": {"type": "ineq", "fun": act}},
stepsize=0.1 * (b - a))
b = r.x[0]
else:
act = _AcceptTest(N, t_max)
r = basinhopping(_f_opt, [a, b], niter=150,
minimizer_kwargs={"method" : "slsqp",
"args" : (integrand, N, t_max, ft_ref),
"constraints": {"type": "ineq", "fun": act}},
stepsize=0.1*(b-a))
a, b = r.x[0], r.x[1]
rd = 10 ** r.fun
log.info("optimization yields max rd {:.3e} and new boundaries [{:.2e},{:.2e}]".format(rd, a, b))
return rd, a, b
def get_N_a_b_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, opt_b_only, N_max = 2**20):
"""
chooses N such that the approximated Fourier integral
meets the exact solution within a given tolerance of the
relative deviation for a given interval of interest
"""
if method == 'midp':
fourier_integral = fourier_integral_midpoint
elif method == 'simps':
fourier_integral = fourier_integral_simps
log.info("error estimation up to tmax {:.3e} (tol={:.3e})".format(t_max, tol))
if opt_b_only:
I0 = quad(integrand, a, np.inf)[0]
else:
raise ValueError("unknown method '{}'".format(method))
log.debug("fft integral from {:.3e} to {:.3e}".format(a, b))
log.debug("error estimation up to tmax {:.3e} (tol={:.3e}".format(t_max, tol))
I0 = quad(integrand, -np.inf, np.inf)[0]
ft_ref_0 = ft_ref(0)
rd = np.abs(ft_ref_0 - I0) / np.abs(ft_ref_0)
log.debug("ft_ref check yields rd {:.3e}".format(rd))
if rd > 1e-6:
raise FTReferenceError("it seems that 'ft_ref' is not the fourier transform of 'integrand'")
i = 10
while True:
N = 2**i
if opt_b_only:
r = minimize(f_opt_b_only, x0=[b], args=(a, integrand, N, t_max), method='bfgs')
r = minimize(f_opt_b_only, x0=[r.x[0]], args=(a, integrand, N, t_max), method='slsqp')
b = r.x[0]
else:
r = minimize(f_opt, x0=[a,b], args=(integrand, N, t_max), method='bfgs')
r = minimize(f_opt, x0=[r.x[0], r.x[1]], args=(integrand, N, t_max), method='slsqp')
a = r.x[0]
b = r.x[1]
rd = 10**r.fun
log.debug("using fft for integral [{:.2e},{:.2e}] N {} yields max rd {:.3e} (tol {:.3e})".format(a, b, N, rd, tol))
rd, a, b = opt_integral_boundaries(integrand=integrand, a=a, b=b, t_max=t_max, ft_ref=ft_ref,
opt_b_only=opt_b_only, N=N)
if rd < tol:
log.debug("reached rd ({:.3e}) < tol ({:.3e}), return N={}".format(rd, tol, N))
log.info("reached rd ({:.3e}) < tol ({:.3e}), return N={}".format(rd, tol, N))
return N, a, b
if N > N_max:
raise RuntimeError("maximum number of points for Fourier Transform reached")
i += 1
def get_dt_for_accurate_interpolation(t_max, tol, ft_ref):
@ -215,37 +272,33 @@ def get_dt_for_accurate_interpolation(t_max, tol, ft_ref):
N*=2
def calc_ab_N_dx_dt(integrand, intgr_tol, intpl_tol, tmax, a, b, ft_ref, opt_b_only, N_max = 2**20, method='midp'):
def calc_ab_N_dx_dt(integrand, intgr_tol, intpl_tol, t_max, a, b, ft_ref, opt_b_only, N_max = 2**20):
N, a, b = get_N_a_b_for_accurate_fourier_integral(integrand, a, b,
t_max = tmax,
t_max = t_max,
tol = intgr_tol,
ft_ref = ft_ref,
opt_b_only=opt_b_only,
N_max = N_max,
method = method)
dt = get_dt_for_accurate_interpolation(t_max = tmax,
N_max = N_max)
dt = get_dt_for_accurate_interpolation(t_max = t_max,
tol = intpl_tol,
ft_ref = ft_ref)
if method == 'simps':
dx = (b-a)/(N-1)
elif method == 'midp':
dx = (b-a)/N
dx = (b-a)/N
N_min = 2*np.pi/dx/dt
if N_min <= N:
log.debug("dt criterion fulfilled")
return N,a,b
log.info("dt criterion fulfilled")
return a, b, N, dx, dt
else:
log.debug("down scale dx and dt to match new power of 2 N")
log.info("down scale dx and dt to match new power of 2 N")
N = 2**int(np.ceil(np.log2(N_min)))
scale = np.sqrt(N_min/N)
assert scale <= 1
#scale = np.sqrt(N_min/N)
#assert scale <= 1
scale = 1
dx_new = scale*dx
if method == 'simps':
b_minus_a = dx_new*(N-1)
elif method == 'midp':
b_minus_a = dx_new*N
b_minus_a = dx_new*N
dt_new = 2*np.pi/dx_new/N
if opt_b_only:
@ -255,6 +308,11 @@ def calc_ab_N_dx_dt(integrand, intgr_tol, intpl_tol, tmax, a, b, ft_ref, opt_b_o
b += delta/2
a -= delta/2
rd, a, b = opt_integral_boundaries(integrand=integrand, a=a, b=b, t_max=t_max, ft_ref=ft_ref,
opt_b_only=opt_b_only, N=N)
log.debug("rd after final optimization:{:.3e}".format(rd))
return a, b, N, dx_new, dt_new

View file

@ -293,29 +293,29 @@ class StocProc_FFT_tol(_absStocProc):
Simulate Stochastic Process using FFT method
"""
def __init__(self, spectral_density, t_max, bcf_ref, intgr_tol=1e-2, intpl_tol=1e-2,
seed=None, k=3, negative_frequencies=False, method='simps'):
seed=None, k=3, negative_frequencies=False):
if not negative_frequencies:
log.debug("non neg freq only")
log.info("non neg freq only")
# assume the spectral_density is 0 for w<0
# and decays fast for large w
b = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
tol = intgr_tol**2,
ref_val = 1,
max_val = 1e6,
x0 = 1)
log.debug("upper int bound b {:.3e}".format(b))
b, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = 0,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**24,
method = method)
log.debug("required tol result in N {}".format(N))
a = 0
log.info("upper int bound b {:.3e}".format(b))
a, b, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
t_max = t_max,
a = 0,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
opt_b_only= True,
N_max = 2**24)
log.info("required tol results in N {}".format(N))
else:
log.info("use neg freq")
# assume the spectral_density is non zero also for w<0
# but decays fast for large |w|
b = method_fft.find_integral_boundary(integrand = spectral_density,
@ -328,19 +328,17 @@ class StocProc_FFT_tol(_absStocProc):
ref_val = -1,
max_val = 1e6,
x0 = -1)
b_minus_a, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
a, b, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = a*1000,
b = b*1000,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**24,
method = method)
b = b*b_minus_a/(b-a)
a = b-b_minus_a
t_max = t_max,
a = a,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
opt_b_only= False,
N_max = 2**24)
log.info("required tol result in N {}".format(N))
num_grid_points = int(np.ceil(t_max/dt))+1
t_max = (num_grid_points-1)*dt
@ -350,18 +348,9 @@ class StocProc_FFT_tol(_absStocProc):
k = k)
omega = dx*np.arange(N)
if method == 'simps':
self.yl = spectral_density(omega + a) * dx / np.pi
self.yl = method_fft.get_fourier_integral_simps_weighted_values(self.yl)
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*a*self.t) #self.t is from the parent class
elif method == 'midp':
self.yl = spectral_density(omega + a + dx/2) * dx / np.pi
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*(a+dx/2)*self.t) #self.t is from the parent class
else:
raise ValueError("unknown method '{}'".format(method))
self.yl = spectral_density(omega + a + dx/2) * dx / np.pi
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*(a+dx/2)*self.t) #self.t is from the parent class
def __getstate__(self):
return self.yl, self.num_grid_points, self.omega_min_correction, self.t_max, self._seed, self._k

View file

@ -328,8 +328,8 @@ def test_calc_abN():
a,b = sp.method_fft.find_integral_boundary_auto(integrand=intg, tol=1e-2, ref_val=1)
a, b, N, dx, dt = sp.method_fft.calc_ab_N_dx_dt(integrand = intg,
intgr_tol = tol,
intpl_tol = tol,
tmax = tmax,
intpl_tol = tol,
t_max = tmax,
a = 0,
b = b,
ft_ref = bcf_ref,

View file

@ -40,14 +40,15 @@ def spectral_density(omega):
def stocproc_metatest(stp, num_samples, tol, corr, plot):
print("generate samples")
x_t_array_KLE = np.empty(shape=(num_samples, stp.num_grid_points), dtype=np.complex128)
t = np.linspace(0, stp.t_max, 487)
x_t_array_KLE = np.empty(shape=(num_samples, 487), dtype=np.complex128)
for i in range(num_samples):
stp.new_process()
x_t_array_KLE[i] = stp()
t = stp.t
x_t_array_KLE[i] = stp(t)
autoCorr_KLE_conj, autoCorr_KLE_not_conj = sp.tools.auto_correlation(x_t_array_KLE)
ac_true = corr(t.reshape(stp.num_grid_points, 1) - t.reshape(1, stp.num_grid_points))
ac_true = corr(t.reshape(487, 1) - t.reshape(1, 487))
max_diff_conj = np.max(np.abs(ac_true - autoCorr_KLE_conj))
print("max diff <x(t) x^ast(s)>: {:.2e}".format(max_diff_conj))
@ -237,21 +238,123 @@ def test_lorentz_SD(plot=False):
t_max = 15
num_samples = 1000
num_samples = 5000
tol = 3e-2
stp = sp.StocProc_KLE(r_tau=lac,
t_max=t_max,
ng_fredholm=201,
ng_fac=4,
ng_fredholm=1025,
ng_fac=2,
seed=0)
#stocproc_metatest(stp, num_samples, tol, lac, True)
t = stp.t
stp.new_process()
plt.plot(t, np.abs(stp()))
num_samples = 50000
stp = sp.StocProc_FFT_tol(lsp, t_max, lac, negative_frequencies=True, seed=0, intgr_tol=1e-2, intpl_tol=9*1e-5)
stocproc_metatest(stp, num_samples, tol, lac, True)
stp = sp.StocProc_FFT_tol(lsp, t_max, lac, negative_frequencies=True, seed=0, intgr_tol=1e-2, intpl_tol=1e-9)
t = stp.t
stp.new_process()
plt.plot(t, np.abs(stp()), ls='', marker='.')
stp = sp.StocProc_FFT_tol(lsp, t_max, lac, negative_frequencies=True, seed=0, intgr_tol=1e-2, intpl_tol=1e-11)
t = stp.t
stp.new_process()
plt.plot(t, np.abs(stp()), ls='', marker='.')
plt.show()
return
print("generate samples")
t = np.linspace(0, stp.t_max, 487)
t_fine = np.linspace(0, stp.t_max, 4870)
t_ref_list = [0,1,7]
# for num_samples in [100, 1000]:
# x_t0 = np.zeros(shape=(3), dtype=np.complex128)
# for i in range(num_samples):
# stp.new_process()
# x_t = stp(t)
# for j, ti in enumerate(t_ref_list):
# x_t0[j] += np.conj(stp(ti)) * stp(ti)
# x_t0 /= num_samples
# print(np.abs(1-x_t0))
num_samples = 1000
x_t_array = np.zeros(shape=(487, 3), dtype=np.complex128)
for i in range(num_samples):
stp.new_process()
x_t = stp(t)
for j, ti in enumerate(t_ref_list):
x_t_array[:, j] += x_t * np.conj(stp(ti))
x_t_array /= num_samples
for j, ti in enumerate(t_ref_list):
p, = plt.plot(t, np.abs(x_t_array[:, j]))
plt.plot(t_fine, np.abs(lac(t_fine-ti)), color=p.get_color())
plt.show()
#num_samples = 1000
#stp = sp.StocProc_FFT_tol(lsp, t_max, lac, negative_frequencies=True, seed=0, intgr_tol=1e-2, intpl_tol=1e-2)
#stocproc_metatest(stp, num_samples, tol, lambda tau:lac(tau)/np.pi, plot)
def test_subohmic_SD(plot=False):
def lsp(omega):
return omega ** _S_ * np.exp(-omega)
def lac(tau):
return (1 + 1j*(tau))**(-(_S_+1)) * _GAMMA_S_PLUS_1 / np.pi
t_max = 15
stp = sp.StocProc_KLE(r_tau=lac,
t_max=t_max,
ng_fredholm=1025,
ng_fac=2,
seed=0)
t = stp.t
stp.new_process()
plt.plot(t, np.abs(stp()))
stp = sp.StocProc_FFT_tol(lsp, t_max, lac, negative_frequencies=False, seed=0, intgr_tol=1e-3, intpl_tol=1e-3)
t = stp.t
stp.new_process()
plt.plot(t, np.abs(stp()))
plt.show()
return
print("generate samples")
t = np.linspace(0, stp.t_max, 487)
t_fine = np.linspace(0, stp.t_max, 4870)
t_ref_list = [0,1,7]
num_samples = 1000
x_t_array = np.zeros(shape=(487, 3), dtype=np.complex128)
for i in range(num_samples):
stp.new_process()
x_t = stp(t)
for j, ti in enumerate(t_ref_list):
x_t_array[:, j] += x_t * np.conj(stp(ti))
x_t_array /= num_samples
for j, ti in enumerate(t_ref_list):
p, = plt.plot(t, np.abs(x_t_array[:, j]))
plt.plot(t_fine, np.abs(lac(t_fine-ti)), color=p.get_color())
plt.show()
@ -259,7 +362,7 @@ def test_lorentz_SD(plot=False):
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
#logging.basicConfig(level=logging.INFO)
# test_stochastic_process_KLE_correlation_function(plot=True)
# test_stochastic_process_FFT_correlation_function(plot=True)
@ -267,5 +370,6 @@ if __name__ == "__main__":
# test_stocproc_dump_load()
test_lorentz_SD(plot=False)
test_lorentz_SD(plot=True)
# test_subohmic_SD()
pass