diff --git a/hopsflow/hopsflow.py b/hopsflow/hopsflow.py index 7fdbfeb..9e0e5b4 100644 --- a/hopsflow/hopsflow.py +++ b/hopsflow/hopsflow.py @@ -18,6 +18,7 @@ from numpy.typing import NDArray import opt_einsum as oe from hops.core.hierarchy_data import HIData from hops.core.hierarchy_parameters import HIParams +from typing import Callable import copy ############################################################################### @@ -498,10 +499,37 @@ def heat_flow_ensemble( :returns: the value of the flow for each time step """ - if therm_args is None and only_therm: - raise ValueError("Can't calculate only thermal part if therm_args are None.") + flow_worker = make_heat_flow_worker( + params, therm_args[1] if therm_args else None, only_therm + ) - thermal = therm_args[1] if therm_args else None + return util.ensemble_mean( + iter(zip(ψ_0s, ψ_1s, therm_args[0])) + if therm_args + else iter(zip(ψ_0s, ψ_1s, itertools.repeat(0))), + flow_worker, + **kwargs, + ) + + +def make_heat_flow_worker( + params: SystemParams, + thermal: Optional[ThermalParams] = None, + only_therm: bool = False, +) -> Callable[..., np.ndarray]: + """Constructs a worker function that takes ``(first hierarchy + state, aux states, thermal process seed)`` and returns the flow. + + :param params: a parameter object for the system, see + :any:`SystemParams` + :param therm: the parameter object for the nonzero temperature, + see :any:`ThermalParams` + :param only_therm: whether to only calculate the thermal part of + the flow + """ + + if thermal is None and only_therm: + raise ValueError("Can't calculate only thermal part if therm_args are None.") def flow_worker(ψs: tuple[np.ndarray, np.ndarray, int]): ψ_0, ψ_1, seed = ψs @@ -519,13 +547,7 @@ def heat_flow_ensemble( return flow - return util.ensemble_mean( - iter(zip(ψ_0s, ψ_1s, therm_args[0])) - if therm_args - else iter(zip(ψ_0s, ψ_1s, itertools.repeat(0))), - flow_worker, - **kwargs, - ) + return flow_worker def heat_flow_from_data( @@ -563,6 +585,34 @@ def heat_flow_from_data( ) +def make_interaction_worker( + params: SystemParams, + thermal: Optional[ThermalParams] = None, + power: bool = False, +): + """ + :param params: a parameter object for the system, see + :any:`SystemParams` + :param thermal: the thermal object, see :any:`ThermalParams` + :param power: whether to calculate the interaction power + """ + + def interaction_energy_task( + ψs: Tuple[np.ndarray, np.ndarray, int], + ) -> np.ndarray: + ψ_0, ψ_1, seeds = ψs + run = HOPSRun(ψ_0, ψ_1, params, power) # type: ignore + energy = interaction_energy_coupling(run, params) # type: ignore + + if thermal is not None: + therm_run = ThermalRunParams(thermal, seeds) # type: ignore + energy += interaction_energy_therm(run, therm_run) + + return energy + + return interaction_energy_task + + def interaction_energy_ensemble( ψ_0s: Iterator[np.ndarray], ψ_1s: Iterator[np.ndarray], @@ -589,20 +639,7 @@ def interaction_energy_ensemble( thermal = therm_args[1] if therm_args else None - def interaction_energy_task( - ψs: Tuple[np.ndarray, np.ndarray, int], - ) -> np.ndarray: - ψ_0, ψ_1, seeds = ψs - - run = HOPSRun(ψ_0, ψ_1, params, power) # type: ignore - energy = interaction_energy_coupling(run, params) # type: ignore - - if thermal is not None: - therm_run = ThermalRunParams(thermal, seeds) # type: ignore - energy += interaction_energy_therm(run, therm_run) - - return energy - + interaction_energy_task = make_interaction_worker(params, thermal, power) return util.ensemble_mean( iter(zip(ψ_0s, ψ_1s, therm_args[0])) if therm_args diff --git a/hopsflow/util.py b/hopsflow/util.py index 6af1b7a..63107f3 100644 --- a/hopsflow/util.py +++ b/hopsflow/util.py @@ -29,6 +29,7 @@ import opt_einsum as oe import gc import math import time +import pickle from hops.core.hierarchy_data import HIData Aggregate = tuple[int, np.ndarray, np.ndarray] @@ -491,27 +492,18 @@ def operator_expectation( return expect -def operator_expectation_ensemble( - ψs: Iterator[np.ndarray], +def make_operator_expectation_task( op: Union[np.ndarray, DynamicMatrix], t: np.ndarray, normalize: bool = False, real: bool = False, - **kwargs, -) -> EnsembleValue: - """Calculates the expecation value of ``op`` as a time series. - - :param ψs: A collection of stochastic trajectories. Each - element should have the shape ``(time, dim-sys)``. - :param op: The operator. - :param N: Number of samples to take. - :param real: Whether to take the real part. - - All the other kwargs are passed on to :any:`ensemble_mean`. - - :returns: the expectation value +): """ + Returns a function that takes the first hierarchy states and + returns the expectation value of the operator op. + For the arguments see :any:`operator_expectation_ensemble`. + """ if isinstance(op, ConstantMatrix): op = op(0) @@ -543,6 +535,31 @@ def operator_expectation_ensemble( return sandwhiches + return op_exp_task + + +def operator_expectation_ensemble( + ψs: Iterator[np.ndarray], + op: Union[np.ndarray, DynamicMatrix], + t: np.ndarray, + normalize: bool = False, + real: bool = False, + **kwargs, +) -> EnsembleValue: + """Calculates the expecation value of ``op`` as a time series. + + :param ψs: A collection of stochastic trajectories. Each + element should have the shape ``(time, dim-sys)``. + :param op: The operator. + :param N: Number of samples to take. + :param real: Whether to take the real part. + + All the other kwargs are passed on to :any:`ensemble_mean`. + + :returns: the expectation value + """ + + op_exp_task = make_operator_expectation_task(op, t, normalize, real) return ensemble_mean(ψs, op_exp_task, **kwargs) @@ -642,6 +659,9 @@ class WelfordAggregator: @property def sample_variance(self) -> np.ndarray: + if self.n == 1: + return np.zeros_like(self.mean) + return self._m_2 / (self.n - 1) @property @@ -652,6 +672,10 @@ class WelfordAggregator: def ensemble_std(self) -> np.ndarray: return np.sqrt(self.ensemble_variance) + @property + def ensemble_value(self) -> EnsembleValue: + return EnsembleValue([(self.n, self.mean, self.ensemble_std)]) + class JSONEncoder(json.JSONEncoder): """ @@ -709,6 +733,52 @@ def _ensemble_remote_function(function, chunk: tuple, index: int): return res, index +def get_online_data_path(save: str): + return Path("results") / Path(f"online_{save}.npy") + + +def load_online_cache(save: str): + path = get_online_data_path(save) + + with path.open("rb") as agg_file: + aggregate = pickle.load(agg_file) + + return aggregate.ensemble_value + + +def ensemble_mean_online( + args: Any, + save: str, + function: Callable[..., np.ndarray], +) -> Optional[EnsembleValue]: + path = get_online_data_path(save) + + if args is None: + result = None + else: + result = function(args) + + if np.isnan(np.sum(result)): + result = None + + if path.exists(): + with path.open("rb") as agg_file: + aggregate = pickle.load(agg_file) + if result is not None: + aggregate.update(result) + + else: + if result is None: + raise RuntimeError("No cache and no result.") + + aggregate = WelfordAggregator(result) + + with path.open("wb") as agg_file: + pickle.dump(aggregate, agg_file) + + return aggregate.ensemble_value + + def ensemble_mean( arg_iter: Iterator[Any], function: Callable[..., np.ndarray],