add on the fly analyzer

This commit is contained in:
Valentin Boettcher 2022-11-30 12:28:31 -05:00
parent 1d9603b962
commit 9357f40559
No known key found for this signature in database
GPG key ID: E034E12B7AF56ACE

View file

@ -20,6 +20,7 @@ import logging
import copy
import os
import numpy as np
from multiprocessing import Process
@contextmanager
@ -99,14 +100,25 @@ def integrate(
clear_pd: bool = False,
single_process: bool = False,
stream_file: Optional[str] = None,
analyze: bool = False,
analyze_kwargs: Optional[dict] = None,
):
"""Integrate the hops equations for the model.
A call to :any:`ray.init` may be required.
:param n: The number of samples to be integrated.
:param clear_pd: Whether to clear the data file and redo the integration.
:param clear_pd: Whether to clear the data file and redo the
integration.
:param single_process: Whether to integrate with a single process.
:param stream_file: The path to the fifo that the trajectories are
to be streamed to.
:param analyze: Whether to analyze the results streamed to the
``stream_file`` using :any:`hopsflow`.
Only applies when using the ``stream_file`` option.
:param analyze_kwargs: Keyword arguments passed to :any:`hopsflow.util.ensemble_mean_online`.
"""
hash = model.hexhash
@ -122,11 +134,32 @@ def integrate(
stream_file=stream_file,
)
analysis_process = None
if stream_file is not None and analyze:
if not os.path.exists(stream_file):
os.mkfifo(stream_file)
if analyze_kwargs is None:
analyze_kwargs = dict()
logging.info("Starting analysis process.")
analysis_process = Process(
target=lambda: model.all_energies_online(
stream_pipe=stream_file, **analyze_kwargs
)
)
analysis_process.start()
logging.info(f"Started analysis process with pid {analysis_process.pid}.")
if single_process:
supervisor.integrate_single_process(clear_pd)
else:
supervisor.integrate(clear_pd)
if analysis_process:
analysis_process.join()
with supervisor.get_data(True, stream=False) as data:
with model_db(data_path) as db:
db[hash] = {