add basic time derivative to FFT

This commit is contained in:
Valentin Boettcher 2021-11-10 15:48:43 +01:00
parent e04a1bc889
commit 15be09454f
2 changed files with 91 additions and 7 deletions

View file

@ -18,7 +18,7 @@ and the classes implementing the ``StocProc`` interface.
Function on mudule level
------------------------
.. autofunction:: stocproc.loggin_setup
.. autofunction:: stocproc.logging_setup
.. autofunction:: stocproc.version
.. autofunction:: stocproc.version_full

View file

@ -1,5 +1,6 @@
import abc
from functools import partial
from typing import Optional
import numpy as np
import time
@ -80,6 +81,7 @@ class StocProc(abc.ABC):
:param seed: if not ``None`` seed the random number generator with ``seed``
:param t_axis: an explicit definition of times t_k (may be non equidistant)
:param scale: passes ``scale`` to :py:func:`set_scale`
:param calc_deriv: whether to calculate the derivative of the stochastic process
Note: :py:func:`new_process` is **not** called on init. If you want to evaluate a particular
realization of the stocastic process, a new sample needs to be drawn by calling :py:func:`new_process`.
@ -87,14 +89,18 @@ class StocProc(abc.ABC):
"""
def __init__(self, t_max=None, num_grid_points=None, seed=None, scale=1):
def __init__(
self, t_max=None, num_grid_points=None, seed=None, scale=1, calc_deriv=False
):
self.t_max = t_max
self.num_grid_points = num_grid_points
self.t = np.linspace(0, t_max, num_grid_points)
self._z = None
self._interpolator = None
self._interpolator_dot = None
self._seed = seed
self.calc_deriv = calc_deriv
if seed is not None:
np.random.seed(seed)
self._one_over_sqrt_2 = 1 / np.sqrt(2)
@ -126,6 +132,21 @@ class StocProc(abc.ABC):
else:
return self._interpolator(t)
def dot(self, t: Optional[np.ndarray] = None) -> np.ndarray:
r"""Returns the derivative of the stochastic process.
Works the same as :meth:`__call__` in all other regards.
"""
if self._z_dot is None:
raise RuntimeError(
"StocProc has NO random data, call 'new_process' to generate a new random process"
)
if t is None:
return self._z_dot
else:
return self._interpolator_dot(t)
@abc.abstractmethod
def calc_z(self, y):
r"""*abstract method*
@ -138,10 +159,34 @@ class StocProc(abc.ABC):
"""
pass
@abc.abstractmethod
def calc_z_dot(self, y):
r"""*abstract method*
An implementation needs to map :math:`M` independent complex
valued and Gaussian distributed random variables :math:`Y_m`
(with :math:`\langle Y_m \rangle = 0 = \langle Y_m Y_{m'}\rangle` and :math:`\langle Y_m Y^\ast_{m'} = \delta_{mm'}\rangle`) to the discrete time derivative of stochastic
process :math:`z_n = z(t_n)`.
:return: the derivative of the discrete time stochastic process :math:`z_n`,
array of complex numbers
"""
pass
def _calc_scaled_z(self, y):
r"""scaled the discrete process z with sqrt(scale), such that <z_i z^ast_j> = scale bcf(i,j)"""
r"""
scaled the discrete process z with sqrt(scale), such that <z_i
z^ast_j> = scale bcf(i,j)
"""
return self.sqrt_scale * self.calc_z(y)
def _calc_scaled_z_dot(self, y):
r"""
scaled derivative of the discrete process z with sqrt(scale),
such that <z_i z^ast_j> = scale bcf(i,j)
"""
return self.sqrt_scale * self.calc_z_dot(y)
@abc.abstractmethod
def get_num_y(self):
r"""*abstract method*
@ -191,6 +236,10 @@ class StocProc(abc.ABC):
)
self._z = self._calc_scaled_z(y)
if self.calc_deriv:
self._z_dot = self._calc_scaled_z_dot(y)
log.debug(
"proc_cnt:{} new process generated [{:.2e}s]".format(
self._proc_cnt, time.time() - t0
@ -198,6 +247,12 @@ class StocProc(abc.ABC):
)
t0 = time.time()
self._interpolator = fcSpline.FCS(x_low=0, x_high=self.t_max, y=self._z)
if self.calc_deriv:
self._interpolator_dot = fcSpline.FCS(
x_low=0, x_high=self.t_max, y=self._z_dot
)
log.debug("created interpolator [{:.2e}s]".format(time.time() - t0))
def set_scale(self, scale):
@ -429,6 +484,7 @@ class StocProc_FFT(StocProc):
seed=None,
negative_frequencies=False,
scale=1,
calc_deriv: bool = False,
):
self.key = self.get_key(
alpha=alpha, t_max=t_max, intgr_tol=intgr_tol, intpl_tol=intpl_tol
@ -478,13 +534,20 @@ class StocProc_FFT(StocProc):
t_max = (num_grid_points - 1) * dt
super().__init__(
t_max=t_max, num_grid_points=num_grid_points, seed=seed, scale=scale
t_max=t_max,
num_grid_points=num_grid_points,
seed=seed,
scale=scale,
calc_deriv=calc_deriv,
)
self.yl = spectral_density(dx * np.arange(N) + a + dx / 2) * dx / np.pi
self.omega_min = a + dx / 2
self.omega_k = dx * np.arange(N) + self.omega_min
self.yl = spectral_density(self.omega_k) * dx / np.pi
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(
-1j * (a + dx / 2) * self.t
(-1j * self.omega_min * self.t)
) # self.t is from the parent class
@staticmethod
@ -499,25 +562,35 @@ class StocProc_FFT(StocProc):
return (
self.yl,
self.num_grid_points,
self.omega_min,
self.omega_min_correction,
self.omega_k,
self.t_max,
self._seed,
self.scale,
self.key,
self.calc_deriv,
)
def __setstate__(self, state):
(
self.yl,
num_grid_points,
self.omega_min,
self.omega_min_correction,
self.omega_k,
t_max,
seed,
scale,
self.key,
calc_deriv,
) = state
super().__init__(
t_max=t_max, num_grid_points=num_grid_points, seed=seed, scale=scale
t_max=t_max,
num_grid_points=num_grid_points,
seed=seed,
scale=scale,
calc_deriv=calc_deriv,
)
def calc_z(self, y):
@ -528,10 +601,21 @@ class StocProc_FFT(StocProc):
and return values :math:`z_n` with :math:`t_n <= t_\mathrm{max}`.
"""
z_fft = np.fft.fft(self.yl * y)
z = z_fft[0 : self.num_grid_points] * self.omega_min_correction
return z
def calc_z_dot(self, y: np.ndarray) -> np.ndarray:
r"""Calculate the discrete time stochastic process derivative using FFT algorithm
and return values :math:`\dot{z}_n` with :math:`t_n <= t_\mathrm{max}`.
"""
z_dot_fft = np.fft.fft(-1j * self.omega_k * self.yl * y)
z_dot = z_dot_fft[0 : self.num_grid_points] * self.omega_min_correction
return z_dot
def get_num_y(self):
r"""The number of independent random variables :math:`Y_m` is given by the number of discrete nodes
used by the Fast Fourier Transform algorithm.