clean up calibration plot code

This commit is contained in:
Valentin Boettcher 2024-08-21 10:59:22 -04:00
parent 1f45ebb196
commit 79b42021a5
5 changed files with 365 additions and 255 deletions

View file

@ -1,89 +1,62 @@
"""A demonstration of the ringdown spectroscopy protocol."""
from rabifun.system import *
from rabifun.plots import *
from rabifun.utilities import *
from ringfit.utils import WelfordAggregator
from rabifun.analysis import *
from ringfit.data import ScanData
import functools
import multiprocessing
import copy
from scipy.ndimage import rotate
# %% interactive
class WelfordAggregator:
"""A class to aggregate values using the Welford algorithm.
def solve_shot(
params: Params, t: np.ndarray, t_before: np.ndarray, t_after: np.ndarray
):
"""A worker function to solve for the time evolution in separate processes.
The Welford algorithm is an online algorithm to calculate the mean
and variance of a series of values.
The aggregator keeps track of the number of samples the mean and
the variance. Aggregation of identical values is prevented by
checking the sample index. Tracking can be disabled by setting
the initial index to ``None``.
See also the `Wikipedia article
<https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm>`_.
:param first_value: The first value to aggregate.
:param params: The parameters of the system.
:param t: The time axis.
:param t_before: The time axis before the EOM is switched off.
:param t_after: The time axis after the EOM is switched off.
"""
__slots__ = ["n", "mean", "_m_2"]
def __init__(self, first_value: np.ndarray):
self.n = 1
self.mean = first_value
self._m_2 = np.zeros_like(first_value)
def update(self, new_value: np.ndarray):
"""Updates the aggregator with a new value."""
self.n += 1
delta = new_value - self.mean
self.mean += delta / self.n
delta2 = new_value - self.mean
self._m_2 += np.abs(delta) * np.abs(delta2)
@property
def sample_variance(self) -> np.ndarray:
"""
The empirical sample variance. (:math:`\sqrt{N-1}`
normalization.)
"""
if self.n == 1:
return np.zeros_like(self.mean)
return self._m_2 / (self.n - 1)
@property
def ensemble_variance(self) -> np.ndarray:
"""The ensemble variance."""
return self.sample_variance / self.n
@property
def ensemble_std(self) -> np.ndarray:
"""The ensemble standard deviation."""
return np.sqrt(self.ensemble_variance)
def solve_shot(t, params, t_before, t_after):
solution = solve(t, params)
amps = solution.y[::, len(t_before) - 1 :]
return t_after, amps
def make_shots(params, total_lifetimes, eom_range, eom_steps, num_freq):
def make_shots(
params: Params,
total_lifetimes: float,
eom_range: tuple[float, float],
eom_steps: int,
num_freq: int = 1,
):
"""Generate a series of shots with varying EOM frequencies.
The implementation here slightly varies the off time of the laser
so as to introduce some random relative phases of the modes.
:param params: The parameters of the system.
:param total_lifetimes: The total time of the experiment in
lifetimes.
:param eom_range: The range of EOM frequencies in units of
:any:`params.Ω`.
:param eom_steps: The number of steps in the EOM frequency range.
:param num_freq: The number of frequencies to drive. If a number
greater than 1 is given, the EOM will be driven at multiple
frequencies where the highest frequency is the base frequency
plus an consecutive integer multiples of :any:`params.Ω`.
"""
solutions = []
analyze_time = params.lifetimes(total_lifetimes) - params.laser_off_time
t_after = time_axis(params, total_time=analyze_time, resolution=0.01)
pool = multiprocessing.Pool()
shot_params = []
rng = np.random.default_rng(seed=0)
off_time = params.laser_off_time or 0
analyze_time = params.lifetimes(total_lifetimes) - off_time
t_after = time_axis(params, total_time=analyze_time, resolution=0.01)
for step in range(eom_steps):
base = params.Ω * (
@ -97,43 +70,59 @@ def make_shots(params, total_lifetimes, eom_range, eom_steps, num_freq):
)
current_params.drive_phases = rng.uniform(0, 2 * np.pi, size=num_freq)
off_time = rng.normal(params.laser_off_time, 0.1 * params.laser_off_time)
off_time = rng.normal(off_time, 0.1 * params.laser_off_time)
params.laser_off_time
current_params.laser_off_time = off_time
current_params.laser_off_time = None # off_time
current_params.drive_off_time = off_time
current_params.total_lifetimes = (off_time + analyze_time) / params.lifetimes(1)
t_before = time_axis(params, total_time=off_time, resolution=0.01)
t = np.concatenate([t_before[:-1], t_after + t_before[-1]])
shot_params.append((current_params, t, t_before, t_after))
shot_params.append((t, current_params, t_before, t_after))
pool = multiprocessing.Pool()
solutions = pool.starmap(solve_shot, shot_params)
return solutions
def process_shots(solutions, noise_amplitude, params):
def process_shots(
params: Params,
solutions: list[tuple[np.ndarray, np.ndarray]],
noise_amplitude: float,
num_freq: int,
):
"""
Calculates the normalized average Fourier power spectrum of a
series of experimental (simulated) shots.
:param params: The parameters of the system.
:param solutions: A list of solutions to process returned by
:any:`solve_shot`.
:param noise_amplitude: The amplitude of the noise to add to the
signal.
The amplitude is normalized by 2/η which is roughly the steady
state signal amplitude if a bath mode is excited resonantly by
a unit-strength input.
:param num_freq: The number of frequencies to drive. See
:any:`make_shots` for details.
"""
rng = np.random.default_rng(seed=0)
# let us get a measure calibrate the noise strength
signals = []
for t, amps in solutions:
signal = output_signal(t, amps, params)
signals.append((t, signal))
noise_amplitude *= 2 * 2 * np.pi / params.η
noise_amplitude /= params.η * np.pi
aggregate = None
for t, signal in signals:
for t, amps in solutions:
signal = output_signal(t, amps, params)
signal += rng.normal(scale=noise_amplitude, size=len(signal))
window = (0, t[-1])
freq, fft = fourier_transform(
t,
signal,
low_cutoff=0.1 * params.Ω,
high_cutoff=params.Ω * 5,
low_cutoff=0.3 * params.Ω,
high_cutoff=params.Ω * (1 + num_freq),
window=window,
)
@ -145,87 +134,128 @@ def process_shots(solutions, noise_amplitude, params):
else:
aggregate.update(power)
assert aggregate is not None # appease pyright
max_power = np.max(aggregate.mean)
return (freq, aggregate.mean / max_power, aggregate.ensemble_std / max_power)
def plot_power_spectrum(
ax_spectrum, freq, average_power_spectrum, σ_power_spectrum, params, annotate=True
def process_and_plot_results(
params: Params,
ax: plt.Axes,
freq: np.ndarray,
average_power_spectrum: np.ndarray,
σ_power_spectrum: np.ndarray,
annotate: bool = True,
):
# ax_spectrum.plot(freq, average_power_spectrum)
runtime = RuntimeParams(params)
"""
Fits the ringdown spectrum and plots the results.
:param params: The parameters of the system.
:param ax: The axis to plot on.
:param freq: The frequency array.
:param average_power_spectrum: The average power spectrum obtained from :any:`process_shots`.
:param σ_power_spectrum: The standard deviation of the power
spectrum.
:param annotate: Whether to annotate the plot with peak and mode positions.
"""
ringdown_params = RingdownParams(
fω_shift=params.measurement_detuning,
mode_window=(4, 4),
mode_window=(params.N, params.N),
fΩ_guess=params.Ω,
fδ_guess=params.Ω * params.δ,
η_guess=0.5,
absolute_low_cutoff=0.1 * params.Ω,
absolute_low_cutoff=0.3 * params.Ω,
)
peak_info = find_peaks(
freq, average_power_spectrum, ringdown_params, prominence=0.05, height=0.1
freq,
average_power_spectrum,
ringdown_params,
prominence=0.05,
height=0.1,
σ_power=σ_power_spectrum,
)
peak_info, lm_result = refine_peaks(
peak_info, ringdown_params, height_cutoff=0.05, σ=σ_power_spectrum
)
peak_info = refine_peaks(peak_info, ringdown_params, height_cutoff=0.05)
peak_info.power = average_power_spectrum
plot_spectrum_and_peak_info(
ax_spectrum, peak_info, ringdown_params, annotate=annotate
)
if lm_result is not None:
# print(lm_result.fit_report())
plot_spectrum_and_peak_info(ax, peak_info, annotate=annotate)
if peak_info.lm_result is not None:
fine_freq = np.linspace(freq.min(), freq.max(), 5000)
fine_fit = lm_result.eval(ω=fine_freq)
ax_spectrum.plot(fine_freq, fine_fit, color="red")
ax_spectrum.set_ylim(-0.1, max(1, fine_fit.max() * 1.1))
fine_fit = peak_info.lm_result.eval(ω=fine_freq)
ax.plot(fine_freq, fine_fit - peak_info.noise_floor, color="C3", zorder=-100)
ax.set_ylim(-0.1, max(1, fine_fit.max() * 1.1))
print(runtime.Ωs.real / (2 * np.pi))
ax.set_xlabel("Frequency (MHz)")
for i, peak_freq in enumerate(runtime.Ωs):
pos = np.abs(
params.measurement_detuning
- peak_freq.real / (2 * np.pi)
+ params.δ * params.Ω
+ params.laser_detuning,
)
ax_spectrum.axvline(
pos,
color="black",
alpha=0.5,
linestyle="--",
zorder=-100,
)
ax_spectrum.axvspan(
pos - peak_freq.imag / (2 * np.pi),
pos + peak_freq.imag / (2 * np.pi),
color="black",
alpha=0.05,
linestyle="--",
zorder=-100,
)
if annotate:
annotate_ringodown_mode_positions(params, ax)
def generate_data(
Ω=13,
η=0.2,
g_0=0.5,
η_factor=5,
noise_amplitude=0.3,
laser_detuning=0,
laser_on_mode=0,
N=10,
eom_ranges=(0.5, 2.5),
eom_steps=20,
small_loop_detuning=0,
excitation_lifetimes=2,
measurement_lifetimes=4,
num_freq=3,
extra_title="",
):
η = 0.2
Ω = 13
"""Simulate and plot the ringdown spectroscopy protocol.
The idea is to have the laser on ``laser_on_mode`` and to sweep
the EOM frequency over a range of values given in ``eom_ranges``
in ``eom_steps`` steps. For each step, the laser and EOM are
inputting into the system for a time given by
``excitation_lifetimes``. Then, the ringdown signal is collected
for a time given by ``measurement_lifetimes``. (Lifetime units
are given by ``η``.) The resulting power spectra are averaged and
then fitted.
:param Ω: The FSR of the system.
:param η: The decay rate of the system.
:param g_0: The coupling strength of the system in units of
:any:`Ω`. Note that the effective coupling strength between
the ``A`` site and the bath modes is reduced by a factor of
:math:`\sqrt{2}`.
:param η_factor: The factor by which the decay rate of the A site
is greater.
:param noise_amplitude: The amplitude of the noise to add to the
signal. See :any:`process_shots` for details.
:param laser_detuning: The detuning of the laser from the the mode
it is exciting.
:param laser_on_mode: The mode that the laser is exciting.
:param N: The number of bath modes.
:param eom_ranges: The range of EOM frequencies in units of
:any:`Ω`.
:param eom_steps: The number of steps in the EOM frequency range.
:param excitation_lifetimes: The time the EOM is driving the
system.
:param measurement_lifetimes: The time the system is left to ring
down.
Note that the laser is not turned off during the ringdown.
:param num_freq: The number of frequencies to drive. See
:any:`make_shots` for details.
:param extra_title: A string to add to the title of the plot.
:returns: The figure containing the plot.
"""
final_laser_detuning = laser_detuning + (
0 if laser_on_mode == 0 else (laser_on_mode - 1 / 4) * Ω
)
params = Params(
η=η,
@ -233,8 +263,8 @@ def generate_data(
Ω=Ω,
δ=1 / 4,
ω_c=0.1,
g_0=g_0,
laser_detuning=laser_detuning,
g_0=g_0 * num_freq, # as it would be normalized otherwise
laser_detuning=final_laser_detuning,
N=N,
N_couplings=N,
measurement_detuning=0,
@ -242,8 +272,8 @@ def generate_data(
rwa=False,
flat_energies=False,
correct_lamb_shift=0,
laser_off_time=0,
small_loop_detuning=small_loop_detuning,
laser_off_time=None,
small_loop_detuning=0,
drive_override=(np.array([]), np.array([])),
)
@ -258,84 +288,42 @@ def generate_data(
num_freq,
)
(sol_on_res) = make_shots(
params,
excitation_lifetimes + measurement_lifetimes,
((1 + params.δ), (1 + params.δ)),
1,
num_freq,
)
(sol_on_res_bath) = make_shots(
params,
excitation_lifetimes + measurement_lifetimes,
((1 + params.δ * 1.1), (1 + params.δ * 1.1)),
1,
num_freq,
)
freq, average_power_spectrum, σ_power_spectrum = process_shots(
solutions, noise_amplitude, params
)
_, spectrum_on_resonance, σ_power_spectrum_on_resonance = process_shots(
sol_on_res, noise_amplitude, params
)
_, spectrum_on_resonance_bath, σ_power_spectrum_on_resonance_bath = process_shots(
sol_on_res_bath, noise_amplitude, params
params,
solutions,
noise_amplitude,
num_freq,
)
fig = make_figure()
fig = make_figure(extra_title, figsize=(10, 6))
fig.clear()
ax = fig.subplots()
fig.suptitle(f"""
Spectroscopy Protocol V2
Ω/2π = {params.Ω}MHz, η/2π = {params.η}MHz, g_0 = {params.g_0}Ω, N = {params.N}
noise amplitude = {noise_amplitude} * 2/η, η_A = {η_factor} x η, EOM stepped from {eom_ranges[0]:.2f}Ω to {eom_ranges[1]:.2f}Ω in {eom_steps} steps
total time = {(excitation_lifetimes + measurement_lifetimes) * eom_steps / (params.η * 1e6)}s
""")
ax_multi, ax_single, ax_single_bath = fig.subplot_mosaic("AA\nBC").values()
plot_power_spectrum(
ax_multi, freq, average_power_spectrum, σ_power_spectrum, params
process_and_plot_results(params, ax, freq, average_power_spectrum, σ_power_spectrum)
ax.text(
0.01,
0.95,
f"""$Ω/2π = {params.Ω}$MHz
$η/2π = {params.η}MHz$
$g_0 = {params.g_0}Ω$
$N = {params.N}$
noise = ${noise_amplitude * 2}$
$η_A = {η_factor}η$
EOM range = {eom_ranges[0]:.2f}Ω to {eom_ranges[1]:.2f}Ω
EOM steps = {eom_steps}
excitation time = {excitation_lifetimes} lifetimes
measurement time = {measurement_lifetimes} lifetimes
on mode = {laser_on_mode}
laser detuning = {laser_detuning}
num freq = {num_freq}
total time = {(excitation_lifetimes + measurement_lifetimes) * eom_steps / (params.η * 1e6)}s""",
transform=ax.transAxes,
ha="left",
va="top",
size=10,
bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.5),
)
plot_power_spectrum(
ax_single,
freq,
spectrum_on_resonance,
σ_power_spectrum_on_resonance,
params,
annotate=False,
)
plot_power_spectrum(
ax_single_bath,
freq,
spectrum_on_resonance_bath,
σ_power_spectrum_on_resonance_bath,
params,
annotate=False,
)
runtime = RuntimeParams(params)
for ax in [ax_multi, ax_single, ax_single_bath]:
ax.set_xlabel("Frequency (MHz)")
ax.sharex(ax_multi)
ax.sharey(ax_multi)
ax_ticks = ax.twiny()
ax_ticks.sharey(ax)
ax_ticks.set_xticks(runtime.ringdown_frequencies)
ax_ticks.set_xticklabels(
[mode_name(i, params.N) for i in range(2 * params.N + 2)]
)
ax_ticks.plot(freq, np.zeros_like(freq), alpha=0)
ax_ticks.set_xlim(ax.get_xlim())
ax_multi.set_title("Averaged Power Spectrum")
ax_single.set_title("Single-shot, No detuning")
ax_single_bath.set_title("Single-shot, EOM 10% detuned")
# ax_spectrum.set_yscale(yscale)
ax.set_title(extra_title)
fig.tight_layout()
return fig
@ -344,15 +332,31 @@ def generate_data(
# %% save
if __name__ == "__main__":
fig = generate_data(
g_0=0.5,
g_0=0.2,
η_factor=5,
noise_amplitude=5e-3,
noise_amplitude=0.3,
N=5,
eom_ranges=(0.7, 0.9), # (1.9, 2.1),
eom_ranges=(0.7, 0.9),
eom_steps=100,
small_loop_detuning=0,
laser_detuning=0,
excitation_lifetimes=1,
measurement_lifetimes=3,
laser_on_mode=0,
excitation_lifetimes=2,
measurement_lifetimes=4,
num_freq=4,
extra_title="Laser on A site",
)
fig = generate_data(
g_0=0.2,
η_factor=5,
noise_amplitude=0.3,
N=5,
eom_ranges=(1.2, 1.3),
eom_steps=100,
laser_detuning=0,
laser_on_mode=-1,
excitation_lifetimes=2,
measurement_lifetimes=4,
num_freq=1,
extra_title="Laser on Bath Mode",
)

View file

@ -121,7 +121,7 @@ class RingdownPeakData:
"""The fft frequency array."""
power: np.ndarray
"""The power spectrum of the fft."""
"""The normalized power spectrum of the fft."""
peaks: np.ndarray
"""The indices of the peaks."""
@ -132,6 +132,9 @@ class RingdownPeakData:
peak_info: dict
"""The information from :any:`scipy.signal.find_peaks`."""
σ_power: np.ndarray | None = None
"""The standard deviation of the power spectrum."""
peak_widths: np.ndarray | None = None
"""
The widths of the peaks.
@ -146,10 +149,23 @@ class RingdownPeakData:
lorentz_params: list | None = None
"""The lorentzian fit params to be fed into :any:`lorentzian`."""
lm_result: lmfit.model.ModelResult | None = None
"""The fit result from :any:`lmfit`."""
noise_floor: float = 0
"""The noise floor of the spectrum."""
@property
def is_refined(self) -> bool:
"""Whether the peaks have been refined with :any:`refine_peaks`."""
return self.peak_widths is not None
return self.lm_result is not None
def __post_init__(self):
norm = np.max(self.power)
self.power /= norm
if self.σ_power is not None:
self.σ_power /= norm
def find_peaks(
@ -158,6 +174,7 @@ def find_peaks(
params: RingdownParams,
prominence: float = 0.005,
height: float = 0.1,
σ_power: np.ndarray | None = None,
) -> RingdownPeakData:
"""Determine the peaks of the power spectrum of the
ringdown data.
@ -168,7 +185,7 @@ def find_peaks(
:param prominence: The prominence (vertical distance of peak from
surrounding valleys) of the peaks.
:param height: The minimum height of the peaks.
:param σ_power: The standard deviation of the power spectrum.
"""
freq_step = freq[1] - freq[0]
@ -195,6 +212,7 @@ def find_peaks(
peak_freqs=peak_freqs,
peak_info=peak_info,
power=power_spectrum,
σ_power=σ_power,
)
@ -210,6 +228,11 @@ def filter_peaks(
to_be_deleted: list = [],
):
deleted_peaks = []
if not peaks.is_refined:
return peaks
peaks = dataclasses.replace(peaks)
for i in reversed(range(len(peaks.peak_freqs))):
A, ω0, γ = peaks.lorentz_params[i]
Δω0, Δγ = peaks.Δpeak_freqs[i], peaks.Δpeak_widths[i]
@ -221,11 +244,11 @@ def filter_peaks(
or A > 5
or Δγ > uncertainty_threshold * params.fΩ_guess
):
np.delete(peaks.peaks, i)
np.delete(peaks.peak_freqs, i)
np.delete(peaks.Δpeak_freqs, i)
np.delete(peaks.peak_widths, i)
np.delete(peaks.Δpeak_widths, i)
peaks.peaks = np.delete(peaks.peaks, i)
peaks.peak_freqs = np.delete(peaks.peak_freqs, i)
peaks.Δpeak_freqs = np.delete(peaks.Δpeak_freqs, i)
peaks.peak_widths = np.delete(peaks.peak_widths, i)
peaks.Δpeak_widths = np.delete(peaks.Δpeak_widths, i)
del peaks.lorentz_params[i]
deleted_peaks.append(i)
@ -243,10 +266,14 @@ def refine_peaks(
params: RingdownParams,
uncertainty_threshold: float = 0.2,
height_cutoff: float = 0.1,
σ: np.ndarray | None = None,
):
) -> RingdownPeakData:
"""
Refine the peak positions and frequencies by fitting Lorentzians.
Refine the peak positions and frequencies by fitting a sum of
Lorentzians. The peaks are filtered according to the
``height_cutoff``, ``uncertainty_threshold`` and other criteria
and the fit repeated until nothing changes. The results are
stored in a copy of ``peaks``, among them the last successful
:any:`lmfit` fit result.
:param peaks: The peak data.
:param params: The ringdown parameters.
@ -256,7 +283,7 @@ def refine_peaks(
"""
if len(peaks.peaks) == 0:
return peaks, None
return peaks
peaks = dataclasses.replace(peaks)
freqs = peaks.freq
@ -268,8 +295,7 @@ def refine_peaks(
scaled_power = power
if σ is None:
σ = np.zeros_like(power)
σ = np.zeros_like(power) if peaks.σ_power is None else peaks.σ_power
for i, (A, ω0) in enumerate(zip(peaks.peak_info["peak_heights"], peak_freqs)):
model = lmfit.Model(lorentzian, prefix=f"peak_{i}_")
@ -319,7 +345,7 @@ def refine_peaks(
peaks.lorentz_params = [None] * len(peaks.peak_freqs)
for i in reversed(range(len(peaks.peak_freqs))):
for i in range(len(peaks.peak_freqs)):
peak_prefix = f"peak_{i}_"
A, ω0, γ = (
@ -342,13 +368,16 @@ def refine_peaks(
peaks.lorentz_params[i] = A, ω0, γ
peaks.lm_result = lm_result
peaks.noise_floor = lm_result.best_values["offset"]
before_filter = len(peaks.peaks)
peaks = filter_peaks(peaks, params, uncertainty_threshold, height_cutoff)
if len(peaks.peaks) < before_filter:
return refine_peaks(peaks, params, uncertainty_threshold, height_cutoff)
return peaks, lm_result
return peaks
class StepType(Enum):

View file

@ -213,53 +213,43 @@ def plot_rwa_vs_real_amplitudes(ax, solution_no_rwa, solution_rwa, params, **kwa
return no_rwa_lines, rwa_lines
def plot_spectrum_and_peak_info(
ax, peaks: RingdownPeakData, params: RingdownParams, annotate=False
):
def plot_spectrum_and_peak_info(ax, peaks: RingdownPeakData, annotate=False):
"""Plot the fft spectrum with peaks.
:param ax: The axis to plot on.
:param peaks: The peak data.
:param params: The ringdown parameters.
:param annotate: Whether to annotate the peaks.
"""
ax.clear()
ax.plot(
peaks.freq,
peaks.power,
peaks.power - peaks.noise_floor,
label="FFT Power",
color="C0",
linewidth=0.5,
linewidth=0.1,
marker="o",
markersize=2,
markersize=3,
)
fine_freq = np.linspace(peaks.freq[0], peaks.freq[-1], 1000)
if annotate:
for i, (freq, Δfreq, lorentz) in enumerate(
zip(peaks.peak_freqs, peaks.Δpeak_freqs, peaks.lorentz_params)
):
# ax.plot(
# freq,
# max(lorentz[0], 1),
# "x",
# label="Peaks",
# color="C2",
# )
roundfreq, rounderr = scientific_round(freq, Δfreq)
t = ax.text(
freq,
ax.get_ylim()[1] * 0.9,
lorentz[0] * 1.1,
rf"{i} (${roundfreq}\pm {rounderr}$)",
fontsize=20,
)
t.set_bbox(dict(facecolor="white", alpha=1, edgecolor="white"))
t.set_bbox(dict(facecolor="white", alpha=0.8, edgecolor="white"))
ax.plot(
peaks.freq,
lorentzian(peaks.freq, *lorentz),
fine_freq,
lorentzian(fine_freq, *lorentz),
"--",
color="C2",
alpha=0.5,
@ -268,12 +258,43 @@ def plot_spectrum_and_peak_info(
ax.set_title("FFT Spectrum")
ax.set_xlabel("ω [linear]")
ax.set_ylabel("Power")
ax.axvline(
params.fω_shift,
color="gray",
linestyle="--",
zorder=-10,
label="Frequency Shift",
)
ax.set_xlim(peaks.freq[0], peaks.freq[-1])
ax.legend()
def annotate_ringodown_mode_positions(params: Params, ax):
"""
Add y ticks and vertical lines indicating the theoretical
frequencies and widths of the modes in that can appear in the
ringdown spectrum.
:param params: The system parameters.
:param ax: The pyplot axis to annotate.
"""
runtime = RuntimeParams(params)
ax_ticks = ax.twiny()
ax_ticks.sharey(ax)
ax_ticks.set_xticks(runtime.ringdown_frequencies)
ax_ticks.set_xticklabels([mode_name(i, params.N) for i in range(2 * params.N + 2)])
ax_ticks.set_xlim(ax.get_xlim())
for pos, peak_freq in zip(runtime.ringdown_frequencies, runtime.Ωs):
ax.axvline(
pos,
color="black",
alpha=0.5,
linestyle="--",
zorder=-100,
)
ax.axvspan(
pos - peak_freq.imag / (2 * np.pi),
pos + peak_freq.imag / (2 * np.pi),
color="black",
alpha=0.05,
linestyle="--",
zorder=-100,
)
return ax

View file

@ -125,7 +125,7 @@ class Params:
"""
return n / self.Ω
def lifetimes(self, n: float):
def lifetimes(self, n: float) -> float:
"""
Returns the number of lifetimes of the system that correspond to
`n` cycles.

View file

@ -46,3 +46,59 @@ def smoothe_signal(
window = int(window_size / time_step)
return uniform_filter1d(signal, window)
class WelfordAggregator:
"""A class to aggregate values using the Welford algorithm.
The Welford algorithm is an online algorithm to calculate the mean
and variance of a series of values.
The aggregator keeps track of the number of samples the mean and
the variance. Aggregation of identical values is prevented by
checking the sample index. Tracking can be disabled by setting
the initial index to ``None``.
See also the `Wikipedia article
<https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm>`_.
:param first_value: The first value to aggregate.
"""
__slots__ = ["n", "mean", "_m_2"]
def __init__(self, first_value: np.ndarray):
self.n: int = 1
self.mean: np.ndarray = first_value
self._m_2 = np.zeros_like(first_value)
def update(self, new_value: np.ndarray):
"""Updates the aggregator with a new value."""
self.n += 1
delta = new_value - self.mean
self.mean += delta / self.n
delta2 = new_value - self.mean
self._m_2 += np.abs(delta) * np.abs(delta2)
@property
def sample_variance(self) -> np.ndarray:
"""
The empirical sample variance. (:math:`\sqrt{N-1}`
normalization.)
"""
if self.n == 1:
return np.zeros_like(self.mean)
return self._m_2 / (self.n - 1)
@property
def ensemble_variance(self) -> np.ndarray:
"""The ensemble variance."""
return self.sample_variance / self.n
@property
def ensemble_std(self) -> np.ndarray:
"""The ensemble standard deviation."""
return np.sqrt(self.ensemble_variance)