diff --git a/hiro_models/model_auxiliary.py b/hiro_models/model_auxiliary.py index cd31be4..995786c 100644 --- a/hiro_models/model_auxiliary.py +++ b/hiro_models/model_auxiliary.py @@ -92,13 +92,21 @@ def integrate_multi(models: Sequence[Model], *args, **kwargs): integrate(model, *args, **kwargs) -def integrate(model: Model, n: int, data_path: str = "./.data", clear_pd: bool = False): +def integrate( + model: Model, + n: int, + data_path: str = "./.data", + clear_pd: bool = False, + single_process: bool = False, + stream_file: Optional[str] = None, +): """Integrate the hops equations for the model. A call to :any:`ray.init` may be required. :param n: The number of samples to be integrated. :param clear_pd: Whether to clear the data file and redo the integration. + :param single_process: Whether to integrate with a single process. """ hash = model.hexhash @@ -111,11 +119,15 @@ def integrate(model: Model, n: int, data_path: str = "./.data", clear_pd: bool = n, data_path=data_path, data_name=hash, + stream_file=stream_file, ) - supervisor.integrate(clear_pd) + if single_process: + supervisor.integrate_single_process(clear_pd) + else: + supervisor.integrate(clear_pd) - with supervisor.get_data(True) as data: + with supervisor.get_data(True, stream=False) as data: with model_db(data_path) as db: db[hash] = { "model_config": model.to_dict(), diff --git a/hiro_models/model_base.py b/hiro_models/model_base.py index 3242041..5f9e0a9 100644 --- a/hiro_models/model_base.py +++ b/hiro_models/model_base.py @@ -18,6 +18,7 @@ import hashlib import hops.core.hierarchy_parameters as params from collections.abc import Callable from datetime import datetime +import pickle @dataclass @@ -353,6 +354,123 @@ class Model(ABC): **kwargs, ) + def all_energies_online_from_cache( + self, + ) -> tuple[ + EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue + ]: + + names = ["flow", "interaction", "interaction_power", "system", "system_power"] + + hexhash = self.hexhash + results = [] + + for name in names: + results.append(hopsflow.util.load_online_cache(f"{name}_{hexhash}")) + + return tuple(results) + + def all_energies_online( + self, + stream_pipe: str = "results.fifo", + **kwargs, + ) -> Optional[ + tuple[EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue] + ]: + """Calculates the bath energy flow, the interaction energy, + the interaction power, the system energy and the system power + from the trajectories dumped into ``stream_pipe``. + + The ``kwargs`` are passed on to + :any:`hopsflow.util.ensemble_mean_online`. + + :returns: At tuple of :any:`hopsflow.util.EnsembleValue`. + """ + + flow_name = f"flow_{self.hexhash}" + flow_worker = hopsflow.hopsflow.make_heat_flow_worker( + self.hopsflow_system, self.hopsflow_therm(self.t) + ) + + interaction_name = f"interaction_{self.hexhash}" + interaction_worker = hopsflow.hopsflow.make_interaction_worker( + self.hopsflow_system, self.hopsflow_therm(self.t), power=False + ) + + interaction_power_name = f"interaction_power_{self.hexhash}" + interaction_power_worker = hopsflow.hopsflow.make_interaction_worker( + self.hopsflow_system, self.hopsflow_therm(self.t), power=True + ) + + system_name = f"system_{self.hexhash}" + system_worker = hopsflow.util.make_operator_expectation_task( + self.system, self.t, normalize=True, real=True + ) + + system_power_name = f"system_power_{self.hexhash}" + system_power_worker = hopsflow.util.make_operator_expectation_task( + self.system.derivative(), self.t, normalize=True, real=True + ) + + flow, interaction, interaction_power, system, system_power = ( + None, + None, + None, + None, + None, + ) + + with open(stream_pipe, "rb") as fifo: + while True: + try: + ( + _, + psi0, + aux_states, + _, + _, + rng_seed, + ) = pickle.load(fifo) + flow = hopsflow.util.ensemble_mean_online( + (psi0, aux_states, rng_seed), + flow_name, + flow_worker, + **kwargs, + ) + + interaction = hopsflow.util.ensemble_mean_online( + (psi0, aux_states, rng_seed), + interaction_name, + interaction_worker, + **kwargs, + ) + + interaction_power = hopsflow.util.ensemble_mean_online( + (psi0, aux_states, rng_seed), + interaction_power_name, + interaction_power_worker, + **kwargs, + ) + + system = hopsflow.util.ensemble_mean_online( + (psi0), + system_name, + system_worker, + **kwargs, + ) + + system_power = hopsflow.util.ensemble_mean_online( + (psi0), + system_power_name, + system_power_worker, + **kwargs, + ) + + except EOFError: + break + + return flow, interaction, interaction_power, system, system_power + def interaction_energy(self, data: HIData, **kwargs) -> EnsembleValue: """Calculates interaction energy from the hierarchy data ``data``. diff --git a/hiro_models/one_qubit_model.py b/hiro_models/one_qubit_model.py index c870b14..6005372 100644 --- a/hiro_models/one_qubit_model.py +++ b/hiro_models/one_qubit_model.py @@ -565,6 +565,12 @@ class QubitModelMutliBath(Model): The system hamiltonian :math:`H` with shape ``(2, 2)``. """ + streaming_mode: bool = False + """ + Whether to stream the trajectory to a fifo. When turned on, the + trajectories won't be saved to the data file. + """ + @property def coupling_operators(self) -> list[DynamicMatrix]: """The bath coupling operators :math:`L`.""" @@ -818,8 +824,11 @@ class QubitModelMutliBath(Model): seed=0, nonlinear=True, terminator=False, - result_type=params.ResultType.ZEROTH_AND_FIRST_ORDER, - accum_only=False, + result_type=params.ResultType.ZEROTH_ORDER_ONLY + if self.streaming_mode + else params.ResultType.ZEROTH_AND_FIRST_ORDER, + stream_result_type=params.ResultType.ZEROTH_AND_FIRST_ORDER, + accum_only=self.streaming_mode, rand_skip=None, truncation_scheme=trunc_scheme, save_therm_rng_seed=True,