"""Utilities for the energy flow calculation.""" from __future__ import annotations import itertools import multiprocessing import numpy as np import scipy import scipy.integrate import scipy.optimize from typing import Iterator, Optional, Any, Callable, Union from lmfit import minimize, Parameters from numpy.polynomial import Polynomial from tqdm import tqdm from pathlib import Path import sys import shutil import hashlib import logging import json from functools import singledispatch, singledispatchmethod import scipy.interpolate import copy import ray import numbers import matplotlib.pyplot as plt from hops.util.dynamic_matrix import DynamicMatrix, ConstantMatrix import opt_einsum as oe import gc import math import time from hops.core.hierarchy_data import HIData Aggregate = tuple[int, np.ndarray, np.ndarray] EnsembleReturn = Union[Aggregate, list[Aggregate]] class EnsembleValue: def __init__( self, value: Union[Aggregate, list[Aggregate], tuple[np.ndarray, np.ndarray]] ): if ( isinstance(value, tuple) and len(value) == 2 and isinstance(value[0], np.ndarray) and isinstance(value[1], np.ndarray) ): self._value: list[Aggregate] = [[0, *value]] # type:ignore else: self._value: list[Aggregate] = ( # type:ignore value if (isinstance(value, list) or isinstance(value, np.ndarray)) else [value] ) @property def final_aggregate(self): return self._value[-1] @property def N(self): return self.final_aggregate[0] @property def value(self): return self.final_aggregate[1] @property def σ(self): return self.final_aggregate[2] @property def Ns(self): return [N for N, _, _ in self._value] @property def values(self): return [val for _, val, _ in self._value] @property def σs(self): return [σ for _, _, σ in self._value] @property def aggregate_iterator(self): for agg in self._value: yield agg @property def ensemble_value_iterator(self): for agg in self._value: yield EnsembleValue(agg) @property def mean(self): N, val, σ = self.final_aggregate return EnsembleValue( [(N, val.mean().copy(), np.sqrt((σ.copy() ** 2).sum() / val.size ** 2))] ) @property def max(self): N, val, σ = self.final_aggregate max_index = np.argmax(val) return EnsembleValue([(N, val[max_index].copy(), σ[max_index].copy())]) @property def min(self): N, val, σ = self.final_aggregate min_index = np.argmin(val) return EnsembleValue([(N, val[min_index].copy(), σ[min_index].copy())]) def __getitem__(self, index): return EnsembleValue(self._value[index]) def slice(self, slc: Union[np.ndarray, slice]): results = [] for N, val, σ in self.aggregate_iterator: results.append((N, val[slc], σ[slc])) return EnsembleValue(results) def __len__(self) -> int: return len(self._value) def for_bath(self, bath: int): if self.num_baths == 1 and len(self.value.shape) in [0, 1]: return self return EnsembleValue([(N, val[bath], σ[bath]) for N, val, σ in self._value]) @property def num_baths(self) -> int: shape = self.value.shape return self.value.shape[0] if len(shape) > 1 else 1 def sum_baths(self) -> EnsembleValue: final = self.for_bath(0) for i in range(1, self.num_baths): final = final + self.for_bath(i) return final def insert(self, value: Aggregate): where = len(self._value) for i, (N, _, _) in enumerate(self._value): if N > value[0]: where = i break self._value.insert(where, value) def insert_multi(self, values: list[Aggregate]): for value in values: self.insert(value) def consistency(self, other: Union[EnsembleValue, np.ndarray]) -> float: diff = abs( self[-1] - (other[-1] if isinstance(other, self.__class__) else other) ) diff_val = diff.for_bath(0).value return ( (diff.value < diff.σ).sum() / (len(diff_val) if hasattr(diff_val, "__len__") else 1) * 100 ) def integrate(self, τ: np.ndarray) -> EnsembleValue: """Calculate the antiderivative along a 'time axis' ``τ``.""" results = [] for N, val, σ in self.aggregate_iterator: results.append((N, *integrate_array(val, τ, σ))) return EnsembleValue(results) def __abs__(self) -> "EnsembleValue": out = [] for N, value, σ in self._value: out.append((N, abs(value), σ)) return EnsembleValue(out) def __add__( self, other: Union["EnsembleValue", float, int, np.ndarray] ) -> EnsembleValue: if type(self) == type(other): if len(self) != len(other): raise RuntimeError("Can only add values of equal length.") left = self._value right = other._value out = [] for left_i, right_i in zip(left, right): if left_i[0] < right_i[0]: samples = left_i[0] σ = np.sqrt( left_i[2] ** 2 + right_i[2] ** 2 * (right_i[0] - 1) / (left_i[0] - 1) ).real else: samples = right_i[0] σ = np.sqrt( left_i[2] ** 2 * (left_i[0] - 1) / (right_i[0] - 1) + right_i[2] ** 2 ).real out.append( ( samples, left_i[1] + right_i[1], σ, ) ) return EnsembleValue(out) if isinstance(other, tuple): new = copy.deepcopy(self) new.insert(other) return new if isinstance(other, list) and isinstance(other[0], tuple): new = copy.deepcopy(self) new.insert_multi(other) return new if isinstance(other, numbers.Number) or isinstance(other, np.ndarray): out = [] for N, value, σ in self.aggregate_iterator: out.append((N, value + other, σ)) return EnsembleValue(out) return NotImplemented __radd__ = __add__ def __mul__(self, other: Union["EnsembleValue", float, int, np.ndarray]): if ( isinstance(other, float) or isinstance(other, int) or isinstance(other, np.ndarray) ): return EnsembleValue( [(N, val * other, np.abs(σ * other)) for N, val, σ in self._value] ) if type(self) == type(other): if len(self) != len(other): raise RuntimeError("Can only multiply values of equal length.") left = self._value right = other._value out = [] for left_i, right_i in zip(left, right): if left_i[0] != right_i[0]: raise RuntimeError("Can only multiply equal sample counts.") out.append( ( left_i[0], left_i[1] * right_i[1], np.sqrt( (right_i[1] * left_i[2]) ** 2 + (left_i[1] * right_i[2]) ** 2 ).real, ) ) return EnsembleValue(out) return NotImplemented __rmul__ = __mul__ def __sub__( self, other: Union["EnsembleValue", float, int, np.ndarray] ) -> "EnsembleValue": if ( type(self) == type(other) or isinstance(other, float) or isinstance(other, int) or isinstance(other, np.ndarray) ): return self + (-1 * other) return NotImplemented def __rsub__(self, other: Union["EnsembleValue", float, int]) -> "EnsembleValue": if ( type(self) == type(other) or isinstance(other, float) or isinstance(other, int) ): return (self * -1) + other return NotImplemented def __repr__(self) -> str: return f"{self.__class__.__name__}({self._value})" def ensemble_return_scale(left: float, right: EnsembleReturn) -> EnsembleReturn: """Scales ``right`` by ``left``.""" single_return = False if not isinstance(right, list): single_return = True right = [right] result = [(N, left * val, np.abs(left * σ)) for N, val, σ in right] return result[0] if single_return else result def ensemble_return_add(left: EnsembleReturn, right: EnsembleReturn) -> EnsembleReturn: """ Adds the values of ``left`` and ``right``. The standard deviations are calculated correctly by adding the variances. """ single_return = False if not isinstance(left, list): assert not isinstance( right, list ), "Both ensemble returns have to be of the same shape" single_return = True left = [left] right = [right] assert isinstance(right, list) out = [] for left_i, right_i in zip(left, right): if left_i[0] != right_i[0]: raise RuntimeError("Can only add equal sample counts.") out.append( ( left_i[0], left_i[1] + right_i[1], np.sqrt(left_i[2] ** 2 + right_i[2] ** 2).real, ) ) return out[0] if single_return else out class BCF: r"""A parameter object to hold information about a BCF. The BCFs will be expanded into a sum of exponentials like :math:`\alpha(\tau) = \sum_k G_k \cdot \exp(-W_k\cdot\tau)`. You can either give the BCFs as parameter or the coefficients. If you give the BCFs, the fit will be performed automatically. Calling this object will call the wrapped BCF function. :param resolution: the precision in the sampling for the fit, ``t_max/precision`` points will be used :param num_terms: the number of terms of the expansion of the BCF expansion """ def __init__( self, t_max: float, function: Optional[Callable[[np.ndarray], np.ndarray]] = None, num_terms: Optional[int] = None, resolution: Optional[float] = None, factors: Optional[np.ndarray] = None, exponents: Optional[np.ndarray] = None, ): #: the maximum simulation time self.t_max = t_max if function is not None: #: the BCF as python function, will be set to the exponential #: expansion if the BCF coefficients are given. self.function = function if num_terms is None or resolution is None: raise ValueError( "Either give the function, the number of terms and the resolution or the coefficients." ) _exponents, _factors = fit_α( self.function, num_terms, self.t_max, int(self.t_max / resolution), ) #: the factors in the BCF expansion self.factors = _factors #: the exponents in the BCF expansion self.exponents = _exponents else: if factors is None or exponents is None: raise ValueError( "Either give the function and number of terms or the coefficients." ) assert factors is not None assert exponents is not None self.factors = factors self.exponents = exponents if self.factors.size != self.exponents.size: raise ValueError( "Factors and exponents have to have the same dimension." ) self.function = self.approx def approx(self, t: np.ndarray) -> np.ndarray: """The BCF as exponential expansion.""" return α_apprx(t, self.factors, self.exponents) def __call__(self, t: np.ndarray): return self.function(t) def apply_operator(ψ: np.ndarray, op: np.ndarray) -> np.ndarray: """ Applies the operator ``op`` to each element of the time series ψ of the dimensions ``(*, dim)`` where ``dim`` is the hilbert space dimension. """ return np.array((op @ ψ.T).T) def sandwhich_operator( ψ: np.ndarray, op: np.ndarray, normalize: bool = False, real: bool = False, ) -> np.ndarray: """ Applies the operator ``op`` to each element of the time series ``ψ`` of the dimensions ``(*, dim)`` where ``dim`` is the hilbert space dimension and sandwiches ``ψ`` onto it from the left. If ``normalize`` is :any:`True` then the value will be divided by the squared norm. If ``real`` is :any:`True`, the real part is returned. """ exp_val = np.sum(ψ.conj() * apply_operator(ψ, op), axis=1) if normalize: exp_val /= np.sum(ψ.conj() * ψ, axis=1).real if real: exp_val = np.real(exp_val) return exp_val def operator_expectation( ρ: np.ndarray, op: np.ndarray, real: bool = False ) -> np.ndarray: """Calculates the expecation value of ``op`` as a time series. :param ρ: The state as time series. ``(time, dim-sys, dim-sys)`` :param op: The operator. :param real: Whether to take the real part. :returns: the expectation value """ expect = np.einsum("ijk,kj", ρ, op) if real: expect = np.real(expect) return expect def operator_expectation_ensemble( ψs: Iterator[np.ndarray], op: Union[np.ndarray, DynamicMatrix], t: np.ndarray, normalize: bool = False, real: bool = False, **kwargs, ) -> EnsembleValue: """Calculates the expecation value of ``op`` as a time series. :param ψs: A collection of stochastic trajectories. Each element should have the shape ``(time, dim-sys)``. :param op: The operator. :param N: Number of samples to take. :param real: Whether to take the real part. All the other kwargs are passed on to :any:`ensemble_mean`. :returns: the expectation value """ if isinstance(op, ConstantMatrix): op = op(0) if isinstance(op, DynamicMatrix): calc_sandwhich = oe.contract_expression( "ti,tij,tj->t", (len(t), op.shape[0]), op(t), (len(t), op.shape[0]), constants=[1], ) else: calc_sandwhich = oe.contract_expression( "ti,ij,tj->t", (len(t), op.shape[0]), op, (len(t), op.shape[0]), constants=[1], ) def op_exp_task(ψ: np.ndarray): sandwhiches: np.ndarray = calc_sandwhich(ψ.conj(), ψ) # type: ignore if normalize: sandwhiches /= np.sum(ψ.conj() * ψ, axis=1).real if real: sandwhiches = sandwhiches.real return sandwhiches return ensemble_mean(ψs, op_exp_task, **kwargs) def mulitply_hierarchy(left: np.ndarray, right: np.ndarray) -> np.ndarray: """Multiply each hierarchy member with a member of ``left`` for each time step. :param left: array of shape ``(hierarchy-width,)`` :param right: array of shape ``(time-steps, hierarchy-width, system-dimension)`` """ return left[None, :, None] * right def dot_with_hierarchy(left: np.ndarray, right: np.ndarray) -> np.ndarray: r"""Calculates :math:`\sum_k \langle\mathrm{left} | \mathrm{right}^{(e_k)}\rangle` for each time step. :param left: array of shape ``(time-steps, system-dimension, hierarchy-width,)`` :param right: array of shape ``(time-steps, hierarchy-width, system-dimension)`` """ return np.sum(left[:, None, :] * right, axis=(1, 2)) def α_apprx(τ: np.ndarray, G: np.ndarray, W: np.ndarray) -> np.ndarray: r""" Calculate exponential expansion $\sum_i G_i \exp(W_i * τ)$ of the BCF along ``τ``. :param τ: the time :param G: pefactors :param W: exponents :returns: the exponential expansion evaluated at ``τ`` """ return np.sum( G[np.newaxis, :] * np.exp(-W[np.newaxis, :] * (τ[:, np.newaxis])), axis=1, ) def integrate_array( arr: np.ndarray, t: np.ndarray, err: Optional[np.ndarray] ) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]: """ Calculates the antiderivative of the function sampled in ``arr`` along ``t`` using spline interpolation. Optionally the error ``err`` is being integrated alongside. """ multiple_baths = len(arr.shape) > 1 if not multiple_baths: arr = arr[None, ...] if err is not None: err = err[None, ...] splines = [scipy.interpolate.UnivariateSpline(t, y, s=0, k=5) for y in arr] integral = np.array([spline.antiderivative()(t) for spline in splines]) if err is not None: dt = t[1:] - t[:-1] err_sum = [ np.concatenate(([0], np.cumsum(((e[1:] ** 2 + e[:-1] ** 2) / 4) * dt ** 2))) for e in err ] err_integral = np.sqrt(err_sum).real if multiple_baths: return integral, err_integral return integral[0], err_integral[0] if multiple_baths: return integral return integral[0] ############################################################################### # Ensemble Mean # ############################################################################### class WelfordAggregator: __slots__ = ["n", "mean", "_m_2"] def __init__(self, first_value: np.ndarray): self.n = 1 self.mean = first_value self._m_2 = np.zeros_like(first_value) def update(self, new_value: np.ndarray): self.n += 1 delta = new_value - self.mean self.mean += delta / self.n delta2 = new_value - self.mean self._m_2 += np.abs(delta) * np.abs(delta2) @property def sample_variance(self) -> np.ndarray: return self._m_2 / (self.n - 1) @property def ensemble_variance(self) -> np.ndarray: return self.sample_variance / self.n @property def ensemble_std(self) -> np.ndarray: return np.sqrt(self.ensemble_variance) class JSONEncoder(json.JSONEncoder): """ A custom encoder to serialize objects occuring in :any:`ensemble_mean`. """ @singledispatchmethod def default(self, obj: Any): if hasattr(obj, "__bfkey__"): return f"<{type(obj)} ({obj.__bfkey__()})>" return obj.__dict__ if hasattr(obj, "__dict__") else "" @default.register def _(self, arr: np.ndarray): return {"type": "array", "value": arr.tolist()} @default.register def _(self, integer: np.int64): return int(integer) @default.register def _(self, obj: complex): return {"type": "complex", "re": obj.real, "im": obj.imag} def _object_hook(dct: dict[str, Any]): """A custom decoder for the types introduced in :any:`JSONEncoder`.""" if "type" in dct: type = dct["type"] if type == "array": return np.array(dct["value"]) if type == "complex": return dct["re"] + 1j * dct["im"] return dct def _grouper(n: int, iterable: Iterator[Any]): """Groups the iteartor into tuples of at most length ``n``.""" while True: chunk = tuple(itertools.islice(iterable, n)) if not chunk: return yield chunk @ray.remote def _ensemble_remote_function(function, chunk: tuple, index: int): res = np.array([np.array(function(arg)) for arg in chunk]) return res, index def ensemble_mean( arg_iter: Iterator[Any], function: Callable[..., np.ndarray], N: Optional[int] = None, every: Optional[int] = None, save: Optional[str] = None, overwrite_cache: bool = False, chunk_size: Optional[int] = None, in_flight: Optional[int] = None, gc_sleep: float = 0, ) -> EnsembleValue: results = [] path = None json_meta_info = json.dumps( dict( N=N, every=every, function_name=function.__name__, first_iterator_value="", ), cls=JSONEncoder, ensure_ascii=False, ).encode("utf-8") if save: key = hashlib.sha256(json_meta_info).hexdigest() path = Path("results") / Path( f"{save}_{function.__name__}_{N}_{every}_{key}.npy" ) if not overwrite_cache and path.exists(): logging.debug(f"Loading cache from: {path}") results = np.load(str(path), allow_pickle=True) return EnsembleValue([tuple(res) for res in results]) first_result = function(next(arg_iter)) aggregate = WelfordAggregator(first_result) if N == 1: return EnsembleValue([(1, aggregate.mean, np.zeros_like(aggregate.mean))]) if chunk_size is None: chunk_size = max(100000 // (first_result.size * first_result.itemsize), 1) logging.debug(f"Setting chunk size to {chunk_size}.") num_chunks = math.ceil((N - 1) / chunk_size) if N is not None else None chunk_iterator = iter( tqdm( zip( _grouper( chunk_size, itertools.islice(arg_iter, None, N - 1 if N else None), ), itertools.count(0), ), total=num_chunks, desc="Loading", ) ) finished = [] processing_refs = [] chunks = {} in_flight = in_flight or int(ray.available_resources().get("CPU", 0)) * 2 function_on_store = ray.put(function) highest_index = 0 while True: try: next_val = next(chunk_iterator) except StopIteration: next_val = None if len(processing_refs) > in_flight or not next_val: finished, processing_refs = ray.wait( processing_refs, num_returns=len(processing_refs) - in_flight if next_val is not None else len(processing_refs), fetch_local=True, ) has_downloaded = len(finished) > 0 for result in finished: res_chunk, idx = ray.get(result) # print( # res_chunk[0].size * len(res_chunk) * res_chunk[0].itemsize / 1024 / 1024 # ) chunks[idx] = res_chunk finished = [] if has_downloaded: while highest_index in chunks: next_chunk = chunks[highest_index] del chunks[highest_index] len_chunk = len(next_chunk) - 1 for i, res in enumerate(next_chunk): aggregate.update(res) if ( every is not None and (aggregate.n % every) == 0 or aggregate.n == N or (not next_val and not chunks and i == len_chunk) ): results.append( ( aggregate.n, aggregate.mean.copy(), aggregate.ensemble_std.copy(), ) ) highest_index += 1 if gc_sleep and gc_sleep > 0: gc.collect() time.sleep(gc_sleep) # wait for the ray store to catch on if next_val: chunk_ref = ray.put(next_val[0]) processing_refs.append( _ensemble_remote_function.remote( function_on_store, chunk_ref, next_val[1] ) ) else: break del ( chunk_iterator, function_on_store, finished, chunk_ref, chunks, processing_refs, ) if path: path.parent.mkdir(parents=True, exist_ok=True) logging.info(f"Writing cache to: {path}") with path.open("wb") as f: np.save(f, np.array(results, dtype="object")) with path.with_suffix(".json").open("wb") as f: f.write(json_meta_info) del aggregate return EnsembleValue(results) class BCFDist(scipy.stats.rv_continuous): """A distribution based on the absolute value of the BCF.""" def __init__(self, α: Callable[[np.ndarray], np.ndarray], **kwargs): super().__init__(**kwargs) self._α = α self._norm = scipy.integrate.quad( lambda t: np.abs(self._α(np.array([t]))), 0, np.inf ) def _pdf(self, x: np.ndarray): return np.abs(self._α(x)) / self._norm def fit_α( α: Callable[[np.ndarray], np.ndarray], n: int, t_max: float, support_points: int = 1000, with_cache: bool = True, ) -> tuple[np.ndarray, np.ndarray]: """ Fit the BCF ``α`` to a sum of ``n`` exponentials up to ``t_max`` using a number of ``support_points``. The fit result will be cached if ``with_cache`` is :any:`True`. """ max_sol = scipy.optimize.minimize( lambda t: -np.abs(α(np.array([t])))[0], [0], bounds=((0, np.inf),) ) max = -max_sol.fun t_at_max = max_sol.x[0] t_tail = scipy.optimize.newton( lambda t: np.abs(α(np.array([t]))[0]) - max / 100, t_at_max + 0.1, ) assert isinstance(t_tail, float), "Could not find tail time." # norm = scipy.integrate.quad(lambda t: np.abs(α(np.array([t]))), 0, t_tail)[0] # hit_prop = norm / (max * t_tail) # rng = np.random.default_rng(1) # ts = rng.random(size=int(support_points * 2 / 3 / hit_prop) + 1) * t_tail # ys = rng.random(size=len(ts)) * max # mask = ys < np.abs(α(ts)) # ts = ts[mask] ts = np.linspace(0, t_tail, int(support_points * 2 / 3) + 1) ts = np.append(ts, np.linspace(t_tail, t_max, int(support_points * 1 / 3))) ys = α(ts) data_key = hash((np.array([ts, ys]).data.tobytes(), n)) cache_path = Path(".cache") / "bcf_fit" / f"{data_key}.npy" logging.info(f"Looking up bcf fit at {cache_path}.") if with_cache and cache_path.exists(): logging.info(f"Loading bcf fit from {cache_path}.") w, g = np.load(cache_path) return w, g # ts = np.linspace(0, t_max, support_points) def residual(fit_params, x, data): resid = 0 w = np.array([fit_params[f"w{i}"] for i in range(n)]) + 1j * np.array( [fit_params[f"wi{i}"] for i in range(n)] ) g = np.array([fit_params[f"g{i}"] for i in range(n)]) + 1j * np.array( [fit_params[f"gi{i}"] for i in range(n)] ) resid = data - α_apprx(x, g, w) return resid.view(float) fit_params = Parameters() for i in range(n): fit_params.add(f"w{i}", value=0.1, min=0) fit_params.add(f"wi{i}", value=0.1) fit_params.add(f"g{i}", value=0.1) fit_params.add(f"gi{i}", value=0) # if i == n - 1: # expr_im = "0" # # expr_re = "0" # for j in range(n - 1): # expr_im += f"+gi{j}" # # expr_re += f"+g{j}" # fit_params.add(f"gi{i}", expr=f"-({expr_im})") # else: # fit_params.add(f"gi{i}", value=0) out = minimize(residual, fit_params, args=(ts, ys), method="least_squares") w = np.array([out.params[f"w{i}"] for i in range(n)]) + 1j * np.array( [out.params[f"wi{i}"] for i in range(n)] ) g = np.array([out.params[f"g{i}"] for i in range(n)]) + 1j * np.array( [out.params[f"gi{i}"] for i in range(n)] ) if with_cache: cache_path.parent.mkdir(parents=True, exist_ok=True) np.save(cache_path, (w, g)) return w, g def except_element(array: np.ndarray, index: int) -> np.ndarray: """Returns the ``array`` except the element with ``index``.""" mask = np.array([i != index for i in range(array.size)]) return array[mask] def poly_real(p: Polynomial) -> Polynomial: """Return the real part of ``p``.""" new = p.copy() new.coef = p.coef.real return new def uni_to_gauss(x: np.ndarray): """Transforms ``x`` into ``len(x)/2`` normal distributed numbers.""" n = len(x) // 2 phi = x[:n] * 2 * np.pi r = np.sqrt(-np.log(x[n:])) return r * np.exp(1j * phi) def expand_t(f): def wrapped(self, t): t = np.expand_dims(np.asarray(t), axis=0) return f(self, t) return wrapped def operator_expectation_from_data( data: HIData, op: Union[np.ndarray, DynamicMatrix], **kwargs, ) -> EnsembleValue: """Calculates the expectation value of a system observable ``op``. The ``kwargs`` is passed on to :any:`util.operator_expectation_ensemble`. :param data: The data instance that contains the trajectories. Does not have to be opened yet. :returns: the expectation value of the observable ``op`` for each time step """ with data as d: if "save" in kwargs: kwargs["save"] += "_" + data.get_hi_key_hash() return operator_expectation_ensemble( ψs=d.valid_sample_iterator(d.stoc_traj), op=op, t=d.get_time(), normalize=d.get_hi_key().HiP.nonlinear, **(dict(N=d.samples) | kwargs), )