bachelor_thesis/prog/python/qqgg/monte_carlo.py

1036 lines
31 KiB
Python
Raw Normal View History

"""
Simple monte carlo integration implementation.
Author: Valentin Boettcher <hiro@protagon.space>
"""
2020-04-22 11:26:13 +02:00
import os
import os.path
import numpy as np
import pathlib
import time
2020-05-08 11:05:28 +02:00
import collections
2020-05-07 15:13:14 +02:00
from typing import Union
2020-04-22 11:26:13 +02:00
from multiprocessing import Pool, cpu_count
import functools
from scipy.optimize import minimize_scalar, root, shgo
2020-04-08 12:01:45 +02:00
from dataclasses import dataclass
2020-04-24 15:01:39 +02:00
import utility
2020-05-07 09:58:05 +02:00
import joblib
2020-04-06 20:33:37 +02:00
2020-04-08 16:23:34 +02:00
2020-03-31 13:21:45 +02:00
def _process_interval(interval):
interval = np.asarray(interval)
if len(interval.shape) > 1:
return np.array([_process_interval(i) for i in interval])
2020-04-08 17:00:11 +02:00
assert len(interval) == 2, "An interval has two endpoints"
2020-03-31 13:21:45 +02:00
a, b = interval
if b < a:
a, b = b, a
return np.array([a, b])
2020-03-31 13:21:45 +02:00
2020-04-08 12:01:45 +02:00
@dataclass
class IntegrationResult:
"""
A simple container class to hold the result, uncertainty and
sampling size of the naive monte-carlo integration.
"""
2020-04-08 12:01:45 +02:00
result: float
sigma: float
N: int
@property
def combined_result(self):
"""
Get the result and accuracy combined as tuple.
"""
return self.result, self.sigma
2020-04-08 16:23:34 +02:00
2020-05-04 19:57:16 +02:00
@utility.numpy_cache("cache")
def integrate(
2020-05-03 11:08:37 +02:00
f,
interval,
epsilon=0.01,
seed=None,
num_points=None,
adapt=True,
return_sample=False,
**kwargs,
) -> IntegrationResult:
"""Monte-Carlo integrates the function `f` over an interval.
If the integrand is multi-dimensional it must accept an array of
argument arrays. Think about your axes!
:param f: function of one variable, kwargs are passed to it
:param tuple interval: a 2-tuple of numbers, specifiying the
integration range
:param epsilon: desired accuracy
:param seed: the seed for the rng, if not specified, the system
time is used
2020-05-03 11:08:37 +02:00
:param bool return_sample: wether to return the utilized sample
of f as second return value
:returns: the integration result
2020-04-08 12:01:45 +02:00
:rtype: IntegrationResult
"""
2020-03-31 13:21:45 +02:00
interval = _process_interval(interval)
if len(interval.shape) == 1:
return integrate(f, [interval], epsilon, seed, **kwargs)
if seed:
np.random.seed(seed)
integration_volume = (interval[:, 1] - interval[:, 0]).prod()
dimension = len(interval)
2020-03-31 13:21:45 +02:00
# guess the correct N
2020-04-08 17:00:11 +02:00
probe_points = np.random.uniform(
interval[:, 0],
interval[:, 1],
2020-05-07 09:27:30 +02:00
(int(integration_volume * dimension * 100), dimension),
).T
if num_points is None:
2020-05-07 09:27:30 +02:00
prelim_std = integration_volume * f(*probe_points, **kwargs).std()
epsilon = epsilon if prelim_std > epsilon else prelim_std / 10
num_points = int((prelim_std / epsilon) ** 2 * 1.1 + 1)
# now we iterate until we hit the desired epsilon
while True:
points = np.random.uniform(
interval[:, 0], interval[:, 1], (num_points, len(interval))
2020-05-07 09:27:30 +02:00
).T
2020-05-07 09:27:30 +02:00
sample = f(*points, **kwargs)
integral = np.sum(sample) / num_points * integration_volume
2020-04-02 08:38:53 +02:00
# the deviation gets multiplied by the square root of the interval
# lenght, because it is the standard deviation of the integral.
sample_std = np.std(sample) * integration_volume
2020-04-08 16:48:46 +02:00
deviation = sample_std * np.sqrt(1 / (num_points - 1))
2020-03-31 13:21:45 +02:00
if not adapt or deviation < epsilon:
2020-05-03 11:08:37 +02:00
result = IntegrationResult(integral, deviation, num_points)
return (result, sample) if return_sample else result
2020-04-08 12:01:45 +02:00
# then we refine our guess, the factor 1.1
2020-04-08 17:00:11 +02:00
num_points = int((sample_std / epsilon) ** 2 * 1.1)
2020-03-31 15:19:51 +02:00
2020-04-08 16:23:34 +02:00
def _negate(f):
"""A helper that multiplies the given function with -1."""
@functools.wraps(f)
def negated(*args, **kwargs):
return -f(*args, **kwargs)
return negated
2020-03-31 13:21:45 +02:00
def find_upper_bound(f, interval, **kwargs):
"""Find the upper bound of a function.
2020-03-31 13:21:45 +02:00
:param f: function of one scalar and some kwargs that are passed
on to it
:param interval: interval to look in
2020-03-31 13:21:45 +02:00
:returns: the upper bound of the function
:rtype: float
"""
2020-04-08 17:00:11 +02:00
upper_bound = minimize_scalar(
lambda *args: -f(*args, **kwargs), bounds=interval, method="bounded"
)
2020-03-31 13:21:45 +02:00
if upper_bound.success:
return -upper_bound.fun
else:
2020-04-08 17:00:11 +02:00
raise RuntimeError("Could not find an upper bound.")
def find_upper_bound_vector(f, interval):
2020-04-29 14:55:41 +02:00
result = shgo(_negate(f), bounds=interval, options=dict(maxfev=10000))
if not result.success:
raise RuntimeError("Could not find an upper bound.", result)
upper_bound = -result.fun
return upper_bound
def sample_unweighted_vector(
2020-05-07 18:30:12 +02:00
f, interval, seed=None, upper_bound=None, report_efficiency=False, total=1,
):
dimension = len(interval)
2020-04-29 14:55:41 +02:00
interval = _process_interval(interval)
if seed:
np.random.seed(seed)
2020-04-29 14:55:41 +02:00
if upper_bound is None:
upper_bound = find_upper_bound_vector(f, interval)
def allocate_random_chunk():
return np.random.uniform(
[*interval[:, 0], 0], [*interval[:, 1], 1], [1, 1 + dimension],
)
total_points = 0
total_accepted = 0
while True:
points = allocate_random_chunk()
if report_efficiency:
total_points += 1
arg = points[:, 0:-1][0]
2020-04-28 18:06:20 +02:00
if f(arg) >= points[:, -1] * upper_bound:
if report_efficiency:
total_accepted += 1
yield (arg, total_accepted / total_points,) if report_efficiency else arg
return
2020-04-29 14:55:41 +02:00
def sample_weighted_vector(f, interval, num_samples=1, seed=None):
dimension = len(interval)
interval = _process_interval(interval)
if seed:
np.random.seed(seed)
sample_points = np.random.uniform(
interval[:, 0], interval[:, 1], [num_samples, dimension],
)
return sample_points, np.array([f(sample) for sample in sample_points])
2020-04-08 17:00:11 +02:00
def sample_unweighted(
f,
interval,
upper_bound=None,
seed=None,
chunk_size=100,
report_efficiency=False,
**kwargs,
2020-04-08 17:00:11 +02:00
):
2020-03-31 13:21:45 +02:00
"""Samples a distribution proportional to f by hit and miss.
Implemented as a generator.
:param f: function of one scalar to sample, should be positive,
superflous kwargs are passed to it
:param interval: the interval to sample from
:param upper_bound: an upper bound to the function, optional
:param seed: the seed for the rng, if not specified, the system
time is used
:param chunk_size: the size of the chunks of random numbers
allocated per unit interval
:yields: random nubers following the distribution of f
:rtype: float
"""
2020-03-31 13:21:45 +02:00
interval = _process_interval(interval)
2020-04-08 17:00:11 +02:00
interval_length = interval[1] - interval[0]
2020-04-05 21:12:02 +02:00
if seed:
np.random.seed(seed)
2020-04-08 17:00:11 +02:00
upper_bound_fn, upper_bound_integral, upper_bound_integral_inverse = (
None,
None,
None,
)
# i know....
if not upper_bound:
upper_bound_value = find_upper_bound(f, interval, **kwargs)
2020-04-08 17:00:11 +02:00
def upper_bound_fn(x):
return upper_bound_value
def upper_bound_integral(x):
return upper_bound_value * x
def upper_bound_integral_inverse(y):
return y / upper_bound_value
elif len(upper_bound) == 2:
2020-04-08 17:00:11 +02:00
upper_bound_fn, upper_bound_integral = upper_bound
def upper_inv(points): # not for performance right now...
2020-04-08 17:00:11 +02:00
return np.array(
[
root(
lambda y: upper_bound_integral(y) - x, x0=0, jac=upper_bound_fn
).x
for x in points
]
).T
upper_bound_integral_inverse = upper_inv
elif len(upper_bound) == 3:
2020-04-08 17:00:11 +02:00
upper_bound_fn, upper_bound_integral, upper_bound_integral_inverse = upper_bound
else:
2020-04-08 17:00:11 +02:00
raise ValueError("The upper bound must be `None` or a three element sequence!")
2020-03-31 13:21:45 +02:00
def allocate_random_chunk():
2020-04-08 17:00:11 +02:00
return np.random.uniform(
[upper_bound_integral(interval[0]), 0],
[upper_bound_integral(interval[1]), 1],
[int(chunk_size * interval_length), 2],
)
2020-03-31 13:21:45 +02:00
total_points = 0
total_accepted = 0
2020-03-31 13:21:45 +02:00
while True:
points = allocate_random_chunk()
points[:, 0] = upper_bound_integral_inverse(points[:, 0])
2020-04-08 17:00:11 +02:00
sample_points = points[:, 0][
np.where(f(points[:, 0]) > points[:, 1] * upper_bound_fn(points[:, 0]))
]
if report_efficiency:
total_points += points.size
total_accepted += sample_points.size
2020-03-31 13:21:45 +02:00
for point in sample_points:
2020-04-08 17:00:11 +02:00
yield (point, total_accepted / total_points) if report_efficiency else point
2020-03-31 15:19:51 +02:00
@dataclass
class VegasIntegrationResult(IntegrationResult):
"""
A simple container class to hold the result, uncertainty and
sampling size of the naive monte-carlo integration.
The ascertained increment borders, as well as the total sample
size are available as well.
"""
increment_borders: np.ndarray
vegas_iterations: int
2020-05-07 15:13:14 +02:00
maximum: Union[float, None] = None
2020-04-08 16:23:34 +02:00
2020-04-08 17:00:11 +02:00
def reshuffle_increments(
integral_steps,
integral,
interval_lengths,
num_increments,
increment_borders,
alpha,
K,
):
# alpha controls the convergence speed
2020-04-08 16:48:46 +02:00
μ = np.abs(integral_steps) / integral
2020-04-08 17:00:11 +02:00
new_increments = (K * ((μ - 1) / (np.log(μ))) ** alpha).astype(int)
2020-04-08 16:48:46 +02:00
group_size = new_increments.sum() / num_increments
new_increment_borders = np.empty_like(increment_borders)
# this whole code does a very simple thing: it eats up
# sub-increments until it has `group_size` of them
i = 0 # position in increment count list
j = 0 # position in new_incerement_borders
# the number of sub-increments still available
rest = new_increments[0]
# the number of sub-increments needed to fill one increment
head = group_size
# the current position in the interval relative to its
# beginning
current = 0
while i < num_increments and (j < (num_increments - 1)):
if new_increments[i] == 0:
i += 1
rest = new_increments[i]
2020-04-08 16:48:46 +02:00
current_increment_size = interval_lengths[i] / new_increments[i]
if head <= rest:
2020-04-08 16:48:46 +02:00
current += head * current_increment_size
new_increment_borders[j] = current
rest -= head
head = group_size
j += 1
else:
2020-04-08 16:48:46 +02:00
current += rest * current_increment_size
head -= rest
i += 1
rest = new_increments[i]
return new_increment_borders
2020-04-08 17:00:11 +02:00
def integrate_vegas(
f,
interval,
seed=None,
num_increments=5,
epsilon=1e-3,
increment_epsilon=1e-2,
alpha=1.5,
acumulate=True,
2020-04-09 14:39:28 +02:00
vegas_point_density=1000,
**kwargs,
2020-04-08 17:00:11 +02:00
) -> VegasIntegrationResult:
2020-04-03 19:20:20 +02:00
"""Integrate the given function (in one dimension) with the vegas
algorithm to reduce variance. This implementation follows the
2020-04-03 19:44:39 +02:00
description given in JOURNAL OF COMPUTATIONAL 27, 192-203 (1978).
All iterations contribute to the final result.
2020-04-03 19:20:20 +02:00
:param f: function of one variable, kwargs are passed to it
:param tuple interval: a 2-tuple of numbers, specifiying the
integration range
:param seed: the seed for the rng, if not specified, the system
time is used
:param num_increments: the number increments in which to divide
the interval
:param point_density: the number of random points per unit
interval
2020-04-08 16:47:19 +02:00
:param increment_epsilon: the breaking condition, if the magnitude
of the difference between the increment positions in
subsequent iterations does not change more then epsilon the
computation is considered to have converged
2020-04-03 19:20:20 +02:00
:param alpha: controls the the convergence speed, should be
2020-04-03 19:49:10 +02:00
between 1 and 2 (the lower the faster)
2020-04-03 19:20:20 +02:00
:returns: the intregal, the standard deviation, an array of
increment borders which can be used in subsequent
sampling
:rtype: tuple
"""
2020-04-03 18:55:41 +02:00
2020-04-04 21:46:44 +02:00
interval = _process_interval(interval)
2020-04-08 17:00:11 +02:00
interval_length = interval[1] - interval[0]
2020-04-03 18:55:41 +02:00
2020-04-03 19:20:20 +02:00
if seed:
np.random.seed(seed)
# no clever logic is being used to define the vegas iteration
# sample density for the sake of simplicity
2020-04-09 14:39:28 +02:00
points_per_increment = int(vegas_point_density * interval_length / num_increments)
2020-04-03 18:55:41 +02:00
# start with equally sized intervals
2020-04-08 17:00:11 +02:00
interval_borders = np.linspace(*interval, num_increments + 1, endpoint=True)
2020-04-08 17:00:11 +02:00
def evaluate_integrand(interval_borders, interval_lengths, samples_per_increment):
2020-04-03 18:55:41 +02:00
intervals = np.array((interval_borders[:-1], interval_borders[1:]))
2020-04-08 17:00:11 +02:00
sample_points = np.random.uniform(
*intervals, (samples_per_increment, num_increments)
).T
2020-04-03 18:55:41 +02:00
2020-04-08 17:00:11 +02:00
weighted_f_values = f(sample_points, **kwargs) * interval_lengths[:, None]
2020-04-03 18:55:41 +02:00
2020-04-06 20:33:37 +02:00
# the mean here has absorbed the num_increments
2020-04-03 18:55:41 +02:00
integral_steps = weighted_f_values.mean(axis=1)
integral = integral_steps.sum()
2020-04-08 17:00:11 +02:00
variance = (
(f(sample_points, **kwargs).std(axis=1) * interval_lengths) ** 2
).sum() / (samples_per_increment - 1)
2020-04-06 20:33:37 +02:00
return integral, integral_steps, variance
2020-04-03 18:55:41 +02:00
2020-04-08 16:48:46 +02:00
K = num_increments * 1000
2020-04-03 18:55:41 +02:00
increment_borders = interval_borders[1:-1] - interval_borders[0]
2020-04-03 19:20:20 +02:00
2020-04-03 19:44:39 +02:00
integrals = []
variances = []
vegas_iterations, integral, variance = 0, 0, 0
2020-04-03 18:55:41 +02:00
while True:
vegas_iterations += 1
2020-04-03 18:55:41 +02:00
interval_lengths = interval_borders[1:] - interval_borders[:-1]
2020-04-08 17:00:11 +02:00
integral, integral_steps, variance = evaluate_integrand(
interval_borders, interval_lengths, points_per_increment
)
2020-04-03 19:44:39 +02:00
integrals.append(integral)
variances.append(variance)
2020-04-03 19:20:20 +02:00
# it is debatable to pass so much that could be recomputed...
2020-04-08 16:48:46 +02:00
new_increment_borders = reshuffle_increments(
integral_steps,
integral,
interval_lengths,
num_increments,
increment_borders,
alpha,
2020-04-08 17:00:11 +02:00
K,
)
2020-04-03 18:55:41 +02:00
interval_borders[1:-1] = interval_borders[0] + increment_borders
2020-04-08 17:00:11 +02:00
if (
np.linalg.norm(increment_borders - new_increment_borders)
< increment_epsilon
):
break
2020-04-03 19:20:20 +02:00
2020-04-03 18:55:41 +02:00
increment_borders = new_increment_borders
2020-04-05 21:12:02 +02:00
interval_lengths = interval_borders[1:] - interval_borders[:-1]
# brute force increase of the sample size
2020-04-08 17:00:11 +02:00
if np.sqrt(variance) >= epsilon:
tick = 3
while True:
2020-04-08 17:00:11 +02:00
integral, _, variance = evaluate_integrand(
interval_borders, interval_lengths, points_per_increment
)
integrals.append(integral)
variances.append(variance)
2020-04-08 17:00:11 +02:00
if np.sqrt(variance) <= epsilon:
break
# adaptive scaling of sample size incrementation
points_per_increment += int(
1000 ** np.log(tick) * interval_length / num_increments
)
tick += 2
2020-04-08 17:00:11 +02:00
# as a bonus, we utilize all prior integration results
if acumulate:
integrals = np.array(integrals)
variances = np.array(variances)
2020-04-08 17:00:11 +02:00
integral = np.sum(integrals ** 3 / variances ** 2) / np.sum(
integrals ** 2 / variances ** 2
)
variance = 1 / np.sqrt(np.sum(integrals ** 2 / variances ** 2)) * integral
return VegasIntegrationResult(
integral,
np.sqrt(variance),
points_per_increment * num_increments,
interval_borders,
vegas_iterations,
)
def sample_stratified(
f, increment_borders, seed=None, chunk_size=100, report_efficiency=False, **kwargs
):
2020-04-05 21:12:02 +02:00
"""Samples a distribution proportional to f by hit and miss.
Implemented as a generator.
:param f: function of one scalar to sample, should be positive,
superflous kwargs are passed to it
:param interval: the interval to sample from
:param seed: the seed for the rng, if not specified, the system
time is used
:param chunk_size: the size of the chunks of random numbers
allocated per unit interval
:yields: random nubers following the distribution of f
"""
increment_count = increment_borders.size - 1
increment_lenghts = increment_borders[1:] - increment_borders[:-1]
2020-04-08 16:48:46 +02:00
weights = increment_count * increment_lenghts
increment_chunk = int(chunk_size / increment_count)
chunk_size = increment_chunk * increment_count
2020-04-05 21:12:02 +02:00
2020-04-08 17:00:11 +02:00
upper_bound = np.array(
[
find_upper_bound(
lambda x: f(x, **kwargs) * weight, [left_border, right_border]
)
for weight, left_border, right_border in zip(
weights, increment_borders[:-1], increment_borders[1:]
)
]
).max()
2020-04-05 21:12:02 +02:00
total_samples = 0
total_accepted = 0
while True:
2020-04-08 17:00:11 +02:00
increment_samples = np.random.uniform(
increment_borders[:-1],
increment_borders[1:],
[increment_chunk, increment_count],
)
increment_y_samples = np.random.uniform(
0, 1, [increment_chunk, increment_count]
)
2020-04-08 16:48:46 +02:00
f_weighted = f(increment_samples) * weights # numpy magic at work here
mask = f_weighted > increment_y_samples * upper_bound
2020-04-05 21:12:02 +02:00
if report_efficiency:
total_samples += chunk_size
total_accepted += np.count_nonzero(mask)
for point in increment_samples[mask]:
2020-04-08 17:00:11 +02:00
yield (
point,
total_accepted / total_samples,
) if report_efficiency else point
2020-04-08 16:23:34 +02:00
2020-04-22 11:26:13 +02:00
class SamplingWorker:
def __init__(self, num_samples, **kwargs):
self.num_samples = num_samples
self.kwargs = kwargs
def draw_sample(self):
global _FUN
seed = int.from_bytes(os.urandom(2), "big")
return sample_unweighted_array(self.num_samples, _FUN, **self.kwargs, seed=seed)
_FUN = None
2020-04-24 15:01:39 +02:00
@utility.numpy_cache("cache")
2020-04-08 17:00:11 +02:00
def sample_unweighted_array(
2020-04-20 10:19:39 +02:00
num,
f,
interval=None,
increment_borders=None,
report_efficiency=False,
2020-04-22 11:26:13 +02:00
proc=None,
2020-05-03 11:08:37 +02:00
status_path=None,
**kwargs,
2020-04-08 17:00:11 +02:00
):
"""Sample `num` elements from a distribution. The rest of the
arguments is analogous to `sample_unweighted`.
"""
2020-04-22 11:26:13 +02:00
global _FUN
2020-04-20 10:19:39 +02:00
sample_arr = None
2020-04-22 11:26:13 +02:00
eff = None
2020-05-03 11:08:37 +02:00
# some multiprocessing magic
2020-04-22 11:26:13 +02:00
if proc is not None:
if isinstance(proc, str) and proc == "auto":
2020-05-05 13:42:31 +02:00
proc = cpu_count()
2020-04-22 11:26:13 +02:00
result = None
2020-04-22 16:11:53 +02:00
num_per_worker = int(num / proc + 1)
2020-04-22 11:26:13 +02:00
_FUN = f # there is no other way :(
workers = [
SamplingWorker(
2020-04-22 16:11:53 +02:00
num_samples=num_per_worker,
2020-04-22 11:26:13 +02:00
interval=interval,
increment_borders=increment_borders,
report_efficiency=report_efficiency,
2020-05-03 11:08:37 +02:00
status_path=status_path,
2020-04-22 11:26:13 +02:00
proc=None,
cache=None,
**kwargs,
)
2020-04-22 11:26:13 +02:00
for _ in range(proc)
]
with Pool(proc) as p:
result = p.map(SamplingWorker.draw_sample, workers)
result = np.array(result)
if report_efficiency:
eff = result[:, 1].mean()
2020-04-22 16:11:53 +02:00
sample_arr = np.concatenate(result[:, 0])[0:num]
2020-04-20 10:19:39 +02:00
else:
2020-04-22 16:11:53 +02:00
sample_arr = np.concatenate(result)[0:num]
2020-04-20 10:19:39 +02:00
2020-04-22 11:26:13 +02:00
else:
samples = None
if interval is not None:
interval = np.array(interval)
vectorized = len(interval.shape) > 1
sample_arr = np.empty((num, interval.shape[0]) if vectorized else num)
if len(interval.shape) > 1:
samples = sample_unweighted_vector(
f,
interval,
report_efficiency=report_efficiency,
total=num,
**kwargs,
2020-04-22 11:26:13 +02:00
)
else:
if "chunk_size" not in kwargs:
kwargs["chunk_size"] = num * 10
samples = sample_unweighted(
f, interval, report_efficiency=report_efficiency, **kwargs
)
elif increment_borders is not None:
2020-05-08 11:05:28 +02:00
if isinstance(increment_borders[0], collections.Iterable):
sample_arr = np.empty((num, len(increment_borders)))
2020-05-07 18:30:12 +02:00
samples = sample_stratified_vector(
f, increment_borders, report_efficiency=report_efficiency, **kwargs,
)
else:
sample_arr = np.empty(num)
samples = sample_stratified(
f,
increment_borders=increment_borders,
report_efficiency=report_efficiency,
**kwargs,
)
2020-04-22 11:26:13 +02:00
else:
raise TypeError("Neiter interval nor increment_borders specified!")
2020-04-20 10:19:39 +02:00
2020-05-03 11:08:37 +02:00
fifo = None
if status_path:
if not pathlib.Path(status_path).exists():
os.mkfifo(status_path)
fifo = open(status_path, "w")
start_time = time.monotonic()
last_time = start_time
2020-04-22 11:26:13 +02:00
for i, sample in zip(range(num), samples):
2020-05-03 11:08:37 +02:00
if fifo:
this_time = time.monotonic()
if this_time - last_time > 1:
δ = this_time - start_time
total = i + 1
speed = total / δ
try:
fifo.write(
f"{os.getpid()}: {i:<10} "
f"{num-total:<10} {speed:4.1f} 1/sec "
f"{(total / num) * 100:2.1f}%"
f"{(num-total)/speed/60:4.0f} min\n"
)
fifo.flush()
except BrokenPipeError:
pass
last_time = this_time # after the work is before the work is ...
2020-04-22 11:26:13 +02:00
if report_efficiency:
sample_arr[i], _ = sample
else:
sample_arr[i] = sample
2020-04-22 11:26:13 +02:00
eff = next(samples)[1] if report_efficiency else None
2020-05-03 11:08:37 +02:00
fifo and fifo.close()
2020-04-22 11:26:13 +02:00
return (sample_arr, eff) if report_efficiency else sample_arr
2020-05-07 09:27:30 +02:00
# Geezus! Such code dublication..., but I don't want to break
# backwards compatibility.
2020-05-07 15:13:14 +02:00
@utility.numpy_cache("cache")
2020-05-07 09:27:30 +02:00
def integrate_vegas_nd(
f,
intervals,
2020-05-07 09:27:30 +02:00
seed=None,
num_increments=5,
epsilon=1e-3,
increment_epsilon=1e-2,
alpha=1.5,
vegas_point_density=1000,
2020-05-07 09:58:05 +02:00
proc="auto",
2020-05-07 15:13:14 +02:00
num_points_per_cube=None,
2020-05-07 09:27:30 +02:00
**kwargs,
) -> VegasIntegrationResult:
"""Integrate the given function (in n-dimensions) with the vegas
algorithm to reduce variance. This implementation follows the
description given in JOURNAL OF COMPUTATIONAL 27, 192-203 (1978).
:param f: function of one variable, kwargs are passed to it
:param tuple interval: a 2-tuple of numbers, specifiying the
integration range
:param seed: the seed for the rng, if not specified, the system
time is used
:param num_increments: the number increments in which to divide
the interval
:param point_density: the number of random points per unit
interval
:param increment_epsilon: the breaking condition, if the magnitude
of the difference between the increment positions in
subsequent iterations does not change more then epsilon the
computation is considered to have converged
:param alpha: controls the the convergence speed, should be
between 1 and 2 (the lower the faster)
:returns: the intregal, the standard deviation, an array of
increment borders which can be used in subsequent
sampling
:rtype: tuple
"""
intervals = np.asarray(_process_interval(intervals))
ndim = len(intervals)
2020-05-07 09:27:30 +02:00
integration_volume = (intervals[:, 1] - intervals[:, 0]).prod()
2020-05-08 11:05:28 +02:00
if not isinstance(num_increments, collections.Iterable):
num_increments = np.ones(ndim).astype(int) * num_increments
2020-05-08 11:05:28 +02:00
else:
num_increments = np.asarray(num_increments)
num_cubes = num_increments.prod()
2020-05-07 09:27:30 +02:00
if seed:
np.random.seed(seed)
2020-05-07 09:58:05 +02:00
if proc == "auto":
proc = cpu_count()
2020-05-07 09:27:30 +02:00
# no clever logic is being used to define the vegas iteration
# sample density for the sake of simplicity
2020-05-07 15:13:14 +02:00
points_per_cube = num_points_per_cube or int(
vegas_point_density * integration_volume / num_cubes
)
2020-05-07 09:27:30 +02:00
# start with equally sized intervals
increment_borders = [
2020-05-08 11:05:28 +02:00
np.linspace(*interval, num_increments[i] + 1, endpoint=True)
for i, interval in enumerate(intervals)
2020-05-07 09:27:30 +02:00
]
def evaluate_stripe(interval_borders):
cubes = generate_cubes(interval_borders)
result = 0
for cube in cubes:
vol = get_integration_volume(cube)
points = np.random.uniform(
cube[:, 0], cube[:, 1], (points_per_cube, ndim)
).T
result += ((f(*points, **kwargs) * vol) ** 2).sum()
return np.sqrt(result)
def generate_integral_steps(interval_borders, dimension):
borders = interval_borders[dimension]
stripes = np.array([borders[:-1], borders[1:]]).T
ms = np.empty(len(stripes))
2020-05-07 15:13:14 +02:00
2020-05-07 09:27:30 +02:00
for i, stripe in enumerate(stripes):
interval_borders[dimension] = stripe
ms[i] = evaluate_stripe(interval_borders)
# * num_increments gets normalized away
return ms / ms.sum()
2020-05-09 20:29:30 +02:00
K = 1000 * num_increments
2020-05-07 09:27:30 +02:00
vegas_iterations, integral, variance = 0, 0, 0
2020-05-09 20:29:30 +02:00
2020-05-07 09:27:30 +02:00
while True:
vegas_iterations += 1
2020-05-09 20:29:30 +02:00
new_increment_borders = []
remainder = 0
2020-05-08 15:50:33 +02:00
2020-05-07 09:27:30 +02:00
for dim in range(ndim):
increment_weights = generate_integral_steps(increment_borders.copy(), dim)
nonzero_increments = increment_weights[increment_weights > 0]
remainder += (nonzero_increments.max() - nonzero_increments.min()) * len(
nonzero_increments
)
2020-05-07 09:27:30 +02:00
new_borders = reshuffle_increments_nd(
2020-05-08 11:05:28 +02:00
increment_weights,
num_increments[dim],
increment_borders[dim],
alpha,
2020-05-09 20:29:30 +02:00
K[dim],
2020-05-07 09:27:30 +02:00
)
2020-05-09 20:29:30 +02:00
new_increment_borders.append(new_borders)
remainder /= ndim
2020-05-09 20:29:30 +02:00
increment_borders = new_increment_borders
if abs(remainder) < increment_epsilon:
2020-05-07 09:27:30 +02:00
break
# brute force increase of the sample size
cubes = generate_cubes(increment_borders)
volumes = [get_integration_volume(cube) for cube in cubes]
cube_samples = [np.empty(0) for _ in cubes]
2020-05-07 15:13:14 +02:00
total_points_per_cube = 0
maximum = None
2020-05-07 09:27:30 +02:00
2020-05-07 15:13:14 +02:00
@joblib.delayed
2020-05-07 09:58:05 +02:00
def evaluate_integrand(cube):
points = np.random.uniform(cube[:, 0], cube[:, 1], (points_per_cube, ndim)).T
return f(*points, **kwargs)
2020-05-07 09:27:30 +02:00
2020-05-07 09:58:05 +02:00
with joblib.Parallel(n_jobs=proc) as parallel:
while True:
integral = variance = 0
total_points_per_cube += points_per_cube
2020-05-07 09:27:30 +02:00
2020-05-07 15:13:14 +02:00
samples = parallel([evaluate_integrand(cube) for cube in cubes])
2020-05-07 09:27:30 +02:00
2020-05-07 09:58:05 +02:00
for sample, vol, i in zip(samples, volumes, range(num_cubes)):
# let's re-use the samples from earlier runs
cube_samples[i] = np.concatenate([cube_samples[i], sample]).T
2020-05-07 15:13:14 +02:00
points = cube_samples[i] * vol
integral += points.mean()
2020-05-07 09:58:05 +02:00
variance += (
cube_samples[i].var() * (vol ** 2) / (total_points_per_cube - 1)
)
2020-05-07 15:28:33 +02:00
local_maximum = points.max() * num_cubes
2020-05-07 15:13:14 +02:00
if not maximum or local_maximum > maximum:
maximum = local_maximum
2020-05-07 09:58:05 +02:00
if np.sqrt(variance) <= epsilon:
break
2020-05-07 09:27:30 +02:00
2020-05-07 09:58:05 +02:00
# adaptive scaling of sample size incrementation
2020-05-07 15:13:14 +02:00
points_per_cube *= int((variance / epsilon ** 2) * 1.1)
2020-05-07 09:27:30 +02:00
return VegasIntegrationResult(
integral,
np.sqrt(variance),
2020-05-07 15:13:14 +02:00
total_points_per_cube * num_cubes,
2020-05-07 09:27:30 +02:00
increment_borders,
vegas_iterations,
2020-05-07 15:13:14 +02:00
maximum,
2020-05-07 09:27:30 +02:00
)
def generate_cubes(interval_borders):
"""Given an array of interval borders, return a list of hypercube
edges to fill the whole volume."""
intervals = [np.array([border[:-1], border[1:]]).T for border in interval_borders]
ndim = len(intervals)
axis = np.arange(ndim)
mesh_axes = [np.arange(len(ax_intervals)) for ax_intervals in intervals]
grid = np.array(np.meshgrid(*mesh_axes)).T
grid = grid.reshape(int(grid.size / ndim), ndim).astype(int)
return np.array(
[[intervals[n][i] for (n, i) in zip(axis, indices)] for indices in grid]
)
def get_integration_volume(interval):
return (interval[:, 1] - interval[:, 0]).prod()
def reshuffle_increments_nd(
μ, num_increments, increment_borders, alpha, K,
):
2020-05-08 11:05:28 +02:00
if num_increments == 1:
return increment_borders
2020-05-07 09:27:30 +02:00
# alpha controls the convergence speed
2020-05-07 15:13:14 +02:00
new_increments = np.empty_like(μ)
if np.any(μ == 1):
return increment_borders
mask = μ > 0
μ = μ[mask]
2020-05-08 15:50:33 +02:00
new_increments[mask] = (K * ((μ - 1) / (np.log(μ))) ** alpha).astype(int)
2020-05-09 20:29:30 +02:00
new_increments[np.logical_not(mask)] = 1
group_size = int(new_increments.sum() / num_increments)
2020-05-07 15:13:14 +02:00
2020-05-07 09:27:30 +02:00
new_increment_borders = np.empty_like(increment_borders[1:-1])
interval_lengths = increment_borders[1:] - increment_borders[:-1]
# this whole code does a very simple thing: it eats up
# sub-increments until it has `group_size` of them
i = 0 # position in increment count list
j = 0 # position in new_incerement_borders
# the number of sub-increments still available
rest = new_increments[0]
# the number of sub-increments needed to fill one increment
head = group_size
# the current position in the interval relative to its
# beginning
current = 0
while i < num_increments and (j < (num_increments - 1)):
if new_increments[i] == 0:
i += 1
rest = new_increments[i]
current_increment_size = interval_lengths[i] / new_increments[i]
if head <= rest:
current += head * current_increment_size
new_increment_borders[j] = current
rest -= head
head = group_size
j += 1
else:
current += rest * current_increment_size
head -= rest
i += 1
rest = new_increments[i]
return (
2020-05-07 15:13:14 +02:00
np.array(
[0, *new_increment_borders, increment_borders[-1] - increment_borders[0]]
)
2020-05-07 09:27:30 +02:00
+ increment_borders[0]
)
2020-05-07 18:30:12 +02:00
# NOTE: Finding the upper bound happens in the Integration step!
def sample_stratified_vector(
f, increment_borders, upper_bound, seed=None, report_efficiency=False
):
ndim = len(increment_borders)
if seed:
np.random.seed(seed)
def allocate_random_chunk(cube):
return np.random.uniform([*cube[:, 0], 0], [*cube[:, 1], 1], [1, 1 + ndim],)
total_points = 0
total_accepted = 0
cubes = generate_cubes(increment_borders)
num_cubes = len(cubes)
while True:
np.random.shuffle(cubes)
for cube in cubes:
volume = get_integration_volume(cube)
points = allocate_random_chunk(cube)
if report_efficiency:
total_points += 1
args = points[0][:-1]
if f(*args) * num_cubes * volume >= points[0][-1] * upper_bound:
if report_efficiency:
total_accepted += 1
yield (
args.T,
total_accepted / total_points,
) if report_efficiency else args.T
return