From 9357f405599c89ea35f1e5f226aae08536ded047 Mon Sep 17 00:00:00 2001 From: Valentin Boettcher Date: Wed, 30 Nov 2022 12:28:31 -0500 Subject: [PATCH] add on the fly analyzer --- hiro_models/model_auxiliary.py | 35 +++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/hiro_models/model_auxiliary.py b/hiro_models/model_auxiliary.py index 995786c..5599491 100644 --- a/hiro_models/model_auxiliary.py +++ b/hiro_models/model_auxiliary.py @@ -20,6 +20,7 @@ import logging import copy import os import numpy as np +from multiprocessing import Process @contextmanager @@ -99,14 +100,25 @@ def integrate( clear_pd: bool = False, single_process: bool = False, stream_file: Optional[str] = None, + analyze: bool = False, + analyze_kwargs: Optional[dict] = None, ): """Integrate the hops equations for the model. A call to :any:`ray.init` may be required. :param n: The number of samples to be integrated. - :param clear_pd: Whether to clear the data file and redo the integration. + :param clear_pd: Whether to clear the data file and redo the + integration. :param single_process: Whether to integrate with a single process. + :param stream_file: The path to the fifo that the trajectories are + to be streamed to. + :param analyze: Whether to analyze the results streamed to the + ``stream_file`` using :any:`hopsflow`. + + Only applies when using the ``stream_file`` option. + + :param analyze_kwargs: Keyword arguments passed to :any:`hopsflow.util.ensemble_mean_online`. """ hash = model.hexhash @@ -122,11 +134,32 @@ def integrate( stream_file=stream_file, ) + analysis_process = None + if stream_file is not None and analyze: + if not os.path.exists(stream_file): + os.mkfifo(stream_file) + + if analyze_kwargs is None: + analyze_kwargs = dict() + + logging.info("Starting analysis process.") + analysis_process = Process( + target=lambda: model.all_energies_online( + stream_pipe=stream_file, **analyze_kwargs + ) + ) + + analysis_process.start() + logging.info(f"Started analysis process with pid {analysis_process.pid}.") + if single_process: supervisor.integrate_single_process(clear_pd) else: supervisor.integrate(clear_pd) + if analysis_process: + analysis_process.join() + with supervisor.get_data(True, stream=False) as data: with model_db(data_path) as db: db[hash] = {