master-thesis/python/energy_flow_proper/hopsflow/util.py

234 lines
6.7 KiB
Python
Raw Normal View History

2021-11-04 15:33:08 +01:00
"""Utilities for the energy flow calculation."""
import itertools
import functools
import multiprocessing
2021-11-04 15:33:08 +01:00
import numpy as np
2021-11-11 16:09:37 +01:00
import scipy
2021-11-25 20:14:52 +01:00
from typing import Iterator, Optional, Any, Callable, Union
from lmfit import minimize, Parameters
from numpy.polynomial import Polynomial
2021-11-04 15:33:08 +01:00
def apply_operator(ψ: np.ndarray, op: np.ndarray) -> np.ndarray:
"""
Applies the operator ``op`` to each element of the time series
ψ of the dimensions ``(*, dim)`` where ``dim`` is the hilbert
space dimension.
"""
return np.array((op @ ψ.T).T)
def sandwhich_operator(
ψ: np.ndarray, op: np.ndarray, normalize: bool = False
) -> np.ndarray:
"""
Applies the operator ``op`` to each element of the time
series ψ of the dimensions ``(*, dim)`` where ``dim`` is the
hilbert space dimension and sandwiches ``ψ`` onto it from the
2021-11-25 20:14:52 +01:00
left. If ``normalize`` is :any:`True` then the value will be
divided by the squared norm.
"""
exp_val = np.sum(ψ.conj() * apply_operator(ψ, op), axis=1)
if normalize:
exp_val /= np.sum(ψ.conj() * ψ, axis=1).real
return exp_val
2021-11-11 16:09:37 +01:00
def operator_expectation(ρ: np.ndarray, op: np.ndarray) -> np.ndarray:
"""Calculates the expecation value of ``op`` as a time series.
:param ρ: The state as time series. ``(time, dim-sys, dim-sys)``
:param op: The operator.
:returns: the expectation value
"""
return np.einsum("ijk,kj", ρ, op).real
def operator_expectation_ensemble(
ψs: Iterator[np.ndarray], op: np.ndarray, N: Optional[int], normalize: bool = False
) -> np.ndarray:
"""Calculates the expecation value of ``op`` as a time series.
:param ψs: A collection of stochastic trajectories. Each
element should have the shape ``(time, dim-sys)``.
:param op: The operator.
:param N: Number of samples to take.
:returns: the expectation value
"""
return ensemble_mean(ψs, sandwhich_operator, N, const_args=(op, normalize))
2021-11-04 15:33:08 +01:00
def mulitply_hierarchy(left: np.ndarray, right: np.ndarray) -> np.ndarray:
"""Multiply each hierarchy member with a member of ``left`` for each time step.
:param left: array of shape ``(hierarchy-width,)``
:param right: array of shape ``(time-steps, hierarchy-width, system-dimension)``
"""
return left[None, :, None] * right
def dot_with_hierarchy(left: np.ndarray, right: np.ndarray) -> np.ndarray:
2021-11-25 20:14:52 +01:00
r"""Calculates :math:`\sum_k \langle\mathrm{left} | \mathrm{right}^{(e_k)}\rangle` for
2021-11-04 15:33:08 +01:00
each time step.
:param left: array of shape ``(time-steps, system-dimension, hierarchy-width,)``
:param right: array of shape ``(time-steps, hierarchy-width, system-dimension)``
"""
2021-11-11 16:09:37 +01:00
return np.sum(left[:, None, :] * right, axis=(1, 2))
def α_apprx(τ: np.ndarray, G: np.ndarray, W: np.ndarray) -> np.ndarray:
r"""
Calculate exponential expansion $\sum_i G_i \exp(W_i * τ)$ of the
BCF along ``τ``.
:param τ: the time
:param G: pefactors
:param W: exponents
:returns: the exponential expansion evaluated at ``τ``
"""
return np.sum(
G[np.newaxis, :] * np.exp(-W[np.newaxis, :] * (τ[:, np.newaxis])), axis=1
)
def integrate_array(arr: np.ndarray, t: np.ndarray) -> np.ndarray:
"""
Calculates the antiderivative of the function sampled in ``arr``
along ``t``.
"""
2021-11-15 13:34:48 +01:00
return scipy.integrate.cumulative_trapezoid(arr, t, initial=0)
###############################################################################
# Ensemble Mean #
###############################################################################
2021-11-25 20:14:52 +01:00
_ENSEMBLE_MEAN_ARGS: tuple = tuple()
_ENSEMBLE_MEAN_KWARGS: dict = dict()
def _ENSEMBLE_FUNC(_, *args, **kwargs):
return _
def _ensemble_mean_call(arg) -> np.ndarray:
global _ENSEMBLE_MEAN_ARGS
global _ENSEMBLE_MEAN_KWARGS
return _ENSEMBLE_FUNC(arg, *_ENSEMBLE_MEAN_ARGS, **_ENSEMBLE_MEAN_KWARGS)
2021-11-25 20:14:52 +01:00
def _ensemble_mean_init(func: Callable, args: tuple, kwargs: dict):
global _ENSEMBLE_FUNC
global _ENSEMBLE_MEAN_ARGS
global _ENSEMBLE_MEAN_KWARGS
_ENSEMBLE_FUNC = func
_ENSEMBLE_MEAN_ARGS = args
_ENSEMBLE_MEAN_KWARGS = kwargs
# TODO: Use paramspec
def ensemble_mean(
arg_iter: Iterator[Any],
function: Callable[..., np.ndarray],
N: Optional[int] = None,
2021-11-25 20:14:52 +01:00
const_args: tuple = tuple(),
const_kwargs: dict = dict(),
n_proc: Optional[int] = None,
):
result = function(next(arg_iter), *const_args)
if not n_proc:
n_proc = multiprocessing.cpu_count()
with multiprocessing.Pool(
processes=n_proc,
initializer=_ensemble_mean_init,
initargs=(function, const_args, const_kwargs),
) as pool:
result_iter = pool.imap_unordered(
_ensemble_mean_call,
itertools.islice(arg_iter, None, N - 1 if N else None),
2021-11-15 13:34:48 +01:00
1,
)
n = 1
for res in result_iter:
result += res
n += 1
return result / n
2021-11-25 20:14:52 +01:00
def fit_α(
α: Callable[[np.ndarray], np.ndarray],
n: int,
t_max: float,
support_points: Union[int, np.ndarray] = 1000,
) -> tuple[np.ndarray, np.ndarray]:
"""
Fit the BCF ``α`` to a sum of ``n`` exponentials up to
``t_max`` using a number of ``support_points``.
"""
def residual(fit_params, x, data):
resid = 0
w = np.array([fit_params[f"w{i}"] for i in range(n)]) + 1j * np.array(
[fit_params[f"wi{i}"] for i in range(n)]
)
g = np.array([fit_params[f"g{i}"] for i in range(n)]) + 1j * np.array(
[fit_params[f"gi{i}"] for i in range(n)]
)
resid = data - α_apprx(x, g, w)
return resid.view(float)
fit_params = Parameters()
for i in range(n):
fit_params.add(f"g{i}", value=0.1)
fit_params.add(f"gi{i}", value=0.1)
fit_params.add(f"w{i}", value=0.1)
fit_params.add(f"wi{i}", value=0.1)
ts = np.asarray(support_points)
if ts.size < 2:
ts = np.linspace(0, t_max, support_points)
out = minimize(residual, fit_params, args=(ts, α(ts)))
w = np.array([out.params[f"w{i}"] for i in range(n)]) + 1j * np.array(
[out.params[f"wi{i}"] for i in range(n)]
)
g = np.array([out.params[f"g{i}"] for i in range(n)]) + 1j * np.array(
[out.params[f"gi{i}"] for i in range(n)]
)
return w, g
def except_element(array: np.ndarray, index: int) -> np.ndarray:
"""Returns the ``array`` except the element with ``index``."""
mask = [i != index for i in range(array.size)]
return array[mask]
def poly_real(p: Polynomial) -> Polynomial:
"""Return the real part of ``p``."""
new = p.copy()
new.coef = p.coef.real
return new