mirror of
https://github.com/vale981/master-thesis
synced 2025-03-06 18:41:38 -05:00
131 lines
3.7 KiB
Python
131 lines
3.7 KiB
Python
|
from types import ModuleType
|
|||
|
from typing import Callable, Tuple, Union, Iterator
|
|||
|
from lmfit import minimize, Parameters
|
|||
|
import matplotlib.pyplot as plt
|
|||
|
import matplotlib
|
|||
|
import numpy as np
|
|||
|
from numpy.polynomial import Polynomial
|
|||
|
from contextlib import contextmanager
|
|||
|
from pathlib import Path
|
|||
|
import h5py
|
|||
|
from hopsflow import hopsflow
|
|||
|
|
|||
|
|
|||
|
# def get_n_samples(config: ModuleType) -> int:
|
|||
|
# """Get the number of samples from ``stg``."""
|
|||
|
# with stg_helper.get_hierarchy_data(stg, read_only=True) as hd:
|
|||
|
# samp = hd.get_samples()
|
|||
|
# return samp if isinstance(samp, int) else 0
|
|||
|
|
|||
|
|
|||
|
# def has_all_samples(stg: ModuleType) -> bool:
|
|||
|
# return stg.__HI_number_of_samples == get_n_samples(stg)
|
|||
|
|
|||
|
|
|||
|
# def has_all_samples_checker(stg: ModuleType) -> Tuple[str, Callable[..., bool]]:
|
|||
|
# return "Has all samples?", lambda _: has_all_samples(stg)
|
|||
|
|
|||
|
|
|||
|
# def hopsflow_systemparams(stg: ModuleType):
|
|||
|
# system_params = stg_helper.get_system_param(stg)
|
|||
|
# return hopsflow.SystemParams(
|
|||
|
# system_params.L.todense(), stg.__g, stg.__w, stg.__bcf_scale, stg.__HI_nonlinear
|
|||
|
# )
|
|||
|
|
|||
|
|
|||
|
# def hopsflow_thermparams(stg: ModuleType, τ: np.ndarray):
|
|||
|
# ξ = stg_helper.get_eta_therm(stg)
|
|||
|
# ξ.calc_deriv = True
|
|||
|
|
|||
|
# return hopsflow.ThermalParams(
|
|||
|
# ξ=ξ,
|
|||
|
# τ=τ,
|
|||
|
# num_deriv=False,
|
|||
|
# rand_skip=stg.__HI_rand_skip if hasattr(stg, "__HI_rand_skip") else 0,
|
|||
|
# )
|
|||
|
|
|||
|
|
|||
|
def peruse_hierarchy_files(base: str) -> Iterator[h5py.File]:
|
|||
|
p = Path(base)
|
|||
|
for i in p.glob("*/*.h5"):
|
|||
|
f = h5py.File(i, "r")
|
|||
|
yield f
|
|||
|
f.close()
|
|||
|
|
|||
|
|
|||
|
def α_apprx(τ, g, w):
|
|||
|
return np.sum(
|
|||
|
g[np.newaxis, :] * np.exp(-w[np.newaxis, :] * (τ[:, np.newaxis])), axis=1
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
def fit_α(
|
|||
|
α: Callable[[np.ndarray], np.ndarray],
|
|||
|
n: int,
|
|||
|
t_max: float,
|
|||
|
support_points: Union[int, np.ndarray] = 1000,
|
|||
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|||
|
"""
|
|||
|
Fit the BCF ``α`` to a sum of ``n`` exponentials up to
|
|||
|
``t_max`` using a number of ``support_points``.
|
|||
|
"""
|
|||
|
|
|||
|
def residual(fit_params, x, data):
|
|||
|
resid = 0
|
|||
|
w = np.array([fit_params[f"w{i}"] for i in range(n)]) + 1j * np.array(
|
|||
|
[fit_params[f"wi{i}"] for i in range(n)]
|
|||
|
)
|
|||
|
g = np.array([fit_params[f"g{i}"] for i in range(n)]) + 1j * np.array(
|
|||
|
[fit_params[f"gi{i}"] for i in range(n)]
|
|||
|
)
|
|||
|
resid = data - α_apprx(x, g, w)
|
|||
|
|
|||
|
return resid.view(float)
|
|||
|
|
|||
|
fit_params = Parameters()
|
|||
|
for i in range(n):
|
|||
|
fit_params.add(f"g{i}", value=0.1)
|
|||
|
fit_params.add(f"gi{i}", value=0.1)
|
|||
|
fit_params.add(f"w{i}", value=0.1)
|
|||
|
fit_params.add(f"wi{i}", value=0.1)
|
|||
|
|
|||
|
ts = np.asarray(support_points)
|
|||
|
if ts.size < 2:
|
|||
|
ts = np.linspace(0, t_max, support_points)
|
|||
|
|
|||
|
out = minimize(residual, fit_params, args=(ts, α(ts)))
|
|||
|
|
|||
|
w = np.array([out.params[f"w{i}"] for i in range(n)]) + 1j * np.array(
|
|||
|
[out.params[f"wi{i}"] for i in range(n)]
|
|||
|
)
|
|||
|
g = np.array([out.params[f"g{i}"] for i in range(n)]) + 1j * np.array(
|
|||
|
[out.params[f"gi{i}"] for i in range(n)]
|
|||
|
)
|
|||
|
|
|||
|
return w, g
|
|||
|
|
|||
|
|
|||
|
###############################################################################
|
|||
|
# Numpy Hacks #
|
|||
|
###############################################################################
|
|||
|
|
|||
|
|
|||
|
def e_i(i: int, size: int) -> np.ndarray:
|
|||
|
r"""Cartesian base vector :math:`e_i`."""
|
|||
|
|
|||
|
vec = np.zeros(size)
|
|||
|
vec[i] = 1
|
|||
|
return vec
|
|||
|
|
|||
|
|
|||
|
def except_element(array: np.ndarray, index: int) -> np.ndarray:
|
|||
|
mask = [i != index for i in range(array.size)]
|
|||
|
return array[mask]
|
|||
|
|
|||
|
|
|||
|
def poly_real(p: Polynomial) -> Polynomial:
|
|||
|
"""Return the real part of ``p``."""
|
|||
|
new = p.copy()
|
|||
|
new.coef = p.coef.real
|
|||
|
return new
|