diff --git a/stocproc/__init__.py b/stocproc/__init__.py index 1b6126a..230ad69 100644 --- a/stocproc/__init__.py +++ b/stocproc/__init__.py @@ -1,8 +1,10 @@ from .stocproc import * +from .class_stocproc_kle import * -import gquad +from .class_stocproc import StocProc_FFT +from .class_stocproc import StocProc_KLE -from class_stocproc import StocProc_FFT -from class_stocproc import StocProc_KLE -from class_stocproc_kle import StocProc \ No newline at end of file + + +import gquad \ No newline at end of file diff --git a/stocproc/class_stocproc.py b/stocproc/class_stocproc.py new file mode 100644 index 0000000..680b778 --- /dev/null +++ b/stocproc/class_stocproc.py @@ -0,0 +1,218 @@ +import numpy as np +from scipy.interpolate import InterpolatedUnivariateSpline +from . import StocProc + +class ComplexInterpolatedUnivariateSpline(object): + r""" + Univariant spline interpolator from scpi.interpolate in a convenient fashion to + interpolate real and imaginary parts of complex data + """ + def __init__(self, x, y, k=2): + self.re_spline = InterpolatedUnivariateSpline(x, np.real(y)) + self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y)) + + def __call__(self, t): + return self.re_spline(t) + 1j*self.im_spline(t) + +class _absStocProc(object): + r""" + Abstract base class to stochastic process interface + + general work flow: + - Specify the time axis of interest [0, t_max] and it resolution (number of grid points), :math:`t_i = i \frac{t_max}{N_t-1}. + - To evaluate the stochastic process at these points, a mapping from :math:`N_z` normal distributed + random complex numbers with :math:`\langle y_i y_j^\ast \rangle = 2 \delta_{ij}` + to the stochastic process :math:`z_{t_i}` is needed and depends on the implemented method (:py:func:`_calc_z'). + - A new process should be generated by calling :py:func:`new_process'. + - When the __call__ method is invoked the results will be interpolated between the :math:`z_t_i`. + + + """ + def __init__(self, t_max, num_grid_points, seed=None, verbose=1, k=3): + r""" + :param t_max: specify time axis as [0, t_max] + :param num_grid_points: number of equidistant times on that axis + :param seed: if not ``None`` set seed to ``seed`` + :param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info + :param k: polynomial degree used for spline interpolation + """ + self._verbose = verbose + self.t = np.linspace(0, t_max, num_grid_points) + self._z = None + self._interpolator = None + self._k = k + if seed is not None: + np.random.seed(seed) + + def __call__(self, t): + r""" + :param t: time to evaluate the stochastic process as, float of array of floats + evaluates the stochastic process via spline interpolation between the discrete process + """ + if self._interpolator is None: + if self._verbose > 1: + print("setup interpolator ...") + self._interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=self._k) + if self._verbose > 1: + print("done!") + + return self._interpolator(t) + + def _calc_z(self, y): + r""" + maps the normal distributed complex valued random variables y to the stochastic process + + :return: the stochastic process, array of complex numbers + """ + pass + + def get_num_y(self): + r""" + :return: number of complex random variables needed to calculate the stochastic process + """ + pass + + def get_time(self): + r""" + :return: time axis + """ + return self.t + + def get_z(self): + r""" + use :py:func:`new_process` to generate a new process + :return: the current process + """ + return self._z + + def new_process(self, y=None, seed=None): + r""" + generate a new process by evaluating :py:func:`_calc_z' + + When ``y`` is given use these random numbers as input for :py:func:`_calc_z` + otherwise generate a new set of random numbers. + + :param y: independent normal distributed complex valued random variables with :math:`\sig_{ij}^2 = \langle y_i y_j^\ast \rangle = 2 \delta_{ij} + :param seed: if not ``None`` set seed to ``seed`` before generating samples + """ + self._interpolator = None + if seed != None: + if self._verbose > 0: + print("use seed", seed) + np.random.seed(seed) + if y is None: + #random complex normal samples + if self._verbose > 1: + print("generate samples ...") + y = np.random.normal(size = 2*self.get_num_y()).view(np.complex) + if self._verbose > 1: + print("done") + + self._z = self._calc_z(y) + +class StocProc_KLE(_absStocProc): + r""" + class to simulate stochastic processes using KLE method + - Solve fredholm equation on grid with ``ng_fredholm nodes`` (trapezoidal_weights). + If case ``ng_fredholm`` is ``None`` set ``ng_fredholm = num_grid_points``. In general it should + hold ``ng_fredholm < num_grid_points`` and ``num_grid_points = 10*ng_fredholm`` might be a good ratio. + - Calculate discrete stochastic process (using interpolation solution of fredholm equation) with num_grid_points nodes + - invoke spline interpolator when calling + """ + def __init__(self, r_tau, t_max, num_grid_points, ng_fredholm=None, seed=None, sig_min=1e-5, verbose=1, k=3): + r""" + :param r_tau: auto correlation function of the stochastic process + :param t_max: specify time axis as [0, t_max] + :param num_grid_points: number of equidistant times on that axis + :param ng_fredholm: number of discrete time used to solve the underlying fredholm equation, ``None`` set ``ng_fredholm = num_grid_points`` + :param seed: if not ``None`` set seed to ``seed`` + :param sig_min: eigenvalue threshold (see KLE method to generate stochastic processes) + :param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info + :param k: polynomial degree used for spline interpolation + + """ + if ng_fredholm is None: + ng_fredholm = num_grid_points + + if num_grid_points < ng_fredholm: + print("WARNING: found 'num_grid_points < ng_fredholm' -> set 'num_grid_points == ng_fredholm'") + + # if ng_fredholm == num_grid_points -> no kle_fredholm interpolation is needed + if ng_fredholm == num_grid_points: + self.kle_interp = False + else: + self.kle_interp = True + + self.stocproc = StocProc.new_instance_with_trapezoidal_weights(r_tau = r_tau, + t_max = t_max, + ng = ng_fredholm, + sig_min = sig_min, + seed = seed, + verbose = verbose) + + super().__init__(t_max=t_max, num_grid_points=num_grid_points, seed=seed, verbose=verbose, k=k) + + # this is only needed access the underlaying stocproc KLE class + # in a convenient fashion + self._r_tau = self.stocproc._r_tau + self._s = self.stocproc._s + self._A = self.stocproc._A + self.num_y = self.stocproc._num_ev + + self.verbose = verbose + + def _cal_z(self, y): + r""" + uses the underlaying stocproc class to generate the process (see :py:class:`StocProc` for details) + """ + self.stocproc.new_process(y) + + if self.kle_interp: + return self.stocproc.x_t_array(self.t) + else: + return self.stocproc.x_for_initial_time_grid() + + def get_num_y(self): + return self.num_y + + +class StocProc_FFT(object): + r""" + Simulate Stochastic Process using FFT method + """ + def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=1, k=3): + super().__init__(t_max = t_max, + num_grid_points = num_grid_points, + seed = seed, + verbose = verbose, + k = k) + + self.n_dft = self.num_grid_points * 2 - 1 + delta_t = self.t_max / (self.num_grid_points-1) + self.delta_omega = 2 * np.pi / (delta_t * self.n_dft) + + #omega axis + omega = self.delta_omega*np.arange(self.n_dft) + #reshape for multiplication with matrix xi + self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 = np.sqrt(spectral_density(omega)) * np.sqrt(self.delta_omega) / np.sqrt(2) + + if self.verbose > 0: + print("stoc proc fft, spectral density sampling information:") + print(" t_max :", (t_max)) + print(" ng :", (num_grid_points)) + + print(" omega_max :", (self.delta_omega * self.n_dft)) + print(" delta_omega:", (self.delta_omega)) + + def _cal_z(self, y): + weighted_integrand = self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 * y + #compute integral using fft routine + if self.verbose > 1: + print("calc process via fft ...") + z = np.fft.fft(weighted_integrand)[0:self.num_grid_points] + if self.verbose > 1: + print("done") + return z + + def get_num_y(self): + return self.n_dft \ No newline at end of file diff --git a/stocproc/class_stocproc_kle.py b/stocproc/class_stocproc_kle.py new file mode 100644 index 0000000..0b04d86 --- /dev/null +++ b/stocproc/class_stocproc_kle.py @@ -0,0 +1,464 @@ +import functools +import numpy as np +import pickle + +from . import solve_hom_fredholm +from . import get_mid_point_weights +from . import get_trapezoidal_weights_times +from . import gquad + + +class StocProc(object): + r"""Simulate Stochastic Process using Karhunen-Loève expansion + + The :py:class:`StocProc` class provides methods to simulate stochastic processes + :math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]` + with given autocorrelation function + :math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to + the one described and implemented in :py:func:`stochastic_process_kle`. + + The :py:class:`StocProc` class extends the functionality of the + :py:func:`stochastic_process_kle` routine by providing an interpolation + method based on the numeric solution of the Fredholm equation. + Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation + are best interpolates using + + .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) + + with :math:`s_j` and :math:`w_j` being the time grid points and integration weights + for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being + the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation + (see Ref. [1]). + + From that is follows that a good interpolation formula for the stochastic process + is given by + + .. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j) + + where the :math:`Y_i` are independent normal distributed random variables + with variance one. + + For extracting values of the Stochastic Process you may use: + :py:func:`x`: returns the value of the Stochastic Process for a + single time :math:`t` + + :py:func:`x_t_array`: returns the value of the Stochastic Process for + all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`. + + :py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for + the times given to the constructor in order to discretize the Fredholm + equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the + same weights :math:`w_i` and time grid points :math:`s_i`. + + To generate a new process call :py:func:`new_process`. + + To generate a new sample use :py:func:`new_process`. + + :param r_tau: function object of the one parameter correlation function + :math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle` + :param t: list of grid points for the time axis + :param w: appropriate weights to integrate along the time axis using the + grid points given by :py:obj:`t` + :param seed: seed for the random number generator used + :param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`, + viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as + significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the + square root of the eigenvalue :math:`\lambda_i`) + + For further reading see :py:func:`stochastic_process_kle`. + + References: + [1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P., + 2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing, + Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989) + + """ + + _dump_members = ['_r_tau', + '_s', + '_w', + '_eig_val', + '_eig_vec'] + + def __init__(self, + r_tau = None, + t = None, + w = None, + seed = None, + sig_min = 1e-4, + fname = None, + cache_size = 1024, + verbose = 1): + + self.verbose = verbose + if fname is None: + + assert r_tau is not None + self._r_tau = r_tau + + assert t is not None + self._s = t + self._num_gp = len(self._s) + + assert w is not None + self._w = w + + t_row = self._s.reshape(1, self._num_gp) + t_col = self._s.reshape(self._num_gp, 1) + # correlation matrix + # r_tau(t-s) -> integral/sum over s -> s must be row in EV equation + r = self._r_tau(t_col-t_row) + + # solve discrete Fredholm equation + # eig_val = lambda + # eig_vec = u(t) + self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose) + else: + self.__load(fname) + + self.__calc_missing() + + self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False) + self.x = self.my_cache_decorator(self._x) + self.new_process(seed = seed) + + @classmethod + def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min): + known_names = ['trapezoidal', 'mid_point', 'gauss_legendre'] + + if name == 'trapezoidal': + ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min) + elif name == 'mid_point': + ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min) + elif name == 'gauss_legendre': + ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min) + else: + raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names)) + + ob.name = name + return ob + + @classmethod + def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1): + t, w = get_trapezoidal_weights_times(t_max, ng) + return cls(r_tau, t, w, seed, sig_min, verbose=verbose) + + @classmethod + def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1): + t, w = get_mid_point_weights(t_max, ng) + return cls(r_tau, t, w, seed, sig_min, verbose=verbose) + + @classmethod + def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1): + t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max) + return cls(r_tau, t, w, seed, sig_min, verbose=verbose) + + def __load(self, fname): + with open(fname, 'rb') as f: + for m in self._dump_members: + setattr(self, m, pickle.load(f)) + + def __calc_missing(self): + self._num_gp = len(self._s) + self._sqrt_eig_val = np.sqrt(self._eig_val) + self._num_ev = len(self._eig_val) + self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev) + + + def __dump(self, fname): + with open(fname, 'wb') as f: + for m in self._dump_members: + pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL) + + def __getstate__(self): + return [getattr(self, atr) for atr in self._dump_members] + + def __setstate__(self, state): + for i, atr_value in enumerate(state): + setattr(self, self._dump_members[i], atr_value) + self.__calc_missing() + + def save_to_file(self, fname): + self.__dump(fname) + + def get_name(self): + if hasattr(self, 'name'): + return self.name + else: + return 'unknown' + + def new_process(self, yi=None, seed=None): + r"""setup new process + + Generates a new set of independent normal random variables :math:`Y_i` + which correspondent to the expansion coefficients of the + Karhunen-Loève expansion for the stochastic process + + .. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) + + :param seed: a seed my be given which is passed to the random number generator + """ + if seed != None: + np.random.seed(seed) + + self.clear_cache() + if yi is None: + if self.verbose > 1: + print("generate samples ...") + self._Y = np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1) + if self.verbose > 1: + print("done!") + else: + self._Y = yi.reshape(self._num_ev,1) + + + def x_for_initial_time_grid(self): + r"""Get process on initial time grid + + Returns the value of the Stochastic Process for + the times given to the constructor in order to discretize the Fredholm + equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the + same weights :math:`w_i` and time grid points :math:`s_i`. + """ + tmp = self._Y / np.sqrt(2) * self._sqrt_eig_val.reshape(self._num_ev,1) + if self.verbose > 1: + print("calc process via matrix prod ...") + res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten() + if self.verbose > 1: + print("done!") + + return res + + def time_grid(self): + return self._s + + def __call__(self, t): + if isinstance(t, np.ndarray): + return self.x_t_array(t) + else: + return self.x(t) + + def _x(self, t): + # _Y # (N_ev, 1 ) + tmp = self._Y*self._r_tau(t-self._s.reshape(1, self._num_gp)) + # (N_ev, N_gp) + # A # (N_gp, N_ev) + return np.tensordot(tmp, self._A, axes=([1,0],[0,1])) + + def get_cache_info(self): + return self.x.cache_info() + + def clear_cache(self): + self.x.cache_clear() + + + def x_t_array(self, t_array): + t_array = t_array.reshape(1,1,len(t_array)) # (1 , 1 , N_t) + tmp = (self._Y.reshape(self._num_ev,1,1) * + self._r_tau(t_array-self._s.reshape(1,self._num_gp,1))) + # (N_ev, N_gp, N_t) + # A # (N_gp, N_ev) + # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) + if self.verbose > 1: + print("calc process via matrix prod ...") + res = np.tensordot(tmp, self._A, axes=([1,0],[0,1])) + if self.verbose > 1: + print("done!") + return res + + def u_i_mem_save(self, delta_t_fac, i): + r""" + """ + a = delta_t_fac + assert isinstance(a, int) + T = self._s[-1] + N1 = len(self._s) + N2 = a * (N1 - 1) + 1 + alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1)) + + print(alpha_k[N2-1]) + + print("WARNING! this needs to be cythonized") + u_res = np.zeros(shape=N2, dtype=np.complex) + for j in range(N2): + for l in range(N1): + k = j - a*l + N2-1 + u_res[j] += self._w[l] * alpha_k[k] * self._eig_vec[l, i] + + return u_res / self._eig_val[i] + + + def u_i(self, t_array, i): + r"""get eigenfunction of index i + + Returns the i-th eigenfunction corresponding to the i-th eigenvalue + of the discrete Fredholm equation using the interpolation scheme: + + .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) + + :param t_array: 1D time array for which the eigenfunction :math:`u_i` + will be evaluated. + :param i: index of the eigenfunction + :return: 1D array of length ``len(t_array)`` + """ + t_array = t_array.reshape(1,len(t_array)) # (1 , N_t) + tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1)) + # (N_gp, N_t) + # A # (N_gp, N_ev) + # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) + return 1/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0])) + + def u_i_all(self, t_array): + r"""get all eigenfunctions + + Returns all eigenfunctions of the discrete Fredholm equation using + the interpolation scheme: + + .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) + + :param t_array: 1D time array for which the eigenfunction :math:`u_i` + will be evaluated. + :return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)`` + (note, the number of eigenvalues may be smaller than the number + of grid points because of the selections mechanism implemented + by the value of ``sig_min``) + """ + t_array = t_array.reshape(1,len(t_array)) # (1 , N_t) + tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1)) + # (N_gp, N_t) + # A # (N_gp, N_ev) + # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) + return np.tensordot(tmp, 1/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0])) + + def eigen_vector_i(self, i): + r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)""" + return self._eig_vec[:,i] + + def eigen_vector_i_all(self): + r"""Returns all eigenvectors (solutions of the discrete Fredhom equation) + + Note: Note: The maximum number of eigenvalues / eigenfunctions is given by + the number of time grid points passed to the constructor. But due to the + threshold ``sig_min`` (see :py:class:`StocProc`) only those + eigenvalues and corresponding eigenfunctions which satisfy + :math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept. + """ + return self._eig_vec + + def lambda_i(self, i): + r"""Returns the i-th eigenvalue.""" + return self._eig_val[i] + + def lambda_i_all(self): + r"""Returns all eigenvalues.""" + return self._eig_val + + def num_ev(self): + r"""Returns the number of eigenvalues / eigenfunctions used + + Note: The maximum number of eigenvalues / eigenfunctions is given by + the number of time grid points passed to the constructor. But due to the + threshold ``sig_min`` (see :py:class:`StocProc`) only those + eigenvalues and corresponding eigenfunctions which satisfy + :math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept. + """ + return self._num_ev + + def recons_corr(self, t_array): + r"""computes the interpolated correlation functions + + For the Karhunen-Loève expansion of a stochastic process the + correlation function can be expressed as follows: + + .. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s) + + With that one can do a consistency check whether the finite set of basis functions + for the expansion (the solutions of the discrete Fredholm equation) is good + enough to reproduce the given correlation function. + """ + u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev) + u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev) + lambda_i_all = self.lambda_i_all() #(N_ev) + + tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev) + + return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1])) + + def recons_corr_single_s(self, t_array, s): + assert False, "something is wrong here" + u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev) + u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev) + lambda_i_all = self.lambda_i_all() #(N_ev) + tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev) + return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0] + +def mean_error(r_t_s, r_t_s_exact): + r"""mean error of the correlation function as function of s + + .. math:: \mathtt{err} = \frac{1}{T}\int_0^T |r_\mathm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt + + :return: the mean error ``err`` + """ + + err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0) + return err + +def max_error(r_t_s, r_t_s_exact): + return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0) + +def max_rel_error(r_t_s, r_t_s_exact): + return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact)) + +def auto_grid_points(r_tau, t_max, tol = 1e-8, err_method = max_error, name = 'mid_point', sig_min = 1e-4): + err = 1 + ng = 64 + seed = None + err_method_name = err_method.__name__ + print("start auto_grid_points, determine ng ...") + #exponential increase to get below error threshold + while err > tol: + ng *= 2 + ng_fine = ng*10 + t_fine = np.linspace(0, t_max, ng_fine) + print("#"*40) + print("new process with {} weights ...".format(name)) + stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min) + print("reconstruct correlation function ({} points)...".format(ng_fine)) + r_t_s = stoc_proc.recons_corr(t_fine) + print("calculate exact correlation function ...") + r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine)) + print("calculate error using {} ...".format(err_method_name)) + err = np.max(err_method(r_t_s, r_t_s_exact)) + print("ng {} -> err {:.3e}".format(ng, err)) + + ng_low = ng // 2 + ng_high = ng + + while (ng_high - ng_low) > 1: + print("#"*40) + print("ng_l", ng_low) + print("ng_h", ng_high) + ng = (ng_low + ng_high) // 2 + print("ng", ng) + ng_fine = ng*10 + t_fine = np.linspace(0, t_max, ng_fine) + print("new process with {} weights ...".format(name)) + stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min) + print("reconstruct correlation function ({} points)...".format(ng_fine)) + r_t_s = stoc_proc.recons_corr(t_fine) + print("calculate exact correlation function ...") + r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine)) + print("calculate error using {} ...".format(err_method_name)) + err = np.max(err_method(r_t_s, r_t_s_exact)) + print("ng {} -> err {:.3e}".format(ng, err)) + if err > tol: + print(" err > tol!") + print(" ng_l -> ", ng) + ng_low = ng + else: + print(" err <= tol!") + print(" ng_h -> ", ng) + ng_high = ng + + + return ng_high \ No newline at end of file diff --git a/stocproc/stocproc.py b/stocproc/stocproc.py index bc1e2c7..b790d01 100644 --- a/stocproc/stocproc.py +++ b/stocproc/stocproc.py @@ -56,597 +56,11 @@ solutions of the time discrete version. .. todo:: implement convenient classes with fixed weights """ -import numpy as np -import functools -import pickle - import sys import os - sys.path.append(os.path.dirname(__file__)) - -import gquad - -from scipy.interpolate import InterpolatedUnivariateSpline - -class ComplexInterpolatedUnivariateSpline(object): - def __init__(self, x, y, k=2): - self.re_spline = InterpolatedUnivariateSpline(x, np.real(y)) - self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y)) - - def __call__(self, t): - return self.re_spline(t) + 1j*self.im_spline(t) - - -class StocProc(object): - r"""Simulate Stochastic Process using Karhunen-Loève expansion +import numpy as np - The :py:class:`StocProc` class provides methods to simulate stochastic processes - :math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]` - with given autocorrelation function - :math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to - the one described and implemented in :py:func:`stochastic_process_kle`. - - The :py:class:`StocProc` class extends the functionality of the - :py:func:`stochastic_process_kle` routine by providing an interpolation - method based on the numeric solution of the Fredholm equation. - Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation - are best interpolates using - - .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) - - with :math:`s_j` and :math:`w_j` being the time grid points and integration weights - for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being - the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation - (see Ref. [1]). - - From that is follows that a good interpolation formula for the stochastic process - is given by - - .. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j) - - where the :math:`Y_i` are independent normal distributed random variables - with variance one. - - For extracting values of the Stochastic Process you may use: - :py:func:`x`: returns the value of the Stochastic Process for a - single time :math:`t` - - :py:func:`x_t_array`: returns the value of the Stochastic Process for - all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`. - - :py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for - the times given to the constructor in order to discretize the Fredholm - equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the - same weights :math:`w_i` and time grid points :math:`s_i`. - - To generate a new process call :py:func:`new_process`. - - To generate a new sample use :py:func:`new_process`. - - :param r_tau: function object of the one parameter correlation function - :math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle` - :param t: list of grid points for the time axis - :param w: appropriate weights to integrate along the time axis using the - grid points given by :py:obj:`t` - :param seed: seed for the random number generator used - :param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`, - viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as - significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the - square root of the eigenvalue :math:`\lambda_i`) - - For further reading see :py:func:`stochastic_process_kle`. - - References: - [1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P., - 2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing, - Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989) - - """ - - _dump_members = ['_r_tau', - '_s', - '_w', - '_eig_val', - '_eig_vec'] - - def __init__(self, - r_tau = None, - t = None, - w = None, - seed = None, - sig_min = 1e-4, - fname = None, - cache_size = 1024, - verbose = 1): - - self.verbose = verbose - if fname is None: - - assert r_tau is not None - self._r_tau = r_tau - - assert t is not None - self._s = t - self._num_gp = len(self._s) - - assert w is not None - self._w = w - - t_row = self._s.reshape(1, self._num_gp) - t_col = self._s.reshape(self._num_gp, 1) - # correlation matrix - # r_tau(t-s) -> integral/sum over s -> s must be row in EV equation - r = self._r_tau(t_col-t_row) - - # solve discrete Fredholm equation - # eig_val = lambda - # eig_vec = u(t) - self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose) - else: - self.__load(fname) - - self.__calc_missing() - - self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False) - self.x = self.my_cache_decorator(self._x) - self.new_process(seed = seed) - - @classmethod - def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min): - known_names = ['trapezoidal', 'mid_point', 'gauss_legendre'] - - if name == 'trapezoidal': - ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min) - elif name == 'mid_point': - ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min) - elif name == 'gauss_legendre': - ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min) - else: - raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names)) - - ob.name = name - return ob - @classmethod - def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1): - t, w = get_trapezoidal_weights_times(t_max, ng) - return cls(r_tau, t, w, seed, sig_min, verbose=verbose) - - @classmethod - def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1): - t, w = get_mid_point_weights(t_max, ng) - return cls(r_tau, t, w, seed, sig_min, verbose=verbose) - - @classmethod - def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed, sig_min, verbose=1): - t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max) - return cls(r_tau, t, w, seed, sig_min, verbose=verbose) - - def __load(self, fname): - with open(fname, 'rb') as f: - for m in self._dump_members: - setattr(self, m, pickle.load(f)) - - def __calc_missing(self): - self._num_gp = len(self._s) - self._sqrt_eig_val = np.sqrt(self._eig_val) - self._num_ev = len(self._eig_val) - self._w /= np.sqrt(2) - self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev) - - - def __dump(self, fname): - with open(fname, 'wb') as f: - for m in self._dump_members: - pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL) - - def __getstate__(self): - return [getattr(self, atr) for atr in self._dump_members] - - def __setstate__(self, state): - for i, atr_value in enumerate(state): - setattr(self, self._dump_members[i], atr_value) - self.__calc_missing() - - def save_to_file(self, fname): - self.__dump(fname) - - def get_name(self): - if hasattr(self, 'name'): - return self.name - else: - return 'unknown' - - def new_process(self, yi=None, seed=None): - r"""setup new process - - Generates a new set of independent normal random variables :math:`Y_i` - which correspondent to the expansion coefficients of the - Karhunen-Loève expansion for the stochastic process - - .. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) - - :param seed: a seed my be given which is passed to the random number generator - """ - if seed != None: - np.random.seed(seed) - - self.clear_cache() - if yi is None: - if self.verbose > 1: - print("generate samples ...") - self._Y = np.random.normal(size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1) - if self.verbose > 1: - print("done!") - else: - self._Y = yi.reshape(self._num_ev,1) - - - def x_for_initial_time_grid(self): - r"""Get process on initial time grid - - Returns the value of the Stochastic Process for - the times given to the constructor in order to discretize the Fredholm - equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the - same weights :math:`w_i` and time grid points :math:`s_i`. - """ - tmp = self._Y / np.sqrt(2) * self._sqrt_eig_val.reshape(self._num_ev,1) - if self.verbose > 1: - print("calc process via matrix prod ...") - res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten() - if self.verbose > 1: - print("done!") - - return res - - def time_grid(self): - return self._s - - def __call__(self, t): - if isinstance(t, np.ndarray): - return self.x_t_array(t) - else: - return self.x(t) - - def _x(self, t): - # _Y # (N_ev, 1 ) - tmp = self._Y*self._r_tau(t-self._s.reshape(1, self._num_gp)) - # (N_ev, N_gp) - # A # (N_gp, N_ev) - return np.tensordot(tmp, self._A, axes=([1,0],[0,1])) - - def get_cache_info(self): - return self.x.cache_info() - - def clear_cache(self): - self.x.cache_clear() - - - def x_t_array(self, t_array): - t_array = t_array.reshape(1,1,len(t_array)) # (1 , 1 , N_t) - tmp = (self._Y.reshape(self._num_ev,1,1) * - self._r_tau(t_array-self._s.reshape(1,self._num_gp,1))) - # (N_ev, N_gp, N_t) - # A # (N_gp, N_ev) - # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) - if self.verbose > 1: - print("calc process via matrix prod ...") - res = np.tensordot(tmp, self._A, axes=([1,0],[0,1])) - if self.verbose > 1: - print("done!") - return res - - - def u_i(self, t_array, i): - r"""get eigenfunction of index i - - Returns the i-th eigenfunction corresponding to the i-th eigenvalue - of the discrete Fredholm equation using the interpolation scheme: - - .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) - - :param t_array: 1D time array for which the eigenfunction :math:`u_i` - will be evaluated. - :param i: index of the eigenfunction - :return: 1D array of length ``len(t_array)`` - """ - t_array = t_array.reshape(1,len(t_array)) # (1 , N_t) - tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1)) - # (N_gp, N_t) - # A # (N_gp, N_ev) - # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) - return np.sqrt(2)/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0])) - - def u_i_all(self, t_array): - r"""get all eigenfunctions - - Returns all eigenfunctions of the discrete Fredholm equation using - the interpolation scheme: - - .. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j) - - :param t_array: 1D time array for which the eigenfunction :math:`u_i` - will be evaluated. - :return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)`` - (note, the number of eigenvalues may be smaller than the number - of grid points because of the selections mechanism implemented - by the value of ``sig_min``) - """ - t_array = t_array.reshape(1,len(t_array)) # (1 , N_t) - tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1)) - # (N_gp, N_t) - # A # (N_gp, N_ev) - # A_j,i = w_j / sqrt(lambda_i) u_i(s_j) - return np.tensordot(tmp, np.sqrt(2)/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0])) - - def eigen_vector_i(self, i): - r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)""" - return self._eig_vec[:,i] - - def eigen_vector_i_all(self): - r"""Returns all eigenvectors (solutions of the discrete Fredhom equation) - - Note: Note: The maximum number of eigenvalues / eigenfunctions is given by - the number of time grid points passed to the constructor. But due to the - threshold ``sig_min`` (see :py:class:`StocProc`) only those - eigenvalues and corresponding eigenfunctions which satisfy - :math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept. - """ - return self._eig_vec - - def lambda_i(self, i): - r"""Returns the i-th eigenvalue.""" - return self._eig_val[i] - - def lambda_i_all(self): - r"""Returns all eigenvalues.""" - return self._eig_val - - def num_ev(self): - r"""Returns the number of eigenvalues / eigenfunctions used - - Note: The maximum number of eigenvalues / eigenfunctions is given by - the number of time grid points passed to the constructor. But due to the - threshold ``sig_min`` (see :py:class:`StocProc`) only those - eigenvalues and corresponding eigenfunctions which satisfy - :math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept. - """ - return self._num_ev - - def recons_corr(self, t_array): - r"""computes the interpolated correlation functions - - For the Karhunen-Loève expansion of a stochastic process the - correlation function can be expressed as follows: - - .. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s) - - With that one can do a consistency check whether the finite set of basis functions - for the expansion (the solutions of the discrete Fredholm equation) is good - enough to reproduce the given correlation function. - """ - u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev) - u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev) - lambda_i_all = self.lambda_i_all() #(N_ev) - - tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev) - - return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1])) - - def recons_corr_single_s(self, t_array, s): - u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev) - u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev) - lambda_i_all = self.lambda_i_all() #(N_ev) - tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev) - return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0] - -class StocProc_KLE(StocProc): - def __init__(self, r_tau, t_max, num_grid_points, ng_fredholm=None, seed=None, sig_min=1e-5, verbose=1): - """ - class to simulate stocastic processes using KLE method - - solve fredholm equation on grid with ng_fregholm nodes (trapezoidal_weights) - - calculate process (using interpolation solution of fredholm equation) with num_grid_points nodes - - ivoke cubic spline interpolator when calling - """ - - if ng_fredholm is None: - ng_fredholm = num_grid_points - - if num_grid_points < ng_fredholm: - print("WARNING: found 'num_grid_points < ng_fredholm' -> set 'num_grid_points == ng_fredholm'") - - if ng_fredholm == num_grid_points: - self.kle_interp = False - else: - self.kle_interp = True - - self.stocproc = StocProc.new_instance_with_trapezoidal_weights(r_tau = r_tau, - t_max = t_max, - ng = ng_fredholm, - sig_min = sig_min, - seed = seed, - verbose = verbose) - self._r_tau = self.stocproc._r_tau - self._s = self.stocproc._s - self._A = self.stocproc._A - self.stocproc.verbose = verbose - self.verbose = verbose - self.t = np.linspace(0, t_max, num_grid_points) - self._z = self.__cal_z() - if self.verbose > 1: - print("setup interpolator ...") - self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3) - if self.verbose > 1: - print("done!") - - def __cal_z(self): - if self.kle_interp: - return self.stocproc.x_t_array(self.t) - else: - return self.stocproc.x_for_initial_time_grid() - - def __call__(self, t): - return self.interpolator(t) - - def get_time(self): - return self.t - - def get_z(self): - return self._z - - def new_process(self, yi = None, seed = None): - self.stocproc.new_process(yi=yi, seed=seed) - self._z = self.__cal_z() - if self.verbose > 1: - print("setup interpolator ...") - self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3) - if self.verbose > 1: - print("done!") - - - -# def reconst_corr_zero(self, t_array): -class StocProc_FFT(object): - r""" - Simulate Stochastic Process using FFT method - """ - def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=1): - self.verbose = verbose - self.t_max = t_max - self.num_grid_points = num_grid_points - self.n_dft = self.num_grid_points * 2 - 1 - delta_t = self.t_max / (self.num_grid_points-1) - self.delta_omega = 2 * np.pi / (delta_t * self.n_dft) - - #omega axis - omega = self.delta_omega*np.arange(self.n_dft) - #reshape for multiplication with matrix xi - self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 = np.sqrt(spectral_density(omega)) * np.sqrt(self.delta_omega) / np.sqrt(2) - - if self.verbose > 0: - print("stoc proc fft, spectral density sampling information:") - print(" t_max :", (t_max)) - print(" ng :", (num_grid_points)) - - print(" omega_max :", (self.delta_omega * self.n_dft)) - print(" delta_omega:", (self.delta_omega)) - - self.interpolator = None - self.t = np.linspace(0, self.t_max, self.num_grid_points) - self.new_process(seed = seed) - - def __call__(self, t): - if self.interpolator is None: - if self.verbose > 1: - print("setup interpolator ...") - self.interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=3) - if self.verbose > 1: - print("done!") - return self.interpolator(t) - - def get_time(self): - return self.t - - def get_z(self): - return self._z - - def new_process(self, yi = None, seed = None): - self.interpolator = None - if seed != None: - if self.verbose > 0: - print("use seed", seed) - np.random.seed(seed) - if yi is None: - #random complex normal samples - if self.verbose > 1: - print("generate samples ...") - yi = np.random.normal(size = 2*self.n_dft).view(np.complex) - if self.verbose > 1: - print("done") - #each row contain a different integrand - weighted_integrand = self.sqrt_spectral_density_times_sqrt_delta_omega_over_sqrt_2 * yi - #compute integral using fft routine - if self.verbose > 1: - print("calc process via fft ...") - self._z = np.fft.fft(weighted_integrand)[0:self.num_grid_points] - if self.verbose > 1: - print("done") - - - - - -def _mean_error(r_t_s, r_t_s_exact): - r"""mean error of the correlation function as function of s - - .. math:: \mathtt{err} = \frac{1}{T}\int_0^T |r_\mathm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt - - :return: the mean error ``err`` - """ - - err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0) - return err - -def _max_error(r_t_s, r_t_s_exact): - return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0) - -def _max_rel_error(r_t_s, r_t_s_exact): - return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact)) - -def auto_grid_points(r_tau, t_max, tol = 1e-8, err_method = _max_error, name = 'mid_point', sig_min = 1e-4): - err = 1 - ng = 64 - seed = None - err_method_name = err_method.__name__ - print("start auto_grid_points, determine ng ...") - #exponential increase to get below error threshold - while err > tol: - ng *= 2 - ng_fine = ng*10 - t_fine = np.linspace(0, t_max, ng_fine) - print("#"*40) - print("new process with {} weights ...".format(name)) - stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min) - print("reconstruct correlation function ({} points)...".format(ng_fine)) - r_t_s = stoc_proc.recons_corr(t_fine) - print("calculate exact correlation function ...") - r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine)) - print("calculate error using {} ...".format(err_method_name)) - err = np.max(err_method(r_t_s, r_t_s_exact)) - print("ng {} -> err {:.3e}".format(ng, err)) - - ng_low = ng // 2 - ng_high = ng - - while (ng_high - ng_low) > 1: - print("#"*40) - print("ng_l", ng_low) - print("ng_h", ng_high) - ng = (ng_low + ng_high) // 2 - print("ng", ng) - ng_fine = ng*10 - t_fine = np.linspace(0, t_max, ng_fine) - print("new process with {} weights ...".format(name)) - stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min) - print("reconstruct correlation function ({} points)...".format(ng_fine)) - r_t_s = stoc_proc.recons_corr(t_fine) - print("calculate exact correlation function ...") - r_t_s_exact = r_tau(t_fine.reshape(ng_fine,1) - t_fine.reshape(1, ng_fine)) - print("calculate error using {} ...".format(err_method_name)) - err = np.max(err_method(r_t_s, r_t_s_exact)) - print("ng {} -> err {:.3e}".format(ng, err)) - if err > tol: - print(" err > tol!") - print(" ng_l -> ", ng) - ng_low = ng - else: - print(" err <= tol!") - print(" ng_h -> ", ng) - ng_high = ng - - - return ng_high - def solve_hom_fredholm(r, w, eig_val_min, verbose=1): r"""Solves the discrete homogeneous Fredholm equation of the second kind diff --git a/tests/test_stocproc.py b/tests/test_stocproc.py index 9e43a33..ea0475c 100644 --- a/tests/test_stocproc.py +++ b/tests/test_stocproc.py @@ -31,7 +31,7 @@ import os import pathlib p = pathlib.PosixPath(os.path.abspath(__file__)) -sys.path.insert(0, str(p.parent.parent / 'stocproc')) +sys.path.insert(0, str(p.parent.parent)) import stocproc as sp @@ -561,10 +561,12 @@ def test_auto_grid_points(): r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1) # time interval [0,T] t_max = 15 - ng_interpolation = 1000 tol = 1e-8 - ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, sig_min=0) + ng = sp.auto_grid_points(r_tau = r_tau, + t_max = t_max, + tol = tol, + sig_min = 0) print(ng) def test_chache(): @@ -639,7 +641,7 @@ def show_auto_grid_points_result(): # name = 'trapezoidal' # name = 'gauss_legendre' - ng = sp.auto_grid_points(r_tau, t_max, ng_interpolation, tol, name=name, sig_min=sig_min) + ng = sp.auto_grid_points(r_tau, t_max, tol, name=name, sig_min=sig_min) t, w = sp.get_trapezoidal_weights_times(t_max, ng) stoc_proc = sp.StocProc(r_tau, t, w, seed, sig_min) @@ -647,14 +649,52 @@ def show_auto_grid_points_result(): r_t_s_exact = r_tau(t_large.reshape(ng_interpolation,1) - t_large.reshape(1, ng_interpolation)) - diff = sp._mean_error(r_t_s, r_t_s_exact) - diff_max = sp._max_error(r_t_s, r_t_s_exact) + diff = sp.mean_error(r_t_s, r_t_s_exact) + diff_max = sp.max_error(r_t_s, r_t_s_exact) plt.plot(t_large, diff) plt.plot(t_large, diff_max) plt.yscale('log') plt.grid() plt.show() + +def test_ui_mem_save(): + s_param = 1 + gamma_s_plus_1 = gamma(s_param+1) + r_tau = lambda tau : corr(tau, s_param, gamma_s_plus_1) + t_max = 1 + + N1 = 100 + a = 5 + N2 = a*(N1 - 1) + 1 + + t_fine = np.linspace(0, t_max, N2) + + assert abs( (t_max/(N1-1)) - a*(t_fine[1]-t_fine[0]) ) < 1e-14, "{}".format(abs( (t_max/(N1-1)) - (t_fine[1]-t_fine[0]) )) + + stoc_proc = sp.StocProc.new_instance_with_trapezoidal_weights(r_tau, t_max, ng=N1, sig_min = 1e-4) + + for i in range(stoc_proc.num_ev()): + + ui_ms = stoc_proc.u_i_mem_save(delta_t_fac=a, i=i) + ui = stoc_proc.u_i(t_fine, i) +# plt.plot(t_fine, np.real(ui_ms), color='k') +# plt.plot(t_fine, np.imag(ui_ms), color='k') +# +# plt.plot(t_fine, np.real(ui), color='r') +# plt.plot(t_fine, np.imag(ui), color='r') +# +# plt.plot(stoc_proc._s, np.real(stoc_proc._eig_vec[:,i]), marker = 'o', ls='', color='b') +# plt.plot(stoc_proc._s, np.imag(stoc_proc._eig_vec[:,i]), marker = 'o', ls='', color='b') +# +# plt.grid() +# +# plt.show() + + assert np.allclose(ui_ms, ui), "{}".format(max(np.abs(ui_ms - ui))) + + + if __name__ == "__main__": # test_stochastic_process_KLE_correlation_function(plot=False) @@ -664,9 +704,10 @@ if __name__ == "__main__": # test_stocproc_KLE_splineinterpolation(plot=False) # test_stochastic_process_FFT_interpolation(plot=False) # test_stocProc_eigenfunction_extraction() - test_orthonomality() - test_auto_grid_points() - show_auto_grid_points_result() - test_chache() - test_dump_load() +# test_orthonomality() +# test_auto_grid_points() +# show_auto_grid_points_result() +# test_chache() +# test_dump_load() + test_ui_mem_save() pass \ No newline at end of file