# API Reference¶

## Inference¶

### sbi.inference.base.infer(simulator, prior, method, num_simulations, num_workers=1)¶

Runs simulation-based inference and returns the posterior.

This function provides a simple interface to run sbi. Inference is run for a single round and hence the returned posterior $$p(\theta|x)$$ can be sampled and evaluated for any $$x$$ (i.e. it is amortized).

The scope of this function is limited to the most essential features of sbi. For more flexibility (e.g. multi-round inference, different density estimators) please use the flexible interface described here: https://www.mackelab.org/sbi/tutorial/02_flexible_interface/

Parameters:

Name Type Description Default
simulator Callable

A function that takes parameters $$\theta$$ and maps them to simulations, or observations, x, $$\mathrm{sim}(\theta)\to x$$. Any regular Python callable (i.e. function or class with __call__ method) can be used.

required
prior Distribution

A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used.

required
method str

What inference method to use. Either of SNPE, SNLE or SNRE.

required
num_simulations int

Number of simulation calls. More simulations means a longer runtime, but a better posterior estimate.

required
num_workers int

Number of parallel workers to use for simulations.

1

Returns: Posterior over parameters conditional on observations (amortized).

Source code in sbi/inference/base.py
def infer(
simulator: Callable,
prior: Distribution,
method: str,
num_simulations: int,
num_workers: int = 1,
) -> NeuralPosterior:
r"""Runs simulation-based inference and returns the posterior.

This function provides a simple interface to run sbi. Inference is run for a single
round and hence the returned posterior $p(\theta|x)$ can be sampled and evaluated
for any $x$ (i.e. it is amortized).

The scope of this function is limited to the most essential features of sbi. For
more flexibility (e.g. multi-round inference, different density estimators) please
use the flexible interface described here:
https://www.mackelab.org/sbi/tutorial/02_flexible_interface/

Args:
simulator: A function that takes parameters $\theta$ and maps them to
simulations, or observations, x, $\mathrm{sim}(\theta)\to x$. Any
regular Python callable (i.e. function or class with __call__ method)
can be used.
prior: A probability distribution that expresses prior knowledge about the
parameters, e.g. which ranges are meaningful for them. Any
object with .log_prob()and .sample() (for example, a PyTorch
distribution) can be used.
method: What inference method to use. Either of SNPE, SNLE or SNRE.
num_simulations: Number of simulation calls. More simulations means a longer
runtime, but a better posterior estimate.
num_workers: Number of parallel workers to use for simulations.

Returns: Posterior over parameters conditional on observations (amortized).
"""

try:
method_fun: Callable = getattr(sbi.inference, method.upper())
except AttributeError:
raise NameError(
"Method not available. method must be one of 'SNPE', 'SNLE', 'SNRE'."
)

simulator, prior = prepare_for_sbi(simulator, prior)

inference = method_fun(prior=prior)
theta, x = simulate_for_sbi(
simulator=simulator,
proposal=prior,
num_simulations=num_simulations,
num_workers=num_workers,
)
_ = inference.append_simulations(theta, x).train()
posterior = inference.build_posterior()

return posterior


### sbi.utils.user_input_checks.prepare_for_sbi(simulator, prior)¶

Prepare simulator, prior and for usage in sbi.

One of the goals is to allow you to use sbi with inputs computed in numpy.

Attempts to meet the following requirements by reshaping and type-casting:

• the simulator function receives as input and returns a Tensor.
• the simulator can simulate batches of parameters and return batches of data.
• the prior does not produce batches and samples and evaluates to Tensor.
• the output shape is a torch.Size((1,N)) (i.e, has a leading batch dimension 1).

If this is not possible, a suitable exception will be raised.

Parameters:

Name Type Description Default
simulator Callable

Simulator as provided by the user.

required
prior

Prior as provided by the user.

required

Returns:

Type Description
Tuple[Callable, torch.distributions.distribution.Distribution]

Tuple (simulator, prior, x_shape) checked and matching the requirements of sbi.

Source code in sbi/utils/user_input_checks.py
def prepare_for_sbi(simulator: Callable, prior) -> Tuple[Callable, Distribution]:
"""Prepare simulator, prior and for usage in sbi.

One of the goals is to allow you to use sbi with inputs computed in numpy.

Attempts to meet the following requirements by reshaping and type-casting:

- the simulator function receives as input and returns a Tensor.<br/>
- the simulator can simulate batches of parameters and return batches of data.<br/>
- the prior does not produce batches and samples and evaluates to Tensor.<br/>
- the output shape is a torch.Size((1,N)) (i.e, has a leading batch dimension 1).

If this is not possible, a suitable exception will be raised.

Args:
simulator: Simulator as provided by the user.
prior: Prior as provided by the user.

Returns:
Tuple (simulator, prior, x_shape) checked and matching the requirements of sbi.
"""

# Check prior, return PyTorch prior.
prior, _, prior_returns_numpy = process_prior(prior)

# Check simulator, returns PyTorch simulator able to simulate batches.
simulator = process_simulator(simulator, prior, prior_returns_numpy)

# Consistency check after making ready for sbi.
check_sbi_inputs(simulator, prior)

return simulator, prior


### sbi.inference.base.simulate_for_sbi(simulator, proposal, num_simulations, num_workers=1, simulation_batch_size=1, show_progress_bar=True)¶

Returns ($$\theta, x$$) pairs obtained from sampling the proposal and simulating.

This function performs two steps:

• Sample parameters $$\theta$$ from the proposal.
• Simulate these parameters to obtain $$x$$.

Parameters:

Name Type Description Default
simulator Callable

A function that takes parameters $$\theta$$ and maps them to simulations, or observations, x, $$\text{sim}(\theta)\to x$$. Any regular Python callable (i.e. function or class with __call__ method) can be used.

required
proposal Any

Probability distribution that the parameters $$\theta$$ are sampled from.

required
num_simulations int

Number of simulations that are run.

required
num_workers int

Number of parallel workers to use for simulations.

1
simulation_batch_size int

Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension).

1
show_progress_bar bool

Whether to show a progress bar for simulating. This will not affect whether there will be a progressbar while drawing samples from the proposal.

True

Returns: Sampled parameters $$\theta$$ and simulation-outputs $$x$$.

Source code in sbi/inference/base.py
def simulate_for_sbi(
simulator: Callable,
proposal: Any,
num_simulations: int,
num_workers: int = 1,
simulation_batch_size: int = 1,
show_progress_bar: bool = True,
) -> Tuple[Tensor, Tensor]:
r"""Returns ($\theta, x$) pairs obtained from sampling the proposal and simulating.

This function performs two steps:

- Sample parameters $\theta$ from the proposal.
- Simulate these parameters to obtain $x$.

Args:
simulator: A function that takes parameters $\theta$ and maps them to
simulations, or observations, x, $\text{sim}(\theta)\to x$. Any
regular Python callable (i.e. function or class with __call__ method)
can be used.
proposal: Probability distribution that the parameters $\theta$ are sampled
from.
num_simulations: Number of simulations that are run.
num_workers: Number of parallel workers to use for simulations.
simulation_batch_size: Number of parameter sets that the simulator
maps to data x at once. If None, we simulate all parameter sets at the
same time. If >= 1, the simulator has to process data of shape
(simulation_batch_size, parameter_dimension).
show_progress_bar: Whether to show a progress bar for simulating. This will not
affect whether there will be a progressbar while drawing samples from the
proposal.

Returns: Sampled parameters $\theta$ and simulation-outputs $x$.
"""

theta = proposal.sample((num_simulations,))

x = simulate_in_batches(
simulator, theta, simulation_batch_size, num_workers, show_progress_bar
)

return theta, x


###  sbi.inference.snpe.snpe_a.SNPE_A (PosteriorEstimator) ¶

Source code in sbi/inference/snpe/snpe_a.py
class SNPE_A(PosteriorEstimator):
def __init__(
self,
prior: Optional[Distribution] = None,
density_estimator: Union[str, Callable] = "mdn_snpe_a",
num_components: int = 10,
device: str = "cpu",
logging_level: Union[int, str] = "WARNING",
summary_writer: Optional[TensorboardSummaryWriter] = None,
show_progress_bars: bool = True,
):
r"""SNPE-A .

 _Fast epsilon-free Inference of Simulation Models with Bayesian Conditional
Density Estimation_, Papamakarios et al., NeurIPS 2016,
https://arxiv.org/abs/1605.06376.

This class implements SNPE-A. SNPE-A trains across multiple rounds with a
maximum-likelihood-loss. This will make training converge to the proposal
posterior instead of the true posterior. To correct for this, SNPE-A applies a
post-hoc correction after training. This correction has to be performed
analytically. Thus, SNPE-A is limited to Gaussian distributions for all but the
last round. In the last round, SNPE-A can use a Mixture of Gaussians.

Args:
prior: A probability distribution that expresses prior knowledge about the
parameters, e.g. which ranges are meaningful for them. Any
object with .log_prob()and .sample() (for example, a PyTorch
distribution) can be used.
density_estimator: If it is a string (only "mdn_snpe_a" is valid), use a
pre-configured mixture of densities network. Alternatively, a function
that builds a custom neural network can be provided. The function will
be called with the first batch of simulations (theta, x), which can
thus be used for shape inference and potentially for z-scoring. It
needs to return a PyTorch nn.Module implementing the density
estimator. The density estimator needs to provide the methods
.log_prob and .sample(). Note that until the last round only a
single (multivariate) Gaussian component is used for training (see
Algorithm 1 in ). In the last round, this component is replicated
num_components times, its parameters are perturbed with a very small
noise, and then the last training round is done with the expanded
Gaussian mixture as estimator for the proposal posterior.
num_components: Number of components of the mixture of Gaussians in the
last round. This overrides the num_components value passed to
posterior_nn().
device: Training device, e.g., "cpu", "cuda" or "cuda:{0, 1, ...}".
logging_level: Minimum severity of messages to log. One of the strings
INFO, WARNING, DEBUG, ERROR and CRITICAL.
summary_writer: A tensorboard SummaryWriter to control, among others, log
file location (default is <current working directory>/logs.)
show_progress_bars: Whether to show a progressbar during training.
"""

# Catch invalid inputs.
if not ((density_estimator == "mdn_snpe_a") or callable(density_estimator)):
raise TypeError(
"The density_estimator passed to SNPE_A needs to be a "
"callable or the string 'mdn_snpe_a'!"
)

# num_components will be used to replicate the Gaussian in the last round.
self._num_components = num_components
self._ran_final_round = False

# WARNING: sneaky trick ahead. We proxy the parent's train here,
# requiring the signature to have num_atoms, save it for use below, and
# continue. It's sneaky because we are using the object (self) as a namespace
# to pass arguments between functions, and that's implicit state management.
kwargs = utils.del_entries(
locals(),
entries=("self", "__class__", "num_components"),
)
super().__init__(**kwargs)

def train(
self,
final_round: bool = False,
training_batch_size: int = 50,
learning_rate: float = 5e-4,
validation_fraction: float = 0.1,
stop_after_epochs: int = 20,
max_num_epochs: int = 2**31 - 1,
clip_max_norm: Optional[float] = 5.0,
calibration_kernel: Optional[Callable] = None,
resume_training: bool = False,
force_first_round_loss: bool = False,
retrain_from_scratch: bool = False,
show_train_summary: bool = False,
dataloader_kwargs: Optional[Dict] = None,
component_perturbation: float = 5e-3,
) -> nn.Module:
r"""Return density estimator that approximates the proposal posterior.

 _Fast epsilon-free Inference of Simulation Models with Bayesian Conditional
Density Estimation_, Papamakarios et al., NeurIPS 2016,
https://arxiv.org/abs/1605.06376.

Training is performed with maximum likelihood on samples from the latest round,
which leads the algorithm to converge to the proposal posterior.

Args:
final_round: Whether we are in the last round of training or not. For all
but the last round, Algorithm 1 from  is executed. In last the
round, Algorithm 2 from  is executed once.
training_batch_size: Training batch size.
learning_rate: Learning rate for Adam optimizer.
validation_fraction: The fraction of data to use for validation.
stop_after_epochs: The number of epochs to wait for improvement on the
validation set before terminating training.
max_num_epochs: Maximum number of epochs to run. If reached, we stop
training even when the validation loss is still decreasing. Otherwise,
we train until validation loss increases (see also stop_after_epochs).
clip_max_norm: Value at which to clip the total gradient norm in order to
prevent exploding gradients. Use None for no clipping.
calibration_kernel: A function to calibrate the loss with respect to the
simulations x. See Lueckmann, Gonçalves et al., NeurIPS 2017.
resume_training: Can be used in case training time is limited, e.g. on a
cluster. If True, the split between train and validation set, the
optimizer, the number of epochs, and the best validation log-prob will
be restored from the last time .train() was called.
force_first_round_loss: If True, train with maximum likelihood,
i.e., potentially ignoring the correction for using a proposal
distribution different from the prior.
force_first_round_loss: If True, train with maximum likelihood,
regardless of the proposal distribution.
retrain_from_scratch: Whether to retrain the conditional density
estimator for the posterior from scratch each round. Not supported for
SNPE-A.
show_train_summary: Whether to print the number of epochs and validation
loss and leakage after the training.
dataloader_kwargs: Additional or updated kwargs to be passed to the training
and validation dataloaders (like, e.g., a collate_fn)
component_perturbation: The standard deviation applied to all weights and
biases when, in the last round, the Mixture of Gaussians is build from
a single Gaussian. This value can be problem-specific and also depends
on the number of mixture components.

Returns:
Density estimator that approximates the distribution $p(\theta|x)$.
"""

assert not retrain_from_scratch, """Retraining from scratch is not supported in SNPE-A yet. The reason for
this is that, if we reininitialized the density estimator, the z-scoring would
change, which would break the posthoc correction. This is a pure implementation
issue."""

kwargs = utils.del_entries(
locals(),
entries=("self", "__class__", "final_round", "component_perturbation"),
)

# SNPE-A always discards the prior samples.

self._round = max(self._data_round_index)

if final_round:
# If there is (will be) only one round, train with Algorithm 2 from .
if self._round == 0:
self._build_neural_net = partial(
self._build_neural_net, num_components=self._num_components
)
# Run Algorithm 2 from .
elif not self._ran_final_round:
# Now switch to the specified number of components. This method will
# only be used if retrain_from_scratch=True. Otherwise,
# the MDN will be built from replicating the single-component net for
# num_component times (via _expand_mog()).
self._build_neural_net = partial(
self._build_neural_net, num_components=self._num_components
)

# Extend the MDN to the originally desired number of components.
self._expand_mog(eps=component_perturbation)
else:
warnings.warn(
"You have already run SNPE-A with final_round=True. Running it"
"again with this setting will not allow computing the posthoc"
"correction applied in SNPE-A. Thus, you will get an error when "
"calling .build_posterior() after training.",
UserWarning,
)
else:
# Run Algorithm 1 from .
# Wrap the function that builds the MDN such that we can make
# sure that there is only one component when running.
self._build_neural_net = partial(self._build_neural_net, num_components=1)

if final_round:
self._ran_final_round = True

return super().train(**kwargs)

def correct_for_proposal(
self,
density_estimator: Optional[TorchModule] = None,
) -> "SNPE_A_MDN":
r"""Build mixture of Gaussians that approximates the posterior.

Returns a SNPE_A_MDN object, which applies the posthoc-correction required in
SNPE-A.

Args:
density_estimator: The density estimator that the posterior is based on.
If None, use the latest neural density estimator that was trained.

Returns:
Posterior $p(\theta|x)$  with .sample() and .log_prob() methods.
"""
if density_estimator is None:
density_estimator = deepcopy(
self._neural_net
)  # PosteriorEstimator.train() also returns a deepcopy, mimic this here
# If internal net is used device is defined.
device = self._device
else:
# Otherwise, infer it from the device of the net parameters.
device = str(next(density_estimator.parameters()).device)

# Set proposal of the density estimator.
# This also evokes the z-scoring correction if necessary.
if (
self._proposal_roundwise[-1] is self._prior
or self._proposal_roundwise[-1] is None
):
proposal = self._prior
assert isinstance(
proposal, (MultivariateNormal, utils.BoxUniform)
), """Prior must be torch.distributions.MultivariateNormal or sbi.utils.
BoxUniform"""
else:
assert isinstance(
self._proposal_roundwise[-1], DirectPosterior
), """The proposal you passed to append_simulations is neither the prior
nor a DirectPosterior. SNPE-A currently only supports these scenarios.
"""
proposal = self._proposal_roundwise[-1]

# Create the SNPE_A_MDN
wrapped_density_estimator = SNPE_A_MDN(
flow=density_estimator, proposal=proposal, prior=self._prior, device=device
)
return wrapped_density_estimator

def build_posterior(
self,
density_estimator: Optional[TorchModule] = None,
prior: Optional[Distribution] = None,
) -> "DirectPosterior":
r"""Build posterior from the neural density estimator.

This method first corrects the estimated density with correct_for_proposal
and then returns a DirectPosterior.

Args:
density_estimator: The density estimator that the posterior is based on.
If None, use the latest neural density estimator that was trained.
prior: Prior distribution.

Returns:
Posterior $p(\theta|x)$  with .sample() and .log_prob() methods.
"""
if prior is None:
assert (
self._prior is not None
), """You did not pass a prior. You have to pass the prior either at
initialization inference = SNPE_A(prior) or to .build_posterior
(prior=prior)."""
prior = self._prior

wrapped_density_estimator = self.correct_for_proposal(
density_estimator=density_estimator
)
self._posterior = DirectPosterior(
posterior_estimator=wrapped_density_estimator,
prior=prior,
)
return deepcopy(self._posterior)

def _log_prob_proposal_posterior(
self,
theta: Tensor,
x: Tensor,
proposal: Optional[Any],
) -> Tensor:
"""Return the log-probability of the proposal posterior.

For SNPE-A this is the same as self._neural_net.log_prob(theta, x) in
_loss() to be found in snpe_base.py.

Args:
theta: Batch of parameters θ.
x: Batch of data.
masks: Mask that is True for prior samples in the batch in order to train
them with prior loss.
proposal: Proposal distribution.

Returns: Log-probability of the proposal posterior.
"""
return self._neural_net.log_prob(theta, x)

def _expand_mog(self, eps: float = 1e-5):
"""
Replicate a singe Gaussian trained with Algorithm 1 before continuing
with Algorithm 2. The weights and biases of the associated MDN layers
are repeated num_components times, slightly perturbed to break the
symmetry such that the gradients in the subsequent training are not
all identical.

Args:
eps: Standard deviation for the random perturbation.
"""
assert isinstance(self._neural_net._distribution, MultivariateGaussianMDN)

# Increase the number of components
self._neural_net._distribution._num_components = self._num_components

# Expand the 1-dim Gaussian.
for name, param in self._neural_net.named_parameters():
if any(
key in name for key in ["logits", "means", "unconstrained", "upper"]
):
if "bias" in name:
param.data = param.data.repeat(self._num_components)
param.grad = None  # let autograd construct a new gradient
elif "weight" in name:
param.data = param.data.repeat(self._num_components, 1)
param.grad = None  # let autograd construct a new gradient


#### __init__(self, prior=None, density_estimator='mdn_snpe_a', num_components=10, device='cpu', logging_level='WARNING', summary_writer=None, show_progress_bars=True) special ¶

SNPE-A .

 Fast epsilon-free Inference of Simulation Models with Bayesian Conditional Density Estimation, Papamakarios et al., NeurIPS 2016, https://arxiv.org/abs/1605.06376.

This class implements SNPE-A. SNPE-A trains across multiple rounds with a maximum-likelihood-loss. This will make training converge to the proposal posterior instead of the true posterior. To correct for this, SNPE-A applies a post-hoc correction after training. This correction has to be performed analytically. Thus, SNPE-A is limited to Gaussian distributions for all but the last round. In the last round, SNPE-A can use a Mixture of Gaussians.

Parameters:

Name Type Description Default
prior Optional[torch.distributions.distribution.Distribution]

A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used.

None
density_estimator Union[str, Callable]

If it is a string (only “mdn_snpe_a” is valid), use a pre-configured mixture of densities network. Alternatively, a function that builds a custom neural network can be provided. The function will be called with the first batch of simulations (theta, x), which can thus be used for shape inference and potentially for z-scoring. It needs to return a PyTorch nn.Module implementing the density estimator. The density estimator needs to provide the methods .log_prob and .sample(). Note that until the last round only a single (multivariate) Gaussian component is used for training (see Algorithm 1 in ). In the last round, this component is replicated num_components times, its parameters are perturbed with a very small noise, and then the last training round is done with the expanded Gaussian mixture as estimator for the proposal posterior.

'mdn_snpe_a'
num_components int

Number of components of the mixture of Gaussians in the last round. This overrides the num_components value passed to posterior_nn().

10
device str

Training device, e.g., “cpu”, “cuda” or “cuda:{0, 1, …}”.

'cpu'
logging_level Union[int, str]

Minimum severity of messages to log. One of the strings INFO, WARNING, DEBUG, ERROR and CRITICAL.

'WARNING'
summary_writer Optional[Writer]

A tensorboard SummaryWriter to control, among others, log file location (default is <current working directory>/logs.)

None
show_progress_bars bool

Whether to show a progressbar during training.

True
Source code in sbi/inference/snpe/snpe_a.py
def __init__(
self,
prior: Optional[Distribution] = None,
density_estimator: Union[str, Callable] = "mdn_snpe_a",
num_components: int = 10,
device: str = "cpu",
logging_level: Union[int, str] = "WARNING",
summary_writer: Optional[TensorboardSummaryWriter] = None,
show_progress_bars: bool = True,
):
r"""SNPE-A .

 _Fast epsilon-free Inference of Simulation Models with Bayesian Conditional
Density Estimation_, Papamakarios et al., NeurIPS 2016,
https://arxiv.org/abs/1605.06376.

This class implements SNPE-A. SNPE-A trains across multiple rounds with a
maximum-likelihood-loss. This will make training converge to the proposal
posterior instead of the true posterior. To correct for this, SNPE-A applies a
post-hoc correction after training. This correction has to be performed
analytically. Thus, SNPE-A is limited to Gaussian distributions for all but the
last round. In the last round, SNPE-A can use a Mixture of Gaussians.

Args:
prior: A probability distribution that expresses prior knowledge about the
parameters, e.g. which ranges are meaningful for them. Any
object with .log_prob()and .sample() (for example, a PyTorch
distribution) can be used.
density_estimator: If it is a string (only "mdn_snpe_a" is valid), use a
pre-configured mixture of densities network. Alternatively, a function
that builds a custom neural network can be provided. The function will
be called with the first batch of simulations (theta, x), which can
thus be used for shape inference and potentially for z-scoring. It
needs to return a PyTorch nn.Module implementing the density
estimator. The density estimator needs to provide the methods
.log_prob and .sample(). Note that until the last round only a
single (multivariate) Gaussian component is used for training (see
Algorithm 1 in ). In the last round, this component is replicated
num_components times, its parameters are perturbed with a very small
noise, and then the last training round is done with the expanded
Gaussian mixture as estimator for the proposal posterior.
num_components: Number of components of the mixture of Gaussians in the
last round. This overrides the num_components value passed to
posterior_nn().
device: Training device, e.g., "cpu", "cuda" or "cuda:{0, 1, ...}".
logging_level: Minimum severity of messages to log. One of the strings
INFO, WARNING, DEBUG, ERROR and CRITICAL.
summary_writer: A tensorboard SummaryWriter to control, among others, log
file location (default is <current working directory>/logs.)
show_progress_bars: Whether to show a progressbar during training.
"""

# Catch invalid inputs.
if not ((density_estimator == "mdn_snpe_a") or callable(density_estimator)):
raise TypeError(
"The density_estimator passed to SNPE_A needs to be a "
"callable or the string 'mdn_snpe_a'!"
)

# num_components will be used to replicate the Gaussian in the last round.
self._num_components = num_components
self._ran_final_round = False

# WARNING: sneaky trick ahead. We proxy the parent's train here,
# requiring the signature to have num_atoms, save it for use below, and
# continue. It's sneaky because we are using the object (self) as a namespace
# to pass arguments between functions, and that's implicit state management.
kwargs = utils.del_entries(
locals(),
entries=("self", "__class__", "num_components"),
)
super().__init__(**kwargs)


#### append_simulations(self, theta, x, proposal=None, data_device=None) inherited ¶

Store parameters and simulation outputs to use them for later training.

Data are stored as entries in lists for each type of variable (parameter/data).

Stores $$\theta$$, $$x$$, prior_masks (indicating if simulations are coming from the prior or not) and an index indicating which round the batch of simulations came from.

Parameters:

Name Type Description Default
theta Tensor

Parameter sets.

required
x Tensor

Simulation outputs.

required
proposal Optional[sbi.inference.posteriors.direct_posterior.DirectPosterior]

The distribution that the parameters $$\theta$$ were sampled from. Pass None if the parameters were sampled from the prior. If not None, it will trigger a different loss-function.

None
data_device Optional[str]

Where to store the data, default is on the same device where the training is happening. If training a large dataset on a GPU with not much VRAM can set to ‘cpu’ to store data on system memory instead.

None

Returns:

Type Description
PosteriorEstimator

NeuralInference object (returned so that this function is chainable).

Source code in sbi/inference/snpe/snpe_a.py
def append_simulations(
self,
theta: Tensor,
x: Tensor,
proposal: Optional[DirectPosterior] = None,
data_device: Optional[str] = None,
) -> "PosteriorEstimator":
r"""Store parameters and simulation outputs to use them for later training.

Data are stored as entries in lists for each type of variable (parameter/data).

Stores $\theta$, $x$, prior_masks (indicating if simulations are coming from the
prior or not) and an index indicating which round the batch of simulations came
from.

Args:
theta: Parameter sets.
x: Simulation outputs.
proposal: The distribution that the parameters $\theta$ were sampled from.
Pass None if the parameters were sampled from the prior. If not
None, it will trigger a different loss-function.
data_device: Where to store the data, default is on the same device where
the training is happening. If training a large dataset on a GPU with not
much VRAM can set to 'cpu' to store data on system memory instead.

Returns:
NeuralInference object (returned so that this function is chainable).
"""

is_valid_x, num_nans, num_infs = handle_invalid_x(x, True)  # Hardcode to True

x = x[is_valid_x]
theta = theta[is_valid_x]

# Check for problematic z-scoring
warn_if_zscoring_changes_data(x)
warn_on_invalid_x(num_nans, num_infs, True)
warn_on_invalid_x_for_snpec_leakage(
num_nans, num_infs, True, type(self).__name__, self._round
)

if data_device is None:
data_device = self._device

theta, x = validate_theta_and_x(
theta, x, data_device=data_device, training_device=self._device
)
self._check_proposal(proposal)

if (
proposal is None
or proposal is self._prior
or (
isinstance(proposal, RestrictedPrior) and proposal._prior is self._prior
)
):
# The _data_round_index will later be used to infer if one should train
# with MLE loss or with atomic loss (see, in train():
# self._round = max(self._data_round_index))
self._data_round_index.append(0)
else:
if not self._data_round_index:
# This catches a pretty specific case: if, in the first round, one
# passes data that does not come from the prior.
self._data_round_index.append(1)
else:
self._data_round_index.append(max(self._data_round_index) + 1)

self._theta_roundwise.append(theta)
self._x_roundwise.append(x)

self._proposal_roundwise.append(proposal)

if self._prior is None or isinstance(self._prior, ImproperEmpirical):
if proposal is not None:
raise ValueError(
"You had not passed a prior at initialization, but now you "
"passed a proposal. If you want to run multi-round SNPE, you have "
"to specify a prior (set the .prior argument or re-initialize "
"the object with a prior distribution). If the samples you passed "
"to append_simulations() were sampled from the prior, you can "
"run single-round inference with "
"append_simulations(..., proposal=None)."
)
theta_prior = self.get_simulations()
self._prior = ImproperEmpirical(theta_prior, ones(theta_prior.shape))

return self


#### build_posterior(self, density_estimator=None, prior=None)¶

Build posterior from the neural density estimator.

This method first corrects the estimated density with correct_for_proposal and then returns a DirectPosterior.

Parameters:

Name Type Description Default
density_estimator Optional[Module]

The density estimator that the posterior is based on. If None, use the latest neural density estimator that was trained.

None
prior Optional[torch.distributions.distribution.Distribution]

Prior distribution.

None

Returns:

Type Description
DirectPosterior

Posterior $$p(\theta|x)$$ with .sample() and .log_prob() methods.

Source code in sbi/inference/snpe/snpe_a.py
def build_posterior(
self,
density_estimator: Optional[TorchModule] = None,
prior: Optional[Distribution] = None,
) -> "DirectPosterior":
r"""Build posterior from the neural density estimator.

This method first corrects the estimated density with correct_for_proposal
and then returns a DirectPosterior.

Args:
density_estimator: The density estimator that the posterior is based on.
If None, use the latest neural density estimator that was trained.
prior: Prior distribution.

Returns:
Posterior $p(\theta|x)$  with .sample() and .log_prob() methods.
"""
if prior is None:
assert (
self._prior is not None
), """You did not pass a prior. You have to pass the prior either at
initialization inference = SNPE_A(prior) or to .build_posterior
(prior=prior)."""
prior = self._prior

wrapped_density_estimator = self.correct_for_proposal(
density_estimator=density_estimator
)
self._posterior = DirectPosterior(
posterior_estimator=wrapped_density_estimator,
prior=prior,
)
return deepcopy(self._posterior)


#### correct_for_proposal(self, density_estimator=None)¶

Build mixture of Gaussians that approximates the posterior.

Returns a SNPE_A_MDN object, which applies the posthoc-correction required in SNPE-A.

Parameters:

Name Type Description Default
density_estimator Optional[Module]

The density estimator that the posterior is based on. If None, use the latest neural density estimator that was trained.

None

Returns:

Type Description
SNPE_A_MDN

Posterior $$p(\theta|x)$$ with .sample() and .log_prob() methods.

Source code in sbi/inference/snpe/snpe_a.py
def correct_for_proposal(
self,
density_estimator: Optional[TorchModule] = None,
) -> "SNPE_A_MDN":
r"""Build mixture of Gaussians that approximates the posterior.

Returns a SNPE_A_MDN object, which applies the posthoc-correction required in
SNPE-A.

Args:
density_estimator: The density estimator that the posterior is based on.
If None, use the latest neural density estimator that was trained.

Returns:
Posterior $p(\theta|x)$  with .sample() and .log_prob() methods.
"""
if density_estimator is None:
density_estimator = deepcopy(
self._neural_net
)  # PosteriorEstimator.train() also returns a deepcopy, mimic this here
# If internal net is used device is defined.
device = self._device
else:
# Otherwise, infer it from the device of the net parameters.
device = str(next(density_estimator.parameters()).device)

# Set proposal of the density estimator.
# This also evokes the z-scoring correction if necessary.
if (
self._proposal_roundwise[-1] is self._prior
or self._proposal_roundwise[-1] is None
):
proposal = self._prior
assert isinstance(
proposal, (MultivariateNormal, utils.BoxUniform)
), """Prior must be torch.distributions.MultivariateNormal or sbi.utils.
BoxUniform"""
else:
assert isinstance(
self._proposal_roundwise[-1], DirectPosterior
), """The proposal you passed to append_simulations is neither the prior
nor a DirectPosterior. SNPE-A currently only supports these scenarios.
"""
proposal = self._proposal_roundwise[-1]

# Create the SNPE_A_MDN
wrapped_density_estimator = SNPE_A_MDN(
flow=density_estimator, proposal=proposal, prior=self._prior, device=device
)
return wrapped_density_estimator


#### get_dataloaders(self, starting_round=0, training_batch_size=50, validation_fraction=0.1, resume_training=False, dataloader_kwargs=None) inherited ¶

Return dataloaders for training and validation.

Parameters:

Name Type Description Default
dataset

holding all theta and x, optionally masks.

required
training_batch_size int

training arg of inference methods.

50
resume_training bool

Whether the current call is resuming training so that no new training and validation indices into the dataset have to be created.

False
dataloader_kwargs Optional[dict]

Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn).

None

Returns:

Type Description
Tuple[torch.utils.data.dataloader.DataLoader, torch.utils.data.dataloader.DataLoader]

Tuple of dataloaders for training and validation.

Source code in sbi/inference/snpe/snpe_a.py
def get_dataloaders(
self,
starting_round: int = 0,
training_batch_size: int = 50,
validation_fraction: float = 0.1,
resume_training: bool = False,
dataloader_kwargs: Optional[dict] = None,
"""Return dataloaders for training and validation.

Args:
dataset: holding all theta and x, optionally masks.
training_batch_size: training arg of inference methods.
resume_training: Whether the current call is resuming training so that no
new training and validation indices into the dataset have to be created.
dataloader_kwargs: Additional or updated kwargs to be passed to the training
and validation dataloaders (like, e.g., a collate_fn).

Returns:
Tuple of dataloaders for training and validation.

"""

#
theta, x, prior_masks = self.get_simulations(starting_round)

dataset = data.TensorDataset(theta, x, prior_masks)

# Get total number of training examples.
num_examples = theta.size(0)
# Select random train and validation splits from (theta, x) pairs.
num_training_examples = int((1 - validation_fraction) * num_examples)
num_validation_examples = num_examples - num_training_examples

if not resume_training:
# Seperate indicies for training and validation
permuted_indices = torch.randperm(num_examples)
self.train_indices, self.val_indices = (
permuted_indices[:num_training_examples],
permuted_indices[num_training_examples:],
)

# Create training and validation loaders using a subset sampler.
# Intentionally use dicts to define the default dataloader args
# Then, use dataloader_kwargs to override (or add to) any of these defaults
# https://stackoverflow.com/questions/44784577/in-method-call-args-how-to-override-keyword-argument-of-unpacked-dict
"batch_size": min(training_batch_size, num_training_examples),
"drop_last": True,
"sampler": SubsetRandomSampler(self.train_indices.tolist()),
}
"batch_size": min(training_batch_size, num_validation_examples),
"shuffle": False,
"drop_last": True,
"sampler": SubsetRandomSampler(self.val_indices.tolist()),
}
if dataloader_kwargs is not None:



#### get_simulations(self, starting_round=0) inherited ¶

Returns all $$\theta$$, $$x$$, and prior_masks from rounds >= starting_round.

If requested, do not return invalid data.

Parameters:

Name Type Description Default
starting_round int

The earliest round to return samples from (we start counting from zero).

0
exclude_invalid_x

Whether to exclude simulation outputs x=NaN or x=±∞ during training.

required
warn_on_invalid

Whether to give out a warning if invalid simulations were found.

required

Returns: Parameters, simulation outputs, prior masks.

Source code in sbi/inference/snpe/snpe_a.py
def get_simulations(
self,
starting_round: int = 0,
) -> Tuple[Tensor, Tensor, Tensor]:
r"""Returns all $\theta$, $x$, and prior_masks from rounds >= starting_round.

If requested, do not return invalid data.

Args:
starting_round: The earliest round to return samples from (we start counting
from zero).
exclude_invalid_x: Whether to exclude simulation outputs x=NaN or x=±∞
during training.
warn_on_invalid: Whether to give out a warning if invalid simulations were
found.

Returns: Parameters, simulation outputs, prior masks.
"""

theta = get_simulations_since_round(
self._theta_roundwise, self._data_round_index, starting_round
)
x = get_simulations_since_round(
self._x_roundwise, self._data_round_index, starting_round
)
)

return theta, x, prior_masks


#### train(self, final_round=False, training_batch_size=50, learning_rate=0.0005, validation_fraction=0.1, stop_after_epochs=20, max_num_epochs=2147483647, clip_max_norm=5.0, calibration_kernel=None, resume_training=False, force_first_round_loss=False, retrain_from_scratch=False, show_train_summary=False, dataloader_kwargs=None, component_perturbation=0.005)¶

Return density estimator that approximates the proposal posterior.

 Fast epsilon-free Inference of Simulation Models with Bayesian Conditional Density Estimation, Papamakarios et al., NeurIPS 2016, https://arxiv.org/abs/1605.06376.

Training is performed with maximum likelihood on samples from the latest round, which leads the algorithm to converge to the proposal posterior.

Parameters:

Name Type Description Default
final_round bool

Whether we are in the last round of training or not. For all but the last round, Algorithm 1 from  is executed. In last the round, Algorithm 2 from  is executed once.

False
training_batch_size int

Training batch size.

50
learning_rate float

Learning rate for Adam optimizer.

0.0005
validation_fraction float

The fraction of data to use for validation.

0.1
stop_after_epochs int

The number of epochs to wait for improvement on the validation set before terminating training.

20
max_num_epochs int

Maximum number of epochs to run. If reached, we stop training even when the validation loss is still decreasing. Otherwise, we train until validation loss increases (see also stop_after_epochs).

2147483647
clip_max_norm Optional[float]

Value at which to clip the total gradient norm in order to prevent exploding gradients. Use None for no clipping.

5.0
calibration_kernel Optional[Callable]

A function to calibrate the loss with respect to the simulations x. See Lueckmann, Gonçalves et al., NeurIPS 2017.

None
resume_training bool

Can be used in case training time is limited, e.g. on a cluster. If True, the split between train and validation set, the optimizer, the number of epochs, and the best validation log-prob will be restored from the last time .train() was called.

False
force_first_round_loss bool

If True, train with maximum likelihood, i.e., potentially ignoring the correction for using a proposal distribution different from the prior.

False
force_first_round_loss bool

If True, train with maximum likelihood, regardless of the proposal distribution.

False
retrain_from_scratch bool

Whether to retrain the conditional density estimator for the posterior from scratch each round. Not supported for SNPE-A.

False
show_train_summary bool

Whether to print the number of epochs and validation loss and leakage after the training.

False
dataloader_kwargs Optional[Dict]

Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn)

None
component_perturbation float

The standard deviation applied to all weights and biases when, in the last round, the Mixture of Gaussians is build from a single Gaussian. This value can be problem-specific and also depends on the number of mixture components.

0.005

Returns:

Type Description
Module

Density estimator that approximates the distribution $$p(\theta|x)$$.

Source code in sbi/inference/snpe/snpe_a.py
def train(
self,
final_round: bool = False,
training_batch_size: int = 50,
learning_rate: float = 5e-4,
validation_fraction: float = 0.1,
stop_after_epochs: int = 20,
max_num_epochs: int = 2**31 - 1,
clip_max_norm: Optional[float] = 5.0,
calibration_kernel: Optional[Callable] = None,
resume_training: bool = False,
force_first_round_loss: bool = False,
retrain_from_scratch: bool = False,
show_train_summary: bool = False,
dataloader_kwargs: Optional[Dict] = None,
component_perturbation: float = 5e-3,
) -> nn.Module:
r"""Return density estimator that approximates the proposal posterior.

 _Fast epsilon-free Inference of Simulation Models with Bayesian Conditional
Density Estimation_, Papamakarios et al., NeurIPS 2016,
https://arxiv.org/abs/1605.06376.

Training is performed with maximum likelihood on samples from the latest round,
which leads the algorithm to converge to the proposal posterior.

Args:
final_round: Whether we are in the last round of training or not. For all
but the last round, Algorithm 1 from  is executed. In last the
round, Algorithm 2 from  is executed once.
training_batch_size: Training batch size.
learning_rate: Learning rate for Adam optimizer.
validation_fraction: The fraction of data to use for validation.
stop_after_epochs: The number of epochs to wait for improvement on the
validation set before terminating training.
max_num_epochs: Maximum number of epochs to run. If reached, we stop
training even when the validation loss is still decreasing. Otherwise,
we train until validation loss increases (see also stop_after_epochs).
clip_max_norm: Value at which to clip the total gradient norm in order to
prevent exploding gradients. Use None for no clipping.
calibration_kernel: A function to calibrate the loss with respect to the
simulations x. See Lueckmann, Gonçalves et al., NeurIPS 2017.
resume_training: Can be used in case training time is limited, e.g. on a
cluster. If True, the split between train and validation set, the
optimizer, the number of epochs, and the best validation log-prob will
be restored from the last time .train() was called.
force_first_round_loss: If True, train with maximum likelihood,
i.e., potentially ignoring the correction for using a proposal
distribution different from the prior.
force_first_round_loss: If True, train with maximum likelihood,
regardless of the proposal distribution.
retrain_from_scratch: Whether to retrain the conditional density
estimator for the posterior from scratch each round. Not supported for
SNPE-A.
show_train_summary: Whether to print the number of epochs and validation
loss and leakage after the training.
dataloader_kwargs: Additional or updated kwargs to be passed to the training
and validation dataloaders (like, e.g., a collate_fn)
component_perturbation: The standard deviation applied to all weights and
biases when, in the last round, the Mixture of Gaussians is build from
a single Gaussian. This value can be problem-specific and also depends
on the number of mixture components.

Returns:
Density estimator that approximates the distribution $p(\theta|x)$.
"""

assert not retrain_from_scratch, """Retraining from scratch is not supported in SNPE-A yet. The reason for
this is that, if we reininitialized the density estimator, the z-scoring would
change, which would break the posthoc correction. This is a pure implementation
issue."""

kwargs = utils.del_entries(
locals(),
entries=("self", "__class__", "final_round", "component_perturbation"),
)

# SNPE-A always discards the prior samples.

self._round = max(self._data_round_index)

if final_round:
# If there is (will be) only one round, train with Algorithm 2 from .
if self._round == 0:
self._build_neural_net = partial(
self._build_neural_net, num_components=self._num_components
)
# Run Algorithm 2 from .
elif not self._ran_final_round:
# Now switch to the specified number of components. This method will
# only be used if retrain_from_scratch=True. Otherwise,
# the MDN will be built from replicating the single-component net for
# num_component times (via _expand_mog()).
self._build_neural_net = partial(
self._build_neural_net, num_components=self._num_components
)

# Extend the MDN to the originally desired number of components.
self._expand_mog(eps=component_perturbation)
else:
warnings.warn(
"You have already run SNPE-A with final_round=True. Running it"
"again with this setting will not allow computing the posthoc"
"correction applied in SNPE-A. Thus, you will get an error when "
"calling .build_posterior() after training.",
UserWarning,
)
else:
# Run Algorithm 1 from .
# Wrap the function that builds the MDN such that we can make
# sure that there is only one component when running.
self._build_neural_net = partial(self._build_neural_net, num_components=1)

if final_round:
self._ran_final_round = True

return super().train(**kwargs)


###  sbi.inference.snpe.snpe_c.SNPE_C (PosteriorEstimator) ¶

Source code in sbi/inference/snpe/snpe_c.py
class SNPE_C(PosteriorEstimator):
def __init__(
self,
prior: Optional[Distribution] = None,
density_estimator: Union[str, Callable] = "maf",
device: str = "cpu",
logging_level: Union[int, str] = "WARNING",
summary_writer: Optional[TensorboardSummaryWriter] = None,
show_progress_bars: bool = True,
):
r"""SNPE-C / APT .

 _Automatic Posterior Transformation for Likelihood-free Inference_,
Greenberg et al., ICML 2019, https://arxiv.org/abs/1905.07488.

This class implements two loss variants of SNPE-C: the non-atomic and the atomic
version. The atomic loss of SNPE-C can be used for any density estimator,
i.e. also for normalizing flows. However, it suffers from leakage issues. On
the other hand, the non-atomic loss can only be used only if the proposal
distribution is a mixture of Gaussians, the density estimator is a mixture of
Gaussians, and the prior is either Gaussian or Uniform. It does not suffer from
leakage issues. At the beginning of each round, we print whether the non-atomic
or the atomic version is used.

In this codebase, we will automatically switch to the non-atomic loss if the
following criteria are fulfilled:<br/>
- proposal is a DirectPosterior with density_estimator mdn, as built
with utils.sbi.posterior_nn().<br/>
- the density estimator is a mdn, as built with
utils.sbi.posterior_nn().<br/>
- isinstance(prior, MultivariateNormal) (from torch.distributions) or
isinstance(prior, sbi.utils.BoxUniform)

Note that custom implementations of any of these densities (or estimators) will
not trigger the non-atomic loss, and the algorithm will fall back onto using
the atomic loss.

Args:
prior: A probability distribution that expresses prior knowledge about the
parameters, e.g. which ranges are meaningful for them.
density_estimator: If it is a string, use a pre-configured network of the
provided type (one of nsf, maf, mdn, made). Alternatively, a function
that builds a custom neural network can be provided. The function will
be called with the first batch of simulations (theta, x), which can
thus be used for shape inference and potentially for z-scoring. It
needs to return a PyTorch nn.Module implementing the density
estimator. The density estimator needs to provide the methods
.log_prob and .sample().
device: Training device, e.g., "cpu", "cuda" or "cuda:{0, 1, ...}".
logging_level: Minimum severity of messages to log. One of the strings
INFO, WARNING, DEBUG, ERROR and CRITICAL.
summary_writer: A tensorboard SummaryWriter to control, among others, log
file location (default is <current working directory>/logs.)
show_progress_bars: Whether to show a progressbar during training.
"""

kwargs = del_entries(locals(), entries=("self", "__class__"))
super().__init__(**kwargs)

def train(
self,
num_atoms: int = 10,
training_batch_size: int = 50,
learning_rate: float = 5e-4,
validation_fraction: float = 0.1,
stop_after_epochs: int = 20,
max_num_epochs: int = 2**31 - 1,
clip_max_norm: Optional[float] = 5.0,
calibration_kernel: Optional[Callable] = None,
resume_training: bool = False,
force_first_round_loss: bool = False,
discard_prior_samples: bool = False,
use_combined_loss: bool = False,
retrain_from_scratch: bool = False,
show_train_summary: bool = False,
dataloader_kwargs: Optional[Dict] = None,
) -> nn.Module:
r"""Return density estimator that approximates the distribution $p(\theta|x)$.

Args:
num_atoms: Number of atoms to use for classification.
training_batch_size: Training batch size.
learning_rate: Learning rate for Adam optimizer.
validation_fraction: The fraction of data to use for validation.
stop_after_epochs: The number of epochs to wait for improvement on the
validation set before terminating training.
max_num_epochs: Maximum number of epochs to run. If reached, we stop
training even when the validation loss is still decreasing. Otherwise,
we train until validation loss increases (see also stop_after_epochs).
clip_max_norm: Value at which to clip the total gradient norm in order to
prevent exploding gradients. Use None for no clipping.
calibration_kernel: A function to calibrate the loss with respect to the
simulations x. See Lueckmann, Gonçalves et al., NeurIPS 2017.
resume_training: Can be used in case training time is limited, e.g. on a
cluster. If True, the split between train and validation set, the
optimizer, the number of epochs, and the best validation log-prob will
be restored from the last time .train() was called.
force_first_round_loss: If True, train with maximum likelihood,
i.e., potentially ignoring the correction for using a proposal
distribution different from the prior.
discard_prior_samples: Whether to discard samples simulated in round 1, i.e.
from the prior. Training may be sped up by ignoring such less targeted
samples.
use_combined_loss: Whether to train the neural net also on prior samples
using maximum likelihood in addition to training it on all samples using
atomic loss. The extra MLE loss helps prevent density leaking with
bounded priors.
retrain_from_scratch: Whether to retrain the conditional density
estimator for the posterior from scratch each round.
show_train_summary: Whether to print the number of epochs and validation
loss and leakage after the training.
dataloader_kwargs: Additional or updated kwargs to be passed to the training
and validation dataloaders (like, e.g., a collate_fn)

Returns:
Density estimator that approximates the distribution $p(\theta|x)$.
"""

# WARNING: sneaky trick ahead. We proxy the parent's train here,
# requiring the signature to have num_atoms, save it for use below, and
# continue. It's sneaky because we are using the object (self) as a namespace
# to pass arguments between functions, and that's implicit state management.
self._num_atoms = num_atoms
self._use_combined_loss = use_combined_loss
kwargs = del_entries(
locals(), entries=("self", "__class__", "num_atoms", "use_combined_loss")
)

self._round = max(self._data_round_index)

if self._round > 0:
# Set the proposal to the last proposal that was passed by the user. For
# atomic SNPE, it does not matter what the proposal is. For non-atomic
# SNPE, we only use the latest data that was passed, i.e. the one from the
# last proposal.
proposal = self._proposal_roundwise[-1]
self.use_non_atomic_loss = (
isinstance(proposal.posterior_estimator._distribution, mdn)
and isinstance(self._neural_net._distribution, mdn)
and check_dist_class(
self._prior, class_to_check=(Uniform, MultivariateNormal)
)
)

algorithm = "non-atomic" if self.use_non_atomic_loss else "atomic"
print(f"Using SNPE-C with {algorithm} loss")

if self.use_non_atomic_loss:
# Take care of z-scoring, pre-compute and store prior terms.
self._set_state_for_mog_proposal()

return super().train(**kwargs)

def _set_state_for_mog_proposal(self) -> None:
"""Set state variables that are used at each training step of non-atomic SNPE-C.

Three things are computed:
1) Check if z-scoring was requested. To do so, we check if the _transform
argument of the net had been a CompositeTransform. See pyknos mdn.py.
2) Define a (potentially standardized) prior. It's standardized if z-scoring
3) Compute (Precision * mean) for the prior. This quantity is used at every
training step if the prior is Gaussian.
"""

self.z_score_theta = isinstance(self._neural_net._transform, CompositeTransform)

self._set_maybe_z_scored_prior()

if isinstance(self._maybe_z_scored_prior, MultivariateNormal):
self.prec_m_prod_prior = torch.mv(
self._maybe_z_scored_prior.precision_matrix,  # type: ignore
self._maybe_z_scored_prior.loc,  # type: ignore
)

def _set_maybe_z_scored_prior(self) -> None:
r"""Compute and store potentially standardized prior (if z-scoring was done).

The proposal posterior is:
$pp(\theta|x) = 1/Z * q(\theta|x) * prop(\theta) / p(\theta)$

Let's denote z-scored theta by a: a = (theta - mean) / std
p(\theta)$and draw samples from the posterior with MCMC or rejection sampling. Note that, in the case of single-round SNRE_A / AALR, it is possible to evaluate the log-probability of the **normalized** posterior, but sampling still requires MCMC (or rejection sampling). Args: density_estimator: The density estimator that the posterior is based on. If None, use the latest neural density estimator that was trained. prior: Prior distribution. sample_with: Method to use for sampling from the posterior. Must be one of [mcmc | rejection | vi]. mcmc_method: Method used for MCMC sampling, one of slice_np, slice, hmc, nuts. Currently defaults to slice_np for a custom numpy implementation of slice sampling; select hmc, nuts or slice for Pyro-based sampling. vi_method: Method used for VI, one of [rKL, fKL, IW, alpha]. Note that some of the methods admit a mode seeking property (e.g. rKL) whereas some admit a mass covering one (e.g fKL). mcmc_parameters: Additional kwargs passed to MCMCPosterior. vi_parameters: Additional kwargs passed to VIPosterior. rejection_sampling_parameters: Additional kwargs passed to RejectionPosterior. Returns: Posterior$p(\theta|x)$with .sample() and .log_prob() methods (the returned log-probability is unnormalized). """ if prior is None: assert ( self._prior is not None ), """You did not pass a prior. You have to pass the prior either at initialization inference = SNRE(prior) or to .build_posterior (prior=prior).""" prior = self._prior else: check_prior(prior) if density_estimator is None: ratio_estimator = self._neural_net # If internal net is used device is defined. device = self._device else: ratio_estimator = density_estimator # Otherwise, infer it from the device of the net parameters. device = next(density_estimator.parameters()).device.type potential_fn, theta_transform = ratio_estimator_based_potential( ratio_estimator=ratio_estimator, prior=prior, x_o=None ) if sample_with == "mcmc": self._posterior = MCMCPosterior( potential_fn=potential_fn, theta_transform=theta_transform, proposal=prior, method=mcmc_method, device=device, x_shape=self._x_shape, **mcmc_parameters, ) elif sample_with == "rejection": self._posterior = RejectionPosterior( potential_fn=potential_fn, proposal=prior, device=device, x_shape=self._x_shape, **rejection_sampling_parameters, ) elif sample_with == "vi": self._posterior = VIPosterior( potential_fn=potential_fn, theta_transform=theta_transform, prior=prior, # type: ignore vi_method=vi_method, device=device, x_shape=self._x_shape, **vi_parameters, ) else: raise NotImplementedError # Store models at end of each round. self._model_bank.append(deepcopy(self._posterior)) return deepcopy(self._posterior)  #### get_dataloaders(self, starting_round=0, training_batch_size=50, validation_fraction=0.1, resume_training=False, dataloader_kwargs=None) inherited ¶ Return dataloaders for training and validation. Parameters: Name Type Description Default dataset holding all theta and x, optionally masks. required training_batch_size int training arg of inference methods. 50 resume_training bool Whether the current call is resuming training so that no new training and validation indices into the dataset have to be created. False dataloader_kwargs Optional[dict] Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn). None Returns: Type Description Tuple[torch.utils.data.dataloader.DataLoader, torch.utils.data.dataloader.DataLoader] Tuple of dataloaders for training and validation. Source code in sbi/inference/snre/snre_b.py def get_dataloaders( self, starting_round: int = 0, training_batch_size: int = 50, validation_fraction: float = 0.1, resume_training: bool = False, dataloader_kwargs: Optional[dict] = None, ) -> Tuple[data.DataLoader, data.DataLoader]: """Return dataloaders for training and validation. Args: dataset: holding all theta and x, optionally masks. training_batch_size: training arg of inference methods. resume_training: Whether the current call is resuming training so that no new training and validation indices into the dataset have to be created. dataloader_kwargs: Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn). Returns: Tuple of dataloaders for training and validation. """ # theta, x, prior_masks = self.get_simulations(starting_round) dataset = data.TensorDataset(theta, x, prior_masks) # Get total number of training examples. num_examples = theta.size(0) # Select random train and validation splits from (theta, x) pairs. num_training_examples = int((1 - validation_fraction) * num_examples) num_validation_examples = num_examples - num_training_examples if not resume_training: # Seperate indicies for training and validation permuted_indices = torch.randperm(num_examples) self.train_indices, self.val_indices = ( permuted_indices[:num_training_examples], permuted_indices[num_training_examples:], ) # Create training and validation loaders using a subset sampler. # Intentionally use dicts to define the default dataloader args # Then, use dataloader_kwargs to override (or add to) any of these defaults # https://stackoverflow.com/questions/44784577/in-method-call-args-how-to-override-keyword-argument-of-unpacked-dict train_loader_kwargs = { "batch_size": min(training_batch_size, num_training_examples), "drop_last": True, "sampler": SubsetRandomSampler(self.train_indices.tolist()), } val_loader_kwargs = { "batch_size": min(training_batch_size, num_validation_examples), "shuffle": False, "drop_last": True, "sampler": SubsetRandomSampler(self.val_indices.tolist()), } if dataloader_kwargs is not None: train_loader_kwargs = dict(train_loader_kwargs, **dataloader_kwargs) val_loader_kwargs = dict(val_loader_kwargs, **dataloader_kwargs) train_loader = data.DataLoader(dataset, **train_loader_kwargs) val_loader = data.DataLoader(dataset, **val_loader_kwargs) return train_loader, val_loader  #### get_simulations(self, starting_round=0) inherited ¶ Returns all $$\theta$$, $$x$$, and prior_masks from rounds >= starting_round. If requested, do not return invalid data. Parameters: Name Type Description Default starting_round int The earliest round to return samples from (we start counting from zero). 0 exclude_invalid_x Whether to exclude simulation outputs x=NaN or x=±∞ during training. required warn_on_invalid Whether to give out a warning if invalid simulations were found. required Returns: Parameters, simulation outputs, prior masks. Source code in sbi/inference/snre/snre_b.py def get_simulations( self, starting_round: int = 0, ) -> Tuple[Tensor, Tensor, Tensor]: r"""Returns all$\theta$,$x$, and prior_masks from rounds >= starting_round. If requested, do not return invalid data. Args: starting_round: The earliest round to return samples from (we start counting from zero). exclude_invalid_x: Whether to exclude simulation outputs x=NaN or x=±∞ during training. warn_on_invalid: Whether to give out a warning if invalid simulations were found. Returns: Parameters, simulation outputs, prior masks. """ theta = get_simulations_since_round( self._theta_roundwise, self._data_round_index, starting_round ) x = get_simulations_since_round( self._x_roundwise, self._data_round_index, starting_round ) prior_masks = get_simulations_since_round( self._prior_masks, self._data_round_index, starting_round ) return theta, x, prior_masks  #### train(self, num_atoms=10, training_batch_size=50, learning_rate=0.0005, validation_fraction=0.1, stop_after_epochs=20, max_num_epochs=2147483647, clip_max_norm=5.0, resume_training=False, discard_prior_samples=False, retrain_from_scratch=False, show_train_summary=False, dataloader_kwargs=None)¶ Return classifier that approximates the ratio $$p(\theta,x)/p(\theta)p(x)$$. Parameters: Name Type Description Default num_atoms int Number of atoms to use for classification. 10 training_batch_size int Training batch size. 50 learning_rate float Learning rate for Adam optimizer. 0.0005 validation_fraction float The fraction of data to use for validation. 0.1 stop_after_epochs int The number of epochs to wait for improvement on the validation set before terminating training. 20 max_num_epochs int Maximum number of epochs to run. If reached, we stop training even when the validation loss is still decreasing. Otherwise, we train until validation loss increases (see also stop_after_epochs). 2147483647 clip_max_norm Optional[float] Value at which to clip the total gradient norm in order to prevent exploding gradients. Use None for no clipping. 5.0 resume_training bool Can be used in case training time is limited, e.g. on a cluster. If True, the split between train and validation set, the optimizer, the number of epochs, and the best validation log-prob will be restored from the last time .train() was called. False discard_prior_samples bool Whether to discard samples simulated in round 1, i.e. from the prior. Training may be sped up by ignoring such less targeted samples. False retrain_from_scratch bool Whether to retrain the conditional density estimator for the posterior from scratch each round. False show_train_summary bool Whether to print the number of epochs and validation loss and leakage after the training. False dataloader_kwargs Optional[Dict] Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn) None Returns: Type Description Module Classifier that approximates the ratio $$p(\theta,x)/p(\theta)p(x)$$. Source code in sbi/inference/snre/snre_b.py def train( self, num_atoms: int = 10, training_batch_size: int = 50, learning_rate: float = 5e-4, validation_fraction: float = 0.1, stop_after_epochs: int = 20, max_num_epochs: int = 2**31 - 1, clip_max_norm: Optional[float] = 5.0, resume_training: bool = False, discard_prior_samples: bool = False, retrain_from_scratch: bool = False, show_train_summary: bool = False, dataloader_kwargs: Optional[Dict] = None, ) -> nn.Module: r"""Return classifier that approximates the ratio$p(\theta,x)/p(\theta)p(x)$. Args: num_atoms: Number of atoms to use for classification. training_batch_size: Training batch size. learning_rate: Learning rate for Adam optimizer. validation_fraction: The fraction of data to use for validation. stop_after_epochs: The number of epochs to wait for improvement on the validation set before terminating training. max_num_epochs: Maximum number of epochs to run. If reached, we stop training even when the validation loss is still decreasing. Otherwise, we train until validation loss increases (see also stop_after_epochs). clip_max_norm: Value at which to clip the total gradient norm in order to prevent exploding gradients. Use None for no clipping. resume_training: Can be used in case training time is limited, e.g. on a cluster. If True, the split between train and validation set, the optimizer, the number of epochs, and the best validation log-prob will be restored from the last time .train() was called. discard_prior_samples: Whether to discard samples simulated in round 1, i.e. from the prior. Training may be sped up by ignoring such less targeted samples. retrain_from_scratch: Whether to retrain the conditional density estimator for the posterior from scratch each round. show_train_summary: Whether to print the number of epochs and validation loss and leakage after the training. dataloader_kwargs: Additional or updated kwargs to be passed to the training and validation dataloaders (like, e.g., a collate_fn) Returns: Classifier that approximates the ratio$p(\theta,x)/p(\theta)p(x)$. """ kwargs = del_entries(locals(), entries=("self", "__class__")) return super().train(**kwargs)  ###  sbi.inference.abc.mcabc.MCABC (ABCBASE) ¶ Source code in sbi/inference/abc/mcabc.py class MCABC(ABCBASE): def __init__( self, simulator: Callable, prior, distance: Union[str, Callable] = "l2", num_workers: int = 1, simulation_batch_size: int = 1, show_progress_bars: bool = True, ): r"""Monte-Carlo Approximate Bayesian Computation (Rejection ABC) .  Pritchard, J. K., Seielstad, M. T., Perez-Lezaun, A., & Feldman, M. W. (1999). Population growth of human Y chromosomes: a study of Y chromosome microsatellites. Molecular biology and evolution, 16(12), 1791-1798. Args: simulator: A function that takes parameters$\theta$and maps them to simulations, or observations, x,$\mathrm{sim}(\theta)\to x$. Any regular Python callable (i.e. function or class with __call__ method) can be used. prior: A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. distance: Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. num_workers: Number of parallel workers to use for simulations. simulation_batch_size: Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). show_progress_bars: Whether to show a progressbar during simulation and sampling. """ super().__init__( simulator=simulator, prior=prior, distance=distance, num_workers=num_workers, simulation_batch_size=simulation_batch_size, show_progress_bars=show_progress_bars, ) def __call__( self, x_o: Union[Tensor, ndarray], num_simulations: int, eps: Optional[float] = None, quantile: Optional[float] = None, lra: bool = False, sass: bool = False, sass_fraction: float = 0.25, sass_expansion_degree: int = 1, kde: bool = False, kde_kwargs: Dict[str, Any] = {}, return_summary: bool = False, ) -> Union[Tuple[Tensor, dict], Tuple[KDEWrapper, dict], Tensor, KDEWrapper]: r"""Run MCABC and return accepted parameters or KDE object fitted on them. Args: x_o: Observed data. num_simulations: Number of simulations to run. eps: Acceptance threshold$\epsilon$for distance between observed and simulated data. quantile: Upper quantile of smallest distances for which the corresponding parameters are returned, e.g, q=0.01 will return the top 1%. Exactly one of quantile or eps have to be passed. lra: Whether to run linear regression adjustment as in Beaumont et al. 2002 sass: Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. sass_fraction: Fraction of simulation budget used for the initial sass run. sass_expansion_degree: Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. kde: Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. kde_kwargs: kwargs for performing KDE: 'bandwidth='; either a float, or a string naming a bandwidth heuristics, e.g., 'cv' (cross validation), 'silvermann' or 'scott', default 'cv'. 'transform': transform applied to the parameters before doing KDE. 'sample_weights': weights associated with samples. See 'get_kde' for more details return_summary: Whether to return the distances and data corresponding to the accepted parameters. Returns: theta (if kde False): accepted parameters kde (if kde True): KDE object based on accepted parameters from which one can .sample() and .log_prob(). summary (if summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x. """ # Exactly one of eps or quantile need to be passed. assert (eps is not None) ^ ( quantile is not None ), "Eps or quantile must be passed, but not both." # Run SASS and change the simulator and x_o accordingly. if sass: num_pilot_simulations = int(sass_fraction * num_simulations) self.logger.info( f"Running SASS with {num_pilot_simulations} pilot samples." ) num_simulations -= num_pilot_simulations pilot_theta = self.prior.sample((num_pilot_simulations,)) pilot_x = self._batched_simulator(pilot_theta) sass_transform = self.get_sass_transform( pilot_theta, pilot_x, sass_expansion_degree ) simulator = lambda theta: sass_transform(self._batched_simulator(theta)) x_o = sass_transform(x_o) else: simulator = self._batched_simulator # Simulate and calculate distances. theta = self.prior.sample((num_simulations,)) x = simulator(theta) # Infer shape of x to test and set x_o. self.x_shape = x.unsqueeze(0).shape self.x_o = process_x(x_o, self.x_shape) distances = self.distance(self.x_o, x) # Select based on acceptance threshold epsilon. if eps is not None: is_accepted = distances < eps num_accepted = is_accepted.sum().item() assert num_accepted > 0, f"No parameters accepted, eps={eps} too small" theta_accepted = theta[is_accepted] distances_accepted = distances[is_accepted] x_accepted = x[is_accepted] # Select based on quantile on sorted distances. elif quantile is not None: num_top_samples = int(num_simulations * quantile) sort_idx = torch.argsort(distances) theta_accepted = theta[sort_idx][:num_top_samples] distances_accepted = distances[sort_idx][:num_top_samples] x_accepted = x[sort_idx][:num_top_samples] else: raise ValueError("One of epsilon or quantile has to be passed.") # Maybe adjust theta with LRA. if lra: self.logger.info("Running Linear regression adjustment.") final_theta = self.run_lra(theta_accepted, x_accepted, observation=self.x_o) else: final_theta = theta_accepted if kde: self.logger.info( f"""KDE on {final_theta.shape} samples with bandwidth option {kde_kwargs["bandwidth"] if "bandwidth" in kde_kwargs else "cv"}. Beware that KDE can give unreliable results when used with too few samples and in high dimensions.""" ) kde_dist = get_kde(final_theta, **kde_kwargs) if return_summary: return ( kde_dist, dict(theta=final_theta, distances=distances_accepted, x=x_accepted), ) else: return kde_dist elif return_summary: return final_theta, dict(distances=distances_accepted, x=x_accepted) else: return final_theta  #### __call__(self, x_o, num_simulations, eps=None, quantile=None, lra=False, sass=False, sass_fraction=0.25, sass_expansion_degree=1, kde=False, kde_kwargs={}, return_summary=False) special ¶ Run MCABC and return accepted parameters or KDE object fitted on them. Parameters: Name Type Description Default x_o Union[torch.Tensor, numpy.ndarray] Observed data. required num_simulations int Number of simulations to run. required eps Optional[float] Acceptance threshold $$\epsilon$$ for distance between observed and simulated data. None quantile Optional[float] Upper quantile of smallest distances for which the corresponding parameters are returned, e.g, q=0.01 will return the top 1%. Exactly one of quantile or eps have to be passed. None lra bool Whether to run linear regression adjustment as in Beaumont et al. 2002 False sass bool Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. False sass_fraction float Fraction of simulation budget used for the initial sass run. 0.25 sass_expansion_degree int Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. 1 kde bool Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. False kde_kwargs Dict[str, Any] kwargs for performing KDE: ‘bandwidth=’; either a float, or a string naming a bandwidth heuristics, e.g., ‘cv’ (cross validation), ‘silvermann’ or ‘scott’, default ‘cv’. ‘transform’: transform applied to the parameters before doing KDE. ‘sample_weights’: weights associated with samples. See ‘get_kde’ for more details {} return_summary bool Whether to return the distances and data corresponding to the accepted parameters. False Returns: Type Description theta (if kde False) accepted parameters kde (if kde True): KDE object based on accepted parameters from which one can .sample() and .log_prob(). summary (if summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x. Source code in sbi/inference/abc/mcabc.py def __call__( self, x_o: Union[Tensor, ndarray], num_simulations: int, eps: Optional[float] = None, quantile: Optional[float] = None, lra: bool = False, sass: bool = False, sass_fraction: float = 0.25, sass_expansion_degree: int = 1, kde: bool = False, kde_kwargs: Dict[str, Any] = {}, return_summary: bool = False, ) -> Union[Tuple[Tensor, dict], Tuple[KDEWrapper, dict], Tensor, KDEWrapper]: r"""Run MCABC and return accepted parameters or KDE object fitted on them. Args: x_o: Observed data. num_simulations: Number of simulations to run. eps: Acceptance threshold$\epsilon$for distance between observed and simulated data. quantile: Upper quantile of smallest distances for which the corresponding parameters are returned, e.g, q=0.01 will return the top 1%. Exactly one of quantile or eps have to be passed. lra: Whether to run linear regression adjustment as in Beaumont et al. 2002 sass: Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. sass_fraction: Fraction of simulation budget used for the initial sass run. sass_expansion_degree: Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. kde: Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. kde_kwargs: kwargs for performing KDE: 'bandwidth='; either a float, or a string naming a bandwidth heuristics, e.g., 'cv' (cross validation), 'silvermann' or 'scott', default 'cv'. 'transform': transform applied to the parameters before doing KDE. 'sample_weights': weights associated with samples. See 'get_kde' for more details return_summary: Whether to return the distances and data corresponding to the accepted parameters. Returns: theta (if kde False): accepted parameters kde (if kde True): KDE object based on accepted parameters from which one can .sample() and .log_prob(). summary (if summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x. """ # Exactly one of eps or quantile need to be passed. assert (eps is not None) ^ ( quantile is not None ), "Eps or quantile must be passed, but not both." # Run SASS and change the simulator and x_o accordingly. if sass: num_pilot_simulations = int(sass_fraction * num_simulations) self.logger.info( f"Running SASS with {num_pilot_simulations} pilot samples." ) num_simulations -= num_pilot_simulations pilot_theta = self.prior.sample((num_pilot_simulations,)) pilot_x = self._batched_simulator(pilot_theta) sass_transform = self.get_sass_transform( pilot_theta, pilot_x, sass_expansion_degree ) simulator = lambda theta: sass_transform(self._batched_simulator(theta)) x_o = sass_transform(x_o) else: simulator = self._batched_simulator # Simulate and calculate distances. theta = self.prior.sample((num_simulations,)) x = simulator(theta) # Infer shape of x to test and set x_o. self.x_shape = x.unsqueeze(0).shape self.x_o = process_x(x_o, self.x_shape) distances = self.distance(self.x_o, x) # Select based on acceptance threshold epsilon. if eps is not None: is_accepted = distances < eps num_accepted = is_accepted.sum().item() assert num_accepted > 0, f"No parameters accepted, eps={eps} too small" theta_accepted = theta[is_accepted] distances_accepted = distances[is_accepted] x_accepted = x[is_accepted] # Select based on quantile on sorted distances. elif quantile is not None: num_top_samples = int(num_simulations * quantile) sort_idx = torch.argsort(distances) theta_accepted = theta[sort_idx][:num_top_samples] distances_accepted = distances[sort_idx][:num_top_samples] x_accepted = x[sort_idx][:num_top_samples] else: raise ValueError("One of epsilon or quantile has to be passed.") # Maybe adjust theta with LRA. if lra: self.logger.info("Running Linear regression adjustment.") final_theta = self.run_lra(theta_accepted, x_accepted, observation=self.x_o) else: final_theta = theta_accepted if kde: self.logger.info( f"""KDE on {final_theta.shape} samples with bandwidth option {kde_kwargs["bandwidth"] if "bandwidth" in kde_kwargs else "cv"}. Beware that KDE can give unreliable results when used with too few samples and in high dimensions.""" ) kde_dist = get_kde(final_theta, **kde_kwargs) if return_summary: return ( kde_dist, dict(theta=final_theta, distances=distances_accepted, x=x_accepted), ) else: return kde_dist elif return_summary: return final_theta, dict(distances=distances_accepted, x=x_accepted) else: return final_theta  #### __init__(self, simulator, prior, distance='l2', num_workers=1, simulation_batch_size=1, show_progress_bars=True) special ¶ Monte-Carlo Approximate Bayesian Computation (Rejection ABC) .  Pritchard, J. K., Seielstad, M. T., Perez-Lezaun, A., & Feldman, M. W. (1999). Population growth of human Y chromosomes: a study of Y chromosome microsatellites. Molecular biology and evolution, 16(12), 1791-1798. Parameters: Name Type Description Default simulator Callable A function that takes parameters $$\theta$$ and maps them to simulations, or observations, x, $$\mathrm{sim}(\theta)\to x$$. Any regular Python callable (i.e. function or class with __call__ method) can be used. required prior A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. required distance Union[str, Callable] Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. 'l2' num_workers int Number of parallel workers to use for simulations. 1 simulation_batch_size int Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). 1 show_progress_bars bool Whether to show a progressbar during simulation and sampling. True Source code in sbi/inference/abc/mcabc.py def __init__( self, simulator: Callable, prior, distance: Union[str, Callable] = "l2", num_workers: int = 1, simulation_batch_size: int = 1, show_progress_bars: bool = True, ): r"""Monte-Carlo Approximate Bayesian Computation (Rejection ABC) .  Pritchard, J. K., Seielstad, M. T., Perez-Lezaun, A., & Feldman, M. W. (1999). Population growth of human Y chromosomes: a study of Y chromosome microsatellites. Molecular biology and evolution, 16(12), 1791-1798. Args: simulator: A function that takes parameters$\theta$and maps them to simulations, or observations, x,$\mathrm{sim}(\theta)\to x$. Any regular Python callable (i.e. function or class with __call__ method) can be used. prior: A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. distance: Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. num_workers: Number of parallel workers to use for simulations. simulation_batch_size: Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). show_progress_bars: Whether to show a progressbar during simulation and sampling. """ super().__init__( simulator=simulator, prior=prior, distance=distance, num_workers=num_workers, simulation_batch_size=simulation_batch_size, show_progress_bars=show_progress_bars, )  #### choose_distance_function(distance_type='l2') inherited ¶ Return distance function for given distance type. Source code in sbi/inference/abc/mcabc.py @staticmethod def choose_distance_function(distance_type: str = "l2") -> Callable: """Return distance function for given distance type.""" if distance_type == "mse": distance = lambda xo, x: torch.mean((xo - x) ** 2, dim=-1) elif distance_type == "l2": distance = lambda xo, x: torch.norm((xo - x), dim=-1) elif distance_type == "l1": distance = lambda xo, x: torch.mean(abs(xo - x), dim=-1) else: raise ValueError(r"Distance {distance_type} not supported.") def distance_fun(observed_data: Tensor, simulated_data: Tensor) -> Tensor: """Return distance over batch dimension. Args: observed_data: Observed data, could be 1D. simulated_data: Batch of simulated data, has batch dimension. Returns: Torch tensor with batch of distances. """ assert simulated_data.ndim == 2, "simulated data needs batch dimension" return distance(observed_data, simulated_data) return distance_fun  #### get_sass_transform(theta, x, expansion_degree=1, sample_weight=None) inherited ¶ Return semi-automatic summary statitics function. Running weighted linear regressin as in Fearnhead & Prandle 2012: https://arxiv.org/abs/1004.1112 Source code in sbi/inference/abc/mcabc.py @staticmethod def get_sass_transform( theta: torch.Tensor, x: torch.Tensor, expansion_degree: int = 1, sample_weight=None, ) -> Callable: """Return semi-automatic summary statitics function. Running weighted linear regressin as in Fearnhead & Prandle 2012: https://arxiv.org/abs/1004.1112 Following implementation in https://abcpy.readthedocs.io/en/latest/_modules/abcpy/statistics.html#Identity and https://pythonhosted.org/abcpy/_modules/abcpy/summaryselections.html#Semiautomatic """ expansion = PolynomialFeatures(degree=expansion_degree, include_bias=False) # Transform x, remove intercept. x_expanded = expansion.fit_transform(x) sumstats_map = np.zeros((x_expanded.shape, theta.shape)) for parameter_idx in range(theta.shape): regression_model = LinearRegression(fit_intercept=True) regression_model.fit( X=x_expanded, y=theta[:, parameter_idx], sample_weight=sample_weight ) sumstats_map[:, parameter_idx] = regression_model.coef_ sumstats_map = torch.tensor(sumstats_map, dtype=torch.float32) def sumstats_transform(x): x_expanded = torch.tensor(expansion.fit_transform(x), dtype=torch.float32) return x_expanded.mm(sumstats_map) return sumstats_transform  #### run_lra(theta, x, observation, sample_weight=None) inherited ¶ Return parameters adjusted with linear regression adjustment. Implementation as in Beaumont et al. 2002: https://arxiv.org/abs/1707.01254 Source code in sbi/inference/abc/mcabc.py @staticmethod def run_lra( theta: torch.Tensor, x: torch.Tensor, observation: torch.Tensor, sample_weight=None, ) -> torch.Tensor: """Return parameters adjusted with linear regression adjustment. Implementation as in Beaumont et al. 2002: https://arxiv.org/abs/1707.01254 """ theta_adjusted = theta for parameter_idx in range(theta.shape): regression_model = LinearRegression(fit_intercept=True) regression_model.fit( X=x, y=theta[:, parameter_idx], sample_weight=sample_weight, ) theta_adjusted[:, parameter_idx] += regression_model.predict( observation.reshape(1, -1) ) theta_adjusted[:, parameter_idx] -= regression_model.predict(x) return theta_adjusted  ###  sbi.inference.abc.smcabc.SMCABC (ABCBASE) ¶ Source code in sbi/inference/abc/smcabc.py class SMCABC(ABCBASE): def __init__( self, simulator: Callable, prior: Distribution, distance: Union[str, Callable] = "l2", num_workers: int = 1, simulation_batch_size: int = 1, show_progress_bars: bool = True, kernel: Optional[str] = "gaussian", algorithm_variant: str = "C", ): r"""Sequential Monte Carlo Approximate Bayesian Computation. We distinguish between three different SMC methods here: - A: Toni et al. 2010 (Phd Thesis) - B: Sisson et al. 2007 (with correction from 2009) - C: Beaumont et al. 2009 In Toni et al. 2010 we find an overview of the differences on page 34: - B: same as A except for resampling of weights if the effective sampling size is too small. - C: same as A except for calculation of the covariance of the perturbation kernel: the kernel covariance is a scaled version of the covariance of the previous population. Args: simulator: A function that takes parameters$\theta$and maps them to simulations, or observations, x,$\mathrm{sim}(\theta)\to x$. Any regular Python callable (i.e. function or class with __call__ method) can be used. prior: A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. distance: Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. num_workers: Number of parallel workers to use for simulations. simulation_batch_size: Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). show_progress_bars: Whether to show a progressbar during simulation and sampling. kernel: Perturbation kernel. algorithm_variant: Indicating the choice of algorithm variant, A, B, or C. """ super().__init__( simulator=simulator, prior=prior, distance=distance, num_workers=num_workers, simulation_batch_size=simulation_batch_size, show_progress_bars=show_progress_bars, ) kernels = ("gaussian", "uniform") assert ( kernel in kernels ), f"Kernel '{kernel}' not supported. Choose one from {kernels}." self.kernel = kernel algorithm_variants = ("A", "B", "C") assert algorithm_variant in algorithm_variants, ( f"SMCABC variant '{algorithm_variant}' not supported, choose one from" " {algorithm_variants}." ) self.algorithm_variant = algorithm_variant self.distance_to_x0 = None self.simulation_counter = 0 self.num_simulations = 0 # Define simulator that keeps track of budget. def simulate_with_budget(theta): self.simulation_counter += theta.shape return self._batched_simulator(theta) self._simulate_with_budget = simulate_with_budget def __call__( self, x_o: Union[Tensor, ndarray], num_particles: int, num_initial_pop: int, num_simulations: int, epsilon_decay: float, distance_based_decay: bool = False, ess_min: Optional[float] = None, kernel_variance_scale: float = 1.0, use_last_pop_samples: bool = True, return_summary: bool = False, kde: bool = False, kde_kwargs: Dict[str, Any] = {}, kde_sample_weights: bool = False, lra: bool = False, lra_with_weights: bool = False, sass: bool = False, sass_fraction: float = 0.25, sass_expansion_degree: int = 1, ) -> Union[Tensor, KDEWrapper, Tuple[Tensor, dict], Tuple[KDEWrapper, dict]]: r"""Run SMCABC and return accepted parameters or KDE object fitted on them. Args: x_o: Observed data. num_particles: Number of particles in each population. num_initial_pop: Number of simulations used for initial population. num_simulations: Total number of possible simulations. epsilon_decay: Factor with which the acceptance threshold$\epsilon$decays. distance_based_decay: Whether the$\epsilon$decay is constant over populations or calculated from the previous populations distribution of distances. ess_min: Threshold of effective sampling size for resampling weights. Not used when None (default). kernel_variance_scale: Factor for scaling the perturbation kernel variance. use_last_pop_samples: Whether to fill up the current population with samples from the previous population when the budget is used up. If False, the current population is discarded and the previous population is returned. lra: Whether to run linear regression adjustment as in Beaumont et al. 2002 lra_with_weights: Whether to run lra as weighted linear regression with SMC weights sass: Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. sass_fraction: Fraction of simulation budget used for the initial sass run. sass_expansion_degree: Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. kde: Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. kde_kwargs: kwargs for performing KDE: 'bandwidth='; either a float, or a string naming a bandwidth heuristics, e.g., 'cv' (cross validation), 'silvermann' or 'scott', default 'cv'. 'transform': transform applied to the parameters before doing KDE. 'sample_weights': weights associated with samples. See 'get_kde' for more details kde_sample_weights: Whether perform weighted KDE with SMC weights or on raw particles. return_summary: Whether to return a dictionary with all accepted particles, weights, etc. at the end. Returns: theta (if kde False): accepted parameters of the last population. kde (if kde True): KDE object fitted on accepted parameters, from which one can .sample() and .log_prob(). summary (if return_summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x of all populations. """ pop_idx = 0 self.num_simulations = num_simulations # Pilot run for SASS. if sass: num_pilot_simulations = int(sass_fraction * num_simulations) self.logger.info( f"Running SASS with {num_pilot_simulations} pilot samples." ) sass_transform = self.run_sass_set_xo( num_particles, num_pilot_simulations, x_o, lra, sass_expansion_degree ) # Udpate simulator and xo x_o = sass_transform(self.x_o) def sass_simulator(theta): self.simulation_counter += theta.shape return sass_transform(self._batched_simulator(theta)) self._simulate_with_budget = sass_simulator # run initial population particles, epsilon, distances, x = self._set_xo_and_sample_initial_population( x_o, num_particles, num_initial_pop ) log_weights = torch.log(1 / num_particles * ones(num_particles)) self.logger.info( ( f"population={pop_idx}, eps={epsilon}, ess={1.0}, " f"num_sims={num_initial_pop}" ) ) all_particles = [particles] all_log_weights = [log_weights] all_distances = [distances] all_epsilons = [epsilon] all_x = [x] while self.simulation_counter < self.num_simulations: pop_idx += 1 # Decay based on quantile of distances from previous pop. if distance_based_decay: epsilon = self._get_next_epsilon( all_distances[pop_idx - 1], epsilon_decay ) # Constant decay. else: epsilon *= epsilon_decay # Get kernel variance from previous pop. self.kernel_variance = self.get_kernel_variance( all_particles[pop_idx - 1], torch.exp(all_log_weights[pop_idx - 1]), samples_per_dim=500, kernel_variance_scale=kernel_variance_scale, ) particles, log_weights, distances, x = self._sample_next_population( particles=all_particles[pop_idx - 1], log_weights=all_log_weights[pop_idx - 1], distances=all_distances[pop_idx - 1], epsilon=epsilon, x=all_x[pop_idx - 1], use_last_pop_samples=use_last_pop_samples, ) # Resample population if effective sampling size is too small. if ess_min is not None: particles, log_weights = self.resample_if_ess_too_small( particles, log_weights, ess_min, pop_idx ) self.logger.info( ( f"population={pop_idx} done: eps={epsilon:.6f}," f" num_sims={self.simulation_counter}." ) ) # collect results all_particles.append(particles) all_log_weights.append(log_weights) all_distances.append(distances) all_epsilons.append(epsilon) all_x.append(x) # Maybe run LRA and adjust weights. if lra: self.logger.info("Running Linear regression adjustment.") adjusted_particles, adjusted_weights = self.run_lra_update_weights( particles=all_particles[-1], xs=all_x[-1], observation=process_x(x_o), log_weights=all_log_weights[-1], lra_with_weights=lra_with_weights, ) final_particles = adjusted_particles else: final_particles = all_particles[-1] if kde: self.logger.info( f"""KDE on {final_particles.shape} samples with bandwidth option {kde_kwargs["bandwidth"] if "bandwidth" in kde_kwargs else "cv"}. Beware that KDE can give unreliable results when used with too few samples and in high dimensions.""" ) # Maybe get particles weights from last population for weighted KDE. if kde_sample_weights: kde_kwargs["sample_weights"] = all_log_weights[-1].exp() kde_dist = get_kde(final_particles, **kde_kwargs) if return_summary: return ( kde_dist, dict( particles=all_particles, weights=all_log_weights, epsilons=all_epsilons, distances=all_distances, xs=all_x, ), ) else: return kde_dist if return_summary: return ( final_particles, dict( particles=all_particles, weights=all_log_weights, epsilons=all_epsilons, distances=all_distances, xs=all_x, ), ) else: return final_particles def _set_xo_and_sample_initial_population( self, x_o: Array, num_particles: int, num_initial_pop: int, ) -> Tuple[Tensor, float, Tensor, Tensor]: """Return particles, epsilon and distances of initial population.""" assert ( num_particles <= num_initial_pop ), "number of initial round simulations must be greater than population size" theta = self.prior.sample((num_initial_pop,)) x = self._simulate_with_budget(theta) # Infer x shape to test and set x_o. self.x_shape = x.unsqueeze(0).shape self.x_o = process_x(x_o, self.x_shape) distances = self.distance(self.x_o, x) sortidx = torch.argsort(distances) particles = theta[sortidx][:num_particles] # Take last accepted distance as epsilon. initial_epsilon = distances[sortidx][num_particles - 1] if not torch.isfinite(initial_epsilon): initial_epsilon = 1e8 return ( particles, initial_epsilon, distances[sortidx][:num_particles], x[sortidx][:num_particles], ) def _sample_next_population( self, particles: Tensor, log_weights: Tensor, distances: Tensor, epsilon: float, x: Tensor, use_last_pop_samples: bool = True, ) -> Tuple[Tensor, Tensor, Tensor, Tensor]: """Return particles, weights and distances of new population.""" new_particles = [] new_log_weights = [] new_distances = [] new_x = [] num_accepted_particles = 0 num_particles = particles.shape while num_accepted_particles < num_particles: # Upperbound for batch size to not exceed simulation budget. num_batch = min( num_particles - num_accepted_particles, self.num_simulations - self.simulation_counter, ) # Sample from previous population and perturb. particle_candidates = self._sample_and_perturb( particles, torch.exp(log_weights), num_samples=num_batch ) # Simulate and select based on distance. x_candidates = self._simulate_with_budget(particle_candidates) dists = self.distance(self.x_o, x_candidates) is_accepted = dists <= epsilon num_accepted_batch = is_accepted.sum().item() if num_accepted_batch > 0: new_particles.append(particle_candidates[is_accepted]) new_log_weights.append( self._calculate_new_log_weights( particle_candidates[is_accepted], particles, log_weights, ) ) new_distances.append(dists[is_accepted]) new_x.append(x_candidates[is_accepted]) num_accepted_particles += num_accepted_batch # If simulation budget was exceeded and we still need particles, take # previous population or fill up with previous population. if ( self.simulation_counter >= self.num_simulations and num_accepted_particles < num_particles ): if use_last_pop_samples: num_remaining = num_particles - num_accepted_particles self.logger.info( f"""Simulation Budget exceeded, filling up with {num_remaining} samples from last population.""" ) # Some new particles have been accepted already, therefore # fill up the remaining once with old particles and weights. new_particles.append(particles[:num_remaining, :]) # Recalculate weights with new particles. new_log_weights = [ self._calculate_new_log_weights( torch.cat(new_particles), particles, log_weights, ) ] new_distances.append(distances[:num_remaining]) new_x.append(x[:num_remaining]) else: self.logger.info( "Simulation Budget exceeded, returning previous population." ) new_particles = [particles] new_log_weights = [log_weights] new_distances = [distances] new_x = [x] break # collect lists of tensors into tensors new_particles = torch.cat(new_particles) new_log_weights = torch.cat(new_log_weights) new_distances = torch.cat(new_distances) new_x = torch.cat(new_x) # normalize the new weights new_log_weights -= torch.logsumexp(new_log_weights, dim=0) # Return sorted wrt distances. sort_idx = torch.argsort(new_distances) return ( new_particles[sort_idx], new_log_weights[sort_idx], new_distances[sort_idx], new_x[sort_idx], ) def _get_next_epsilon(self, distances: Tensor, quantile: float) -> float: """Return epsilon for next round based on quantile of this round's distances. Note: distances are made unique to avoid repeated distances from simulations that result in the same observation. Args: distances: The distances accepted in this round. quantile: Quantile in the distance distribution to determine new epsilon. Returns: epsilon: Epsilon for the next population. """ # Take unique distances to skip same distances simulations (return is sorted). distances = torch.unique(distances) # Cumsum as cdf proxy. distances_cdf = torch.cumsum(distances, dim=0) / distances.sum() # Take the q quantile of distances. try: qidx = torch.where(distances_cdf >= quantile) except IndexError: self.logger.warning( ( f"Accepted unique distances={distances} don't match " f"quantile={quantile:.2f}. Selecting last distance." ) ) qidx = -1 # The new epsilon is given by that distance. return distances[qidx].item() def _calculate_new_log_weights( self, new_particles: Tensor, old_particles: Tensor, old_log_weights: Tensor, ) -> Tensor: """Return new log weights following formulas in publications A,B anc C.""" # Prior can be batched across new particles. prior_log_probs = self.prior.log_prob(new_particles) # Contstruct function to get kernel log prob for given old particle. # The kernel is centered on each old particle as in all three variants (A,B,C). def kernel_log_prob(new_particle): return self.get_new_kernel(old_particles).log_prob(new_particle) # We still have to loop over particles here because # the kernel log probs are already batched across old particles. log_weighted_sum = tensor( [ torch.logsumexp(old_log_weights + kernel_log_prob(new_particle), dim=0) for new_particle in new_particles ], dtype=torch.float32, ) # new weights are prior probs over weighted sum: return prior_log_probs - log_weighted_sum @staticmethod def sample_from_population_with_weights( particles: Tensor, weights: Tensor, num_samples: int = 1 ) -> Tensor: """Return samples from particles sampled with weights.""" # define multinomial with weights as probs multi = Multinomial(probs=weights) # sample num samples, with replacement samples = multi.sample(sample_shape=torch.Size((num_samples,))) # get indices of success trials indices = torch.where(samples) # return those indices from trace return particles[indices] def _sample_and_perturb( self, particles: Tensor, weights: Tensor, num_samples: int = 1 ) -> Tensor: """Sample and perturb batch of new parameters from trace. Reject sampled and perturbed parameters outside of prior. """ num_accepted = 0 parameters = [] while num_accepted < num_samples: parms = self.sample_from_population_with_weights( particles, weights, num_samples=num_samples - num_accepted ) # Create kernel on params and perturb. parms_perturbed = self.get_new_kernel(parms).sample() is_within_prior = within_support(self.prior, parms_perturbed) num_accepted += int(is_within_prior.sum().item()) if num_accepted > 0: parameters.append(parms_perturbed[is_within_prior]) return torch.cat(parameters) def get_kernel_variance( self, particles: Tensor, weights: Tensor, samples_per_dim: int = 100, kernel_variance_scale: float = 1.0, ) -> Tensor: if self.kernel == "gaussian": # For variant C, Beaumont et al. 2009, the kernel variance comes from the # previous population. if self.algorithm_variant == "C": # Calculate weighted covariance of particles. population_cov = torch.tensor( np.atleast_2d(np.cov(particles, rowvar=False, aweights=weights)), dtype=torch.float32, ) # Make sure variance is nonsingular. try: torch.cholesky(kernel_variance_scale * population_cov) except RuntimeError: self.logger.warning( """"Singular particle covariance, using unit covariance.""" ) population_cov = torch.eye(particles.shape) return kernel_variance_scale * population_cov # While for Toni et al. and Sisson et al. it comes from the parameter # ranges. elif self.algorithm_variant in ("A", "B"): particle_ranges = self.get_particle_ranges( particles, weights, samples_per_dim=samples_per_dim ) return kernel_variance_scale * torch.diag(particle_ranges) else: raise ValueError(f"Variant, '{self.algorithm_variant}' not supported.") elif self.kernel == "uniform": # Variance spans the range of parameters for every dimension. return kernel_variance_scale * self.get_particle_ranges( particles, weights, samples_per_dim=samples_per_dim ) else: raise ValueError(f"Kernel, '{self.kernel}' not supported.") def get_new_kernel(self, thetas: Tensor) -> Distribution: """Return new kernel distribution for a given set of paramters.""" if self.kernel == "gaussian": assert self.kernel_variance.ndim == 2 return MultivariateNormal( loc=thetas, covariance_matrix=self.kernel_variance ) elif self.kernel == "uniform": low = thetas - self.kernel_variance high = thetas + self.kernel_variance # Move batch shape to event shape to get Uniform that is multivariate in # parameter dimension. return Uniform(low=low, high=high).to_event(1) else: raise ValueError(f"Kernel, '{self.kernel}' not supported.") def resample_if_ess_too_small( self, particles: Tensor, log_weights: Tensor, ess_min: float, pop_idx: int, ) -> Tuple[Tensor, Tensor]: """Return resampled particles and uniform weights if effectice sampling size is too small. """ num_particles = particles.shape ess = (1 / torch.sum(torch.exp(2.0 * log_weights), dim=0)) / num_particles # Resampling of weights for low ESS only for Sisson et al. 2007. if ess < ess_min: self.logger.info(f"ESS={ess:.2f} too low, resampling pop {pop_idx}...") # First resample, then set to uniform weights as in Sisson et al. 2007. particles = self.sample_from_population_with_weights( particles, torch.exp(log_weights), num_samples=num_particles ) log_weights = torch.log(1 / num_particles * ones(num_particles)) return particles, log_weights def run_lra_update_weights( self, particles: Tensor, xs: Tensor, observation: Tensor, log_weights: Tensor, lra_with_weights: bool, ) -> Tuple[Tensor, Tensor]: """Return particles and weights adjusted with LRA. Runs (weighted) linear regression from xs onto particles to adjust the particles. Updates the SMC weights according to the new particles. """ adjusted_particels = self.run_lra( theta=particles, x=xs, observation=observation, sample_weight=log_weights.exp() if lra_with_weights else None, ) # Update SMC weights with LRA adjusted weights adjusted_log_weights = self._calculate_new_log_weights( new_particles=adjusted_particels, old_particles=particles, old_log_weights=log_weights, ) return adjusted_particels, adjusted_log_weights def run_sass_set_xo( self, num_particles: int, num_pilot_simulations: int, x_o, lra: bool = False, sass_expansion_degree: int = 1, ) -> Callable: """Return transform for semi-automatic summary statistics. Runs an single round of rejection abc with fixed budget and accepts num_particles simulations to run the regression for sass. Sets self.x_o once the x_shape can be derived from simulations. """ (pilot_particles, _, _, pilot_xs,) = self._set_xo_and_sample_initial_population( x_o, num_particles, num_pilot_simulations ) # Adjust with LRA. if lra: pilot_particles = self.run_lra(pilot_particles, pilot_xs, self.x_o) sass_transform = self.get_sass_transform( pilot_particles, pilot_xs, expansion_degree=sass_expansion_degree, sample_weight=None, ) return sass_transform def get_particle_ranges( self, particles: Tensor, weights: Tensor, samples_per_dim: int = 100 ) -> Tensor: """Return range of particles in each parameter dimension.""" # get weighted samples samples = self.sample_from_population_with_weights( particles, weights, num_samples=samples_per_dim * particles.shape, ) # Variance spans the range of particles for every dimension. particle_ranges = samples.max(0).values - samples.min(0).values assert particle_ranges.ndim < 2 return particle_ranges  #### __call__(self, x_o, num_particles, num_initial_pop, num_simulations, epsilon_decay, distance_based_decay=False, ess_min=None, kernel_variance_scale=1.0, use_last_pop_samples=True, return_summary=False, kde=False, kde_kwargs={}, kde_sample_weights=False, lra=False, lra_with_weights=False, sass=False, sass_fraction=0.25, sass_expansion_degree=1) special ¶ Run SMCABC and return accepted parameters or KDE object fitted on them. Parameters: Name Type Description Default x_o Union[torch.Tensor, numpy.ndarray] Observed data. required num_particles int Number of particles in each population. required num_initial_pop int Number of simulations used for initial population. required num_simulations int Total number of possible simulations. required epsilon_decay float Factor with which the acceptance threshold $$\epsilon$$ decays. required distance_based_decay bool Whether the $$\epsilon$$ decay is constant over populations or calculated from the previous populations distribution of distances. False ess_min Optional[float] Threshold of effective sampling size for resampling weights. Not used when None (default). None kernel_variance_scale float Factor for scaling the perturbation kernel variance. 1.0 use_last_pop_samples bool Whether to fill up the current population with samples from the previous population when the budget is used up. If False, the current population is discarded and the previous population is returned. True lra bool Whether to run linear regression adjustment as in Beaumont et al. 2002 False lra_with_weights bool Whether to run lra as weighted linear regression with SMC weights False sass bool Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. False sass_fraction float Fraction of simulation budget used for the initial sass run. 0.25 sass_expansion_degree int Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. 1 kde bool Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. False kde_kwargs Dict[str, Any] kwargs for performing KDE: ‘bandwidth=’; either a float, or a string naming a bandwidth heuristics, e.g., ‘cv’ (cross validation), ‘silvermann’ or ‘scott’, default ‘cv’. ‘transform’: transform applied to the parameters before doing KDE. ‘sample_weights’: weights associated with samples. See ‘get_kde’ for more details {} kde_sample_weights bool Whether perform weighted KDE with SMC weights or on raw particles. False return_summary bool Whether to return a dictionary with all accepted particles, weights, etc. at the end. False Returns: Type Description theta (if kde False) accepted parameters of the last population. kde (if kde True): KDE object fitted on accepted parameters, from which one can .sample() and .log_prob(). summary (if return_summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x of all populations. Source code in sbi/inference/abc/smcabc.py def __call__( self, x_o: Union[Tensor, ndarray], num_particles: int, num_initial_pop: int, num_simulations: int, epsilon_decay: float, distance_based_decay: bool = False, ess_min: Optional[float] = None, kernel_variance_scale: float = 1.0, use_last_pop_samples: bool = True, return_summary: bool = False, kde: bool = False, kde_kwargs: Dict[str, Any] = {}, kde_sample_weights: bool = False, lra: bool = False, lra_with_weights: bool = False, sass: bool = False, sass_fraction: float = 0.25, sass_expansion_degree: int = 1, ) -> Union[Tensor, KDEWrapper, Tuple[Tensor, dict], Tuple[KDEWrapper, dict]]: r"""Run SMCABC and return accepted parameters or KDE object fitted on them. Args: x_o: Observed data. num_particles: Number of particles in each population. num_initial_pop: Number of simulations used for initial population. num_simulations: Total number of possible simulations. epsilon_decay: Factor with which the acceptance threshold$\epsilon$decays. distance_based_decay: Whether the$\epsilon$decay is constant over populations or calculated from the previous populations distribution of distances. ess_min: Threshold of effective sampling size for resampling weights. Not used when None (default). kernel_variance_scale: Factor for scaling the perturbation kernel variance. use_last_pop_samples: Whether to fill up the current population with samples from the previous population when the budget is used up. If False, the current population is discarded and the previous population is returned. lra: Whether to run linear regression adjustment as in Beaumont et al. 2002 lra_with_weights: Whether to run lra as weighted linear regression with SMC weights sass: Whether to determine semi-automatic summary statistics as in Fearnhead & Prangle 2012. sass_fraction: Fraction of simulation budget used for the initial sass run. sass_expansion_degree: Degree of the polynomial feature expansion for the sass regression, default 1 - no expansion. kde: Whether to run KDE on the accepted parameters to return a KDE object from which one can sample. kde_kwargs: kwargs for performing KDE: 'bandwidth='; either a float, or a string naming a bandwidth heuristics, e.g., 'cv' (cross validation), 'silvermann' or 'scott', default 'cv'. 'transform': transform applied to the parameters before doing KDE. 'sample_weights': weights associated with samples. See 'get_kde' for more details kde_sample_weights: Whether perform weighted KDE with SMC weights or on raw particles. return_summary: Whether to return a dictionary with all accepted particles, weights, etc. at the end. Returns: theta (if kde False): accepted parameters of the last population. kde (if kde True): KDE object fitted on accepted parameters, from which one can .sample() and .log_prob(). summary (if return_summary True): dictionary containing the accepted paramters (if kde True), distances and simulated data x of all populations. """ pop_idx = 0 self.num_simulations = num_simulations # Pilot run for SASS. if sass: num_pilot_simulations = int(sass_fraction * num_simulations) self.logger.info( f"Running SASS with {num_pilot_simulations} pilot samples." ) sass_transform = self.run_sass_set_xo( num_particles, num_pilot_simulations, x_o, lra, sass_expansion_degree ) # Udpate simulator and xo x_o = sass_transform(self.x_o) def sass_simulator(theta): self.simulation_counter += theta.shape return sass_transform(self._batched_simulator(theta)) self._simulate_with_budget = sass_simulator # run initial population particles, epsilon, distances, x = self._set_xo_and_sample_initial_population( x_o, num_particles, num_initial_pop ) log_weights = torch.log(1 / num_particles * ones(num_particles)) self.logger.info( ( f"population={pop_idx}, eps={epsilon}, ess={1.0}, " f"num_sims={num_initial_pop}" ) ) all_particles = [particles] all_log_weights = [log_weights] all_distances = [distances] all_epsilons = [epsilon] all_x = [x] while self.simulation_counter < self.num_simulations: pop_idx += 1 # Decay based on quantile of distances from previous pop. if distance_based_decay: epsilon = self._get_next_epsilon( all_distances[pop_idx - 1], epsilon_decay ) # Constant decay. else: epsilon *= epsilon_decay # Get kernel variance from previous pop. self.kernel_variance = self.get_kernel_variance( all_particles[pop_idx - 1], torch.exp(all_log_weights[pop_idx - 1]), samples_per_dim=500, kernel_variance_scale=kernel_variance_scale, ) particles, log_weights, distances, x = self._sample_next_population( particles=all_particles[pop_idx - 1], log_weights=all_log_weights[pop_idx - 1], distances=all_distances[pop_idx - 1], epsilon=epsilon, x=all_x[pop_idx - 1], use_last_pop_samples=use_last_pop_samples, ) # Resample population if effective sampling size is too small. if ess_min is not None: particles, log_weights = self.resample_if_ess_too_small( particles, log_weights, ess_min, pop_idx ) self.logger.info( ( f"population={pop_idx} done: eps={epsilon:.6f}," f" num_sims={self.simulation_counter}." ) ) # collect results all_particles.append(particles) all_log_weights.append(log_weights) all_distances.append(distances) all_epsilons.append(epsilon) all_x.append(x) # Maybe run LRA and adjust weights. if lra: self.logger.info("Running Linear regression adjustment.") adjusted_particles, adjusted_weights = self.run_lra_update_weights( particles=all_particles[-1], xs=all_x[-1], observation=process_x(x_o), log_weights=all_log_weights[-1], lra_with_weights=lra_with_weights, ) final_particles = adjusted_particles else: final_particles = all_particles[-1] if kde: self.logger.info( f"""KDE on {final_particles.shape} samples with bandwidth option {kde_kwargs["bandwidth"] if "bandwidth" in kde_kwargs else "cv"}. Beware that KDE can give unreliable results when used with too few samples and in high dimensions.""" ) # Maybe get particles weights from last population for weighted KDE. if kde_sample_weights: kde_kwargs["sample_weights"] = all_log_weights[-1].exp() kde_dist = get_kde(final_particles, **kde_kwargs) if return_summary: return ( kde_dist, dict( particles=all_particles, weights=all_log_weights, epsilons=all_epsilons, distances=all_distances, xs=all_x, ), ) else: return kde_dist if return_summary: return ( final_particles, dict( particles=all_particles, weights=all_log_weights, epsilons=all_epsilons, distances=all_distances, xs=all_x, ), ) else: return final_particles  #### __init__(self, simulator, prior, distance='l2', num_workers=1, simulation_batch_size=1, show_progress_bars=True, kernel='gaussian', algorithm_variant='C') special ¶ Sequential Monte Carlo Approximate Bayesian Computation. We distinguish between three different SMC methods here: - A: Toni et al. 2010 (Phd Thesis) - B: Sisson et al. 2007 (with correction from 2009) - C: Beaumont et al. 2009 In Toni et al. 2010 we find an overview of the differences on page 34: - B: same as A except for resampling of weights if the effective sampling size is too small. - C: same as A except for calculation of the covariance of the perturbation kernel: the kernel covariance is a scaled version of the covariance of the previous population. Parameters: Name Type Description Default simulator Callable A function that takes parameters $$\theta$$ and maps them to simulations, or observations, x, $$\mathrm{sim}(\theta)\to x$$. Any regular Python callable (i.e. function or class with __call__ method) can be used. required prior Distribution A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. required distance Union[str, Callable] Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. 'l2' num_workers int Number of parallel workers to use for simulations. 1 simulation_batch_size int Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). 1 show_progress_bars bool Whether to show a progressbar during simulation and sampling. True kernel Optional[str] Perturbation kernel. 'gaussian' algorithm_variant str Indicating the choice of algorithm variant, A, B, or C. 'C' Source code in sbi/inference/abc/smcabc.py def __init__( self, simulator: Callable, prior: Distribution, distance: Union[str, Callable] = "l2", num_workers: int = 1, simulation_batch_size: int = 1, show_progress_bars: bool = True, kernel: Optional[str] = "gaussian", algorithm_variant: str = "C", ): r"""Sequential Monte Carlo Approximate Bayesian Computation. We distinguish between three different SMC methods here: - A: Toni et al. 2010 (Phd Thesis) - B: Sisson et al. 2007 (with correction from 2009) - C: Beaumont et al. 2009 In Toni et al. 2010 we find an overview of the differences on page 34: - B: same as A except for resampling of weights if the effective sampling size is too small. - C: same as A except for calculation of the covariance of the perturbation kernel: the kernel covariance is a scaled version of the covariance of the previous population. Args: simulator: A function that takes parameters$\theta$and maps them to simulations, or observations, x,$\mathrm{sim}(\theta)\to x$. Any regular Python callable (i.e. function or class with __call__ method) can be used. prior: A probability distribution that expresses prior knowledge about the parameters, e.g. which ranges are meaningful for them. Any object with .log_prob()and .sample() (for example, a PyTorch distribution) can be used. distance: Distance function to compare observed and simulated data. Can be a custom function or one of l1, l2, mse. num_workers: Number of parallel workers to use for simulations. simulation_batch_size: Number of parameter sets that the simulator maps to data x at once. If None, we simulate all parameter sets at the same time. If >= 1, the simulator has to process data of shape (simulation_batch_size, parameter_dimension). show_progress_bars: Whether to show a progressbar during simulation and sampling. kernel: Perturbation kernel. algorithm_variant: Indicating the choice of algorithm variant, A, B, or C. """ super().__init__( simulator=simulator, prior=prior, distance=distance, num_workers=num_workers, simulation_batch_size=simulation_batch_size, show_progress_bars=show_progress_bars, ) kernels = ("gaussian", "uniform") assert ( kernel in kernels ), f"Kernel '{kernel}' not supported. Choose one from {kernels}." self.kernel = kernel algorithm_variants = ("A", "B", "C") assert algorithm_variant in algorithm_variants, ( f"SMCABC variant '{algorithm_variant}' not supported, choose one from" " {algorithm_variants}." ) self.algorithm_variant = algorithm_variant self.distance_to_x0 = None self.simulation_counter = 0 self.num_simulations = 0 # Define simulator that keeps track of budget. def simulate_with_budget(theta): self.simulation_counter += theta.shape return self._batched_simulator(theta) self._simulate_with_budget = simulate_with_budget  #### choose_distance_function(distance_type='l2') inherited ¶ Return distance function for given distance type. Source code in sbi/inference/abc/smcabc.py @staticmethod def choose_distance_function(distance_type: str = "l2") -> Callable: """Return distance function for given distance type.""" if distance_type == "mse": distance = lambda xo, x: torch.mean((xo - x) ** 2, dim=-1) elif distance_type == "l2": distance = lambda xo, x: torch.norm((xo - x), dim=-1) elif distance_type == "l1": distance = lambda xo, x: torch.mean(abs(xo - x), dim=-1) else: raise ValueError(r"Distance {distance_type} not supported.") def distance_fun(observed_data: Tensor, simulated_data: Tensor) -> Tensor: """Return distance over batch dimension. Args: observed_data: Observed data, could be 1D. simulated_data: Batch of simulated data, has batch dimension. Returns: Torch tensor with batch of distances. """ assert simulated_data.ndim == 2, "simulated data needs batch dimension" return distance(observed_data, simulated_data) return distance_fun  #### get_new_kernel(self, thetas)¶ Return new kernel distribution for a given set of paramters. Source code in sbi/inference/abc/smcabc.py def get_new_kernel(self, thetas: Tensor) -> Distribution: """Return new kernel distribution for a given set of paramters.""" if self.kernel == "gaussian": assert self.kernel_variance.ndim == 2 return MultivariateNormal( loc=thetas, covariance_matrix=self.kernel_variance ) elif self.kernel == "uniform": low = thetas - self.kernel_variance high = thetas + self.kernel_variance # Move batch shape to event shape to get Uniform that is multivariate in # parameter dimension. return Uniform(low=low, high=high).to_event(1) else: raise ValueError(f"Kernel, '{self.kernel}' not supported.")  #### get_particle_ranges(self, particles, weights, samples_per_dim=100)¶ Return range of particles in each parameter dimension. Source code in sbi/inference/abc/smcabc.py def get_particle_ranges( self, particles: Tensor, weights: Tensor, samples_per_dim: int = 100 ) -> Tensor: """Return range of particles in each parameter dimension.""" # get weighted samples samples = self.sample_from_population_with_weights( particles, weights, num_samples=samples_per_dim * particles.shape, ) # Variance spans the range of particles for every dimension. particle_ranges = samples.max(0).values - samples.min(0).values assert particle_ranges.ndim < 2 return particle_ranges  #### get_sass_transform(theta, x, expansion_degree=1, sample_weight=None) inherited ¶ Return semi-automatic summary statitics function. Running weighted linear regressin as in Fearnhead & Prandle 2012: https://arxiv.org/abs/1004.1112 Source code in sbi/inference/abc/smcabc.py @staticmethod def get_sass_transform( theta: torch.Tensor, x: torch.Tensor, expansion_degree: int = 1, sample_weight=None, ) -> Callable: """Return semi-automatic summary statitics function. Running weighted linear regressin as in Fearnhead & Prandle 2012: https://arxiv.org/abs/1004.1112 Following implementation in https://abcpy.readthedocs.io/en/latest/_modules/abcpy/statistics.html#Identity and https://pythonhosted.org/abcpy/_modules/abcpy/summaryselections.html#Semiautomatic """ expansion = PolynomialFeatures(degree=expansion_degree, include_bias=False) # Transform x, remove intercept. x_expanded = expansion.fit_transform(x) sumstats_map = np.zeros((x_expanded.shape, theta.shape)) for parameter_idx in range(theta.shape): regression_model = LinearRegression(fit_intercept=True) regression_model.fit( X=x_expanded, y=theta[:, parameter_idx], sample_weight=sample_weight ) sumstats_map[:, parameter_idx] = regression_model.coef_ sumstats_map = torch.tensor(sumstats_map, dtype=torch.float32) def sumstats_transform(x): x_expanded = torch.tensor(expansion.fit_transform(x), dtype=torch.float32) return x_expanded.mm(sumstats_map) return sumstats_transform  #### resample_if_ess_too_small(self, particles, log_weights, ess_min, pop_idx)¶ Return resampled particles and uniform weights if effectice sampling size is too small. Source code in sbi/inference/abc/smcabc.py def resample_if_ess_too_small( self, particles: Tensor, log_weights: Tensor, ess_min: float, pop_idx: int, ) -> Tuple[Tensor, Tensor]: """Return resampled particles and uniform weights if effectice sampling size is too small. """ num_particles = particles.shape ess = (1 / torch.sum(torch.exp(2.0 * log_weights), dim=0)) / num_particles # Resampling of weights for low ESS only for Sisson et al. 2007. if ess < ess_min: self.logger.info(f"ESS={ess:.2f} too low, resampling pop {pop_idx}...") # First resample, then set to uniform weights as in Sisson et al. 2007. particles = self.sample_from_population_with_weights( particles, torch.exp(log_weights), num_samples=num_particles ) log_weights = torch.log(1 / num_particles * ones(num_particles)) return particles, log_weights  #### run_lra(theta, x, observation, sample_weight=None) inherited ¶ Return parameters adjusted with linear regression adjustment. Implementation as in Beaumont et al. 2002: https://arxiv.org/abs/1707.01254 Source code in sbi/inference/abc/smcabc.py @staticmethod def run_lra( theta: torch.Tensor, x: torch.Tensor, observation: torch.Tensor, sample_weight=None, ) -> torch.Tensor: """Return parameters adjusted with linear regression adjustment. Implementation as in Beaumont et al. 2002: https://arxiv.org/abs/1707.01254 """ theta_adjusted = theta for parameter_idx in range(theta.shape): regression_model = LinearRegression(fit_intercept=True) regression_model.fit( X=x, y=theta[:, parameter_idx], sample_weight=sample_weight, ) theta_adjusted[:, parameter_idx] += regression_model.predict( observation.reshape(1, -1) ) theta_adjusted[:, parameter_idx] -= regression_model.predict(x) return theta_adjusted  #### run_lra_update_weights(self, particles, xs, observation, log_weights, lra_with_weights)¶ Return particles and weights adjusted with LRA. Runs (weighted) linear regression from xs onto particles to adjust the particles. Updates the SMC weights according to the new particles. Source code in sbi/inference/abc/smcabc.py def run_lra_update_weights( self, particles: Tensor, xs: Tensor, observation: Tensor, log_weights: Tensor, lra_with_weights: bool, ) -> Tuple[Tensor, Tensor]: """Return particles and weights adjusted with LRA. Runs (weighted) linear regression from xs onto particles to adjust the particles. Updates the SMC weights according to the new particles. """ adjusted_particels = self.run_lra( theta=particles, x=xs, observation=observation, sample_weight=log_weights.exp() if lra_with_weights else None, ) # Update SMC weights with LRA adjusted weights adjusted_log_weights = self._calculate_new_log_weights( new_particles=adjusted_particels, old_particles=particles, old_log_weights=log_weights, ) return adjusted_particels, adjusted_log_weights  #### run_sass_set_xo(self, num_particles, num_pilot_simulations, x_o, lra=False, sass_expansion_degree=1)¶ Return transform for semi-automatic summary statistics. Runs an single round of rejection abc with fixed budget and accepts num_particles simulations to run the regression for sass. Sets self.x_o once the x_shape can be derived from simulations. Source code in sbi/inference/abc/smcabc.py def run_sass_set_xo( self, num_particles: int, num_pilot_simulations: int, x_o, lra: bool = False, sass_expansion_degree: int = 1, ) -> Callable: """Return transform for semi-automatic summary statistics. Runs an single round of rejection abc with fixed budget and accepts num_particles simulations to run the regression for sass. Sets self.x_o once the x_shape can be derived from simulations. """ (pilot_particles, _, _, pilot_xs,) = self._set_xo_and_sample_initial_population( x_o, num_particles, num_pilot_simulations ) # Adjust with LRA. if lra: pilot_particles = self.run_lra(pilot_particles, pilot_xs, self.x_o) sass_transform = self.get_sass_transform( pilot_particles, pilot_xs, expansion_degree=sass_expansion_degree, sample_weight=None, ) return sass_transform  #### sample_from_population_with_weights(particles, weights, num_samples=1) staticmethod ¶ Return samples from particles sampled with weights. Source code in sbi/inference/abc/smcabc.py @staticmethod def sample_from_population_with_weights( particles: Tensor, weights: Tensor, num_samples: int = 1 ) -> Tensor: """Return samples from particles sampled with weights.""" # define multinomial with weights as probs multi = Multinomial(probs=weights) # sample num samples, with replacement samples = multi.sample(sample_shape=torch.Size((num_samples,))) # get indices of success trials indices = torch.where(samples) # return those indices from trace return particles[indices]  ## Posteriors¶ ###  sbi.inference.posteriors.direct_posterior.DirectPosterior (NeuralPosterior) ¶ Posterior $$p(\theta|x_o)$$ with log_prob() and sample() methods, only applicable to SNPE. SNPE trains a neural network to directly approximate the posterior distribution. However, for bounded priors, the neural network can have leakage: it puts non-zero mass in regions where the prior is zero. The DirectPosterior class wraps the trained network to deal with these cases. Specifically, this class offers the following functionality: - correct the calculation of the log probability such that it compensates for the leakage. - reject samples that lie outside of the prior bounds. This class can not be used in combination with SNLE or SNRE. Source code in sbi/inference/posteriors/direct_posterior.py class DirectPosterior(NeuralPosterior): r"""Posterior$p(\theta|x_o)$with log_prob() and sample() methods, only applicable to SNPE.<br/><br/> SNPE trains a neural network to directly approximate the posterior distribution. However, for bounded priors, the neural network can have leakage: it puts non-zero mass in regions where the prior is zero. The DirectPosterior class wraps the trained network to deal with these cases.<br/><br/> Specifically, this class offers the following functionality:<br/> - correct the calculation of the log probability such that it compensates for the leakage.<br/> - reject samples that lie outside of the prior bounds.<br/><br/> This class can not be used in combination with SNLE or SNRE. """ def __init__( self, posterior_estimator: flows.Flow, prior: Distribution, theta_transform: Optional[TorchTransform] = None, max_sampling_batch_size: int = 10_000, device: Optional[str] = None, x_shape: Optional[torch.Size] = None, ): """ Args: prior: Prior distribution with .log_prob() and .sample(). posterior_estimator: The trained neural posterior. theta_transform: Custom transform to perform MAP optimization in unconstrained space. If None (default), a suitable transform is built from the prior support. In order to not use a transform at all, pass an identity transform, e.g., theta_transform=torch.distrbutions. transforms. identity_transform(). max_sampling_batch_size: Batchsize of samples being drawn from the proposal at every iteration. device: Training device, e.g., "cpu", "cuda" or "cuda:0". If None, potential_fn.device is used. x_shape: Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. """ # Because DirectPosterior does not take the potential_fn as input, it # builds it itself. The potential_fn and theta_transform are used only for # obtaining the MAP. check_prior(prior) potential_fn, theta_transform = posterior_estimator_based_potential( posterior_estimator, prior, x_o=None, theta_transform=theta_transform ) super().__init__( potential_fn=potential_fn, theta_transform=theta_transform, device=device, x_shape=x_shape, ) self.prior = prior self.posterior_estimator = posterior_estimator self.max_sampling_batch_size = max_sampling_batch_size self._leakage_density_correction_factor = None self._purpose = """It samples the posterior network and rejects samples that lie outside of the prior bounds.""" def sample( self, sample_shape: Shape = torch.Size(), x: Optional[Tensor] = None, max_sampling_batch_size: int = 10_000, sample_with: Optional[str] = None, show_progress_bars: bool = True, ): r"""Return samples from posterior distribution$p(\theta|x)$. Args: sample_shape: Desired shape of samples that are drawn from posterior. If sample_shape is multidimensional we simply draw sample_shape.numel() samples and then reshape into the desired shape. sample_with: This argument only exists to keep backward-compatibility with sbi v0.17.2 or older. If it is set, we instantly raise an error. show_progress_bars: Whether to show sampling progress monitor. """ num_samples = torch.Size(sample_shape).numel() x = self._x_else_default_x(x) max_sampling_batch_size = ( self.max_sampling_batch_size if max_sampling_batch_size is None else max_sampling_batch_size ) if sample_with is not None: raise ValueError( f"You set sample_with={sample_with}. As of sbi v0.18.0, setting " f"sample_with is no longer supported. You have to rerun " f".build_posterior(sample_with={sample_with})." ) samples = rejection_sample_posterior_within_prior( posterior_nn=self.posterior_estimator, prior=self.prior, x=x, num_samples=num_samples, show_progress_bars=show_progress_bars, max_sampling_batch_size=max_sampling_batch_size, ) return samples def log_prob( self, theta: Tensor, x: Optional[Tensor] = None, norm_posterior: bool = True, track_gradients: bool = False, leakage_correction_params: Optional[dict] = None, ) -> Tensor: r"""Returns the log-probability of the posterior$p(\theta|x)$. Args: theta: Parameters$\theta$. norm_posterior: Whether to enforce a normalized posterior density. Renormalization of the posterior is useful when some probability falls out or leaks out of the prescribed prior support. The normalizing factor is calculated via rejection sampling, so if you need speedier but unnormalized log posterior estimates set here norm_posterior=False. The returned log posterior is set to -∞ outside of the prior support regardless of this setting. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. leakage_correction_params: A dict of keyword arguments to override the default values of leakage_correction(). Possible options are: num_rejection_samples, force_update, show_progress_bars, and rejection_sampling_batch_size. These parameters only have an effect if norm_posterior=True. Returns: (len(θ),)-shaped log posterior probability$\log p(\theta|x)$for θ in the support of the prior, -∞ (corresponding to 0 probability) outside. """ x = self._x_else_default_x(x) # TODO Train exited here, entered after sampling? self.posterior_estimator.eval() theta = ensure_theta_batched(torch.as_tensor(theta)) theta_repeated, x_repeated = match_theta_and_x_batch_shapes(theta, x) with torch.set_grad_enabled(track_gradients): # Evaluate on device, move back to cpu for comparison with prior. unnorm_log_prob = self.posterior_estimator.log_prob( theta_repeated, context=x_repeated ) # Force probability to be zero outside prior support. in_prior_support = within_support(self.prior, theta_repeated) masked_log_prob = torch.where( in_prior_support, unnorm_log_prob, torch.tensor(float("-inf"), dtype=torch.float32, device=self._device), ) if leakage_correction_params is None: leakage_correction_params = dict() # use defaults log_factor = ( log(self.leakage_correction(x=x, **leakage_correction_params)) if norm_posterior else 0 ) return masked_log_prob - log_factor @torch.no_grad() def leakage_correction( self, x: Tensor, num_rejection_samples: int = 10_000, force_update: bool = False, show_progress_bars: bool = False, rejection_sampling_batch_size: int = 10_000, ) -> Tensor: r"""Return leakage correction factor for a leaky posterior density estimate. The factor is estimated from the acceptance probability during rejection sampling from the posterior. This is to avoid re-estimating the acceptance probability from scratch whenever log_prob is called and norm_posterior=True. Here, it is estimated only once for self.default_x and saved for later. We re-evaluate only whenever a new x is passed. Arguments: num_rejection_samples: Number of samples used to estimate correction factor. show_progress_bars: Whether to show a progress bar during sampling. rejection_sampling_batch_size: Batch size for rejection sampling. Returns: Saved or newly-estimated correction factor (as a scalar Tensor). """ def acceptance_at(x: Tensor) -> Tensor: return rejection_sample_posterior_within_prior( posterior_nn=self.posterior_estimator, prior=self.prior, x=x.to(self._device), num_samples=num_rejection_samples, show_progress_bars=show_progress_bars, sample_for_correction_factor=True, max_sampling_batch_size=rejection_sampling_batch_size, ) # Check if the provided x matches the default x (short-circuit on identity). is_new_x = self.default_x is None or ( x is not self.default_x and (x != self.default_x).any() ) not_saved_at_default_x = self._leakage_density_correction_factor is None if is_new_x: # Calculate at x; don't save. return acceptance_at(x) elif not_saved_at_default_x or force_update: # Calculate at default_x; save. assert self.default_x is not None self._leakage_density_correction_factor = acceptance_at(self.default_x) return self._leakage_density_correction_factor # type: ignore def map( self, x: Optional[Tensor] = None, num_iter: int = 1_000, num_to_optimize: int = 100, learning_rate: float = 0.01, init_method: Union[str, Tensor] = "posterior", num_init_samples: int = 1_000, save_best_every: int = 10, show_progress_bars: bool = False, force_update: bool = False, ) -> Tensor: r"""Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Args: x: Deprecated - use .set_default_x() prior to .map(). num_iter: Number of optimization steps that the algorithm takes to find the MAP. learning_rate: Learning rate of the optimizer. init_method: How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. num_init_samples: Draw this number of samples from the posterior and evaluate the log-probability of all of them. num_to_optimize: From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. save_best_every: The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) show_progress_bars: Whether or not to show a progressbar for sampling from the posterior. force_update: Whether to re-calculate the MAP when x is unchanged and have a cached value. log_prob_kwargs: Will be empty for SNLE and SNRE. Will contain {'norm_posterior': True} for SNPE. Returns: The MAP estimate. """ return super().map( x=x, num_iter=num_iter, num_to_optimize=num_to_optimize, learning_rate=learning_rate, init_method=init_method, num_init_samples=num_init_samples, save_best_every=save_best_every, show_progress_bars=show_progress_bars, force_update=force_update, )  #### default_x: Optional[torch.Tensor] inherited property writable ¶ Return default x used by .sample(), .log_prob as conditioning context. #### __init__(self, posterior_estimator, prior, theta_transform=None, max_sampling_batch_size=10000, device=None, x_shape=None) special ¶ Parameters: Name Type Description Default prior Distribution Prior distribution with .log_prob() and .sample(). required posterior_estimator Flow The trained neural posterior. required theta_transform Optional[torch Transform] Custom transform to perform MAP optimization in unconstrained space. If None (default), a suitable transform is built from the prior support. In order to not use a transform at all, pass an identity transform, e.g., theta_transform=torch.distrbutions. transforms. identity_transform(). None max_sampling_batch_size int Batchsize of samples being drawn from the proposal at every iteration. 10000 device Optional[str] Training device, e.g., “cpu”, “cuda” or “cuda:0”. If None, potential_fn.device is used. None x_shape Optional[torch.Size] Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. None Source code in sbi/inference/posteriors/direct_posterior.py def __init__( self, posterior_estimator: flows.Flow, prior: Distribution, theta_transform: Optional[TorchTransform] = None, max_sampling_batch_size: int = 10_000, device: Optional[str] = None, x_shape: Optional[torch.Size] = None, ): """ Args: prior: Prior distribution with .log_prob() and .sample(). posterior_estimator: The trained neural posterior. theta_transform: Custom transform to perform MAP optimization in unconstrained space. If None (default), a suitable transform is built from the prior support. In order to not use a transform at all, pass an identity transform, e.g., theta_transform=torch.distrbutions. transforms. identity_transform(). max_sampling_batch_size: Batchsize of samples being drawn from the proposal at every iteration. device: Training device, e.g., "cpu", "cuda" or "cuda:0". If None, potential_fn.device is used. x_shape: Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. """ # Because DirectPosterior does not take the potential_fn as input, it # builds it itself. The potential_fn and theta_transform are used only for # obtaining the MAP. check_prior(prior) potential_fn, theta_transform = posterior_estimator_based_potential( posterior_estimator, prior, x_o=None, theta_transform=theta_transform ) super().__init__( potential_fn=potential_fn, theta_transform=theta_transform, device=device, x_shape=x_shape, ) self.prior = prior self.posterior_estimator = posterior_estimator self.max_sampling_batch_size = max_sampling_batch_size self._leakage_density_correction_factor = None self._purpose = """It samples the posterior network and rejects samples that lie outside of the prior bounds."""  #### leakage_correction(self, x, num_rejection_samples=10000, force_update=False, show_progress_bars=False, rejection_sampling_batch_size=10000)¶ Return leakage correction factor for a leaky posterior density estimate. The factor is estimated from the acceptance probability during rejection sampling from the posterior. This is to avoid re-estimating the acceptance probability from scratch whenever log_prob is called and norm_posterior=True. Here, it is estimated only once for self.default_x and saved for later. We re-evaluate only whenever a new x is passed. Parameters: Name Type Description Default num_rejection_samples int Number of samples used to estimate correction factor. 10000 show_progress_bars bool Whether to show a progress bar during sampling. False rejection_sampling_batch_size int Batch size for rejection sampling. 10000 Returns: Type Description Tensor Saved or newly-estimated correction factor (as a scalar Tensor). Source code in sbi/inference/posteriors/direct_posterior.py @torch.no_grad() def leakage_correction( self, x: Tensor, num_rejection_samples: int = 10_000, force_update: bool = False, show_progress_bars: bool = False, rejection_sampling_batch_size: int = 10_000, ) -> Tensor: r"""Return leakage correction factor for a leaky posterior density estimate. The factor is estimated from the acceptance probability during rejection sampling from the posterior. This is to avoid re-estimating the acceptance probability from scratch whenever log_prob is called and norm_posterior=True. Here, it is estimated only once for self.default_x and saved for later. We re-evaluate only whenever a new x is passed. Arguments: num_rejection_samples: Number of samples used to estimate correction factor. show_progress_bars: Whether to show a progress bar during sampling. rejection_sampling_batch_size: Batch size for rejection sampling. Returns: Saved or newly-estimated correction factor (as a scalar Tensor). """ def acceptance_at(x: Tensor) -> Tensor: return rejection_sample_posterior_within_prior( posterior_nn=self.posterior_estimator, prior=self.prior, x=x.to(self._device), num_samples=num_rejection_samples, show_progress_bars=show_progress_bars, sample_for_correction_factor=True, max_sampling_batch_size=rejection_sampling_batch_size, ) # Check if the provided x matches the default x (short-circuit on identity). is_new_x = self.default_x is None or ( x is not self.default_x and (x != self.default_x).any() ) not_saved_at_default_x = self._leakage_density_correction_factor is None if is_new_x: # Calculate at x; don't save. return acceptance_at(x) elif not_saved_at_default_x or force_update: # Calculate at default_x; save. assert self.default_x is not None self._leakage_density_correction_factor = acceptance_at(self.default_x) return self._leakage_density_correction_factor # type: ignore  #### log_prob(self, theta, x=None, norm_posterior=True, track_gradients=False, leakage_correction_params=None)¶ Returns the log-probability of the posterior $$p(\theta|x)$$. Parameters: Name Type Description Default theta Tensor Parameters $$\theta$$. required norm_posterior bool Whether to enforce a normalized posterior density. Renormalization of the posterior is useful when some probability falls out or leaks out of the prescribed prior support. The normalizing factor is calculated via rejection sampling, so if you need speedier but unnormalized log posterior estimates set here norm_posterior=False. The returned log posterior is set to -∞ outside of the prior support regardless of this setting. True track_gradients bool Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. False leakage_correction_params Optional[dict] A dict of keyword arguments to override the default values of leakage_correction(). Possible options are: num_rejection_samples, force_update, show_progress_bars, and rejection_sampling_batch_size. These parameters only have an effect if norm_posterior=True. None Returns: Type Description Tensor (len(θ),)-shaped log posterior probability $$\log p(\theta|x)$$ for θ in the support of the prior, -∞ (corresponding to 0 probability) outside. Source code in sbi/inference/posteriors/direct_posterior.py def log_prob( self, theta: Tensor, x: Optional[Tensor] = None, norm_posterior: bool = True, track_gradients: bool = False, leakage_correction_params: Optional[dict] = None, ) -> Tensor: r"""Returns the log-probability of the posterior$p(\theta|x)$. Args: theta: Parameters$\theta$. norm_posterior: Whether to enforce a normalized posterior density. Renormalization of the posterior is useful when some probability falls out or leaks out of the prescribed prior support. The normalizing factor is calculated via rejection sampling, so if you need speedier but unnormalized log posterior estimates set here norm_posterior=False. The returned log posterior is set to -∞ outside of the prior support regardless of this setting. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. leakage_correction_params: A dict of keyword arguments to override the default values of leakage_correction(). Possible options are: num_rejection_samples, force_update, show_progress_bars, and rejection_sampling_batch_size. These parameters only have an effect if norm_posterior=True. Returns: (len(θ),)-shaped log posterior probability$\log p(\theta|x)$for θ in the support of the prior, -∞ (corresponding to 0 probability) outside. """ x = self._x_else_default_x(x) # TODO Train exited here, entered after sampling? self.posterior_estimator.eval() theta = ensure_theta_batched(torch.as_tensor(theta)) theta_repeated, x_repeated = match_theta_and_x_batch_shapes(theta, x) with torch.set_grad_enabled(track_gradients): # Evaluate on device, move back to cpu for comparison with prior. unnorm_log_prob = self.posterior_estimator.log_prob( theta_repeated, context=x_repeated ) # Force probability to be zero outside prior support. in_prior_support = within_support(self.prior, theta_repeated) masked_log_prob = torch.where( in_prior_support, unnorm_log_prob, torch.tensor(float("-inf"), dtype=torch.float32, device=self._device), ) if leakage_correction_params is None: leakage_correction_params = dict() # use defaults log_factor = ( log(self.leakage_correction(x=x, **leakage_correction_params)) if norm_posterior else 0 ) return masked_log_prob - log_factor  #### map(self, x=None, num_iter=1000, num_to_optimize=100, learning_rate=0.01, init_method='posterior', num_init_samples=1000, save_best_every=10, show_progress_bars=False, force_update=False)¶ Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Parameters: Name Type Description Default x Optional[torch.Tensor] Deprecated - use .set_default_x() prior to .map(). None num_iter int Number of optimization steps that the algorithm takes to find the MAP. 1000 learning_rate float Learning rate of the optimizer. 0.01 init_method Union[str, torch.Tensor] How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. 'posterior' num_init_samples int Draw this number of samples from the posterior and evaluate the log-probability of all of them. 1000 num_to_optimize int From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. 100 save_best_every int The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) 10 show_progress_bars bool Whether or not to show a progressbar for sampling from the posterior. False force_update bool Whether to re-calculate the MAP when x is unchanged and have a cached value. False log_prob_kwargs Will be empty for SNLE and SNRE. Will contain {‘norm_posterior’: True} for SNPE. required Returns: Type Description Tensor The MAP estimate. Source code in sbi/inference/posteriors/direct_posterior.py def map( self, x: Optional[Tensor] = None, num_iter: int = 1_000, num_to_optimize: int = 100, learning_rate: float = 0.01, init_method: Union[str, Tensor] = "posterior", num_init_samples: int = 1_000, save_best_every: int = 10, show_progress_bars: bool = False, force_update: bool = False, ) -> Tensor: r"""Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Args: x: Deprecated - use .set_default_x() prior to .map(). num_iter: Number of optimization steps that the algorithm takes to find the MAP. learning_rate: Learning rate of the optimizer. init_method: How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. num_init_samples: Draw this number of samples from the posterior and evaluate the log-probability of all of them. num_to_optimize: From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. save_best_every: The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) show_progress_bars: Whether or not to show a progressbar for sampling from the posterior. force_update: Whether to re-calculate the MAP when x is unchanged and have a cached value. log_prob_kwargs: Will be empty for SNLE and SNRE. Will contain {'norm_posterior': True} for SNPE. Returns: The MAP estimate. """ return super().map( x=x, num_iter=num_iter, num_to_optimize=num_to_optimize, learning_rate=learning_rate, init_method=init_method, num_init_samples=num_init_samples, save_best_every=save_best_every, show_progress_bars=show_progress_bars, force_update=force_update, )  #### potential(self, theta, x=None, track_gradients=False) inherited ¶ Evaluates $$\theta$$ under the potential that is used to sample the posterior. The potential is the unnormalized log-probability of $$\theta$$ under the posterior. Parameters: Name Type Description Default theta Tensor Parameters $$\theta$$. required track_gradients bool Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. False Source code in sbi/inference/posteriors/direct_posterior.py def potential( self, theta: Tensor, x: Optional[Tensor] = None, track_gradients: bool = False ) -> Tensor: r"""Evaluates$\theta$under the potential that is used to sample the posterior. The potential is the unnormalized log-probability of$\theta$under the posterior. Args: theta: Parameters$\theta$. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. """ self.potential_fn.set_x(self._x_else_default_x(x)) theta = ensure_theta_batched(torch.as_tensor(theta)) return self.potential_fn( theta.to(self._device), track_gradients=track_gradients )  #### sample(self, sample_shape=torch.Size([]), x=None, max_sampling_batch_size=10000, sample_with=None, show_progress_bars=True)¶ Return samples from posterior distribution $$p(\theta|x)$$. Parameters: Name Type Description Default sample_shape Union[torch.Size, Tuple[int, ...]] Desired shape of samples that are drawn from posterior. If sample_shape is multidimensional we simply draw sample_shape.numel() samples and then reshape into the desired shape. torch.Size([]) sample_with Optional[str] This argument only exists to keep backward-compatibility with sbi v0.17.2 or older. If it is set, we instantly raise an error. None show_progress_bars bool Whether to show sampling progress monitor. True Source code in sbi/inference/posteriors/direct_posterior.py def sample( self, sample_shape: Shape = torch.Size(), x: Optional[Tensor] = None, max_sampling_batch_size: int = 10_000, sample_with: Optional[str] = None, show_progress_bars: bool = True, ): r"""Return samples from posterior distribution$p(\theta|x)$. Args: sample_shape: Desired shape of samples that are drawn from posterior. If sample_shape is multidimensional we simply draw sample_shape.numel() samples and then reshape into the desired shape. sample_with: This argument only exists to keep backward-compatibility with sbi v0.17.2 or older. If it is set, we instantly raise an error. show_progress_bars: Whether to show sampling progress monitor. """ num_samples = torch.Size(sample_shape).numel() x = self._x_else_default_x(x) max_sampling_batch_size = ( self.max_sampling_batch_size if max_sampling_batch_size is None else max_sampling_batch_size ) if sample_with is not None: raise ValueError( f"You set sample_with={sample_with}. As of sbi v0.18.0, setting " f"sample_with is no longer supported. You have to rerun " f".build_posterior(sample_with={sample_with})." ) samples = rejection_sample_posterior_within_prior( posterior_nn=self.posterior_estimator, prior=self.prior, x=x, num_samples=num_samples, show_progress_bars=show_progress_bars, max_sampling_batch_size=max_sampling_batch_size, ) return samples  #### set_default_x(self, x) inherited ¶ Set new default x for .sample(), .log_prob to use as conditioning context. Reset the MAP stored for the old default x if applicable. This is a pure convenience to avoid having to repeatedly specify x in calls to .sample() and .log_prob() - only$ heta$needs to be passed. This convenience is particularly useful when the posterior is focused, i.e. has been trained over multiple rounds to be accurate in the vicinity of a particular x=x_o (you can check if your posterior object is focused by printing it). NOTE: this method is chainable, i.e. will return the NeuralPosterior object so that calls like posterior.set_default_x(my_x).sample(mytheta) are possible. Parameters: Name Type Description Default x Tensor The default observation to set for the posterior $$p( heta|x)$$. required Returns: Type Description NeuralPosterior NeuralPosterior that will use a default x when not explicitly passed. Source code in sbi/inference/posteriors/direct_posterior.py def set_default_x(self, x: Tensor) -> "NeuralPosterior": """Set new default x for .sample(), .log_prob to use as conditioning context. Reset the MAP stored for the old default x if applicable. This is a pure convenience to avoid having to repeatedly specify x in calls to .sample() and .log_prob() - only$\theta$needs to be passed. This convenience is particularly useful when the posterior is focused, i.e. has been trained over multiple rounds to be accurate in the vicinity of a particular x=x_o (you can check if your posterior object is focused by printing it). NOTE: this method is chainable, i.e. will return the NeuralPosterior object so that calls like posterior.set_default_x(my_x).sample(mytheta) are possible. Args: x: The default observation to set for the posterior$p(\theta|x)$. Returns: NeuralPosterior that will use a default x when not explicitly passed. """ self._x = process_x( x, x_shape=self._x_shape, allow_iid_x=self.potential_fn.allow_iid_x ).to(self._device) self._map = None return self  ###  sbi.inference.posteriors.importance_posterior.ImportanceSamplingPosterior (NeuralPosterior) ¶ Provides importance sampling to sample from the posterior. SNLE or SNRE train neural networks to approximate the likelihood(-ratios). ImportanceSamplingPosterior allows to estimate the posterior log-probability by estimating the normlalization constant with importance sampling. It also allows to perform importance sampling (with .sample()) and to draw approximate samples with sampling-importance-resampling (SIR) (with .sir_sample()) Source code in sbi/inference/posteriors/importance_posterior.py class ImportanceSamplingPosterior(NeuralPosterior): r"""Provides importance sampling to sample from the posterior.<br/><br/> SNLE or SNRE train neural networks to approximate the likelihood(-ratios). ImportanceSamplingPosterior allows to estimate the posterior log-probability by estimating the normlalization constant with importance sampling. It also allows to perform importance sampling (with .sample()) and to draw approximate samples with sampling-importance-resampling (SIR) (with .sir_sample()) """ def __init__( self, potential_fn: Callable, proposal: Any, theta_transform: Optional[TorchTransform] = None, method: str = "sir", oversampling_factor: int = 32, max_sampling_batch_size: int = 10_000, device: Optional[str] = None, x_shape: Optional[torch.Size] = None, ): """ Args: potential_fn: The potential function from which to draw samples. proposal: The proposal distribution. theta_transform: Transformation that is applied to parameters. Is not used during but only when calling .map(). method: Either of [sir|importance]. This sets the behavior of the .sample() method. With sir, approximate posterior samples are generated with sampling importance resampling (SIR). With importance, the .sample() method returns a tuple of samples and corresponding importance weights. oversampling_factor: Number of proposed samples from which only one is selected based on its importance weight. max_sampling_batch_size: The batch size of samples being drawn from the proposal at every iteration. device: Device on which to sample, e.g., "cpu", "cuda" or "cuda:0". If None, potential_fn.device is used. x_shape: Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. """ super().__init__( potential_fn, theta_transform=theta_transform, device=device, x_shape=x_shape, ) self.proposal = proposal self._normalization_constant = None self.method = method self.oversampling_factor = oversampling_factor self.max_sampling_batch_size = max_sampling_batch_size self._purpose = ( "It provides sampling-importance resampling (SIR) to .sample() from the " "posterior and can evaluate the _unnormalized_ posterior density with " ".log_prob()." ) def log_prob( self, theta: Tensor, x: Optional[Tensor] = None, track_gradients: bool = False, normalization_constant_params: Optional[dict] = None, ) -> Tensor: r"""Returns the log-probability of theta under the posterior. The normalization constant is estimated with importance sampling. Args: theta: Parameters$\theta$. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. normalization_constant_params: Parameters passed on to estimate_normalization_constant(). Returns: len($\theta$)-shaped log-probability. """ x = self._x_else_default_x(x) self.potential_fn.set_x(x) theta = ensure_theta_batched(torch.as_tensor(theta)) with torch.set_grad_enabled(track_gradients): potential_values = self.potential_fn( theta.to(self._device), track_gradients=track_gradients ) if normalization_constant_params is None: normalization_constant_params = dict() # use defaults normalization_constant = self.estimate_normalization_constant( x, **normalization_constant_params ) return (potential_values - torch.log(normalization_constant)).to( self._device ) @torch.no_grad() def estimate_normalization_constant( self, x: Tensor, num_samples: int = 10_000, force_update: bool = False ) -> Tensor: """Returns the normalization constant via importance sampling. Args: num_samples: Number of importance samples used for the estimate. force_update: Whether to re-calculate the normlization constant when x is unchanged and have a cached value. """ # Check if the provided x matches the default x (short-circuit on identity). is_new_x = self.default_x is None or ( x is not self.default_x and (x != self.default_x).any() ) not_saved_at_default_x = self._normalization_constant is None if is_new_x: # Calculate at x; don't save. _, log_importance_weights = importance_sample( self.potential_fn, proposal=self.proposal, num_samples=num_samples, ) return torch.mean(torch.exp(log_importance_weights)) elif not_saved_at_default_x or force_update: # Calculate at default_x; save. assert self.default_x is not None _, log_importance_weights = importance_sample( self.potential_fn, proposal=self.proposal, num_samples=num_samples, ) self._normalization_constant = torch.mean(torch.exp(log_importance_weights)) return self._normalization_constant.to(self._device) # type: ignore def sample( self, sample_shape: Shape = torch.Size(), x: Optional[Tensor] = None, oversampling_factor: int = 32, max_sampling_batch_size: int = 10_000, sample_with: Optional[str] = None, ) -> Union[Tensor, Tuple[Tensor, Tensor]]: """Return samples from the approximate posterior distribution. Args: sample_shape: _description_ x: _description_ """ if sample_with is not None: raise ValueError( f"You set sample_with={sample_with}. As of sbi v0.18.0, setting " f"sample_with is no longer supported. You have to rerun " f".build_posterior(sample_with={sample_with})." ) self.potential_fn.set_x(self._x_else_default_x(x)) if self.method == "sir": return self._sir_sample( sample_shape, oversampling_factor=oversampling_factor, max_sampling_batch_size=max_sampling_batch_size, ) elif self.method == "importance": return self._importance_sample(sample_shape) else: raise NameError def _importance_sample( self, sample_shape: Shape = torch.Size(), ) -> Tuple[Tensor, Tensor]: """Returns samples from the proposal and log of their importance weights. Args: sample_shape: Desired shape of samples that are drawn from posterior. sample_with: This argument only exists to keep backward-compatibility with sbi v0.17.2 or older. If it is set, we instantly raise an error. Returns: Samples and logarithm of corresponding importance weights. """ num_samples = torch.Size(sample_shape).numel() samples, log_importance_weights = importance_sample( self.potential_fn, proposal=self.proposal, num_samples=num_samples, ) samples = samples.reshape((*sample_shape, -1)).to(self._device) return samples, log_importance_weights.to(self._device) def _sir_sample( self, sample_shape: Shape = torch.Size(), oversampling_factor: int = 32, max_sampling_batch_size: int = 10_000, show_progress_bars: bool = True, ): r"""Returns approximate samples from posterior$p(\theta|x)$via SIR. Args: sample_shape: Desired shape of samples that are drawn from posterior. If sample_shape is multidimensional we simply draw sample_shape.numel() samples and then reshape into the desired shape. x: Observed data. sample_with: This argument only exists to keep backward-compatibility with sbi v0.17.2 or older. If it is set, we instantly raise an error. oversampling_factor: Number of proposed samples form which only one is selected based on its importance weight. max_sampling_batch_size: The batchsize of samples being drawn from the proposal at every iteration. Used only in sir_sample(). show_progress_bars: Whether to show sampling progress monitor. Returns: Samples from posterior. """ # Replace arguments that were not passed with their default. oversampling_factor = ( self.oversampling_factor if oversampling_factor is None else oversampling_factor ) max_sampling_batch_size = ( self.max_sampling_batch_size if max_sampling_batch_size is None else max_sampling_batch_size ) num_samples = torch.Size(sample_shape).numel() samples = sampling_importance_resampling( self.potential_fn, proposal=self.proposal, num_samples=num_samples, oversampling_factor=oversampling_factor, show_progress_bars=show_progress_bars, max_sampling_batch_size=max_sampling_batch_size, device=self._device, ) return samples.reshape((*sample_shape, -1)).to(self._device) def map( self, x: Optional[Tensor] = None, num_iter: int = 1_000, num_to_optimize: int = 100, learning_rate: float = 0.01, init_method: Union[str, Tensor] = "proposal", num_init_samples: int = 1_000, save_best_every: int = 10, show_progress_bars: bool = False, force_update: bool = False, ) -> Tensor: r"""Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Args: x: Deprecated - use .set_default_x() prior to .map(). num_iter: Number of optimization steps that the algorithm takes to find the MAP. learning_rate: Learning rate of the optimizer. init_method: How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. num_init_samples: Draw this number of samples from the posterior and evaluate the log-probability of all of them. num_to_optimize: From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. save_best_every: The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) show_progress_bars: Whether or not to show a progressbar for sampling from the posterior. force_update: Whether to re-calculate the MAP when x is unchanged and have a cached value. log_prob_kwargs: Will be empty for SNLE and SNRE. Will contain {'norm_posterior': True} for SNPE. Returns: The MAP estimate. """ return super().map( x=x, num_iter=num_iter, num_to_optimize=num_to_optimize, learning_rate=learning_rate, init_method=init_method, num_init_samples=num_init_samples, save_best_every=save_best_every, show_progress_bars=show_progress_bars, force_update=force_update, )  #### default_x: Optional[torch.Tensor] inherited property writable ¶ Return default x used by .sample(), .log_prob as conditioning context. #### __init__(self, potential_fn, proposal, theta_transform=None, method='sir', oversampling_factor=32, max_sampling_batch_size=10000, device=None, x_shape=None) special ¶ Parameters: Name Type Description Default potential_fn Callable The potential function from which to draw samples. required proposal Any The proposal distribution. required theta_transform Optional[torch Transform] Transformation that is applied to parameters. Is not used during but only when calling .map(). None method str Either of [sir|importance]. This sets the behavior of the .sample() method. With sir, approximate posterior samples are generated with sampling importance resampling (SIR). With importance, the .sample() method returns a tuple of samples and corresponding importance weights. 'sir' oversampling_factor int Number of proposed samples from which only one is selected based on its importance weight. 32 max_sampling_batch_size int The batch size of samples being drawn from the proposal at every iteration. 10000 device Optional[str] Device on which to sample, e.g., “cpu”, “cuda” or “cuda:0”. If None, potential_fn.device is used. None x_shape Optional[torch.Size] Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. None Source code in sbi/inference/posteriors/importance_posterior.py def __init__( self, potential_fn: Callable, proposal: Any, theta_transform: Optional[TorchTransform] = None, method: str = "sir", oversampling_factor: int = 32, max_sampling_batch_size: int = 10_000, device: Optional[str] = None, x_shape: Optional[torch.Size] = None, ): """ Args: potential_fn: The potential function from which to draw samples. proposal: The proposal distribution. theta_transform: Transformation that is applied to parameters. Is not used during but only when calling .map(). method: Either of [sir|importance]. This sets the behavior of the .sample() method. With sir, approximate posterior samples are generated with sampling importance resampling (SIR). With importance, the .sample() method returns a tuple of samples and corresponding importance weights. oversampling_factor: Number of proposed samples from which only one is selected based on its importance weight. max_sampling_batch_size: The batch size of samples being drawn from the proposal at every iteration. device: Device on which to sample, e.g., "cpu", "cuda" or "cuda:0". If None, potential_fn.device is used. x_shape: Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. """ super().__init__( potential_fn, theta_transform=theta_transform, device=device, x_shape=x_shape, ) self.proposal = proposal self._normalization_constant = None self.method = method self.oversampling_factor = oversampling_factor self.max_sampling_batch_size = max_sampling_batch_size self._purpose = ( "It provides sampling-importance resampling (SIR) to .sample() from the " "posterior and can evaluate the _unnormalized_ posterior density with " ".log_prob()." )  #### estimate_normalization_constant(self, x, num_samples=10000, force_update=False)¶ Returns the normalization constant via importance sampling. Parameters: Name Type Description Default num_samples int Number of importance samples used for the estimate. 10000 force_update bool Whether to re-calculate the normlization constant when x is unchanged and have a cached value. False Source code in sbi/inference/posteriors/importance_posterior.py @torch.no_grad() def estimate_normalization_constant( self, x: Tensor, num_samples: int = 10_000, force_update: bool = False ) -> Tensor: """Returns the normalization constant via importance sampling. Args: num_samples: Number of importance samples used for the estimate. force_update: Whether to re-calculate the normlization constant when x is unchanged and have a cached value. """ # Check if the provided x matches the default x (short-circuit on identity). is_new_x = self.default_x is None or ( x is not self.default_x and (x != self.default_x).any() ) not_saved_at_default_x = self._normalization_constant is None if is_new_x: # Calculate at x; don't save. _, log_importance_weights = importance_sample( self.potential_fn, proposal=self.proposal, num_samples=num_samples, ) return torch.mean(torch.exp(log_importance_weights)) elif not_saved_at_default_x or force_update: # Calculate at default_x; save. assert self.default_x is not None _, log_importance_weights = importance_sample( self.potential_fn, proposal=self.proposal, num_samples=num_samples, ) self._normalization_constant = torch.mean(torch.exp(log_importance_weights)) return self._normalization_constant.to(self._device) # type: ignore  #### log_prob(self, theta, x=None, track_gradients=False, normalization_constant_params=None)¶ Returns the log-probability of theta under the posterior. The normalization constant is estimated with importance sampling. Parameters: Name Type Description Default theta Tensor Parameters $$\theta$$. required track_gradients bool Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. False normalization_constant_params Optional[dict] Parameters passed on to estimate_normalization_constant(). None Returns: Type Description Tensor len($\theta$)-shaped log-probability. Source code in sbi/inference/posteriors/importance_posterior.py def log_prob( self, theta: Tensor, x: Optional[Tensor] = None, track_gradients: bool = False, normalization_constant_params: Optional[dict] = None, ) -> Tensor: r"""Returns the log-probability of theta under the posterior. The normalization constant is estimated with importance sampling. Args: theta: Parameters$\theta$. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. normalization_constant_params: Parameters passed on to estimate_normalization_constant(). Returns: len($\theta$)-shaped log-probability. """ x = self._x_else_default_x(x) self.potential_fn.set_x(x) theta = ensure_theta_batched(torch.as_tensor(theta)) with torch.set_grad_enabled(track_gradients): potential_values = self.potential_fn( theta.to(self._device), track_gradients=track_gradients ) if normalization_constant_params is None: normalization_constant_params = dict() # use defaults normalization_constant = self.estimate_normalization_constant( x, **normalization_constant_params ) return (potential_values - torch.log(normalization_constant)).to( self._device )  #### map(self, x=None, num_iter=1000, num_to_optimize=100, learning_rate=0.01, init_method='proposal', num_init_samples=1000, save_best_every=10, show_progress_bars=False, force_update=False)¶ Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Parameters: Name Type Description Default x Optional[torch.Tensor] Deprecated - use .set_default_x() prior to .map(). None num_iter int Number of optimization steps that the algorithm takes to find the MAP. 1000 learning_rate float Learning rate of the optimizer. 0.01 init_method Union[str, torch.Tensor] How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. 'proposal' num_init_samples int Draw this number of samples from the posterior and evaluate the log-probability of all of them. 1000 num_to_optimize int From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. 100 save_best_every int The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) 10 show_progress_bars bool Whether or not to show a progressbar for sampling from the posterior. False force_update bool Whether to re-calculate the MAP when x is unchanged and have a cached value. False log_prob_kwargs Will be empty for SNLE and SNRE. Will contain {‘norm_posterior’: True} for SNPE. required Returns: Type Description Tensor The MAP estimate. Source code in sbi/inference/posteriors/importance_posterior.py def map( self, x: Optional[Tensor] = None, num_iter: int = 1_000, num_to_optimize: int = 100, learning_rate: float = 0.01, init_method: Union[str, Tensor] = "proposal", num_init_samples: int = 1_000, save_best_every: int = 10, show_progress_bars: bool = False, force_update: bool = False, ) -> Tensor: r"""Returns the maximum-a-posteriori estimate (MAP). The method can be interrupted (Ctrl-C) when the user sees that the log-probability converges. The best estimate will be saved in self._map and can be accessed with self.map(). The MAP is obtained by running gradient ascent from a given number of starting positions (samples from the posterior with the highest log-probability). After the optimization is done, we select the parameter set that has the highest log-probability after the optimization. Warning: The default values used by this function are not well-tested. They might require hand-tuning for the problem at hand. For developers: if the prior is a BoxUniform, we carry out the optimization in unbounded space and transform the result back into bounded space. Args: x: Deprecated - use .set_default_x() prior to .map(). num_iter: Number of optimization steps that the algorithm takes to find the MAP. learning_rate: Learning rate of the optimizer. init_method: How to select the starting parameters for the optimization. If it is a string, it can be either [posterior, prior], which samples the respective distribution num_init_samples times. If it is a tensor, the tensor will be used as init locations. num_init_samples: Draw this number of samples from the posterior and evaluate the log-probability of all of them. num_to_optimize: From the drawn num_init_samples, use the num_to_optimize with highest log-probability as the initial points for the optimization. save_best_every: The best log-probability is computed, saved in the map-attribute, and printed every save_best_every-th iteration. Computing the best log-probability creates a significant overhead (thus, the default is 10.) show_progress_bars: Whether or not to show a progressbar for sampling from the posterior. force_update: Whether to re-calculate the MAP when x is unchanged and have a cached value. log_prob_kwargs: Will be empty for SNLE and SNRE. Will contain {'norm_posterior': True} for SNPE. Returns: The MAP estimate. """ return super().map( x=x, num_iter=num_iter, num_to_optimize=num_to_optimize, learning_rate=learning_rate, init_method=init_method, num_init_samples=num_init_samples, save_best_every=save_best_every, show_progress_bars=show_progress_bars, force_update=force_update, )  #### potential(self, theta, x=None, track_gradients=False) inherited ¶ Evaluates $$\theta$$ under the potential that is used to sample the posterior. The potential is the unnormalized log-probability of $$\theta$$ under the posterior. Parameters: Name Type Description Default theta Tensor Parameters $$\theta$$. required track_gradients bool Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. False Source code in sbi/inference/posteriors/importance_posterior.py def potential( self, theta: Tensor, x: Optional[Tensor] = None, track_gradients: bool = False ) -> Tensor: r"""Evaluates$\theta$under the potential that is used to sample the posterior. The potential is the unnormalized log-probability of$\theta$under the posterior. Args: theta: Parameters$\theta$. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. """ self.potential_fn.set_x(self._x_else_default_x(x)) theta = ensure_theta_batched(torch.as_tensor(theta)) return self.potential_fn( theta.to(self._device), track_gradients=track_gradients )  #### sample(self, sample_shape=torch.Size([]), x=None, oversampling_factor=32, max_sampling_batch_size=10000, sample_with=None)¶ Return samples from the approximate posterior distribution. Parameters: Name Type Description Default sample_shape Union[torch.Size, Tuple[int, ...]] description torch.Size([]) x Optional[torch.Tensor] description None Source code in sbi/inference/posteriors/importance_posterior.py def sample( self, sample_shape: Shape = torch.Size(), x: Optional[Tensor] = None, oversampling_factor: int = 32, max_sampling_batch_size: int = 10_000, sample_with: Optional[str] = None, ) -> Union[Tensor, Tuple[Tensor, Tensor]]: """Return samples from the approximate posterior distribution. Args: sample_shape: _description_ x: _description_ """ if sample_with is not None: raise ValueError( f"You set sample_with={sample_with}. As of sbi v0.18.0, setting " f"sample_with is no longer supported. You have to rerun " f".build_posterior(sample_with={sample_with})." ) self.potential_fn.set_x(self._x_else_default_x(x)) if self.method == "sir": return self._sir_sample( sample_shape, oversampling_factor=oversampling_factor, max_sampling_batch_size=max_sampling_batch_size, ) elif self.method == "importance": return self._importance_sample(sample_shape) else: raise NameError  #### set_default_x(self, x) inherited ¶ Set new default x for .sample(), .log_prob to use as conditioning context. Reset the MAP stored for the old default x if applicable. This is a pure convenience to avoid having to repeatedly specify x in calls to .sample() and .log_prob() - only$ heta$needs to be passed. This convenience is particularly useful when the posterior is focused, i.e. has been trained over multiple rounds to be accurate in the vicinity of a particular x=x_o (you can check if your posterior object is focused by printing it). NOTE: this method is chainable, i.e. will return the NeuralPosterior object so that calls like posterior.set_default_x(my_x).sample(mytheta) are possible. Parameters: Name Type Description Default x Tensor The default observation to set for the posterior $$p( heta|x)$$. required Returns: Type Description NeuralPosterior NeuralPosterior that will use a default x when not explicitly passed. Source code in sbi/inference/posteriors/importance_posterior.py def set_default_x(self, x: Tensor) -> "NeuralPosterior": """Set new default x for .sample(), .log_prob to use as conditioning context. Reset the MAP stored for the old default x if applicable. This is a pure convenience to avoid having to repeatedly specify x in calls to .sample() and .log_prob() - only$\theta$needs to be passed. This convenience is particularly useful when the posterior is focused, i.e. has been trained over multiple rounds to be accurate in the vicinity of a particular x=x_o (you can check if your posterior object is focused by printing it). NOTE: this method is chainable, i.e. will return the NeuralPosterior object so that calls like posterior.set_default_x(my_x).sample(mytheta) are possible. Args: x: The default observation to set for the posterior$p(\theta|x)$. Returns: NeuralPosterior that will use a default x when not explicitly passed. """ self._x = process_x( x, x_shape=self._x_shape, allow_iid_x=self.potential_fn.allow_iid_x ).to(self._device) self._map = None return self  ###  sbi.inference.posteriors.mcmc_posterior.MCMCPosterior (NeuralPosterior) ¶ Provides MCMC to sample from the posterior. SNLE or SNRE train neural networks to approximate the likelihood(-ratios). MCMCPosterior allows to sample from the posterior with MCMC. Source code in sbi/inference/posteriors/mcmc_posterior.py class MCMCPosterior(NeuralPosterior): r"""Provides MCMC to sample from the posterior.<br/><br/> SNLE or SNRE train neural networks to approximate the likelihood(-ratios). MCMCPosterior allows to sample from the posterior with MCMC. """ def __init__( self, potential_fn: Callable, proposal: Any, theta_transform: Optional[TorchTransform] = None, method: str = "slice_np", thin: int = 10, warmup_steps: int = 10, num_chains: int = 1, init_strategy: str = "resample", init_strategy_parameters: Dict[str, Any] = {}, init_strategy_num_candidates: Optional[int] = None, num_workers: int = 1, device: Optional[str] = None, x_shape: Optional[torch.Size] = None, ): """ Args: potential_fn: The potential function from which to draw samples. proposal: Proposal distribution that is used to initialize the MCMC chain. theta_transform: Transformation that will be applied during sampling. Allows to perform MCMC in unconstrained space. method: Method used for MCMC sampling, one of slice_np, slice_np_vectorized, slice, hmc, nuts. slice_np is a custom numpy implementation of slice sampling. slice_np_vectorized is identical to slice_np, but if num_chains>1, the chains are vectorized for slice_np_vectorized whereas they are run sequentially for slice_np. The samplers hmc, nuts or slice sample with Pyro. thin: The thinning factor for the chain. warmup_steps: The initial number of samples to discard. num_chains: The number of chains. init_strategy: The initialisation strategy for chains; proposal will draw init locations from proposal, whereas sir will use Sequential- Importance-Resampling (SIR). SIR initially samples init_strategy_num_candidates from the proposal, evaluates all of them under the potential_fn and proposal, and then resamples the initial locations with weights proportional to exp(potential_fn - proposal.log_prob. resample is the same as sir but uses exp(potential_fn) as weights. init_strategy_parameters: Dictionary of keyword arguments passed to the init strategy, e.g., for init_strategy=sir this could be num_candidate_samples, i.e., the number of candidates to to find init locations (internal default is 1000), or device. init_strategy_num_candidates: Number of candidates to to find init locations in init_strategy=sir (deprecated, use init_strategy_parameters instead). num_workers: number of cpu cores used to parallelize mcmc device: Training device, e.g., "cpu", "cuda" or "cuda:0". If None, potential_fn.device is used. x_shape: Shape of a single simulator output. If passed, it is used to check the shape of the observed data and give a descriptive error. """ super().__init__( potential_fn, theta_transform=theta_transform, device=device, x_shape=x_shape, ) self.proposal = proposal self.method = method self.thin = thin self.warmup_steps = warmup_steps self.num_chains = num_chains self.init_strategy = init_strategy self.init_strategy_parameters = init_strategy_parameters self.num_workers = num_workers self._posterior_sampler = None # Hardcode parameter name to reduce clutter kwargs. self.param_name = "theta" if init_strategy_num_candidates is not None: warn( """Passing init_strategy_num_candidates is deprecated as of sbi v0.19.0. Instead, use e.g., init_strategy_parameters={"num_candidate_samples": 1000}""" ) self.init_strategy_parameters[ "num_candidate_samples" ] = init_strategy_num_candidates self.potential_ = self._prepare_potential(method) self._purpose = ( "It provides MCMC to .sample() from the posterior and " "can evaluate the _unnormalized_ posterior density with .log_prob()." ) @property def mcmc_method(self) -> str: """Returns MCMC method.""" return self._mcmc_method @mcmc_method.setter def mcmc_method(self, method: str) -> None: """See set_mcmc_method.""" self.set_mcmc_method(method) @property def posterior_sampler(self): """Returns sampler created by sample.""" return self._posterior_sampler def set_mcmc_method(self, method: str) -> "NeuralPosterior": """Sets sampling method to for MCMC and returns NeuralPosterior. Args: method: Method to use. Returns: NeuralPosterior for chainable calls. """ self._mcmc_method = method return self def log_prob( self, theta: Tensor, x: Optional[Tensor] = None, track_gradients: bool = False ) -> Tensor: r"""Returns the log-probability of theta under the posterior. Args: theta: Parameters$\theta$. track_gradients: Whether the returned tensor supports tracking gradients. This can be helpful for e.g. sensitivity analysis, but increases memory consumption. Returns: len($\theta$)-shaped log-probability. """ warn( """.log_prob() is deprecated for methods that can only evaluate the log-probability up to a normalizing constant. Use .potential() instead.""" ) warn("The log-probability is unnormalized!") self.potential_fn.set_x(self._x_else_default_x(x)) theta = ensure_theta_batched(torch.as_tensor(theta)) return self.potential_fn( theta.to(self._device), track_gradients=track_gradients ) def sample( self, sample_shape: Shape = torch.Size(), x: Optional[Tensor] = None, method: Optional[str] = None, thin: Optional[int] = None, warmup_steps: Optional[int] = None, num_chains: Optional[int] = None, init_strategy: Optional[str] = None, init_strategy_parameters: Optional[Dict[str, Any]] = None, init_strategy_num_candidates: Optional[int] = None, mcmc_parameters: Dict = {}, mcmc_method: Optional[str] = None, sample_with: Optional[str] = None, num_workers: Optional[int] = None, show_progress_bars: bool = True, ) -> Union[Tensor, Tuple[Tensor, InferenceData]]: r"""Return samples from posterior distribution$p(\theta|x)\$ with MCMC.

Check the __init__() method for a description of all arguments as well as
their default values.

Args:
sample_shape: Desired shape of samples that are drawn from posterior. If
sample_shape is multidimensional we simply draw sample_shape.numel()
samples and then reshape into the desired shape.
mcmc_parameters: Dictionary that is passed only to support the API of
sbi v0.17.2 or older.
mcmc_method: This argument only exists to keep backward-compatibility with
sbi v0.17.2 or older. Please use method instead.
sample_with: This argument only exists to keep backward-compatibility with
sbi v0.17.2 or older. If it is set, we instantly raise an error.
show_progress_bars: Whether to show sampling progress monitor.

Returns:
Samples from posterior.
"""
self.potential_fn.set_x(self._x_else_default_x(x))

# Replace arguments that were not passed with their default.
method = self.method if method is None else method
thin = self.thin if thin is None else thin
warmup_steps = self.warmup_steps if warmup_steps is None else warmup_steps
num_chains = self.num_chains if num_chains is None else num_chains
init_strategy = self.init_strategy if init_strategy is None else init_strategy
num_workers = self.num_workers if num_workers is None else num_workers
init_strategy_parameters = (
self.init_strategy_parameters
if init_strategy_parameters is None
else init_strategy_parameters
)
if init_strategy_num_candidates is not None:
warn(
"""Passing init_strategy_num_candidates is deprecated as of sbi
v0.19.0. Instead, use e.g.,
init_strategy_parameters={"num_candidate_samples": 1000}"""
)
self.init_strategy_parameters[
"num_candidate_samples"
] = init_strategy_num_candidates
if sample_with is not None:
raise ValueError(
f"You set sample_with={sample_with}. As of sbi v0.18.0, setting "
f"sample_with is no longer supported. You have to rerun "
f".build_posterior(sample_with={sample_with})."
)
if mcmc_method is not None:
warn(
"You passed mcmc_method to .sample(). As of sbi v0.18.0, this "
"is deprecated and will be removed in a future release. Use method "
"instead of mcmc_method."
)
method = mcmc_method
if mcmc_parameters:
warn(
"You passed mcmc_parameters to .sample(). As of sbi v0.18.0, this "
"is deprecated and will be removed in a future release. Instead, pass "
"the variable to .sample() directly, e.g. "
"posterior.sample((1,), num_chains=5)."
)
# The following lines are only for backwards compatibility with sbi v0.17.2 or
# older.
m_p = mcmc_parameters  # define to shorten the variable name
method = _maybe_use_dict_entry(method, "mcmc_method", m_p)
thin = _maybe_use_dict_entry(thin, "thin", m_p)
warmup_steps = _maybe_use_dict_entry(warmup_steps, "warmup_steps", m_p)
num_chains = _maybe_use_dict_entry(num_chains, "num_chains", m_p)
init_strategy = _maybe_use_dict_entry(init_strategy, "init_strategy", m_p)
self.potential_ = self._prepare_potential(method)  # type: ignore

initial_params = self._get_initial_params(
init_strategy,  # type: ignore
num_chains,  # type: ignore
num_workers,
show_progress_bars,
**init_strategy_parameters,
)
num_samples = torch.Size(sample_shape).numel()

track_gradients = method in ("hmc", "nuts")
if method in ("slice_np", "slice_np_vectorized"):
transformed_samples = self._slice_np_mcmc(
num_samples=num_samples,
potential_function=self.potential_,
initial_params=initial_params,
thin=thin,  # type: ignore
warmup_steps=warmup_steps,  # type: ignore
vectorized=(method == "slice_np_vectorized"),
num_workers=num_workers,
show_progress_bars=show_progress_bars,
)
elif method in ("hmc", "nuts", "slice"):
transformed_samples = self._pyro_mcmc(
num_samples=num_samples,
potential_function=self.potential_,
initial_params=initial_params,
mcmc_method=method,  # type: ignore
thin=thin,  # type: ignore
warmup_steps=warmup_steps,  # type: ignore
num_chains=num_chains,
show_progress_bars=show_progress_bars,
)
else:
raise NameError

samples = self.theta_transform.inv(transformed_samples)

return samples.reshape((*sample_shape, -1))  # type: ignore

def _build_mcmc_init_fn(
self,
proposal: Any,
potential_fn: Callable,
transform: torch_tf.Transform,
init_strategy: str,
**kwargs,
) -> Callable:
"""Return function that, when called, creates an initial parameter set for MCMC.

Args:
proposal: Proposal distribution.
potential_fn: Potential function that the candidate samples are weighted
with.
init_strategy: Specifies the initialization method. Either of
[proposal|sir|resample|latest_sample].
kwargs: Passed on to init function. This way, init specific keywords can
be set through mcmc_parameters. Unused arguments will be absorbed by
the intitialization method.

Returns: Initialization function.
"""
if init_strategy == "proposal" or init_strategy == "prior":
if init_strategy == "prior":
warn(
"You set init_strategy=prior. As of sbi v0.18.0, this is "
"deprecated and it will be removed in a future release. Use "
"init_strategy=proposal instead."
)
return lambda: proposal_init(proposal, transform=transform, **kwargs)
elif init_strategy == "sir":
warn(
"As of sbi v0.19.0, the behavior of the SIR initialization for MCMC "
"has changed. If you wish to restore the behavior of sbi v0.18.0, set "
"init_strategy='resample'."
)
return lambda: sir_init(
proposal, potential_fn, transform=transform, **kwargs
)
elif init_strategy == "resample":
return lambda: resample_given_potential_fn(
proposal, potential_fn, transform=transform, **kwargs
)
elif init_strategy == "latest_sample":
latest_sample = IterateParameters(self._mcmc_init_params, **kwargs)
return latest_sample
else:
raise NotImplementedError

def _get_initial_params(
self,
init_strategy: str,
num_chains: int,
num_workers: int,
show_progress_bars: bool,
**kwargs,
) -> Tensor:
"""Return initial parameters for MCMC obtained with given init strategy.

Parallelizes across CPU cores only for SIR.

Args:
init_strategy: Specifies the initialization method. Either of
[proposal|sir|resample|latest_sample].
num_chains: number of MCMC chains, generates initial params for each
num_workers: number of CPU cores for parallization
show_progress_bars: whether to show progress bars for SIR init
kwargs: Passed on to _build_mcmc_init_fn.

Returns:
Tensor: initial parameters, one for each chain
"""
# Build init function
init_fn = self._build_mcmc_init_fn(
self.proposal,
self.potential_fn,
transform=self.theta_transform,
init_strategy=init_strategy,  # type: ignore
**kwargs,
)

# Parallelize inits for resampling only.
if num_workers > 1 and (init_strategy == "resample" or init_strategy == "sir"):

def seeded_init_fn(seed):
torch.manual_seed(seed)
return init_fn()

seeds = torch.randint(high=2**31, size=(num_chains,))

# Generate initial params parallelized over num_workers.
with tqdm_joblib(
tqdm(
range(num_chains),  # type: ignore
disable=not show_progress_bars,
desc=f"""Generating {num_chains} MCMC inits with {num_workers}
workers.""",
total=num_chains,
)
):
initial_params = torch.cat(
Parallel(n_jobs=num_workers)(
delayed(seeded_init_fn)(seed) for seed in seeds
)
)
else:
initial_params = torch.cat(
[init_fn() for _ in range(num_chains)]  # type: ignore
)

return initial_params

def _slice_np_mcmc(
self,
num_samples: int,
potential_function: Callable,
initial_params: Tensor,
thin: int,
warmup_steps: int,
vectorized: bool = False,
num_workers: int = 1,
init_width: Union[float, ndarray] = 0.01,
show_progress_bars: bool = True,
) -> Tensor:
`