HOPSFlow-Paper/otto_utilities.py

811 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import matplotlib.pyplot as plt
import plot_utils as pu
from hiro_models.otto_cycle import OttoEngine, SmoothlyInterpolatdPeriodicMatrix
from hops.util.dynamic_matrix import *
import numpy as np
import figsaver as fs
import hiro_models.model_auxiliary as aux
from typing import Iterable
import qutip as qt
import itertools
def plot_power_eff_convergence(models, steady_idx=2):
f, (a_power, a_efficiency) = plt.subplots(ncols=2)
a_efficiency.set_yscale("log")
for model in models:
try:
Ns = model.power(steady_idx=steady_idx).Ns
a_power.plot(Ns, model.power(steady_idx=steady_idx).values)
a_efficiency.plot(
Ns, np.abs(model.efficiency(steady_idx=steady_idx).values)
)
except:
pass
a_power.set_xlabel("$N$")
a_power.set_ylabel("$P$")
a_efficiency.set_xlabel("$N$")
a_efficiency.set_ylabel("$\eta$")
return f, (a_power, a_efficiency)
@pu.wrap_plot
def plot_powers_and_efficiencies(x, models, steady_idx=2, ax=None, xlabel=""):
powers = [-model.power(steady_idx=steady_idx).value for model in models]
powers_σ = [model.power(steady_idx=steady_idx).σ for model in models]
ax.axhline(0, color="lightgray")
system_powers = [
val_relative_to_steady(
model,
-1 * model.system_power().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].value[-1]
for model in models
]
system_powers_σ = [
val_relative_to_steady(
model,
-1 * model.system_power().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].σ[-1]
for model in models
]
interaction_powers = [
val_relative_to_steady(
model,
-1 * model.interaction_power().sum_baths().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].value[-1]
for model in models
]
interaction_powers_σ = [
val_relative_to_steady(
model,
-1 * model.interaction_power().sum_baths().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].σ[-1]
for model in models
]
efficiencies = np.array(
[100 * model.efficiency(steady_idx=steady_idx).value for model in models]
)
efficiencies_σ = np.array(
[100 * model.efficiency(steady_idx=steady_idx).σ for model in models]
)
mask = efficiencies > 0
a2 = ax.twinx()
ax.errorbar(x, powers, yerr=powers_σ, marker=".", label=r"$\bar{P}$")
ax.errorbar(
x,
system_powers,
yerr=system_powers_σ,
marker=".",
label=r"$\bar{P}_{\mathrm{sys}}$",
)
ax.errorbar(
x,
interaction_powers,
yerr=interaction_powers_σ,
marker=".",
label=r"$\bar{P}_{\mathrm{int}}$",
)
ax.legend()
lines = a2.errorbar(
np.asarray(x)[mask],
efficiencies[mask],
yerr=efficiencies_σ[mask],
marker="*",
color="C4",
label=r"$\eta$",
)
a2.legend(loc="upper left")
ax.set_xlabel(xlabel)
ax.set_ylabel(r"$\bar{P}$", color="C0")
a2.set_ylabel(r"$\eta$", color="C4")
return ax, a2
def plot_multi_powers_and_efficiencies(
x, multi_models, titles, steady_idx=2, xlabel=""
):
fig, axs = plt.subplots(nrows=2, ncols=2)
(efficiency, power, system_power, interaction_power) = axs.flatten()
markers = itertools.cycle((".", "+", "*", ",", "o"))
for models, title, marker in zip(multi_models, titles, [".", "^", "*"]):
powers = [-model.power(steady_idx=steady_idx).value for model in models]
powers_σ = [model.power(steady_idx=steady_idx).σ for model in models]
system_powers = [
val_relative_to_steady(
model,
-1 * model.system_power().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].value[-1]
for model in models
]
system_powers_σ = [
val_relative_to_steady(
model,
-1 * model.system_power().integrate(model.t) * 1 / model.Θ,
steady_idx=steady_idx,
)[1].σ[-1]
for model in models
]
interaction_powers = [
val_relative_to_steady(
model,
-1
* model.interaction_power().sum_baths().integrate(model.t)
* 1
/ model.Θ,
steady_idx=steady_idx,
)[1].value[-1]
for model in models
]
interaction_powers_σ = [
val_relative_to_steady(
model,
-1
* model.interaction_power().sum_baths().integrate(model.t)
* 1
/ model.Θ,
steady_idx=steady_idx,
)[1].σ[-1]
for model in models
]
efficiencies = np.array(
[100 * model.efficiency(steady_idx=steady_idx).value for model in models]
)
efficiencies_σ = np.array(
[100 * model.efficiency(steady_idx=steady_idx).σ for model in models]
)
mask = efficiencies > 0
power.plot(x, powers, marker=marker)
system_power.plot(
x,
system_powers,
marker=marker,
)
interaction_power.plot(
x,
interaction_powers,
marker=marker,
)
efficiency.plot(
np.asarray(x)[mask],
efficiencies[mask],
marker=marker,
label=title,
)
efficiency.set_title(r"$\eta$")
power.set_title(r"$\bar{P}$")
system_power.set_title(
r"$\bar{P}_{\mathrm{sys}}$",
)
interaction_power.set_title(
r"$\bar{P}_{\mathrm{int}}$",
)
fig.supxlabel(xlabel)
fig.legend(loc="lower center", bbox_to_anchor=(0.5, -0.1), ncol=3)
return fig, axs
@pu.wrap_plot
def plot_cycle(model: OttoEngine, ax=None):
assert ax is not None
ax.plot(
model.t, model.coupling_operators[0].operator_norm(model.t) * 2, label=r"$L_c$"
)
ax.plot(
model.t, model.coupling_operators[1].operator_norm(model.t) * 2, label=r"$L_h$"
)
ax.plot(
model.t,
(model.H.operator_norm(model.t)) / model.H.operator_norm(model.τ_compressed),
label="$H_{\mathrm{sys}}$",
)
ax.set_xlim((0, model.Θ))
ax.set_xlabel(r"$\tau$")
ax.set_ylabel(r"Operator Norm")
ax.legend()
@pu.wrap_plot
def plot_cycles(
models: list[OttoEngine],
ax=None,
H_for_all=False,
H=True,
L_for_all=True,
bath=None,
legend=False,
):
assert ax is not None
model = models[0]
if H:
ax.plot(
model.t,
(model.H.operator_norm(model.t))
/ model.H.operator_norm(model.τ_compressed),
label=f"$H_1$",
)
for index, name in enumerate(["c", "h"]):
if bath is None or bath == index:
ax.plot(
model.t,
model.coupling_operators[index].operator_norm(model.t) * 2,
label=rf"$L_{{{name},1}}$",
)
ax.set_xlim((0, model.Θ))
ax.set_xlabel(r"$\tau$")
ax.set_ylabel(r"Operator Norm")
for i, model in enumerate(models[1:]):
if H and H_for_all:
ax.plot(
model.t,
(model.H.operator_norm(model.t))
/ model.H.operator_norm(model.τ_compressed),
label=f"$H_1$",
)
if L_for_all:
for index, name in enumerate(["c", "h"]):
if bath is None or bath == index:
ax.plot(
model.t,
model.coupling_operators[index].operator_norm(model.t) * 2,
label=rf"$L_{{{name},{i+2}}}$",
)
legend and ax.legend()
@pu.wrap_plot
def plot_sd_overview(model: OttoEngine, ax=None):
assert ax is not None
gaps = model.energy_gaps
ω = np.linspace(0.0001, gaps[-1] + gaps[0], 1000)
for ω_i, label, i in zip(gaps, ["Cold", "Hot"], range(len(gaps))):
lines = ax.plot(
ω,
model.full_thermal_spectral_density(i)(ω) * model.bcf_scales[i],
label=f"{label} $T={model.T[i]}$",
)
ax.plot(
ω,
model.spectral_density(i)(ω) * model.bcf_scales[i],
label=f"{label} $T=0$",
color=pu.lighten_color(lines[0].get_color()),
linestyle="--",
)
ax.plot(
ω_i,
model.full_thermal_spectral_density(i)(ω_i) * model.bcf_scales[i],
marker="o",
color=lines[0].get_color(),
)
# plt.plot(ω, model.full_thermal_spectral_density(1)(ω) * model.bcf_scales[1])
# plt.plot(
# 2, model.full_thermal_spectral_density(1)(2) * model.bcf_scales[1], marker="o"
# )
ax.set_xlabel(r"$\omega$")
ax.set_ylabel(r"Spectral Density")
ax.legend()
def full_report(model):
cyc = plot_cycle(model)
sd = plot_sd_overview(model)
f, a = plot_energy(model)
pu.plot_with_σ(model.t, model.total_energy(), ax=a)
power = model.power()
η = model.efficiency() * 100
print(
fs.tex_value(power.value, err=power.σ, prefix="P="),
)
print(
fs.tex_value(η.value, err=η.σ, prefix=r"\eta="),
)
def plot_energy(model):
f, a = pu.plot_energy_overview(
model,
strobe_data=model.strobe,
hybrid=True,
bath_names=["cold", "hot"],
online=True,
)
a.legend()
return f, a
def integrate_online(model, n, stream_folder=None, **kwargs):
aux.integrate(
model,
n,
stream_file=("" if stream_folder is None else stream_folder)
+ f"results_{model.hexhash}.fifo",
analyze=True,
**kwargs,
)
def get_sample_count(model):
try:
with aux.get_data(model) as d:
return d.samples
except:
return 0
def integrate_online_multi(models, n, *args, increment=1000, **kwargs):
target = increment
while target <= n:
current_target = min([n, target])
for model in models:
count = get_sample_count(model)
if count < current_target:
integrate_online(model, current_target, *args, **kwargs)
target += increment
@pu.wrap_plot
def plot_3d_heatmap(models, value_accessor, x_spec, y_spec, normalize=False, ax=None):
value_dict = {}
x_labels = set()
y_labels = set()
for model in models:
x_label = x_spec(model)
y_label = y_spec(model)
value = value_accessor(model)
if x_label not in value_dict:
value_dict[x_label] = {}
if y_label in value_dict[x_label]:
raise ValueError(
f"Dublicate value for model with x={x_label}, y={y_label}."
)
value_dict[x_label][y_label] = value_accessor(model)
x_labels.add(x_label)
y_labels.add(y_label)
x_labels = np.sort(list(x_labels))
y_labels = np.sort(list(y_labels))
_xx, _yy = np.meshgrid(x_labels, y_labels, indexing="ij")
x, y = _xx.ravel(), _yy.ravel()
values = np.fromiter((value_dict[_x][_y] for _x, _y in zip(x, y)), dtype=float)
dx = x_labels[1] - x_labels[0]
dy = y_labels[1] - y_labels[0]
x -= dx / 2
y -= dy / 2
normalized_values = abs(values) - abs(values).min()
normalized_values /= abs(normalized_values).max()
cmap = plt.get_cmap("plasma")
colors = [cmap(power) for power in normalized_values]
ax.bar3d(
x,
y,
np.zeros_like(values),
dx,
dy,
values / abs(values).max() if normalize else values,
color=colors,
)
ax.set_xticks(x_labels)
ax.set_yticks(y_labels)
@pu.wrap_plot
def plot_contour(
models, value_accessor, x_spec, y_spec, normalize=False, ax=None, levels=None
):
value_dict = {}
x_labels = set()
y_labels = set()
for model in models:
x_label = x_spec(model)
y_label = y_spec(model)
value = value_accessor(model)
if x_label not in value_dict:
value_dict[x_label] = {}
if y_label in value_dict[x_label]:
raise ValueError(
f"Dublicate value for model with x={x_label}, y={y_label}."
)
value_dict[x_label][y_label] = value_accessor(model)
x_labels.add(x_label)
y_labels.add(y_label)
x_labels = np.sort(list(x_labels))
y_labels = np.sort(list(y_labels))
_xx, _yy = np.meshgrid(x_labels, y_labels, indexing="ij")
x, y = _xx.ravel(), _yy.ravel()
values = (
np.fromiter((value_dict[_x][_y] for _x, _y in zip(x, y)), dtype=float)
.reshape(len(x_labels), len(y_labels))
.T
)
normalized_values = abs(values) - abs(values).min()
normalized_values /= abs(normalized_values).max()
cont = ax.contourf(
x_labels,
y_labels,
values / abs(values).max() if normalize else values,
levels=levels,
)
ax.set_xticks(x_labels)
ax.set_yticks(y_labels)
return cont, (x_labels, y_labels, values)
def get_steady_times(model, steady_idx, shift=0):
shift_idx = int(1 / model.dt * shift)
begin_idx = model.strobe[1][steady_idx] - shift_idx
end_idx = -shift_idx if shift != 0 else -2
return model.t[begin_idx - 1 : end_idx]
def val_relative_to_steady(model, val, steady_idx, shift=0, absolute=False):
shift_idx = int(1 / model.dt * shift)
begin_idx = model.strobe[1][steady_idx] - shift_idx
end_idx = -shift_idx if shift != 0 else -2
final_value = val.slice(slice(begin_idx - 1, end_idx, 1))
if not absolute:
final_value = final_value - val.slice(begin_idx - 1)
return (model.t[begin_idx - 1 : end_idx], final_value)
def timings(τ_c, τ_i):
τ_th = (1 - 2 * τ_c) / 2
τ_i_on = τ_th - 2 * τ_i
timings_H = (0, τ_c, τ_c + τ_th, 2 * τ_c + τ_th)
timings_L_hot = (τ_c, τ_c + τ_i, τ_c + τ_i + τ_i_on, τ_c + 2 * τ_i + τ_i_on)
timings_L_cold = tuple(time + timings_H[2] for time in timings_L_hot)
return timings_H, (timings_L_cold, timings_L_hot)
def model_description(model):
return model.description
@pu.wrap_plot
def plot_steady_energy_changes(
models,
steady_idx=2,
label_fn=model_description,
bath=None,
ax=None,
with_shift=False,
shift_min_inter=False,
):
times, inters, systems = [], [], []
for model in models:
t, inter = val_relative_to_steady(
model,
(
model.interaction_power().sum_baths()
if bath is None
else model.interaction_power().for_bath(bath)
).integrate(model.t),
steady_idx,
shift=model.L_shift[0] if with_shift else 0,
)
t, sys = val_relative_to_steady(
model,
model.system_power().sum_baths().integrate(model.t),
steady_idx,
)
inters.append(inter)
systems.append(sys)
times.append(t)
if shift_min_inter:
for i, inter in enumerate(inters):
length = len(inter.value)
inters[i] -= (inter.slice(slice(0, length // 3))).max.value
for inter, sys, t, model in zip(inters, systems, times, models):
print(model.L_shift)
_, _, (l, _) = pu.plot_with_σ(
t,
-1 * inter,
ax=ax,
label=rf"$W_\mathrm{{int}}$ {label_fn(model)}",
linestyle="--",
)
pu.plot_with_σ(
t,
-1 * sys,
ax=ax,
label=rf"$W_\mathrm{{sys}}$ {label_fn(model)}",
color=l[0].get_color(),
)
ax.set_xlabel(r"$\tau$")
ax.set_ylabel(r"$W$")
ax.legend()
def add_arrow(line, start_ind=None, direction="right", size=15, color=None):
"""
add an arrow to a line.
line: Line2D object
position: x-position of the arrow. If None, mean of xdata is taken
direction: 'left' or 'right'
size: size of the arrow in fontsize points
color: if None, line color is taken.
"""
if color is None:
color = line.get_color()
xdata = line.get_xdata()
ydata = line.get_ydata()
if direction == "right":
end_ind = start_ind + 1
else:
end_ind = start_ind - 1
line.axes.annotate(
"",
xytext=(xdata[start_ind], ydata[start_ind]),
xy=(xdata[end_ind], ydata[end_ind]),
arrowprops=dict(arrowstyle="->", color=color),
size=size,
)
@pu.wrap_plot
def plot_state_change_diagram(modulation, value, phase_indices, ax=None):
all_modulation = model.coupling_operators[bath].operator_norm(t)
phase_indices = (
np.array(model.coupling_operators[bath]._matrix._timings) * (len(t) - 1)
).astype(np.int64)
modulation_windowed = all_modulation[phase_indices[0] : phase_indices[-1]]
value_windowed = inter.value[phase_indices[0] : phase_indices[-1]]
ax.plot(modulation_windowed, value_windowed, linewidth=3, color="cornflowerblue")
ax.add_collection(
color_gradient(
modulation_windowed, value_windowed, "cornflowerblue", "red", linewidth=3
)
)
for begin, end in zip(phase_indices[:-1], phase_indices[1:]):
ax.scatter(modulation[begin], value[begin], zorder=100, marker=".", s=200)
for i, index in enumerate(phase_indices[:-1]):
ax.text(
modulation[index] + np.max(modulation) * 0.02,
value[index] + np.max(np.abs(value)) * 0.01,
str(i + 1),
)
return fig, ax
def get_modulation_and_value(model, operator, value, steady_idx=2):
shift = 0
timing_operator = operator
while not isinstance(timing_operator, SmoothlyInterpolatdPeriodicMatrix):
if isinstance(operator, Shift):
shift = operator._delta
timing_operator = operator._matrix
if isinstance(operator, DynamicMatrixSum):
timing_operator = operator._left
t, value = val_relative_to_steady(model, value, steady_idx, absolute=True)
all_modulation = operator.operator_norm(t)
all_modulation_deriv = operator.derivative().operator_norm(t)
timings = np.array(timing_operator._timings)
phase_indices = (((timings + shift / model.Θ) % 1) * (len(t) - 1)).astype(np.int64)
values = np.zeros_like(all_modulation)
np.divide(
value.value, all_modulation, where=np.abs(all_modulation) > 1e-3, out=values
),
return (
values,
all_modulation,
phase_indices,
)
def plot_modulation_system_diagram(model, steady_idx):
fig, ax = plt.subplots()
t, system = val_relative_to_steady(
model,
(model.system_energy().sum_baths()),
steady_idx,
)
modulation = model.H.operator_norm(t)
ax.plot(modulation, system.value)
ax.set_xlabel(r"$||H_\mathrm{S}||$")
ax.set_ylabel(r"$\langle{H_\mathrm{S}}\rangle$")
return fig, ax
def plot_steady_work_baths(models, steady_idx=2, label_fn=model_description):
fig, ax = plt.subplots()
for model in models:
t, inter_c = val_relative_to_steady(
model,
(model.interaction_power().for_bath(0)).integrate(model.t),
steady_idx,
)
t, inter_h = val_relative_to_steady(
model,
(model.interaction_power().for_bath(1)).integrate(model.t),
steady_idx,
)
pu.plot_with_σ(
t,
inter_c,
ax=ax,
label=rf"$W_\mathrm{{int, c}}$ {label_fn(model)}",
)
pu.plot_with_σ(
t,
inter_h,
ax=ax,
label=rf"$W_\mathrm{{int, h}}$ {label_fn(model)}",
linestyle="--",
)
ax.set_xlabel(r"$\tau$")
ax.set_ylabel(r"$W$")
ax.legend()
return fig, ax
@pu.wrap_plot
def plot_bloch_components(model, ax=None, **kwargs):
with aux.get_data(model) as data:
ρ = data.rho_t_accum.mean[:]
σ_ρ = data.rho_t_accum.ensemble_std[:]
xs = np.einsum("tij,ji->t", ρ, qt.sigmax().full()).real
ys = np.einsum("tij,ji->t", ρ, qt.sigmay().full()).real
zs = np.einsum("tij,ji->t", ρ, qt.sigmaz().full()).real
ax.plot(
model.t,
zs,
**(dict(label=r"$\langle \sigma_z\rangle$", color="C1") | kwargs),
)
ax.plot(
model.t,
xs,
**(dict(label=r"$\langle \sigma_x\rangle$", color="C2") | kwargs),
)
ax.plot(
model.t,
ys,
**(dict(label=r"$\langle \sigma_y\rangle$", color="C3") | kwargs),
)
ax.legend()
ax.set_xlabel(r"$\tau$")
@pu.wrap_plot
def plot_energy_deviation(models, ax=None, labels=None):
ax.set_xlabel(r"$\tau$")
ax.set_ylabel(r"$\Delta||H||/\max||H||$")
for i, model in enumerate(models):
ax.plot(
model.t,
abs(model.total_energy_from_power().value - model.total_energy().value)
/ max(abs(model.total_energy_from_power().value)),
label=labels[i] if labels else None,
)
if labels:
ax.legend()
def max_energy_error(models, steady_idx=None):
deviations = np.zeros(len(models))
for i, model in enumerate(models):
if steady_idx is None:
deviations[i] = abs(
model.total_energy_from_power().value - model.total_energy().value
).max()
else:
deviations[i] = abs(
val_relative_to_steady(
model, model.total_energy_from_power(), steady_idx=steady_idx
)[1].value
- val_relative_to_steady(
model, model.total_energy(), steady_idx=steady_idx
)[1].value
).max()
return round(deviations.max() * 100)