support online data more conveniently

This commit is contained in:
Valentin Boettcher 2022-11-30 16:36:21 -05:00
parent feb1ec7c54
commit ca7e265aa3
No known key found for this signature in database
GPG key ID: E034E12B7AF56ACE
2 changed files with 133 additions and 79 deletions

View file

@ -101,6 +101,7 @@ def integrate(
single_process: bool = False,
stream_file: Optional[str] = None,
analyze: bool = False,
results_path: str = "results",
analyze_kwargs: Optional[dict] = None,
):
"""Integrate the hops equations for the model.
@ -145,7 +146,9 @@ def integrate(
logging.info("Starting analysis process.")
analysis_process = Process(
target=lambda: model.all_energies_online(
stream_pipe=stream_file, **analyze_kwargs
stream_pipe=stream_file,
results_directory=results_path,
**analyze_kwargs,
)
)
@ -162,11 +165,22 @@ def integrate(
with supervisor.get_data(True, stream=False) as data:
with model_db(data_path) as db:
db[hash] = {
dct = {
"model_config": model.to_dict(),
"data_path": str(Path(data.hdf5_name).relative_to(data_path)),
}
if analysis_process:
dct["analysis_files"] = {
"flow": model.online_flow_name,
"interaction": model.online_interaction_name,
"interaction_power": model.online_interaction_power_name,
"system": model.online_system_name,
"system_power": model.online_system_power_name,
}
db[hash] = dct
def get_data(
model: Model, data_path: str = "./.data", read_only: bool = True, **kwargs

View file

@ -13,12 +13,13 @@ from abc import ABC, abstractmethod
import qutip as qt
from hops.core.hierarchy_data import HIData
import hopsflow
from hopsflow.util import EnsembleValue
from hopsflow.util import EnsembleValue, WelfordAggregator
import hashlib
import hops.core.hierarchy_parameters as params
from collections.abc import Callable
from datetime import datetime
import pickle
import os
@dataclass
@ -283,9 +284,28 @@ class Model(ABC):
**kwargs,
)
def system_energy(self, data: HIData, **kwargs) -> EnsembleValue:
def try_get_online_data(self, path: str, results_path: str) -> EnsembleValue:
"""
Try to load the cached data from the online analysis with
filename ``path`` in the directory ``results_path``.
Raises a :any:`RuntimeError` if nothing is found.
"""
file_path = os.path.join(path, results_path)
if not os.path.exists(file_path):
raise RuntimeError(f"No data found under '{file_path}'.")
with open(file_path, "rb") as f:
res: WelfordAggregator = pickle.load(f)
return res.ensemble_value
def system_energy(
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> EnsembleValue:
"""Calculates the system energy from the hierarchy data
``data``.
``data`` or, if not supplied, tries to load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.ensemble_mean`.
@ -293,12 +313,18 @@ class Model(ABC):
:returns: See :any:`hopsflow.util.ensemble_mean`.
"""
if data is None:
return self.try_get_online_data(results_path, self.online_system_name)
operator = self.system
return self.system_expectation(data, operator, real=True, **kwargs)
def system_power(self, data: HIData, **kwargs) -> Optional[EnsembleValue]:
def system_power(
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> Optional[EnsembleValue]:
"""Calculates the power based on the time dependency of the
system hamiltonian from ``data``.
system hamiltonian from ``data`` or, if not supplied, tries to
load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.ensemble_mean`.
@ -307,6 +333,9 @@ class Model(ABC):
:any:`None` if the system is static.
"""
if data is None:
return self.try_get_online_data(results_path, self.online_system_power_name)
operator = self.system.derivative()
if (abs(operator(self.t)).sum() == 0).all():
@ -315,10 +344,11 @@ class Model(ABC):
return self.system_expectation(data, operator, real=True, **kwargs)
def energy_change_from_system_power(
self, data: HIData, **kwargs
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> Optional[EnsembleValue]:
"""Calculates the integrated system power from the hierarchy
data ``data``.
data ``data`` or, if not supplied, tries to load the online
results from ``results_path``.
The ``kwargs`` are passed on to :any:`system_power`.
@ -332,9 +362,11 @@ class Model(ABC):
return None
def bath_energy_flow(self, data: HIData, **kwargs) -> EnsembleValue:
def bath_energy_flow(
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> EnsembleValue:
"""Calculates the bath energy flow from the hierarchy data
``data``.
``data`` or, if not supplied, tries to load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.heat_flow_ensemble`.
@ -342,6 +374,9 @@ class Model(ABC):
:returns: See :any:`hopsflow.util.heat_flow_ensemble`.
"""
if data is None:
return self.try_get_online_data(results_path, self.online_flow_name)
N, kwargs = _get_N_kwargs(kwargs, data)
return hopsflow.hopsflow.heat_flow_ensemble(
@ -354,25 +389,40 @@ class Model(ABC):
**kwargs,
)
def all_energies_online_from_cache(
self,
) -> tuple[
EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue
]:
@property
def online_flow_name(self):
"""The filename where the online flow is saved."""
names = ["flow", "interaction", "interaction_power", "system", "system_power"]
return f"flow_{self.hexhash}.pickle"
hexhash = self.hexhash
results = []
@property
def online_interaction_name(self):
"""The filename where the online interaction is saved."""
for name in names:
results.append(hopsflow.util.load_online_cache(f"{name}_{hexhash}"))
return f"interaction_{self.hexhash}.pickle"
return tuple(results)
@property
def online_interaction_power_name(self):
"""The filename where the online interaction power is saved."""
return f"interaction_power_{self.hexhash}.pickle"
@property
def online_system_name(self):
"""The filename where the online system is saved."""
return f"system_{self.hexhash}.pickle"
@property
def online_system_power_name(self):
"""The filename where the online system power is saved."""
return f"system_power_{self.hexhash}.pickle"
def all_energies_online(
self,
stream_pipe: str = "results.fifo",
results_directory: str = "results",
**kwargs,
) -> Optional[
tuple[EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue, EnsembleValue]
@ -387,27 +437,22 @@ class Model(ABC):
:returns: At tuple of :any:`hopsflow.util.EnsembleValue`.
"""
flow_name = f"flow_{self.hexhash}"
flow_worker = hopsflow.hopsflow.make_heat_flow_worker(
self.hopsflow_system, self.hopsflow_therm(self.t)
)
interaction_name = f"interaction_{self.hexhash}"
interaction_worker = hopsflow.hopsflow.make_interaction_worker(
self.hopsflow_system, self.hopsflow_therm(self.t), power=False
)
interaction_power_name = f"interaction_power_{self.hexhash}"
interaction_power_worker = hopsflow.hopsflow.make_interaction_worker(
self.hopsflow_system, self.hopsflow_therm(self.t), power=True
)
system_name = f"system_{self.hexhash}"
system_worker = hopsflow.util.make_operator_expectation_task(
self.system, self.t, normalize=True, real=True
)
system_power_name = f"system_power_{self.hexhash}"
system_power_worker = hopsflow.util.make_operator_expectation_task(
self.system.derivative(), self.t, normalize=True, real=True
)
@ -433,7 +478,7 @@ class Model(ABC):
) = pickle.load(fifo)
flow = hopsflow.util.ensemble_mean_online(
(psi0, aux_states, rng_seed),
flow_name,
os.path.join(results_directory, self.online_flow_name),
flow_worker,
idx,
**kwargs,
@ -441,7 +486,7 @@ class Model(ABC):
interaction = hopsflow.util.ensemble_mean_online(
(psi0, aux_states, rng_seed),
interaction_name,
os.path.join(results_directory, self.online_interaction_name),
interaction_worker,
idx,
**kwargs,
@ -449,7 +494,9 @@ class Model(ABC):
interaction_power = hopsflow.util.ensemble_mean_online(
(psi0, aux_states, rng_seed),
interaction_power_name,
os.path.join(
results_directory, self.online_interaction_power_name
),
interaction_power_worker,
idx,
**kwargs,
@ -457,7 +504,7 @@ class Model(ABC):
system = hopsflow.util.ensemble_mean_online(
(psi0),
system_name,
os.path.join(results_directory, self.online_system_name),
system_worker,
idx,
**kwargs,
@ -465,7 +512,7 @@ class Model(ABC):
system_power = hopsflow.util.ensemble_mean_online(
(psi0),
system_power_name,
os.path.join(results_directory, self.online_system_power_name),
system_power_worker,
idx,
**kwargs,
@ -476,9 +523,11 @@ class Model(ABC):
return flow, interaction, interaction_power, system, system_power
def interaction_energy(self, data: HIData, **kwargs) -> EnsembleValue:
def interaction_energy(
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> EnsembleValue:
"""Calculates interaction energy from the hierarchy data
``data``.
``data`` or, if not supplied, tries to load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.interaction_energy_ensemble`.
@ -486,6 +535,9 @@ class Model(ABC):
:returns: See :any:`hopsflow.util.interaction_energy_ensemble`.
"""
if data is None:
return self.try_get_online_data(results_path, self.online_interaction_name)
N, kwargs = _get_N_kwargs(kwargs, data)
return hopsflow.hopsflow.interaction_energy_ensemble(
@ -498,9 +550,11 @@ class Model(ABC):
**kwargs,
)
def interaction_power(self, data: HIData, **kwargs) -> EnsembleValue:
def interaction_power(
self, data: Optional[HIData] = None, results_path: str = "results", **kwargs
) -> EnsembleValue:
"""Calculates interaction power from the hierarchy data
``data``.
``data`` or, if not supplied, tries to load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.interaction_energy_ensemble`.
@ -508,6 +562,11 @@ class Model(ABC):
:returns: See :any:`hopsflow.util.interaction_energy_ensemble`.
"""
if data is None:
return self.try_get_online_data(
results_path, self.online_interaction_power_name
)
N, kwargs = _get_N_kwargs(kwargs, data)
return hopsflow.hopsflow.interaction_energy_ensemble(
@ -522,58 +581,34 @@ class Model(ABC):
)
def energy_change_from_interaction_power(
self, data: HIData, **kwargs
self, data: Optional[HIData] = None, **kwargs
) -> EnsembleValue:
"""Calculates the integrated interaction power from the hierarchy data
``data``.
``data`` or, if not supplied, tries to load the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.util.energy_change_from_interaction_power`.
:returns: See :any:`hopsflow.util.energy_change_from_interaction_power`.
:any:`interaction_power`.
"""
N, kwargs = _get_N_kwargs(kwargs, data)
return self.interaction_power(data, **kwargs).integrate(self.t)
return hopsflow.hopsflow.energy_change_from_interaction_power(
np.array(data.time),
data.valid_sample_iterator(data.stoc_traj), # type: ignore
data.valid_sample_iterator(data.aux_states), # type: ignore
self.hopsflow_system,
(data.valid_sample_iterator(data.rng_seed), self.hopsflow_therm(data.time[:])), # type: ignore
N=N,
save=f"interaction_power_{self.hexhash}", # under the hood the power is used
**kwargs,
)
def bath_energy(self, data: HIData, **kwargs) -> EnsembleValue:
def bath_energy(self, data: Optional[HIData], **kwargs) -> EnsembleValue:
"""Calculates bath energy by integrating the bath energy flow
calculated from the ``data``.
calculated from the ``data`` or, if not supplied, tries to load
the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.bath_energy_from_flow`.
:returns: See :any:`hopsflow.bath_energy_from_flow`.
:any:`bath_energy_flow`.
"""
N, kwargs = _get_N_kwargs(kwargs, data)
return hopsflow.hopsflow.bath_energy_from_flow(
np.array(data.time),
data.valid_sample_iterator(data.stoc_traj), # type: ignore
data.valid_sample_iterator(data.aux_states), # type: ignore
self.hopsflow_system,
(data.valid_sample_iterator(data.rng_seed), self.hopsflow_therm(data.time[:])), # type: ignore
save=f"flow_{self.hexhash}", # under the hood the flow is used
N=N,
**kwargs,
)
return -1 * self.bath_energy_flow(data, **kwargs).integrate(self.t)
def interaction_energy_from_conservation(
self, data: HIData, **kwargs
self, data: Optional[HIData] = None, **kwargs
) -> EnsembleValue:
"""Calculates the interaction energy from energy conservations
calculated from the ``data``.
calculated from the ``data`` or, if not supplied, tries to load
the online results from ``results_path``.
The ``kwargs`` are passed on to
:any:`hopsflow.bath_energy_from_flow`.
@ -587,9 +622,10 @@ class Model(ABC):
return total - (system + bath)
def total_energy(self, data: HIData, **kwargs) -> EnsembleValue:
"""Calculates the total energy from the trajectories in
``data`` using energy bilance.
def total_energy(self, data: Optional[HIData] = None, **kwargs) -> EnsembleValue:
"""Calculates the total energy from the trajectories using
energy bilance in ``data`` or, if not supplied, tries to load
the online results from ``results_path``
The ``kwargs`` are passed on to :any:`bath_energy`,
:any:`system_energy` and :any:`interaction_energy`.
@ -605,9 +641,10 @@ class Model(ABC):
return total
def total_power(self, data: HIData, **kwargs) -> EnsembleValue:
def total_power(self, data: Optional[HIData] = None, **kwargs) -> EnsembleValue:
"""Calculates the total power from the trajectories in
``data``.
``data`` or, if not supplied, tries to load
the online results from ``results_path``.
The ``kwargs`` are passed on to :any:`system_power` and
:any:`interaction_power`.
@ -623,9 +660,12 @@ class Model(ABC):
return power
def total_energy_from_power(self, data: HIData, **kwargs) -> EnsembleValue:
def total_energy_from_power(
self, data: Optional[HIData] = None, **kwargs
) -> EnsembleValue:
"""Calculates the total energy from the trajectories in
``data`` using the integrated power.
``data``or, if not supplied, tries to load
the online results from ``results_path`` using the integrated power.
The ``kwargs`` are passed on to :any:`total_power`.