This commit is contained in:
Richard Hartmann 2016-10-28 22:37:49 +02:00
commit 2b82c04e95
15 changed files with 1358 additions and 3461 deletions

225
doc/Makefile Normal file
View file

@ -0,0 +1,225 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " applehelp to make an Apple Help Book"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " epub3 to make an epub3"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " xml to make Docutils-native XML files"
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
@echo " coverage to run coverage check of the documentation (if enabled)"
@echo " dummy to check syntax errors of document sources"
.PHONY: clean
clean:
rm -rf $(BUILDDIR)/*
.PHONY: html
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
.PHONY: dirhtml
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
.PHONY: singlehtml
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
.PHONY: pickle
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
.PHONY: json
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
.PHONY: htmlhelp
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
.PHONY: qthelp
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/StocProc.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/StocProc.qhc"
.PHONY: applehelp
applehelp:
$(SPHINXBUILD) -b applehelp $(ALLSPHINXOPTS) $(BUILDDIR)/applehelp
@echo
@echo "Build finished. The help book is in $(BUILDDIR)/applehelp."
@echo "N.B. You won't be able to view it unless you put it in" \
"~/Library/Documentation/Help or install it in your application" \
"bundle."
.PHONY: devhelp
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/StocProc"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/StocProc"
@echo "# devhelp"
.PHONY: epub
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
.PHONY: epub3
epub3:
$(SPHINXBUILD) -b epub3 $(ALLSPHINXOPTS) $(BUILDDIR)/epub3
@echo
@echo "Build finished. The epub3 file is in $(BUILDDIR)/epub3."
.PHONY: latex
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
.PHONY: latexpdf
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: latexpdfja
latexpdfja:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through platex and dvipdfmx..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: text
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
.PHONY: man
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
.PHONY: texinfo
texinfo:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
.PHONY: info
info:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
.PHONY: gettext
gettext:
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
.PHONY: changes
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
.PHONY: linkcheck
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
.PHONY: doctest
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
.PHONY: coverage
coverage:
$(SPHINXBUILD) -b coverage $(ALLSPHINXOPTS) $(BUILDDIR)/coverage
@echo "Testing of coverage in the sources finished, look at the " \
"results in $(BUILDDIR)/coverage/python.txt."
.PHONY: xml
xml:
$(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
@echo
@echo "Build finished. The XML files are in $(BUILDDIR)/xml."
.PHONY: pseudoxml
pseudoxml:
$(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
@echo
@echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
.PHONY: dummy
dummy:
$(SPHINXBUILD) -b dummy $(ALLSPHINXOPTS) $(BUILDDIR)/dummy
@echo
@echo "Build finished. Dummy builder generates no files."

351
doc/source/conf.py Normal file
View file

@ -0,0 +1,351 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# StocProc documentation build configuration file, created by
# sphinx-quickstart on Fri Oct 28 15:22:00 2016.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../../'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.coverage',
'sphinx.ext.mathjax',
'sphinx.ext.viewcode',
'sphinx.ext.githubpages',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The encoding of source files.
#
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = 'StocProc'
copyright = '2016, Richard Hartmann'
author = 'Richard Hartmann'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.2'
# The full version, including alpha/beta/rc tags.
release = '0.2.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#
# today = ''
#
# Else, today_fmt is used as the format for a strftime call.
#
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all
# documents.
#
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
# modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built documents.
# keep_warnings = False
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
# html_theme_path = []
# The name for this set of Sphinx documents.
# "<project> v<release> documentation" by default.
#
# html_title = 'StocProc v0.2.0'
# A shorter title for the navigation bar. Default is the same as html_title.
#
# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#
# html_logo = None
# The name of an image file (relative to this directory) to use as a favicon of
# the docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
# directly to the root of the documentation.
#
# html_extra_path = []
# If not None, a 'Last updated on:' timestamp is inserted at every page
# bottom, using the given strftime format.
# The empty string is equivalent to '%b %d, %Y'.
#
# html_last_updated_fmt = None
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#
# html_additional_pages = {}
# If false, no module index is generated.
#
# html_domain_indices = True
# If false, no index is generated.
#
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#
# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#
# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#
# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#
# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
# html_file_suffix = None
# Language to be used for generating the HTML full-text search index.
# Sphinx supports the following languages:
# 'da', 'de', 'en', 'es', 'fi', 'fr', 'h', 'it', 'ja'
# 'nl', 'no', 'pt', 'ro', 'r', 'sv', 'tr', 'zh'
#
# html_search_language = 'en'
# A dictionary with options for the search language support, empty by default.
# 'ja' uses this config value.
# 'zh' user can custom change `jieba` dictionary path.
#
# html_search_options = {'type': 'default'}
# The name of a javascript file (relative to the configuration directory) that
# implements a search results scorer. If empty, the default will be used.
#
# html_search_scorer = 'scorer.js'
# Output file base name for HTML help builder.
htmlhelp_basename = 'StocProcdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'StocProc.tex', 'StocProc Documentation',
'Richard Hartmann', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#
# latex_use_parts = False
# If true, show page references after internal links.
#
# latex_show_pagerefs = False
# If true, show URL addresses after external links.
#
# latex_show_urls = False
# Documents to append as an appendix to all manuals.
#
# latex_appendices = []
# It false, will not define \strong, \code, itleref, \crossref ... but only
# \sphinxstrong, ..., \sphinxtitleref, ... To help avoid clash with user added
# packages.
#
# latex_keep_old_macro_names = True
# If false, no module index is generated.
#
# latex_domain_indices = True
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'stocproc', 'StocProc Documentation',
[author], 1)
]
# If true, show URL addresses after external links.
#
# man_show_urls = False
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'StocProc', 'StocProc Documentation',
author, 'StocProc', 'One line description of project.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.
#
# texinfo_appendices = []
# If false, no module index is generated.
#
# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#
# texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
#
# texinfo_no_detailmenu = False
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}

1
doc/source/index.rst Normal file
View file

@ -0,0 +1 @@
.. automodule:: stocproc

View file

@ -1,11 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from . import stocproc_c
from . import stocproc
from . import class_stocproc_kle
from . import class_stocproc
from . import gquad
from . import helper
from . import method_fft
"""
Stochastic Process Module
=========================
This module contains two different implementation for generating stochastic processes for a
given auto correlation function. Both methods are based on a time discrete process, however cubic
spline interpolation is assured to be valid within a given tolerance.
* simulate stochastic processes using Karhunen-Loève expansion :py:func:`stocproc.StocProc_KLE_tol`
Setting up the class involves solving an eigenvalue problem which grows with
the time interval the process is simulated on. Further generating a new process
involves a multiplication with that matrix, therefore it scales quadratically with the
time interval. Nonetheless it turns out that this method requires less random numbers
than the Fast-Fourier method.
* simulate stochastic processes using Fast-Fourier method method :py:func:`stocproc.StocProc_FFT_tol`
Setting up this class is quite efficient as it only calculates values of the
associated spectral density. The number scales linear with the time interval of interest. However to achieve
sufficient accuracy many of these values are required. As the generation of a new process is based on
a Fast-Fouried-Transform over these values, this part is comparably lengthy.
"""
version = '0.2.0'
from .stocproc import StocProc_FFT_tol
from .stocproc import StocProc_KLE
from .stocproc import StocProc_KLE_tol

View file

@ -1,492 +0,0 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.integrate import quad
#from .class_stocproc_kle import StocProc
from .stocproc import solve_hom_fredholm
from .stocproc import get_simpson_weights_times
from . import method_kle
from . import method_fft
from . import stocproc_c
import logging
log = logging.getLogger(__name__)
class ComplexInterpolatedUnivariateSpline(object):
r"""
Univariant spline interpolator from scpi.interpolate in a convenient fashion to
interpolate real and imaginary parts of complex data
"""
def __init__(self, x, y, k=2):
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y))
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y))
def __call__(self, t):
return self.re_spline(t) + 1j*self.im_spline(t)
def complex_quad(func, a, b, **kw_args):
func_re = lambda t: np.real(func(t))
func_im = lambda t: np.imag(func(t))
I_re = quad(func_re, a, b, **kw_args)[0]
I_im = quad(func_im, a, b, **kw_args)[0]
return I_re + 1j*I_im
class _absStocProc(object):
r"""
Abstract base class to stochastic process interface
general work flow:
- Specify the time axis of interest [0, t_max] and it resolution (number of grid points), :math:`t_i = i \frac{t_max}{N_t-1}.
- To evaluate the stochastic process at these points, a mapping from :math:`N_z` normal distributed
random complex numbers with :math:`\langle y_i y_j^\ast \rangle = 2 \delta_{ij}`
to the stochastic process :math:`z_{t_i}` is needed and depends on the implemented method (:py:func:`_calc_z').
- A new process should be generated by calling :py:func:`new_process'.
- When the __call__ method is invoked the results will be interpolated between the :math:`z_t_i`.
"""
def __init__(self, t_max, num_grid_points, seed=None, verbose=1, k=3):
r"""
:param t_max: specify time axis as [0, t_max]
:param num_grid_points: number of equidistant times on that axis
:param seed: if not ``None`` set seed to ``seed``
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
"""
self._verbose = verbose
self.t_max = t_max
self.num_grid_points = num_grid_points
self.t = np.linspace(0, t_max, num_grid_points)
self._z = None
self._interpolator = None
self._k = k
if seed is not None:
np.random.seed(seed)
self._one_over_sqrt_2 = 1/np.sqrt(2)
def __call__(self, t):
r"""
:param t: time to evaluate the stochastic process as, float of array of floats
evaluates the stochastic process via spline interpolation between the discrete process
"""
if self._z is None:
raise RuntimeError("StocProc_FFT has NO random data, call 'new_process' to generate a new random process")
if self._interpolator is None:
if self._verbose > 1:
print("setup interpolator ...")
self._interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=self._k)
if self._verbose > 1:
print("done!")
return self._interpolator(t)
def _calc_z(self, y):
r"""
maps the normal distributed complex valued random variables y to the stochastic process
:return: the stochastic process, array of complex numbers
"""
pass
def get_num_y(self):
r"""
:return: number of complex random variables needed to calculate the stochastic process
"""
pass
def get_time(self):
r"""
:return: time axis
"""
return self.t
def get_z(self):
r"""
use :py:func:`new_process` to generate a new process
:return: the current process
"""
return self._z
def new_process(self, y=None, seed=None):
r"""
generate a new process by evaluating :py:func:`_calc_z'
When ``y`` is given use these random numbers as input for :py:func:`_calc_z`
otherwise generate a new set of random numbers.
:param y: independent normal distributed complex valued random variables with :math:`\sig_{ij}^2 = \langle y_i y_j^\ast \rangle = 2 \delta_{ij}
:param seed: if not ``None`` set seed to ``seed`` before generating samples
"""
self._interpolator = None
if seed != None:
if self._verbose > 0:
print("use seed", seed)
np.random.seed(seed)
if y is None:
#random complex normal samples
if self._verbose > 1:
print("generate samples ...")
y = np.random.normal(scale=self._one_over_sqrt_2, size = 2*self.get_num_y()).view(np.complex)
if self._verbose > 1:
print("done")
self._z = self._calc_z(y)
class StocProc_KLE(_absStocProc):
r"""
class to simulate stochastic processes using KLE method
- Solve fredholm equation on grid with ``ng_fredholm nodes`` (trapezoidal_weights).
If case ``ng_fredholm`` is ``None`` set ``ng_fredholm = num_grid_points``. In general it should
hold ``ng_fredholm < num_grid_points`` and ``num_grid_points = 10*ng_fredholm`` might be a good ratio.
- Calculate discrete stochastic process (using interpolation solution of fredholm equation) with num_grid_points nodes
- invoke spline interpolator when calling
"""
def __init__(self, r_tau, t_max, ng_fredholm, ng_fac=4, seed=None, sig_min=1e-5, verbose=1, k=3, align_eig_vec=False):
r"""
:param r_tau: auto correlation function of the stochastic process
:param t_max: specify time axis as [0, t_max]
:param seed: if not ``None`` set seed to ``seed``
:param sig_min: eigenvalue threshold (see KLE method to generate stochastic processes)
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
"""
self.verbose = verbose
self.ng_fac = ng_fac
if ng_fac == 1:
self.kle_interp = False
else:
self.kle_interp = True
t, w = method_kle.get_simpson_weights_times(t_max, ng_fredholm)
self._one_over_sqrt_2 = 1/np.sqrt(2)
self._r_tau = r_tau
self._s = t
self._num_gp = len(self._s)
self._w = w
r = self._calc_corr_matrix(self._s, self._r_tau)
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
self._eig_val, self._eig_vec = method_kle.solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose-1)
if align_eig_vec:
for i in range(self._eig_vec.shape[1]):
s = np.sum(self._eig_vec[:,i])
phase = np.exp(1j*np.arctan2(np.real(s), np.imag(s)))
self._eig_vec[:,i]/= phase
self.__calc_missing()
ng_fine = self.ng_fac * (self._num_gp - 1) + 1
self.alpha_k = self._calc_corr_min_t_plus_t(s = np.linspace(0, t_max, ng_fine),
bcf = self._r_tau)
super().__init__(t_max=t_max, num_grid_points=ng_fine, seed=seed, verbose=verbose, k=k)
self.num_y = self._num_ev
self.verbose = verbose
@staticmethod
def _calc_corr_min_t_plus_t(s, bcf):
bcf_n_plus = bcf(s-s[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
return np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
@staticmethod
def _calc_corr_matrix(s, bcf):
"""calculates the matrix alpha_ij = bcf(t_i-s_j)
calls bcf only for s-s_0 and reconstructs the rest
"""
n_ = len(s)
bcf_n = StocProc_KLE._calc_corr_min_t_plus_t(s, bcf)
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
return r
def __calc_missing(self):
self._num_gp = len(self._s)
self._sqrt_eig_val = np.sqrt(self._eig_val)
self._num_ev = len(self._eig_val)
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
def _x_t_mem_save(self, kahanSum=False):
"""calculate the stochastic process (SP) for a certain class fine grids
when the SP is setup with n grid points, which means we know the eigenfunctions
for the n discrete times t_i = i/(n-1)*t_max, i = 0,1,...n-1
with delta_t = t_max/(n-1)
it is efficient to calculate the interpolated process
for finer grids with delta_t_fine = delta_t/delta_t_fac
because we only need to know the bcf on the finer grid
"""
return stocproc_c.z_t(delta_t_fac = self.ng_fac,
N1 = self._num_gp,
alpha_k = self.alpha_k,
a_tmp = self._a_tmp,
kahanSum = kahanSum)
def _x_for_initial_time_grid(self):
r"""Get process on initial time grid
Returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
"""
tmp = self._Y * self._sqrt_eig_val.reshape(self._num_ev,1)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
if self.verbose > 1:
print("done!")
return res
def _new_process(self, yi=None, seed=None):
r"""setup new process
Generates a new set of independent normal random variables :math:`Y_i`
which correspondent to the expansion coefficients of the
Karhunen-Loève expansion for the stochastic process
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t)
:param seed: a seed my be given which is passed to the random number generator
"""
if seed != None:
np.random.seed(seed)
if yi is None:
if self.verbose > 1:
print("generate samples ...")
self._Y = np.random.normal(scale = self._one_over_sqrt_2, size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
if self.verbose > 1:
print("done!")
else:
self._Y = yi.reshape(self._num_ev,1)
self._a_tmp = np.tensordot(self._Y[:,0], self._A, axes=([0],[1]))
def _calc_z(self, y):
r"""
uses the underlaying stocproc class to generate the process (see :py:class:`StocProc` for details)
"""
self._new_process(y)
if self.kle_interp:
#return self.stocproc.x_t_array(np.linspace(0, self.t_max, self.num_grid_points))
return self._x_t_mem_save(kahanSum=True)
else:
return self._x_for_initial_time_grid()
def get_num_y(self):
return self.num_y
class StocProc_KLE_tol(StocProc_KLE):
r"""
same as StocProc_KLE except that ng_fredholm is determined from given tolerance
"""
def __init__(self, tol, **kwargs):
self.tol = tol
self._auto_grid_points(**kwargs)
def _init_StocProc_KLE_and_get_error(self, ng, **kwargs):
super().__init__(ng_fredholm=ng, **kwargs)
ng_fine = self.ng_fac*(ng-1)+1
#t_fine = np.linspace(0, self.t_max, ng_fine)
u_i_all_t = stocproc_c.eig_func_all_interp(delta_t_fac = self.ng_fac,
time_axis = self._s,
alpha_k = self.alpha_k,
weights = self._w,
eigen_val = self._eig_val,
eigen_vec = self._eig_vec)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
num_ev = len(self._eig_val)
tmp = self._eig_val.reshape(1, num_ev) * u_i_all_t #(N_gp, N_ev)
recs_bcf = np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
refc_bcf = np.empty(shape=(ng_fine,ng_fine), dtype = np.complex128)
for i in range(ng_fine):
idx = ng_fine-1-i
refc_bcf[:,i] = self.alpha_k[idx:idx+ng_fine]
err = np.max(np.abs(recs_bcf-refc_bcf)/np.abs(refc_bcf))
return err
def _auto_grid_points(self, **kwargs):
err = np.inf
c = 2
#exponential increase to get below error threshold
while err > self.tol:
c *= 2
ng = 2*c + 1
print("ng {}".format(ng), end='', flush=True)
err = self._init_StocProc_KLE_and_get_error(ng, **kwargs)
print(" -> err {:.3g}".format(err))
c_low = c // 2
c_high = c
while (c_high - c_low) > 1:
c = (c_low + c_high) // 2
ng = 2*c + 1
print("ng {}".format(ng), end='', flush=True)
err = self._init_StocProc_KLE_and_get_error(ng, **kwargs)
print(" -> err {:.3g}".format(err))
if err > self.tol:
c_low = c
else:
c_high = c
class StocProc_FFT(_absStocProc):
r"""
Simulate Stochastic Process using FFT method
"""
def __init__(self, spectral_density, t_max, num_grid_points, seed=None, verbose=0, k=3, omega_min=0):
super().__init__(t_max = t_max,
num_grid_points = num_grid_points,
seed = seed,
verbose = verbose,
k = k)
self.n_dft = num_grid_points * 2 - 1
delta_t = t_max / (num_grid_points-1)
self.delta_omega = 2 * np.pi / (delta_t * self.n_dft)
self.omega_min = omega_min
t = np.arange(num_grid_points) * delta_t
self.omega_min_correction = np.exp(-1j * self.omega_min * t)
#omega axis
omega = self.delta_omega*np.arange(self.n_dft)
#reshape for multiplication with matrix xi
self.sqrt_spectral_density_over_pi_times_sqrt_delta_omega = np.sqrt(spectral_density(omega + self.omega_min)) * np.sqrt(self.delta_omega / np.pi)
if self._verbose > 0:
print("stoc proc fft, spectral density sampling information:")
print(" t_max :", (t_max))
print(" ng :", (num_grid_points))
print(" omega_min :", (self.omega_min))
print(" omega_max :", (self.omega_min + self.delta_omega * self.n_dft))
print(" delta_omega:", (self.delta_omega))
def _calc_z(self, y):
weighted_integrand = self.sqrt_spectral_density_over_pi_times_sqrt_delta_omega * y
#compute integral using fft routine
if self._verbose > 1:
print("calc process via fft ...")
z = np.fft.fft(weighted_integrand)[0:self.num_grid_points] * self.omega_min_correction
if self._verbose > 1:
print("done")
return z
def get_num_y(self):
return self.n_dft
class StocProc_FFT_tol(_absStocProc):
r"""
Simulate Stochastic Process using FFT method
"""
def __init__(self, spectral_density, t_max, bcf_ref, intgr_tol=1e-3, intpl_tol=1e-3,
seed=None, verbose=0, k=3, negative_frequencies=False, method='simps'):
if not negative_frequencies:
log.debug("non neg freq only")
# assume the spectral_density is 0 for w<0
# and decays fast for large w
b = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = 1,
max_val = 1e6,
x0 = 1)
log.debug("upper int bound b {:.3e}".format(b))
b, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = 0,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**20,
method = method)
log.debug("required tol result in N {}".format(N))
a = 0
else:
# assume the spectral_density is non zero also for w<0
# but decays fast for large |w|
b = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = 1,
max_val = 1e6,
x0 = 1)
a = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = -1,
max_val = 1e6,
x0 = -1)
b_minus_a, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = a,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**20,
method = method)
b = b*b_minus_a/(b-a)
a = b-b_minus_a
num_grid_points = int(np.ceil(t_max/dt))+1
t_max = (num_grid_points-1)*dt
super().__init__(t_max = t_max,
num_grid_points = num_grid_points,
seed = seed,
verbose = verbose,
k = k)
self.n_dft = N
omega = dx*np.arange(self.n_dft)
if method == 'simps':
self.yl = spectral_density(omega + a) * dx / np.pi
self.yl = method_fft.get_fourier_integral_simps_weighted_values(self.yl)
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*a*self.t) #self.t is from the parent class
elif method == 'midp':
self.yl = spectral_density(omega + a + dx/2) * dx / np.pi
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*(a+dx/2)*self.t) #self.t is from the parent class
else:
raise ValueError("unknown method '{}'".format(method))
def _calc_z(self, y):
z = np.fft.fft(self.yl * y)[0:self.num_grid_points] * self.omega_min_correction
return z
def get_num_y(self):
return self.n_dft

View file

@ -1,704 +0,0 @@
# -*- coding: utf8 -*-
"""
advanced class to do all sort of things with KLE generated stochastic processes
"""
from __future__ import print_function, division
import functools
import numpy as np
import pickle
import sys
from .stocproc import solve_hom_fredholm
from .stocproc import get_mid_point_weights
from .stocproc import get_trapezoidal_weights_times
from .stocproc import get_simpson_weights_times
import gquad
from . import stocproc_c
from scipy.integrate import quad
from scipy.interpolate import InterpolatedUnivariateSpline
from itertools import product
from math import fsum
class ComplexInterpolatedUnivariateSpline(object):
def __init__(self, x, y, k=2):
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y))
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y))
def __call__(self, t):
return self.re_spline(t) + 1j*self.im_spline(t)
def complex_quad(func, a, b, **kw_args):
func_re = lambda t: np.real(func(t))
func_im = lambda t: np.imag(func(t))
I_re = quad(func_re, a, b, **kw_args)[0]
I_im = quad(func_im, a, b, **kw_args)[0]
return I_re + 1j*I_im
class StocProc(object):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
The :py:class:`StocProc` class provides methods to simulate stochastic processes
:math:`X(t)` in a given time interval :math:`[0,t_\mathrm{max}]`
with given autocorrelation function
:math:`R(\tau) = R(t-s) = \langle X(t)X^\ast(s)\rangle`. The method is similar to
the one described and implemented in :py:func:`stochastic_process_kle`.
The :py:class:`StocProc` class extends the functionality of the
:py:func:`stochastic_process_kle` routine by providing an interpolation
method based on the numeric solution of the Fredholm equation.
Since the time discrete solutions :math:`u_i(s_j)` of the Fredholm equation
are best interpolates using
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
with :math:`s_j` and :math:`w_j` being the time grid points and integration weights
for the numeric integrations as well as :math:`\lambda_i` and :math:`u_i` being
the eigenvalues the time discrete eigenvectors of the discrete Fredholm equation
(see Ref. [1]).
From that is follows that a good interpolation formula for the stochastic process
is given by
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t) = \sum_{i,j} \frac{Y_i}{\sqrt{\lambda_i}}w_j R(t-s_j) u_i(s_j)
where the :math:`Y_i` are independent normal distributed random variables
with variance one.
For extracting values of the Stochastic Process you may use:
:py:func:`x`: returns the value of the Stochastic Process for a
single time :math:`t`
:py:func:`x_t_array`: returns the value of the Stochastic Process for
all values of the `numpy.ndarray` a single time :math:`t_\mathrm{array}`.
:py:func:`x_for_initial_time_grid`: returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
To generate a new process call :py:func:`new_process`.
To generate a new sample use :py:func:`new_process`.
:param r_tau: function object of the one parameter correlation function
:math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the
grid points given by :py:obj:`t`
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i = \sigma_i Y_i`,
viewed as coefficient for the base function :math:`u_i(t)`, must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
For further reading see :py:func:`stochastic_process_kle`.
References:
[1] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
"""
_dump_members = ['_r_tau',
'_s',
'_w',
'_eig_val',
'_eig_vec']
def __init__(self,
r_tau = None,
t = None,
w = None,
seed = None,
sig_min = 1e-4,
fname = None,
cache_size = 1024,
verbose = 1,
align_eig_vec = False):
self.verbose = verbose
self._one_over_sqrt_2 = 1/np.sqrt(2)
if fname is None:
assert r_tau is not None
self._r_tau = r_tau
assert t is not None
self._s = t
self._num_gp = len(self._s)
assert w is not None
self._w = w
r = StocProc._calc_corr_matrix(self._s, self._r_tau)
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
self._eig_val, self._eig_vec = solve_hom_fredholm(r, w, sig_min**2, verbose=self.verbose)
if align_eig_vec:
for i in range(self._eig_vec.shape[1]):
s = np.sum(self._eig_vec[:,i])
phase = np.exp(1j*np.arctan2(np.real(s), np.imag(s)))
self._eig_vec[:,i]/= phase
else:
self.__load(fname)
self.__calc_missing()
self.my_cache_decorator = functools.lru_cache(maxsize=cache_size, typed=False)
self.x = self.my_cache_decorator(self._x)
self.new_process(seed = seed)
@staticmethod
def _calc_corr_matrix(s, bcf):
"""calculates the matrix alpha_ij = bcf(t_i-s_j)
calls bcf only for s-s_0 and reconstructs the rest
"""
n_ = len(s)
bcf_n_plus = bcf(s-s[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
return r
@classmethod
def new_instance_by_name(cls, name, r_tau, t_max, ng, seed, sig_min, verbose=1, align_eig_vec=False):
"""create a new StocProc instance where the weights are given by name"""
known_names = ['trapezoidal', 'mid_point', 'simpson', 'gauss_legendre']
if name == 'trapezoidal':
ob = cls.new_instance_with_trapezoidal_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'mid_point':
ob = cls.new_instance_with_mid_point_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'simpson':
ob = cls.new_instance_with_simpson_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
elif name == 'gauss_legendre':
ob = cls.new_instance_with_gauss_legendre_weights(r_tau, t_max, ng, seed, sig_min, verbose, align_eig_vec)
else:
raise RuntimeError("unknown name '{}' to create StocProc instance\nknown names are {}".format(name, known_names))
ob.name = name
return ob
@classmethod
def new_instance_with_trapezoidal_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use trapezoidal weights (see :py:func:`get_trapezoidal_weights_times`)"""
t, w = get_trapezoidal_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_simpson_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use simpson weights (see :py:func:`get_simpson_weights_times`)"""
t, w = get_simpson_weights_times(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_mid_point_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use mid-point weights (see :py:func:`get_mid_point_weights`)"""
t, w = get_mid_point_weights(t_max, ng)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
@classmethod
def new_instance_with_gauss_legendre_weights(cls, r_tau, t_max, ng, seed=None, sig_min=0, verbose=1, align_eig_vec=False):
"""use gauss legendre weights (see :py:func:`gauss_nodes_weights_legendre`)"""
t, w = gquad.gauss_nodes_weights_legendre(n=ng, low=0, high=t_max)
return cls(r_tau, t, w, seed, sig_min, verbose=verbose, align_eig_vec=align_eig_vec)
def __load(self, fname):
with open(fname, 'rb') as f:
for m in self._dump_members:
setattr(self, m, pickle.load(f))
def __calc_missing(self):
self._num_gp = len(self._s)
self._sqrt_eig_val = np.sqrt(self._eig_val)
self._num_ev = len(self._eig_val)
self._A = self._w.reshape(self._num_gp,1) * self._eig_vec / self._sqrt_eig_val.reshape(1, self._num_ev)
def __dump(self, fname):
with open(fname, 'wb') as f:
for m in self._dump_members:
pickle.dump(getattr(self, m), f, pickle.HIGHEST_PROTOCOL)
def __getstate__(self):
return [getattr(self, atr) for atr in self._dump_members]
def __setstate__(self, state):
for i, atr_value in enumerate(state):
setattr(self, self._dump_members[i], atr_value)
self.__calc_missing()
def save_to_file(self, fname):
self.__dump(fname)
def get_name(self):
if hasattr(self, 'name'):
return self.name
else:
return 'unknown'
def new_process(self, yi=None, seed=None):
r"""setup new process
Generates a new set of independent normal random variables :math:`Y_i`
which correspondent to the expansion coefficients of the
Karhunen-Loève expansion for the stochastic process
.. math:: X(t) = \sum_i \sqrt{\lambda_i} Y_i u_i(t)
:param seed: a seed my be given which is passed to the random number generator
"""
if seed != None:
np.random.seed(seed)
self.clear_cache()
if yi is None:
if self.verbose > 1:
print("generate samples ...")
self._Y = np.random.normal(scale = self._one_over_sqrt_2, size=2*self._num_ev).view(np.complex).reshape(self._num_ev,1)
if self.verbose > 1:
print("done!")
else:
self._Y = yi.reshape(self._num_ev,1)
self._a_tmp = np.tensordot(self._Y[:,0], self._A, axes=([0],[1]))
def x_for_initial_time_grid(self):
r"""Get process on initial time grid
Returns the value of the Stochastic Process for
the times given to the constructor in order to discretize the Fredholm
equation. This is equivalent to calling :py:func:`stochastic_process_kle` with the
same weights :math:`w_i` and time grid points :math:`s_i`.
"""
tmp = self._Y * self._sqrt_eig_val.reshape(self._num_ev,1)
if self.verbose > 1:
print("calc process via matrix prod ...")
res = np.tensordot(tmp, self._eig_vec, axes=([0],[1])).flatten()
if self.verbose > 1:
print("done!")
return res
def time_grid(self):
return self._s
def __call__(self, t):
if isinstance(t, np.ndarray):
return self.x_t_array(t)
else:
return self.x(t)
def _x(self, t):
"""calculates the stochastic process at time t"""
R = self._r_tau(t-self._s)
res = np.tensordot(R, self._a_tmp, axes=([0],[0]))
return res
def get_cache_info(self):
return self.x.cache_info()
def clear_cache(self):
self.x.cache_clear()
def x_t_array(self, t_array):
"""calculates the stochastic process at several times [t_i]"""
R = self._r_tau(t_array.reshape(1,-1,)-self._s.reshape(-1, 1)) # because t_array can be anything
# it can not be optimized with _calc_corr_matrix
res = np.tensordot(R, self._a_tmp, axes=([0],[0]))
return res
def x_t_mem_save(self, delta_t_fac, kahanSum=False):
"""calculate the stochastic process (SP) for a certain class fine grids
when the SP is setup with n grid points, which means we know the eigenfunctions
for the n discrete times t_i = i/(n-1)*t_max, i = 0,1,...n-1
with delta_t = t_max/(n-1)
it is efficient to calculate the interpolated process
for finer grids with delta_t_fine = delta_t/delta_t_fac
because we only need to know the bcf on the finer grid
"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.z_t(delta_t_fac = delta_t_fac,
N1 = N1,
alpha_k = alpha_k,
a_tmp = self._a_tmp,
kahanSum = kahanSum)
def x_t_fsum(self, t):
"""slow fsum variant for testing / development reasons
"""
alpha_k = self._r_tau(t - self._s)
terms = np.asarray([self._Y[a] * alpha_k[i] * self._A[i, a] for (a,i) in product(range(self._num_ev), range(self._num_gp))])
re = fsum(np.real(terms))
im = fsum(np.imag(terms))
return re + 1j*im
def u_i_mem_save(self, delta_t_fac, i):
"""efficient evaluation of the interpolated eigen function on special subgrids"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.eig_func_interp(delta_t_fac,
self._s,
alpha_k,
self._w,
self._eig_val[i],
self._eig_vec[:, i])
def u_i(self, t_array, i):
r"""get eigenfunction of index i
Returns the i-th eigenfunction corresponding to the i-th eigenvalue
of the discrete Fredholm equation using the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:param i: index of the eigenfunction
:return: 1D array of length ``len(t_array)``
scales like len(t_array)*num_gp
"""
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return 1/self._sqrt_eig_val[i]*np.tensordot(tmp, self._A[:,i], axes=([0],[0]))
def u_i_all(self, t_array):
r"""get all eigenfunctions
Returns all eigenfunctions of the discrete Fredholm equation using
the interpolation scheme:
.. math:: u_i(t) = \frac{1}{\lambda_i}\sum_j w_j R(t-s_j) u_i(s_j)
:param t_array: 1D time array for which the eigenfunction :math:`u_i`
will be evaluated.
:return: 2D array of shape ``(len(t_array), number_of_eigenvalues=self._num_ev)``
(note, the number of eigenvalues may be smaller than the number
of grid points because of the selections mechanism implemented
by the value of ``sig_min``)
scales like len(t_array)*num_gp*num_ev
"""
t_array = t_array.reshape(1,len(t_array)) # (1 , N_t)
tmp = self._r_tau(t_array-self._s.reshape(self._num_gp,1))
# (N_gp, N_t)
# A # (N_gp, N_ev)
# A_j,i = w_j / sqrt(lambda_i) u_i(s_j)
return np.tensordot(tmp, 1/self._sqrt_eig_val.reshape(1,self._num_ev) * self._A, axes=([0],[0]))
def u_i_all_mem_save(self, delta_t_fac):
"""efficient evaluation of the interpolated eigen function on special subgrids"""
a = delta_t_fac
N1 = len(self._s)
N2 = a * (N1 - 1) + 1
T = self._s[-1]
alpha_k = self._r_tau(np.linspace(-T, T, 2*N2 - 1))
return stocproc_c.eig_func_all_interp(delta_t_fac = delta_t_fac,
time_axis = self._s,
alpha_k = alpha_k,
weights = self._w,
eigen_val = self._eig_val,
eigen_vec = self._eig_vec)
# print("WARNING! this needs to be cythonized")
# u_res = np.zeros(shape=(N2, self.num_ev()), dtype=np.complex)
# for i in range(self.num_ev()):
# for j in range(N2):
# for l in range(N1):
# k = j - a*l + N2-1
# u_res[j, i] += self._w[l] * alpha_k[k] * self._eig_vec[l, i]
#
# u_res[:, i] /= self._eig_val[i]
#
# return u_res
def t_mem_save(self, delta_t_fac):
T = self._s[-1]
N = len(self._s)
return np.linspace(0, T, delta_t_fac*(N-1) + 1)
def eigen_vector_i(self, i):
r"""Returns the i-th eigenvector (solution of the discrete Fredhom equation)"""
return self._eig_vec[:,i]
def eigen_vector_i_all(self):
r"""Returns all eigenvectors (solutions of the discrete Fredhom equation)
Note: Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
"""
return self._eig_vec
def lambda_i(self, i):
r"""Returns the i-th eigenvalue."""
return self._eig_val[i]
def lambda_i_all(self):
r"""Returns all eigenvalues."""
return self._eig_val
def num_ev(self):
r"""Returns the number of eigenvalues / eigenfunctions used
Note: The maximum number of eigenvalues / eigenfunctions is given by
the number of time grid points passed to the constructor. But due to the
threshold ``sig_min`` (see :py:class:`StocProc`) only those
eigenvalues and corresponding eigenfunctions which satisfy
:math:`\mathtt{sig_{toll}} \geq \sqrt{\lambda_i}` are kept.
"""
return self._num_ev
def recons_corr(self, t_array):
r"""computes the interpolated correlation functions
For the Karhunen-Loève expansion of a stochastic process the
correlation function can be expressed as follows:
.. math:: R(t,s) = \langle X(t)X^\ast(s)\rangle = \sum_{n,m} \langle X_n X^\ast_m \rangle u_n(t) u^\ast_m(s) = \sum_n \lambda_n u_n(t) u^\ast_n(s)
With that one can do a consistency check whether the finite set of basis functions
for the expansion (the solutions of the discrete Fredholm equation) is good
enough to reproduce the given correlation function.
"""
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def recons_corr_single_s(self, t_array, s):
assert False, "something is wrong here"
u_i_all_t = self.u_i_all(t_array) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(self.u_i_all(np.asarray([s]))) #(1, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))[:,0]
def recons_corr_memsave(self, delta_t_fac):
u_i_all_t = self.u_i_all_mem_save(delta_t_fac) #(N_gp, N_ev)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
lambda_i_all = self.lambda_i_all() #(N_ev)
tmp = lambda_i_all.reshape(1, self._num_ev) * u_i_all_t #(N_gp, N_ev)
return np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
def get_num_ef(self, rel_threshold):
G = self._sqrt_eig_val
return get_num_ef(G, rel_threshold)
def get_largest_indices(self, rel_threshold):
G = self._sqrt_eig_val
return get_largest_indices(G, rel_threshold)
def check_integral_eq(self, index, delta_t_fac = 4, num_t = 50):
t = self.t_mem_save(delta_t_fac)
u_t_discrete = self.u_i_mem_save(delta_t_fac, index)
tmax = self._s[-1]
G = self._sqrt_eig_val[index]
bcf = self._r_tau
data, norm = check_integral_eq(G, u_t_discrete, t, tmax, bcf, num_t)
return u_t_discrete, data, norm
def get_num_ef(G, rel_threshold):
# print("WARNING: debugging check for sorted G still active!")
# g_old = np.Inf
# for g in G:
# assert g_old >= g
# g_old = g
# G must be in decreasing order
return int(sum(G/max(G) >= rel_threshold))
def get_largest_indices(G, rel_threshold):
print("WARNING: debugging check for sorted G still active!")
g_old = np.Inf
for g in G:
assert g_old >= g
g_old = g
# G must be in decreasing order
idx = sum(G/max(G) >= rel_threshold)
idx_selection = np.arange(0, idx)
return idx_selection
def check_integral_eq(G, U, t_U, tmax, bcf, num_t=50, limit=5000, c=None):
u_t = ComplexInterpolatedUnivariateSpline(t_U, U, k=3)
data = np.empty(shape=(num_t, 2), dtype = np.complex128)
tau = np.linspace(0, tmax, num_t)
for i, tau_ in enumerate(tau):
data[i, 0] = complex_quad(lambda s: bcf(tau_-s) * u_t(s), 0, tmax, limit=limit)
data[i, 1] = G**2*u_t(tau_)
if c is not None:
with c.get_lock():
c.value += 1
norm = quad(lambda s: np.abs(u_t(s))**2, 0, tmax, limit=limit)[0]
return data, norm
def mean_error(r_t_s, r_t_s_exact):
r"""mean error of the correlation function as function of s
.. math:: \mathrm{err} = \frac{1}{T}\int_0^T |r_\mathrm{KLE}(t,r) - r_\mathrm{exact}(t,s)|^2 dt
:return: the mean error ``err``
"""
err = np.mean(np.abs(r_t_s - r_t_s_exact), axis = 0)
return err
def max_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact), axis = 0)
def max_rel_error(r_t_s, r_t_s_exact):
return np.max(np.abs(r_t_s - r_t_s_exact) / np.abs(r_t_s_exact))
def recons_corr_and_get_bcf(T, ng, w, eig_val, eig_vec, bcf):
"""
doing things here again for efficiency reasons
"""
delta_t_fac = 2
N1 = ng
N2 = delta_t_fac * (N1 - 1) + 1
bcf_n_plus = bcf(np.linspace(0, T, N2))
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
u_i_all_t = stocproc_c.eig_func_all_interp(delta_t_fac = delta_t_fac,
time_axis = np.linspace(0, T, N1),
alpha_k = bcf_n,
weights = w,
eigen_val = eig_val,
eigen_vec = eig_vec)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
num_ev = len(eig_val)
tmp = eig_val.reshape(1, num_ev) * u_i_all_t #(N_gp, N_ev)
recs_bcf = np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
refc_bcf = np.empty(shape=(N2,N2), dtype = np.complex128)
for i in range(N2):
idx = N2-1-i
refc_bcf[:,i] = bcf_n[idx:idx+N2]
return recs_bcf, refc_bcf
def auto_grid_points(r_tau, t_max, tol = 1e-3, err_method = max_rel_error, name = 'simpson', sig_min = 1e-6, verbose=1):
err = 1
c = 2
seed = None
err_method_name = err_method.__name__
if verbose > 0:
print("start auto_grid_points, determine ng ...")
#exponential increase to get below error threshold
while err > tol:
c *= 2
ng = 2*c + 1
ng_fine = ng*2-1
if verbose == 1:
print("ng:{} new proc ({}) ... ".format(ng, name), end='')
sys.stdout.flush()
if verbose > 1:
print("#"*40)
print("c", c, "ng", ng)
print("new process with {} weights ...".format(name))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min, verbose-1)
if verbose > 1:
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s, r_t_s_exact = recons_corr_and_get_bcf(T = t_max,
ng = ng,
w = stoc_proc._w,
eig_val = stoc_proc._eig_val,
eig_vec = stoc_proc._eig_vec,
bcf = r_tau)
if verbose > 1:
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
if verbose > 0:
print("err {:.3e}".format(err))
c_low = c // 2
c_high = c
while (c_high - c_low) > 1:
if verbose > 1:
print("#"*40)
print("c_low", c_low)
print("c_high", c_high)
c = (c_low + c_high) // 2
ng = 2*c + 1
ng_fine = ng * 2 - 1
if verbose > 1:
print("c", c)
print("ng", ng)
print("ng_fine", ng_fine)
if verbose == 1:
print("ng:{} new proc ({}) ... ".format(ng, name), end='')
sys.stdout.flush()
if verbose > 1:
print("new process with {} weights ({} points)...".format(name, ng))
stoc_proc = StocProc.new_instance_by_name(name, r_tau, t_max, ng, seed, sig_min, verbose-1)
if verbose > 1:
print("reconstruct correlation function ({} points)...".format(ng_fine))
r_t_s, r_t_s_exact = recons_corr_and_get_bcf(T = t_max,
ng = ng,
w = stoc_proc._w,
eig_val = stoc_proc._eig_val,
eig_vec = stoc_proc._eig_vec,
bcf = r_tau)
if verbose > 1:
print("calculate error using {} ...".format(err_method_name))
err = np.max(err_method(r_t_s, r_t_s_exact))
if verbose > 0:
print("err {:.3e}".format(err))
if err > tol:
if verbose > 1:
print(" err > tol!")
print(" c_low -> ", c)
c_low = c
else:
if verbose > 1:
print(" err <= tol!")
print(" c_high -> ", c)
c_high = c
return ng

View file

@ -1,101 +0,0 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
"""
module to generate the weights and nodes for Guass quadrature
inspired by pyOrthpol (https://pypi.python.org/pypi/orthpol)
as well as the original fortran resource from
Gautschi, W. (1994). Algorithm 726:
ORTHPOLa package of routines for generating orthogonal polynomials
and Gauss-type quadrature rules.
ACM Transactions on Mathematical Software (TOMS), 20(1), 2162.
doi:10.1145/174603.174605
"""
import numpy as np
import numpy.polynomial as pln
from scipy.linalg import eig_banded
from scipy.special import gamma
def _recur_laguerre(n, al=0.):
r"""Calculate the recursion coefficients leading to the
Laguerre polynomials motivated by the Gauss quadrature
formula for integrals with exponential weights ~exp(-x)
see Theodore Seio Chihara,
An Introduction to Orthogonal Polynomials, 1978, p.217
"""
nrange = np.arange(n)
a = 2*nrange + al + 1
b = nrange*(nrange+al)
b[0] = gamma(al + 1.)
return (a, b)
def gauss_nodes_weights_laguerre(n, al=0.):
r"""
.. math::
\int_0^\infty dx \; f(x) x^\alpha \exp(-x) ~= \sum_{i=1}^n w_i f(x_i)
"""
a, b = _recur_laguerre(n, al)
return _gauss_nodes_weights(a, b)
def _recur_legendre(n):
nrange = np.arange(n, dtype = float)
a = np.zeros(n)
b = nrange**2 / ((2*nrange - 1)*(2*nrange + 1))
b[0] = 2
return (a, b)
def gauss_nodes_weights_legendre(n, low=-1, high=1):
r"""
.. math::
\int_{-1}^{1} dx \; f(x) ~= \sum_{i=1}^n w_i f(x_i)
"""
a, b = _recur_legendre(n)
x, w= _gauss_nodes_weights(a, b)
fac = (high-low)/2
return (x + 1)*fac + low, fac*w
def _gauss_nodes_weights(a,b):
r"""Calculate the nodes and weights for given
recursion coefficients assuming a normalized
weights functions.
see Walter Gautschi, Algorithm 726: ORTHPOL;
a Package of Routines for Generating Orthogonal
Polynomials and Gauss-type Quadrature Rules, 1994
"""
assert len(a) == len(b)
a_band = np.vstack((np.sqrt(b),a))
w, v = eig_banded(a_band)
nodes = w # eigenvalues
weights = b[0] * v[0,:]**2 # first component of each eigenvector
# the prefactor b[0] from the original paper
# accounts for the weights of unnormalized weight functions
return nodes, weights
def get_poly(a, b):
n = len(a)
assert len(b) == n
p = []
p.append( 0 )
p.append( pln.Polynomial(coef=(1,)) )
x = pln.Polynomial(coef=(0,1))
for i in range(n):
p_i = (x - a[i]) * p[-1] - b[i] * p[-2]
p.append( p_i )
return p[1:]

View file

@ -1,117 +0,0 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
import numpy as np
from scipy.optimize import brentq
from . import class_stocproc
def get_param_single_lorentz(tmax, dw_max, eta, gamma, wc, x=1e-4, verbose=0):
d = gamma * np.sqrt(1/x - 1)
w_min = wc - d
w_max = wc + d
if verbose > 0:
print('w_min :{:.3}'.format(w_min))
print('w_max :{:.3}'.format(w_max))
C = (w_max - w_min)*tmax / 2 / np.pi
N = int(np.ceil((2 + C)/2 + np.sqrt( (2+C)**2 / 4 - 1)))
dw = (w_max - w_min)/N
if verbose > 0:
print('N: {}'.format(N))
print('-> dw: {:.3}'.format(dw))
if dw <= dw_max:
if verbose > 0:
print('dw <= dw_max: {:.3}'.format(dw_max))
return N, w_min, tmax
else:
if verbose > 0:
print('dw > dw_max: {:.3}'.format(dw_max))
print('adjust tmax and N to fulfill dw <= dw_max')
N = int(np.ceil((w_max - w_min) / dw_max)) - 1
dt = 2*np.pi / (dw_max*N)
tmax_ = dt*N
if verbose > 0:
print('N: {}'.format(N))
print('-> tmax: {:.3}'.format(tmax_))
assert tmax_ > tmax, "tmax_={} > tmax={} FAILED".format(tmax_, tmax)
return N, w_min, tmax
def get_param_ohmic(t_max, spec_dens, x=1e-12, verbose=0):
fmin_spec_dens = lambda w: abs(spec_dens(w)) - spec_dens.maximum_val()*x
w_pos = spec_dens.maximum_at()
w_neg = 2*w_pos
while fmin_spec_dens(w_neg) > 0:
w_neg *= 2
omega_max = brentq(fmin_spec_dens, w_pos, w_neg)
if verbose > 0:
print("omega_max from threshold condition: J(w_max) = x = {:.3g} <-> w_max = {:.3g}".format(x, omega_max))
dw = np.pi / t_max
if verbose > 0:
print("dw:{:.3}".format(dw))
ng_omega = np.ceil(omega_max / dw) # next larger integer
ng_omega = np.ceil(ng_omega / 2) * 2 - 1 # next lager odd integer
ng_t = (ng_omega + 1) / 2 # so this becomes an integer
delta_t = 2 * np.pi / (dw * ng_omega)
sp_t_max = ng_t * delta_t
if verbose > 0:
print("result ng_t: {}".format(ng_t))
print("result sp_t_max: {:.3g}".format(sp_t_max))
return ng_t, sp_t_max
def show_ohmic_sp(ng_t, sp_t_max, spec_dens, seed, ax, t_max):
try:
n = len(ng_t)
except:
n = None
try:
m = len(sp_t_max)
except:
m = None
if (n is None) and m is not None:
ng_t = [ng_t] * m
elif (n is not None) and m is None:
sp_t_max = [sp_t_max] * n
elif (n is not None) and (m is not None):
if n != m:
raise ValueError("len(ng_t) == len(sp_t_max) FAILED")
else:
ng_t = [ng_t]
sp_t_max = [sp_t_max]
for i in range(len(ng_t)):
spfft = class_stocproc.StocProc_FFT(spectral_density = spec_dens,
t_max = sp_t_max[i],
num_grid_points = ng_t[i],
seed = seed)
spfft.new_process()
t = np.linspace(0, sp_t_max[i], ng_t[i])
t_interp = np.linspace(0, sp_t_max[i], 10*ng_t[i])
t = t[np.where(t < t_max)]
t_interp = t_interp[np.where(t_interp < t_max)]
eta = spfft(t)
eta_interp = spfft(t_interp)
p, = ax.plot(t_interp, np.real(eta_interp))
ax.plot(t_interp, np.imag(eta_interp), color=p.get_color(), ls='--')
ax.plot(t, np.real(eta), color=p.get_color(), ls='', marker = '.')
ax.plot(t, np.imag(eta), color=p.get_color(), ls='', marker = '.')

View file

@ -1,19 +1,11 @@
from __future__ import division, print_function
from scipy.optimize import brentq
from scipy.interpolate import InterpolatedUnivariateSpline
from numpy.fft import rfft as np_rfft
import numpy as np
import logging
log = logging.getLogger(__name__)
class ComplexInterpolatedUnivariateSpline(object):
def __init__(self, x, y, k=3):
self.k = k
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y), k=k)
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y), k=k)
def __call__(self, t):
return self.re_spline(t) + 1j*self.im_spline(t)
from .tools import ComplexInterpolatedUnivariateSpline
def find_integral_boundary(integrand, tol, ref_val, max_val, x0):
"""
@ -30,27 +22,57 @@ def find_integral_boundary(integrand, tol, ref_val, max_val, x0):
1/|x-ref_val| > max_val
this assured that the function does not search forever
"""
_max_num_iteration = 100
_i = 0
assert x0 != 0
if integrand(ref_val) <= tol:
raise ValueError("the integrand at ref_val needs to be greater that tol")
# find the left boundary called a
if integrand(x0+ref_val) > tol:
log.debug("ref_value for search: {} tol: {}".format(ref_val, tol))
I = integrand(x0+ref_val)
if I > tol:
log.debug("x={:.3e} I(x+ref_val) = {:.3e} > tol -> veer x away from ref_value".format(x0, I))
x = 2*x0
while integrand(x+ref_val) > tol:
if abs(x-ref_val) > max_val:
I = integrand(x + ref_val)
while I > tol:
log.debug("x={:.3e} I(x+ref_val) = {:.3e}".format(x, I))
if abs(x) > max_val:
raise RuntimeError("|x-ref_val| > max_val was reached")
x *= 2
x *= 2
I = integrand(x + ref_val)
_i += 1
if _i > _max_num_iteration:
raise RuntimeError("iteration limit reached")
log.debug("x={:.3e} I(x+ref_val) = {:.3e} < tol".format(x, I))
a = brentq(lambda x: integrand(x)-tol, x+ref_val, x0+ref_val)
elif integrand(x0+ref_val) < tol:
log.debug("found I(a={:.3e}) = {:.3e} = tol".format(a, integrand(a)))
elif I < tol:
log.debug("x={:.3e} I(x+ref_val) = {:.3e} < tol -> approach x towards ref_value".format(x0, I))
x = x0/2
while integrand(x+ref_val) < tol:
if (1/abs(x-ref_val)) > max_val:
I = integrand(x + ref_val)
while I < tol:
log.debug("x={:.3e} I(x+ref_val) = {:.3e}".format(x, I))
if (1/abs(x)) > max_val:
raise RuntimeError("1/|x-ref_val| > max_val was reached")
x /= 2
a = brentq(lambda x: integrand(x)-tol, x+ref_val, x0+ref_val)
I = integrand(x+ref_val)
_i += 1
if _i > _max_num_iteration:
raise RuntimeError("iteration limit reached")
log.debug("x={:.3e} I(x+ref_val) = {:.3e} > tol".format(x, I))
log.debug("search for root in interval [{:.3e},{:.3e}]".format(x0+ref_val, x+ref_val))
a = brentq(lambda x_: integrand(x_)-tol, x+ref_val, x0+ref_val)
log.debug("found I(a={:.3e}) = {:.3e} = tol".format(a, integrand(a)))
else:
a = x0
log.debug("I(ref_val) = tol -> no search necessary")
log.debug("return a={:.5g}".format(a))
return a
def find_integral_boundary_auto(integrand, tol, ref_val=0, max_val=1e6,
@ -61,21 +83,25 @@ def find_integral_boundary_auto(integrand, tol, ref_val=0, max_val=1e6,
ref_val_right = ref_val if ref_val_right is None else ref_val_right
max_val_left = max_val if max_val_left is None else max_val_left
max_val_right = max_val if max_val_right is None else max_val_right
log.debug("trigger left search")
a = find_integral_boundary(integrand, tol, ref_val=ref_val_left, max_val=max_val_left, x0=-1)
log.debug("trigger right search")
b = find_integral_boundary(integrand, tol, ref_val=ref_val_right, max_val=max_val_right, x0=+1)
return a,b
def fourier_integral_midpoint(integrand, a, b, N):
"""
approximates int_a^b dx integrand(x) by the riemann sum with N terms
and the most simplest uniform midpoint weights
"""
log.debug("integrate over [{:.3e},{:.3e}] using {} points".format(a,b,N))
delta_x = (b-a)/N
delta_k = 2*np.pi/(b-a)
delta_k = 2*np.pi/(b-a)
yl = integrand(np.linspace(a+delta_x/2, b+delta_x/2, N, endpoint=False))
fft_vals = np_rfft(yl)
tau = np.arange(len(fft_vals))*delta_k
log.debug("yields d_x={:.3e}, d_k={:.3e} kmax={:.3e}".format(delta_x, delta_k, tau[-1]))
return tau, delta_x*np.exp(-1j*tau*(a+delta_x/2))*fft_vals
def get_fourier_integral_simps_weighted_values(yl):
@ -110,7 +136,7 @@ def fourier_integral_simps(integrand, a, b, N):
return tau, delta_x*np.exp(-1j*tau*a)*fft_vals
def get_N_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, N_max = 2**18, method='simps'):
def get_N_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, N_max = 2**20, method='simps'):
"""
chooses N such that the approximated Fourier integral
meets the exact solution within a given tolerance of the
@ -124,7 +150,7 @@ def get_N_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, N_m
raise ValueError("unknown method '{}'".format(method))
log.debug("fft integral from {:.3e} to {:.3e}".format(a, b))
log.debug("error estimation up to tmax {:.3e}".format(t_max))
log.debug("error estimation up to tmax {:.3e} (tol={:.3e}".format(t_max, tol))
i = 4
while True:
@ -135,7 +161,10 @@ def get_N_for_accurate_fourier_integral(integrand, a, b, t_max, tol, ft_ref, N_m
rd = np.max(np.abs(ft_tau[idx] - ft_ref_tau) / np.abs(ft_ref_tau))
log.debug("useing fft for integral N with {} yields max rd {:.3e} (tol {:.3e})".format(N, rd, tol))
if rd < tol:
log.debug("reached rd ({:.3e}) < tol ({:.3e}), return N={}".format(rd, tol, N))
return N
if N > N_max:
raise RuntimeError("maximum number of points for Fourier Transform reached")
i += 1
@ -156,7 +185,7 @@ def get_dt_for_accurate_interpolation(t_max, tol, ft_ref):
return t_max/(N/sub_sampl)
N*=2
def calc_ab_N_dx_dt(integrand, intgr_tol, intpl_tol, tmax, a, b, ft_ref, N_max = 2**15, method='simps'):
def calc_ab_N_dx_dt(integrand, intgr_tol, intpl_tol, tmax, a, b, ft_ref, N_max = 2**20, method='simps'):
N = get_N_for_accurate_fourier_integral(integrand, a, b,
t_max = tmax,
tol = intgr_tol,

View file

@ -1,7 +1,11 @@
import numpy as np
from scipy.linalg import eigh as scipy_eigh
import time
def solve_hom_fredholm(r, w, eig_val_min, verbose=1):
import logging
log = logging.getLogger(__name__)
def solve_hom_fredholm(r, w, eig_val_min):
r"""Solves the discrete homogeneous Fredholm equation of the second kind
.. math:: \int_0^{t_\mathrm{max}} \mathrm{d}s R(t-s) u(s) = \lambda u(t)
@ -31,41 +35,21 @@ def solve_hom_fredholm(r, w, eig_val_min, verbose=1):
:return: eigenvalues, eigenvectos (eigenvectos are stored in the normal numpy fashion, ordered in decreasing order)
"""
t0 = time.time()
# weighted matrix r due to quadrature weights
if verbose > 0:
print("build matrix ...")
# d = np.diag(np.sqrt(w))
# r = np.dot(d, np.dot(r, d))
n = len(w)
w_sqrt = np.sqrt(w)
r = w_sqrt.reshape(n,1) * r * w_sqrt.reshape(1,n)
if verbose > 0:
print("solve eigenvalue equation ...")
eig_val, eig_vec = scipy_eigh(r, overwrite_a=True) # eig_vals in ascending
# use only eigenvalues larger than sig_min**2
min_idx = sum(eig_val < eig_val_min)
eig_val = eig_val[min_idx:][::-1]
eig_vec = eig_vec[:, min_idx:][:, ::-1]
log.debug("discrete fredholm equation of size {} solved [{:.2e}]".format(n, time.time()-t0))
num_of_functions = len(eig_val)
if verbose > 0:
print("use {} / {} eigenfunctions (sig_min = {})".format(num_of_functions, len(w), np.sqrt(eig_val_min)))
# inverse scale of the eigenvectors
# d_inverse = np.diag(1/np.sqrt(w))
# eig_vec = np.dot(d_inverse, eig_vec)
log.debug("use {} / {} eigenfunctions (sig_min = {})".format(num_of_functions, len(w), np.sqrt(eig_val_min)))
eig_vec = np.reshape(1/w_sqrt, (n,1)) * eig_vec
if verbose > 0:
print("done!")
return eig_val, eig_vec
def get_mid_point_weights(t_max, num_grid_points):

View file

@ -1,549 +1,382 @@
# -*- coding: utf8 -*-
#
# Copyright 2014 Richard Hartmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""
**Stochastic Process Module**
This module contains various methods to generate stochastic processes for a
given correlation function. There are two different kinds of generators. The one kind
allows to generate the process for a given time grid, where as the other one generates a
time continuous process in such a way that it allows to "correctly" interpolate between the
solutions of the time discrete version.
**time discrete methods:**
:py:func:`stochastic_process_kle`
Simulate Stochastic Process using Karhunen-Loève expansion
This method still needs explicit integrations weights for the
numeric integrations. For convenience you can use
:py:func:`stochastic_process_mid_point_weight` simplest approach, for
test reasons only, uses :py:func:`get_mid_point_weights`
to calculate the weights
:py:func:`stochastic_process_trapezoidal_weight` little more sophisticated,
uses :py:func:`get_trapezoidal_weights_times` to calculate the weights
:py:func:`stochastic_process_simpson_weight`,
**so far for general use**, uses :py:func:`get_simpson_weights_times`
to calculate the weights
:py:func:`stochastic_process_fft`
Simulate Stochastic Process using FFT method
**time continuous methods:**
:py:class:`StocProc`
Simulate Stochastic Process using Karhunen-Loève expansion and allows
for correct interpolation. This class still needs explicit integrations
weights for the numeric integrations (use :py:func:`get_trapezoidal_weights_times`
for general purposes).
.. todo:: implement convenient classes with fixed weights
"""
from __future__ import print_function, division
from .stocproc_c import auto_correlation as auto_correlation_c
import sys
import os
from warnings import warn
sys.path.append(os.path.dirname(__file__))
import numpy as np
from scipy.linalg import eigh as scipy_eigh
from collections import namedtuple
import time
stocproc_key_type = namedtuple(typename = 'stocproc_key_type',
field_names = ['bcf', 't_max', 'ng', 'tol', 'cubatur_type', 'sig_min', 'ng_fac'] )
def solve_hom_fredholm(r, w, eig_val_min, verbose=1):
r"""Solves the discrete homogeneous Fredholm equation of the second kind
.. math:: \int_0^{t_\mathrm{max}} \mathrm{d}s R(t-s) u(s) = \lambda u(t)
Quadrature approximation of the integral gives a discrete representation of the
basis functions and leads to a regular eigenvalue problem.
.. math:: \sum_i w_i R(t_j-s_i) u(s_i) = \lambda u(t_j) \equiv \mathrm{diag(w_i)} \cdot R \cdot u = \lambda u
Note: If :math:`t_i = s_i \forall i` the matrix :math:`R(t_j-s_i)` is
a hermitian matrix. In order to preserve hermiticity for arbitrary :math:`w_i`
one defines the diagonal matrix :math:`D = \mathrm{diag(\sqrt{w_i})}`
with leads to the equivalent expression:
.. math:: D \cdot R \cdot D \cdot D \cdot u = \lambda D \cdot u \equiv \tilde R \tilde u = \lambda \tilde u
where :math:`\tilde R` is hermitian and :math:`u = D^{-1}\tilde u`
Setting :math:`t_i = s_i` and due to the definition of the correlation function the
matrix :math:`r_{ij} = R(t_i, s_j)` is hermitian.
:param r: correlation matrix :math:`R(t_j-s_i)`
:param w: integrations weights :math:`w_i`
(they have to correspond to the discrete time :math:`t_i`)
:param eig_val_min: discards all eigenvalues and eigenvectos with
:math:`\lambda_i < \mathtt{eig\_val\_min} / \mathrm{max}(\lambda)`
:return: eigenvalues, eigenvectos (eigenvectos are stored in the normal numpy fashion, ordered in decreasing order)
"""
# weighted matrix r due to quadrature weights
if verbose > 0:
print("build matrix ...")
# d = np.diag(np.sqrt(w))
# r = np.dot(d, np.dot(r, d))
n = len(w)
w_sqrt = np.sqrt(w)
r = w_sqrt.reshape(n,1) * r * w_sqrt.reshape(1,n)
if verbose > 0:
print("solve eigenvalue equation ...")
eig_val, eig_vec = scipy_eigh(r, overwrite_a=True) # eig_vals in ascending
from . import method_kle
from . import method_fft
from . import stocproc_c
from .tools import ComplexInterpolatedUnivariateSpline
# use only eigenvalues larger than sig_min**2
min_idx = sum(eig_val < eig_val_min)
eig_val = eig_val[min_idx:][::-1]
eig_vec = eig_vec[:, min_idx:][:, ::-1]
num_of_functions = len(eig_val)
if verbose > 0:
print("use {} / {} eigenfunctions (sig_min = {})".format(num_of_functions, len(w), np.sqrt(eig_val_min)))
# inverse scale of the eigenvectors
# d_inverse = np.diag(1/np.sqrt(w))
# eig_vec = np.dot(d_inverse, eig_vec)
eig_vec = np.reshape(1/w_sqrt, (n,1)) * eig_vec
import logging
log = logging.getLogger(__name__)
if verbose > 0:
print("done!")
class _absStocProc(object):
r"""
Abstract base class to stochastic process interface
return eig_val, eig_vec
def stochastic_process_kle(r_tau, t, w, num_samples, seed = None, sig_min = 1e-4, verbose=1):
r"""Simulate Stochastic Process using Karhunen-Loève expansion
Simulate :math:`N_\mathrm{S}` wide-sense stationary stochastic processes
with given correlation :math:`R(\tau) = \langle X(t) X^\ast(s) \rangle = R (t-s)`.
Expanding :math:`X(t)` in a KarhunenLoève manner
.. math:: X(t) = \sum_i X_i u_i(t)
with
.. math::
\langle X_i X^\ast_j \rangle = \lambda_i \delta_{i,j} \qquad \langle u_i | u_j \rangle =
\int_0^{t_\mathrm{max}} u_i(t) u^\ast_i(t) dt = \delta_{i,j}
where :math:`\lambda_i` and :math:`u_i` are solutions of the following
homogeneous Fredholm equation of the second kind.
.. math:: \int_0^{t_\mathrm{max}} \mathrm{d}s R(t-s) u(s) = \lambda u(t)
Discrete solutions are provided by :py:func:`solve_hom_fredholm`.
With these solutions and expressing the random variables :math:`X_i` through
independent normal distributed random variables :math:`Y_i` with variance one
the Stochastic Process at discrete times :math:`t_j` can be calculates as follows
.. math:: X(t_j) = \sum_i Y_i \sqrt{\lambda_i} u_i(t_j)
To calculate the Stochastic Process for abitrary times
:math:`t \in [0,t_\mathrm{max}]` use :py:class:`StocProc`.
References:
[1] Kobayashi, H., Mark, B.L., Turin, W.,
2011. Probability, Random Processes, and Statistical Analysis,
Cambridge University Press, Cambridge. (pp. 363)
[2] Press, W.H., Teukolsky, S.A., Vetterling, W.T., Flannery, B.P.,
2007. Numerical Recipes 3rd Edition: The Art of Scientific Computing,
Auflage: 3. ed. Cambridge University Press, Cambridge, UK ; New York. (pp. 989)
:param r_tau: function object of the one parameter correlation function :math:`R(\tau) = R (t-s) = \langle X(t) X^\ast(s) \rangle`
:param t: list of grid points for the time axis
:param w: appropriate weights to integrate along the time axis using the grid points given by :py:obj:`t`
:param num_samples: number of stochastic process to sample
:param seed: seed for the random number generator used
:param sig_min: minimal standard deviation :math:`\sigma_i` the random variable :math:`X_i`
viewed as coefficient for the base function :math:`u_i(t)` must have to be considered as
significant for the Karhunen-Loève expansion (note: :math:`\sigma_i` results from the
square root of the eigenvalue :math:`\lambda_i`)
general work flow:
- Specify the time axis of interest [0, t_max] and it resolution (number of grid points), :math:`t_i = i \frac{t_max}{N_t-1}.
- To evaluate the stochastic process at these points, a mapping from :math:`N_z` normal distributed
random complex numbers with :math:`\langle y_i y_j^\ast \rangle = 2 \delta_{ij}`
to the stochastic process :math:`z_{t_i}` is needed and depends on the implemented method (:py:func:`_calc_z').
- A new process should be generated by calling :py:func:`new_process'.
- When the __call__ method is invoked the results will be interpolated between the :math:`z_t_i`.
:return: returns a 2D array of the shape (num_samples, len(t)). Each row of the returned array contains one sample of the
stochastic process.
"""
if verbose > 0:
print("__ stochastic_process __")
if seed != None:
np.random.seed(seed)
t_row = t
t_col = t_row[:,np.newaxis]
# correlation matrix
# r_tau(t-s) -> integral/sum over s -> s must be row in EV equation
r_old = r_tau(t_col-t_row)
n_ = len(t)
bcf_n_plus = r_tau(t-t[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
bcf_n = np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
assert np.max(np.abs(r_old - r)) < 1e-14
# solve discrete Fredholm equation
# eig_val = lambda
# eig_vec = u(t)
eig_val, eig_vec = solve_hom_fredholm(r, w, sig_min**2)
num_of_functions = len(eig_val)
# generate samples
sig = np.sqrt(eig_val).reshape(1, num_of_functions) # variance of the random quantities of the Karhunen-Loève expansion
if verbose > 0:
print("generate samples ...")
x = np.random.normal(scale=1/np.sqrt(2), size=(2*num_samples*num_of_functions)).view(np.complex).reshape(num_of_functions, num_samples).T
# random quantities all aligned for num_samples samples
x_t_array = np.tensordot(x*sig, eig_vec, axes=([1],[1])) # multiplication with the eigenfunctions == base of Karhunen-Loève expansion
if verbose > 0:
print("done!")
return x_t_array
def _stochastic_process_alternative_samples(num_samples, num_of_functions, t, sig, eig_vec, seed):
r"""used for debug and test reasons
generate each sample independently in a for loop
should be slower than using numpy's array operations to do it all at once
"""
np.random.seed(seed)
x_t_array = np.empty(shape=(num_samples, len(t)), dtype = complex)
for i in range(num_samples):
x = np.random.normal(size=num_of_functions) * sig
x_t_array[i,:] = np.dot(eig_vec, x.T)
return x_t_array
def get_mid_point_weights(t_max, num_grid_points):
r"""Returns the time gridpoints and wiehgts for numeric integration via **mid point rule**.
The idea is to use :math:`N_\mathrm{GP}` time grid points located in the middle
each of the :math:`N_\mathrm{GP}` equally distributed subintervals of :math:`[0, t_\mathrm{max}]`.
.. math:: t_i = \left(i + \frac{1}{2}\right)\frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
The corresponding trivial weights for integration are
.. math:: w_i = \Delta t = \frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
:param t_max: end of the interval for the time grid :math:`[0,t_\mathrm{max}]`
(note: this would corespond to an integration from :math:`0-\Delta t / 2`
to :math:`t_\mathrm{max}+\Delta t /2`)
:param num_grid_points: number of
"""
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.ones(num_grid_points)*delta_t
return t, w
def stochastic_process_mid_point_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **mid point rule** for integration
The idea is to use :math:`N_\mathrm{GP}` time grid points located in the middle
each of the :math:`N_\mathrm{GP}` equally distributed subintervals of :math:`[0, t_\mathrm{max}]`.
.. math:: t_i = \left(i + \frac{1}{2}\right)\frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
The corresponding trivial weights for integration are
.. math:: w_i = \Delta t = \frac{t_\mathrm{max}}{N_\mathrm{GP}} \qquad i = 0,1, ... N_\mathrm{GP} - 1
Since the autocorrelation function depends solely on the time difference :math:`\tau` the static shift for :math:`t_i` does not
alter matrix used to solve the Fredholm equation. So for the reason of convenience the time grid points are not centered at
the middle of the intervals, but run from 0 to :math:`t_\mathrm{max}` equally distributed.
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t,w = get_trapezoidal_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def get_trapezoidal_weights_times(t_max, num_grid_points):
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.ones(num_grid_points)*delta_t
w[0] /= 2
w[-1] /= 2
return t, w
def stochastic_process_trapezoidal_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **trapezoidal rule** for integration
.. math:: t_i = i \frac{t_\mathrm{max}}{N_\mathrm{GP}} = i \Delta t \qquad i = 0,1, ... N_\mathrm{GP}
The corresponding weights for integration are
.. math:: w_0 = w_{N_\mathrm{GP}} = \frac{\Delta t}{2}, \qquad w_i = \Delta t = \qquad i = 1, ... N_\mathrm{GP} - 1
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t, w = get_trapezoidal_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def get_simpson_weights_times(t_max, num_grid_points):
if num_grid_points % 2 == 0:
raise RuntimeError("simpson weight need odd number of grid points, but git ng={}".format(num_grid_points))
# generate mid points
t, delta_t = np.linspace(0, t_max, num_grid_points, retstep = True)
# equal weights for grid points
w = np.empty(num_grid_points, dtype=np.float64)
w[0::2] = 2/3*delta_t
w[1::2] = 4/3*delta_t
w[0] = 1/3*delta_t
w[-1] = 1/3*delta_t
return t, w
def stochastic_process_simpson_weight(r_tau, t_max, num_grid_points, num_samples, seed = None, sig_min = 1e-4):
r"""Simulate Stochastic Process using Karhunen-Loève expansion with **simpson rule** for integration
Calling :py:func:`stochastic_process` with these calculated :math:`t_i, w_i` gives the corresponding processes.
:param t_max: right end of the considered time interval :math:`[0,t_\mathrm{max}]`
:param num_grid_points: :math:`N_\mathrm{GP}` number of time grid points (need to be odd) used for the discretization of the
integral of the Fredholm integral (see :py:func:`stochastic_process`)
:return: returns the tuple (set of stochastic processes, time grid points)
See :py:func:`stochastic_process` for other parameters
"""
t, w = get_simpson_weights_times(t_max, num_grid_points)
return stochastic_process_kle(r_tau, t, w, num_samples, seed, sig_min), t
def _FT(f, x_max, N, x_min=0):
"""
calculate g(y) = int_x_min^x_max dx f(x) exp(-1j x y) using fft
when setting x' = x - x_min we get
g(y) = int_0^x_max-x_min dx' f(x'+x_min) exp(-1j x' y) exp(-1j x_min y) = exp(-1j x_min y) * int_0^x_max-x_min dx' f(x'+x_min) exp(-1j x' y)
and in a discrete fashion with N gp such that x'_i = dx * i and dx = (N-1) / (x_max - x_min)
further we find dy = 2pi / N / dx so we get y_k = dy * k
g_k = exp(-1j x_min y_k) * sum_0^N dx f(x_i + x_min) exp(-1j dx * i dy * k)
using dx * dk = 2 pi / N we end up with
g_k = exp(-1j x_min y_k) * dx * sum_0^N f(x_i + x_min) exp(-1j 2 pi i k / N)
= exp(-1j x_min y_k) * dx * FFT( f(x_i + x_min) )
"""
x, dx = np.linspace(x_min, x_max, N, retstep=True)
f_i = f(x)
dy = 2*np.pi / dx / N
y_k = np.linspace(0, dy*(N-1), N)
return np.exp(-1j * x_min * y_k) * dx * np.fft.fft(f_i), y_k
def stochastic_process_fft(spectral_density, t_max, num_grid_points, num_samples, seed = None, verbose=1, omega_min=0):
r"""Simulate Stochastic Process using FFT method
This method works only for correlations functions of the form
.. math:: \alpha(\tau) = \int_0^{\omega_\mathrm{max}} \mathrm{d}\omega \, \frac{J(\omega)}{\pi} e^{-\mathrm{i}\omega \tau}
where :math:`J(\omega)` is a real non negative spectral density.
Then the intrgal can be approximated by the Riemann sum
.. math:: \alpha(\tau) \approx \sum_{k=0}^{N-1} \Delta \omega \frac{J(\omega_k)}{\pi} e^{-\mathrm{i} k \Delta \omega \tau}
For a process defined by
.. math:: X(t) = \sum_{k=0}^{N-1} \sqrt{\frac{\Delta \omega J(\omega_k)}{\pi}} X_k \exp^{-\mathrm{i}\omega_k t}
with compelx random variables :math:`X_k` such that :math:`\langle X_k \rangle = 0`,
:math:`\langle X_k X_{k'}\rangle = 0` and :math:`\langle X_k X^\ast_{k'}\rangle = \Delta \omega \delta_{k,k'}` it is easy to see
that it fullfills the Riemann approximated correlation function.
.. math::
\begin{align}
\langle X(t) X^\ast(s) \rangle = & \sum_{k,k'} \frac{\Delta \omega}{\pi} \sqrt{J(\omega_k)J(\omega_{k'})} \langle X_k X_{k'}\rangle \exp^{-\mathrm{i}\omega_k (t-s)} \\
= & \sum_{k} \frac{\Delta \omega}{\pi} J(\omega_k) \exp^{-\mathrm{i}\omega_k (t-s)} \\
= & \alpha(t-s)
\end{align}
In order to use the sheme of the Discrete Fourier Transfrom (DFT) to calculate :math:`X(t)`
:math:`t` has to be disrcetized as well. Some trivial rewriting leads
.. math:: X(t_l) = \sum_{k=0}^{N-1} \sqrt{\frac{\Delta \omega J(\omega_k)}{\pi}} X_k e^{-\mathrm{i} 2 \pi \frac{k l}{N} \frac{\Delta \omega \Delta t}{ 2 \pi} N}
For the DFT sheme to be applicable :math:`\Delta t` has to be chosen such that
.. math:: 1 = \frac{\Delta \omega \Delta t}{2 \pi} N
holds. Since :math:`J(\omega)` is real it follows that :math:`X(t_l) = X^\ast(t_{N-l})`.
For that reason the stochastic process has only :math:`(N+1)/2` (odd :math:`N`) and
:math:`(N/2 + 1)` (even :math:`N`) independent time grid points.
Looking now from the other side, demanding that the process should run from
:math:`0` to :math:`t_\mathrm{max}` with :math:`n` equally distributed time grid points
:math:`N = 2n-1` points for the DFT have to be considered. This also sets the time
increment :math:`\Delta t = t_\mathrm{max} / (n-1)`.
With that the frequency increment is determined by
.. math:: \Delta \omega = \frac{2 \pi}{\Delta t N}
Implementing the above noted considerations it follows
.. math:: X(l \Delta t) = DFT\left(\sqrt{\Delta \omega J(k \Delta \omega)} / \pi \times X_k\right) \qquad k = 0 \; ... \; N-1, \quad l = 0 \; ... \; n
Note: since :math:`\omega_\mathrm{max} = N \Delta \omega = 2 \pi / \Delta t = 2 \pi (n-1) / t_\mathrm{max}`
:param spectral_density: the spectral density :math:`J(\omega)` as callable function object
:param t_max: :math:`[0,t_\mathrm{max}]` is the interval for which the process will be calculated
:param num_grid_points: number :math:`n` of euqally distributed times :math:`t_k` on the intervall :math:`[0,t_\mathrm{max}]`
for which the process will be evaluated
:param num_samples: number of independent processes to be returned
:param seed: seed passed to the random number generator used
:return: returns the tuple (2D array of the set of stochastic processes,
1D array of time grid points). Each row of the stochastic process
array contains one sample of the stochastic process.
"""
if verbose > 0:
print("__ stochastic_process_fft __")
n_dft = num_grid_points * 2 - 1
delta_t = t_max / (num_grid_points-1)
delta_omega = 2 * np.pi / (delta_t * n_dft)
t = np.linspace(0, t_max, num_grid_points)
omega_min_correction = np.exp(-1j * omega_min * t).reshape(1,-1)
#omega axis
omega = delta_omega*np.arange(n_dft)
#reshape for multiplication with matrix xi
sqrt_spectral_density = np.sqrt(spectral_density(omega + omega_min)).reshape((1, n_dft))
if seed != None:
np.random.seed(seed)
if verbose > 0:
print(" omega_max : {:.2}".format(delta_omega * n_dft))
print(" delta_omega: {:.2}".format(delta_omega))
print("generate samples ...")
#random complex normal samples
xi = (np.random.normal(scale=1/np.sqrt(2), size = (2*num_samples*n_dft)).view(np.complex)).reshape(num_samples, n_dft)
#each row contain a different integrand
weighted_integrand = sqrt_spectral_density * np.sqrt(delta_omega / np.pi) * xi
#compute integral using fft routine
z_ast = np.fft.fft(weighted_integrand, axis = 1)[:, 0:num_grid_points] * omega_min_correction
#corresponding time axis
if verbose > 0:
print("done!")
return z_ast, t
def auto_correlation_numpy(x, verbose=1):
warn("use 'auto_correlation' instead", DeprecationWarning)
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
num_samples, num_time_points = x.shape
x_prime = x.reshape(num_samples, 1, num_time_points)
x = x.reshape(num_samples, num_time_points, 1)
if verbose > 0:
print("calculate auto correlation function ...")
res = np.mean(x * np.conj(x_prime), axis = 0), np.mean(x * x_prime, axis = 0)
if verbose > 0:
print("done!")
return res
"""
def __init__(self, t_max, num_grid_points, seed=None, k=3):
r"""
:param t_max: specify time axis as [0, t_max]
:param num_grid_points: number of equidistant times on that axis
:param seed: if not ``None`` set seed to ``seed``
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
"""
self.t_max = t_max
self.num_grid_points = num_grid_points
self.t = np.linspace(0, t_max, num_grid_points)
self._z = None
self._interpolator = None
self._k = k
self._seed = seed
if seed is not None:
np.random.seed(seed)
self._one_over_sqrt_2 = 1/np.sqrt(2)
self._proc_cnt = 0
log.debug("init StocProc with t_max {} and {} grid points".format(t_max, num_grid_points))
def auto_correlation(x, verbose=1):
r"""Computes the auto correlation function for a set of wide-sense stationary stochastic processes
def __call__(self, t=None):
r"""
:param t: time to evaluate the stochastic process as, float of array of floats
evaluates the stochastic process via spline interpolation between the discrete process
"""
if self._z is None:
raise RuntimeError("StocProc_FFT has NO random data, call 'new_process' to generate a new random process")
if t is None:
return self._z
else:
if self._interpolator is None:
t0 = time.time()
self._interpolator = ComplexInterpolatedUnivariateSpline(self.t, self._z, k=self._k)
log.debug("created interpolator [{:.2e}s]".format(time.time() - t0))
return self._interpolator(t)
Computes the auto correlation function for the given set :math:`{X_i(t)}` of stochastic processes:
def _calc_z(self, y):
r"""
maps the normal distributed complex valued random variables y to the stochastic process
:return: the stochastic process, array of complex numbers
"""
pass
.. math:: \alpha(s, t) = \langle X(t)X^\ast(s) \rangle
def get_num_y(self):
r"""
:return: number of complex random variables needed to calculate the stochastic process
"""
pass
For wide-sense stationary processes :math:`\alpha` is independent of :math:`s`.
def get_time(self):
r"""
:return: time axis
"""
return self.t
:param x: 2D array of the shape (num_samples, num_time_points) containing the set of stochastic processes where each row represents one process
def get_z(self):
r"""
use :py:func:`new_process` to generate a new process
:return: the current process
"""
return self._z
:return: 2D array containing the correlation function as function of :math:`s, t`
def new_process(self, y=None, seed=None):
r"""
generate a new process by evaluating :py:func:`_calc_z'
When ``y`` is given use these random numbers as input for :py:func:`_calc_z`
otherwise generate a new set of random numbers.
:param y: independent normal distributed complex valued random variables with :math:`\sig_{ij}^2 = \langle y_i y_j^\ast \rangle = 2 \delta_{ij}
:param seed: if not ``None`` set seed to ``seed`` before generating samples
"""
t0 = time.time()
self._interpolator = None
self._proc_cnt += 1
if seed != None:
log.info("use fixed seed ({})for new process".format(seed))
np.random.seed(seed)
if y is None:
#random complex normal samples
y = np.random.normal(scale=self._one_over_sqrt_2, size = 2*self.get_num_y()).view(np.complex)
self._z = self._calc_z(y)
log.debug("proc_cnt:{} new process generated [{:.2e}s]".format(self._proc_cnt, time.time() - t0))
class StocProc_KLE(_absStocProc):
r"""
class to simulate stochastic processes using KLE method
- Solve fredholm equation on grid with ``ng_fredholm nodes`` (trapezoidal_weights).
If case ``ng_fredholm`` is ``None`` set ``ng_fredholm = num_grid_points``. In general it should
hold ``ng_fredholm < num_grid_points`` and ``num_grid_points = 10*ng_fredholm`` might be a good ratio.
- Calculate discrete stochastic process (using interpolation solution of fredholm equation) with num_grid_points nodes
- invoke spline interpolator when calling
"""
def __init__(self, r_tau, t_max, ng_fredholm, ng_fac=4, seed=None, sig_min=1e-5, k=3, align_eig_vec=False):
r"""
:param r_tau: auto correlation function of the stochastic process
:param t_max: specify time axis as [0, t_max]
:param seed: if not ``None`` set seed to ``seed``
:param sig_min: eigenvalue threshold (see KLE method to generate stochastic processes)
:param verbose: 0: no output, 1: informational output, 2: (eventually some) debug info
:param k: polynomial degree used for spline interpolation
"""
# this lengthy part will be skipped when init class from dump, as _A and alpha_k will be stored
t0 = time.time()
t, w = method_kle.get_simpson_weights_times(t_max, ng_fredholm)
r = self._calc_corr_matrix(t, r_tau)
_eig_val, _eig_vec = method_kle.solve_hom_fredholm(r, w, sig_min ** 2)
if align_eig_vec:
for i in range(_eig_vec.shape[1]):
s = np.sum(_eig_vec[:, i])
phase = np.exp(1j * np.arctan2(np.real(s), np.imag(s)))
_eig_vec[:, i] /= phase
_sqrt_eig_val = np.sqrt(_eig_val)
_A = w.reshape(-1, 1) * _eig_vec / _sqrt_eig_val.reshape(1, -1)
ng_fine = ng_fac * (ng_fredholm - 1) + 1
alpha_k = self._calc_corr_min_t_plus_t(s=np.linspace(0, t_max, ng_fine), bcf=r_tau)
log.debug("new KLE StocProc class prepared [{:.2e}]".format(time.time() - t0))
data = (_A, alpha_k, seed, k, t_max, ng_fac)
self.__setstate__(data)
# needed for getkey / getstate
self.key = (r_tau, t_max, ng_fredholm, ng_fac, sig_min, align_eig_vec)
# save these guys as they are needed to estimate the autocorrelation
self._s = t
self._w = w
self._eig_val = _eig_val
self._eig_vec = _eig_vec
def _getkey(self):
return self.__class__.__name__, self.key
def __getstate__(self):
return self._A, self.alpha_k, self._seed, self._k, self.t_max, self.ng_fac
def __setstate__(self, state):
self._A, self.alpha_k, seed, k, t_max, self.ng_fac = state
if self.ng_fac == 1:
self.kle_interp = False
else:
self.kle_interp = True
self._one_over_sqrt_2 = 1 / np.sqrt(2)
num_gp, self.num_y = self._A.shape
ng_fine = self.ng_fac * (num_gp - 1) + 1
super().__init__(t_max=t_max, num_grid_points=ng_fine, seed=seed, k=k)
@staticmethod
def _calc_corr_min_t_plus_t(s, bcf):
bcf_n_plus = bcf(s-s[0])
# [bcf(-3) , bcf(-2) , bcf(-1) , bcf(0), bcf(1), bcf(2), bcf(3)]
# == [bcf(3)^\ast, bcf(2)^\ast, bcf(1)^\ast, bcf(0), bcf(1), bcf(2), bcf(3)]
return np.hstack((np.conj(bcf_n_plus[-1:0:-1]), bcf_n_plus))
@staticmethod
def _calc_corr_matrix(s, bcf):
"""calculates the matrix alpha_ij = bcf(t_i-s_j)
calls bcf only for s-s_0 and reconstructs the rest
"""
n_ = len(s)
bcf_n = StocProc_KLE._calc_corr_min_t_plus_t(s, bcf)
# we want
# r = bcf(0) bcf(-1), bcf(-2)
# bcf(1) bcf( 0), bcf(-1)
# bcf(2) bcf( 1), bcf( 0)
r = np.empty(shape=(n_,n_), dtype = np.complex128)
for i in range(n_):
idx = n_-1-i
r[:,i] = bcf_n[idx:idx+n_]
return r
def __calc_missing(self):
raise NotImplementedError
def _calc_z(self, y):
if self.kle_interp:
_a_tmp = np.tensordot(y, self._A, axes=([0], [1]))
_num_gp = self._A.shape[0]
return stocproc_c.z_t(delta_t_fac = self.ng_fac,
N1 = _num_gp,
alpha_k = self.alpha_k,
a_tmp = _a_tmp,
kahanSum = True)
else:
return np.tensordot(y*self._sqrt_eig_val, self._eig_vec, axes=([0], [1])).flatten()
def get_num_y(self):
return self.num_y
class StocProc_KLE_tol(StocProc_KLE):
r"""
same as StocProc_KLE except that ng_fredholm is determined from given tolerance
"""
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
if verbose > 0:
print("calculate auto correlation function ...")
res = auto_correlation_c(x)
if verbose > 0:
print("done!")
return res
def __init__(self, tol=1e-2, **kwargs):
self.tol = tol
self._auto_grid_points(**kwargs)
# overwrite ng_fac in key from StocProc_KLE with value of tol
# self.key = (r_tau, t_max, ng_fredholm, ng_fac, sig_min, align_eig_vec)
self.key = (self.key[0], self.key[1], tol, self.key[3],self.key[4], self.key[5])
def auto_correlation_zero(x, s_0_idx = 0):
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
def _init_StocProc_KLE_and_get_error(self, ng, **kwargs):
super().__init__(ng_fredholm=ng, **kwargs)
ng_fine = self.ng_fac*(ng-1)+1
u_i_all_t = stocproc_c.eig_func_all_interp(delta_t_fac = self.ng_fac,
time_axis = self._s,
alpha_k = self.alpha_k,
weights = self._w,
eigen_val = self._eig_val,
eigen_vec = self._eig_vec)
u_i_all_ast_s = np.conj(u_i_all_t) #(N_gp, N_ev)
num_ev = len(self._eig_val)
tmp = self._eig_val.reshape(1, num_ev) * u_i_all_t #(N_gp, N_ev)
recs_bcf = np.tensordot(tmp, u_i_all_ast_s, axes=([1],[1]))
refc_bcf = np.empty(shape=(ng_fine,ng_fine), dtype = np.complex128)
for i in range(ng_fine):
idx = ng_fine-1-i
refc_bcf[:,i] = self.alpha_k[idx:idx+ng_fine]
err = np.max(np.abs(recs_bcf-refc_bcf)/np.abs(refc_bcf))
return err
num_samples = x.shape[0]
x_s_0 = x[:,s_0_idx].reshape(num_samples,1)
return np.mean(x * np.conj(x_s_0), axis = 0), np.mean(x * x_s_0, axis = 0)
def _auto_grid_points(self, **kwargs):
err = np.inf
c = 2
#exponential increase to get below error threshold
while err > self.tol:
c *= 2
ng = 2*c + 1
err = self._init_StocProc_KLE_and_get_error(ng, **kwargs)
log.info("ng {} -> err {:.3e}".format(ng, err))
c_low = c // 2
c_high = c
while (c_high - c_low) > 1:
c = (c_low + c_high) // 2
ng = 2*c + 1
err = self._init_StocProc_KLE_and_get_error(ng, **kwargs)
log.info("ng {} -> err {:.3e}".format(ng, err))
if err > self.tol:
c_low = c
else:
c_high = c
class StocProc_FFT_tol(_absStocProc):
r"""
Simulate Stochastic Process using FFT method
"""
def __init__(self, spectral_density, t_max, bcf_ref, intgr_tol=1e-2, intpl_tol=1e-2,
seed=None, k=3, negative_frequencies=False, method='simps'):
if not negative_frequencies:
log.debug("non neg freq only")
# assume the spectral_density is 0 for w<0
# and decays fast for large w
b = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = 1,
max_val = 1e6,
x0 = 1)
log.debug("upper int bound b {:.3e}".format(b))
b, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = 0,
b = b,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**24,
method = method)
log.debug("required tol result in N {}".format(N))
a = 0
else:
# assume the spectral_density is non zero also for w<0
# but decays fast for large |w|
b = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = 1,
max_val = 1e6,
x0 = 1)
a = method_fft.find_integral_boundary(integrand = spectral_density,
tol = intgr_tol**2,
ref_val = -1,
max_val = 1e6,
x0 = -1)
b_minus_a, N, dx, dt = method_fft.calc_ab_N_dx_dt(integrand = spectral_density,
intgr_tol = intgr_tol,
intpl_tol = intpl_tol,
tmax = t_max,
a = a*1000,
b = b*1000,
ft_ref = lambda tau:bcf_ref(tau)*np.pi,
N_max = 2**24,
method = method)
b = b*b_minus_a/(b-a)
a = b-b_minus_a
num_grid_points = int(np.ceil(t_max/dt))+1
t_max = (num_grid_points-1)*dt
super().__init__(t_max = t_max,
num_grid_points = num_grid_points,
seed = seed,
k = k)
omega = dx*np.arange(N)
if method == 'simps':
self.yl = spectral_density(omega + a) * dx / np.pi
self.yl = method_fft.get_fourier_integral_simps_weighted_values(self.yl)
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*a*self.t) #self.t is from the parent class
elif method == 'midp':
self.yl = spectral_density(omega + a + dx/2) * dx / np.pi
self.yl = np.sqrt(self.yl)
self.omega_min_correction = np.exp(-1j*(a+dx/2)*self.t) #self.t is from the parent class
else:
raise ValueError("unknown method '{}'".format(method))
def __getstate__(self):
return self.yl, self.num_grid_points, self.omega_min_correction, self.t_max, self._seed, self._k
def __setstate__(self, state):
self.yl, num_grid_points, self.omega_min_correction, t_max, seed, k = state
super().__init__(t_max = t_max,
num_grid_points = num_grid_points,
seed = seed,
k = k)
def _calc_z(self, y):
z = np.fft.fft(self.yl * y)[0:self.num_grid_points] * self.omega_min_correction
return z
def get_num_y(self):
return len(self.yl)

83
stocproc/tools.py Normal file
View file

@ -0,0 +1,83 @@
# -*- coding: utf8 -*-
from __future__ import print_function, division
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.integrate import quad
from .stocproc_c import auto_correlation as auto_correlation_c
import sys
import os
from warnings import warn
sys.path.append(os.path.dirname(__file__))
import numpy as np
from scipy.linalg import eigh as scipy_eigh
from collections import namedtuple
stocproc_key_type = namedtuple(typename = 'stocproc_key_type',
field_names = ['bcf', 't_max', 'ng', 'tol', 'cubatur_type', 'sig_min', 'ng_fac'] )
class ComplexInterpolatedUnivariateSpline(object):
def __init__(self, x, y, k=2):
self.re_spline = InterpolatedUnivariateSpline(x, np.real(y))
self.im_spline = InterpolatedUnivariateSpline(x, np.imag(y))
def __call__(self, t):
return self.re_spline(t) + 1j * self.im_spline(t)
def auto_correlation_numpy(x, verbose=1):
warn("use 'auto_correlation' instead", DeprecationWarning)
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
num_samples, num_time_points = x.shape
x_prime = x.reshape(num_samples, 1, num_time_points)
x = x.reshape(num_samples, num_time_points, 1)
if verbose > 0:
print("calculate auto correlation function ...")
res = np.mean(x * np.conj(x_prime), axis = 0), np.mean(x * x_prime, axis = 0)
if verbose > 0:
print("done!")
return res
def auto_correlation(x, verbose=1):
r"""Computes the auto correlation function for a set of wide-sense stationary stochastic processes
Computes the auto correlation function for the given set :math:`{X_i(t)}` of stochastic processes:
.. math:: \alpha(s, t) = \langle X(t)X^\ast(s) \rangle
For wide-sense stationary processes :math:`\alpha` is independent of :math:`s`.
:param x: 2D array of the shape (num_samples, num_time_points) containing the set of stochastic processes where each row represents one process
:return: 2D array containing the correlation function as function of :math:`s, t`
"""
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
if verbose > 0:
print("calculate auto correlation function ...")
res = auto_correlation_c(x)
if verbose > 0:
print("done!")
return res
def auto_correlation_zero(x, s_0_idx = 0):
# handle type error
if x.ndim != 2:
raise TypeError('expected 2D numpy array, but {} given'.format(type(x)))
num_samples = x.shape[0]
x_s_0 = x[:,s_0_idx].reshape(num_samples,1)
return np.mean(x * np.conj(x_s_0), axis = 0), np.mean(x * x_s_0, axis = 0)

View file

@ -1,140 +0,0 @@
from scipy.integrate import quad
import numpy as np
import numpy.polynomial as pln
import pytest
import os
import sys
import pathlib
p = pathlib.PosixPath(os.path.abspath(__file__))
sys.path.insert(0, str(p.parent.parent))
from stocproc import gquad
def scp_laguerre(p1,p2):
f = lambda x: p1(x)*p2(x)*np.exp(-x)
return quad(f, 0, np.inf)
def scp_legendre(p1,p2):
f = lambda x: p1(x)*p2(x)
return quad(f, -1, 1)
def orthogonality(p, scp, tol_0, tol_1):
n = len(p)
for i in range(n):
s = scp(p[i],p[i])
p[i] /= np.sqrt(s[0])
for i in range(n):
for j in range(i,n):
s = scp(p[i],p[j])
print("test <p_{}|p_{}> = {:+.2e}".format(i,j,s[0]))
if i == j:
assert abs(s[0]-1) < tol_1, "error: {}".format(abs(s[0]-1))
else:
assert abs(s[0]) < tol_0, "error: {}".format(abs(s[0]))
def test_orthogonality_laguerre():
n = 12
al = 0
a,b = gquad._recur_laguerre(n, al)
p = gquad.get_poly(a,b)
orthogonality(p, scp=scp_laguerre, tol_0=1e-10, tol_1=1e-10)
def test_orthogonality_legendre():
n = 12
a,b = gquad._recur_legendre(n)
p = gquad.get_poly(a,b)
orthogonality(p, scp=scp_legendre, tol_0=1e-10, tol_1=1e-10)
# due to the lack of python 3 compatible orthpol package
@pytest.mark.xfail
def test_compare_with_orthpol():
n = 50
ipoly = 7 # Laguerre
al = 0
be = 0 # not used
a_op, b_op, ierr = op.drecur(n, ipoly, al, be)
a, b = gquad._recur_laguerre(n, al)
assert np.allclose(a, a_op)
# note: the recur coef b[0] has no influence on the recursion formula,
# because it is multiplied by the polynomial of index -1 which is defined to be zero
# further more this coef does not occur when calculating the nodes and weights
assert np.allclose(b[1:], b_op[1:])
al = 1.2
a_op, b_op, ierr = op.drecur(n, ipoly, al, be)
a, b = gquad._recur_laguerre(n, al)
assert np.allclose(a, a_op)
assert np.allclose(b[1:], b_op[1:])
def test_integration_legendre():
n = 12
np.random.seed(0)
num_samples = 10
for tmp in range(num_samples):
low = np.random.rand()
high = np.random.rand()
x, w = gquad.gauss_nodes_weights_legendre(n, low, high)
coeff = 10*np.random.randn(2*n-1)
p = pln.Polynomial(coef=coeff)
a = 0.5
p_a = p(a)
p_a_ = 0
for i, c in enumerate(coeff):
p_a_ += coeff[i]* a**i
assert abs(p_a - p_a_) < 1e-14, "error: {:.2e}".format(abs(p_a - p_a_))
p_int = p.integ(m=1, lbnd=low)(high)
p_int_gauss = np.sum(w*p(x))
diff = abs(p_int - p_int_gauss)
print("diff: {:.2e}".format(diff))
assert diff < 1e-14
def test_compare_with_scipy_laguerre():
n_list = [3,7,11,20,52,100]
al = 0
for n in n_list:
x, w = gquad.gauss_nodes_weights_laguerre(n, al)
x_, w_ = pln.laguerre.laggauss(deg=n)
diff_x = np.abs(x-x_)
diff_w = np.abs(w-w_)
print("degree:", n)
print("max diff x: {:.2e}".format(max(diff_x)))
print("max diff w: {:.2e}".format(max(diff_w)))
assert max(diff_x) < 1e-12
assert max(diff_w) < 1e-12
def test_compare_with_scipy_legendre():
n_list = [3,7,11,20,52,100,200,500]
al = 0
for n in n_list:
x, w = gquad.gauss_nodes_weights_legendre(n)
x_, w_ = pln.legendre.leggauss(deg=n)
diff_x = np.abs(x-x_)
diff_w = np.abs(w-w_)
print("degree:", n)
print("max diff x: {:.2e}".format(max(diff_x)))
print("max diff w: {:.2e}".format(max(diff_w)))
assert max(diff_x) < 1e-12
assert max(diff_w) < 1e-12
if __name__ == "__main__":
# test_orthogonality_laguerre()
# test_orthogonality_legendre()
# test_integration_legendre()
# test_compare_with_scipy_laguerre()
test_compare_with_scipy_legendre()

View file

@ -5,8 +5,11 @@ import numpy as np
import math
from scipy.special import gamma as gamma_func
import scipy.integrate as sp_int
import matplotlib.pyplot as plt
from math import fsum
try:
import matplotlib.pyplot as plt
except ImportError:
print("matplotlib not found -> any plotting will crash")
import pathlib
p = pathlib.PosixPath(os.path.abspath(__file__))
@ -14,6 +17,9 @@ sys.path.insert(0, str(p.parent.parent))
import stocproc as sp
import logging
def test_find_integral_boundary():
def f(x):
return np.exp(-(x)**2)
@ -66,7 +72,9 @@ def test_find_integral_boundary():
def fourier_integral_trapz(integrand, a, b, N):
"""
approximates int_a^b dx integrand(x) by the riemann sum with N terms
this function is here and not in method_fft because it has almost no
advantage over the modpoint method. so only for testing purposes.
"""
yl = integrand(np.linspace(a, b, N+1, endpoint=True))
yl[0] = yl[0]/2
@ -83,8 +91,7 @@ def fourier_integral_trapz(integrand, a, b, N):
def fourier_integral_simple_test(integrand, a, b, N):
delta_x = (b-a)/N
delta_k = 2*np.pi/(b-a)
#x = np.arange(N)*delta_x+a
x = np.linspace(a, b, N, endpoint = False) + delta_x/2
k = np.arange(N//2+1)*delta_k
@ -106,7 +113,6 @@ def fourier_integral_trapz_simple_test(integrand, a, b, N):
delta_x = (b-a)/N
delta_k = 2*np.pi*N/(b-a)/(N+1)
#x = np.arange(N)*delta_x+a
x = np.linspace(a, b, N+1, endpoint = True)
k = np.arange((N+1)//2+1)*delta_k
@ -184,7 +190,7 @@ def test_fourier_integral_finite_boundary():
idx = np.where(np.logical_and(tau < 75, np.isfinite(rd)))
tau = tau[idx]
rd = rd[idx]
plt.plot(tau, rd, label='trapz N:{}'.format(N))
# plt.plot(tau, rd, label='trapz N:{}'.format(N))
mrd_trapz = np.max(rd)
N = 513
@ -194,7 +200,7 @@ def test_fourier_integral_finite_boundary():
idx = np.where(np.logical_and(tau < 75, np.isfinite(rd)))
tau = tau[idx]
rd = rd[idx]
plt.plot(tau, rd, label='simps N:{}'.format(N))
# plt.plot(tau, rd, label='simps N:{}'.format(N))
mrd_simps = np.max(rd)
assert mrd_simps < mrd_trapz, "mrd_simps ({:.3e}) >= mrd_trapz ({:.3e})".format(mrd_simps, mrd_trapz)
@ -234,10 +240,10 @@ def test_fourier_integral_infinite_boundary():
# sys.exit()
a,b = sp.method_fft.find_integral_boundary_auto(integrand=intg, tol=1e-12, ref_val=1)
print(a,b)
N = 2**18
for N in [2**16, 2**18, 2**20]:
errs = [8e-5, 1e-5, 1.3e-6]
for i, N in enumerate([2**16, 2**18, 2**20]):
tau, bcf_n = sp.method_fft.fourier_integral_midpoint(intg, a, b, N=N)
bcf_ref_n = bcf_ref(tau)
@ -248,8 +254,8 @@ def test_fourier_integral_infinite_boundary():
bcf_n = bcf_n[idx]
bcf_ref_n = bcf_ref_n[idx]
rd_trapz = np.abs(bcf_ref_n-bcf_n)/np.abs(bcf_ref_n)
p, = plt.plot(tau, rd_trapz, label="trapz N {}".format(N))
rd_mp = np.abs(bcf_ref_n-bcf_n)/np.abs(bcf_ref_n)
# p, = plt.plot(tau, rd_mp, label="trapz N {}".format(N))
tau, bcf_n = sp.method_fft.fourier_integral_simps(intg, a, b=b, N=N-1)
@ -261,23 +267,22 @@ def test_fourier_integral_infinite_boundary():
bcf_n = bcf_n[idx]
bcf_ref_n = bcf_ref_n[idx]
rd = np.abs(bcf_ref_n-bcf_n)/np.abs(bcf_ref_n)
plt.plot(tau, rd, label="simps N {}".format(N), color=p.get_color(), ls='--')
rd_sm = np.abs(bcf_ref_n-bcf_n)/np.abs(bcf_ref_n)
# plt.plot(tau, rd_sm, label="simps N {}".format(N), color=p.get_color(), ls='--')
t_ = 3
print(a,b)
x_simps, dx = np.linspace(a,b,N-1, endpoint=True, retstep=True)
I = sp_int.simps(intg(x_simps)*np.exp(-1j*x_simps*t_), dx=dx)
err = np.abs(I-bcf_ref(t_))/np.abs(bcf_ref(t_))
plt.plot(t_, err, marker='o', color='g')
assert np.max(rd_trapz) < 5*1e-4, "max rd_trapz = {:.3e}".format(np.max(rd_trapz))
# plt.legend(loc='lower right')
# plt.grid()
# plt.yscale('log')
# plt.show()
assert np.max(rd_mp) < errs[i]
assert np.max(rd_sm) < errs[i]
# plt.plot(t_, err, marker='o', color='g')
# plt.legend(loc='lower right')
# plt.grid()
# plt.yscale('log')
# plt.show()
def test_get_N_for_accurate_fourier_integral():
s = 0.5
@ -285,8 +290,8 @@ def test_get_N_for_accurate_fourier_integral():
intg = lambda x: osd(x, s, wc)
bcf_ref = lambda t: gamma_func(s + 1) * wc**(s+1) * (1 + 1j*wc * t)**(-(s+1))
a,b = sp.method_fft.find_integral_boundary_auto(integrand=intg, tol=1e-12, ref_val=1)
N = sp.method_fft.get_N_for_accurate_fourier_integral(intg, a, b, t_max=40, tol=1e-3, ft_ref=bcf_ref, N_max = 2**15, method='simps')
a,b = sp.method_fft.find_integral_boundary_auto(integrand=intg, tol=1e-12, ref_val=1)
N = sp.method_fft.get_N_for_accurate_fourier_integral(intg, a, b, t_max=40, tol=1e-3, ft_ref=bcf_ref, N_max = 2**20, method='simps')
print(N)
def test_get_dt_for_accurate_interpolation():
@ -298,7 +303,7 @@ def test_get_dt_for_accurate_interpolation():
print(dt)
def test_sclicing():
yl = np.ones(10)
yl = np.ones(10, dtype=int)
yl = sp.method_fft.get_fourier_integral_simps_weighted_values(yl)
assert yl[0] == 2/6
assert yl[1] == 8/6
@ -319,8 +324,7 @@ def test_calc_abN():
tol = 1e-3
tmax=40
method='simps'
a,b = sp.method_fft.find_integral_boundary_auto(integrand=intg, tol=1e-12, ref_val=1)
ab, N, dx, dt = sp.method_fft.calc_ab_N_dx_dt(integrand = intg,
intgr_tol = tol,
@ -334,8 +338,9 @@ def test_calc_abN():
if __name__ == "__main__":
test_find_integral_boundary()
test_fourier_integral_finite_boundary()
logging.basicConfig(level=logging.INFO)
# test_find_integral_boundary()
# test_fourier_integral_finite_boundary()
test_fourier_integral_infinite_boundary()
test_get_N_for_accurate_fourier_integral()
test_get_dt_for_accurate_interpolation()

File diff suppressed because it is too large Load diff