Generator
Defaultclass
Default.__init__(self, model, prior, summary, seed=None)
Generator
Parameters
----------
model : Simulator instance
Forward model
prior : Distribution or Mixture instance
Prior over parameters
summary : SummaryStats instance
Summary statistics
Attributes
----------
proposal : None or Distribution or Mixture instance
Proposal prior over parameters. If specified, will generate
samples given parameters drawn from proposal distribution rather
than samples drawn from prior when `gen` is called.
Source Code
| def __init__(self, model, prior, summary, seed=None):
self.model = model
self.prior = prior
self.summary = summary
self.proposal = None
self.seed = seed
self.rng = np.random.RandomState(seed=seed)
|
Default._feedback_forward_model(self, data)
Feedback step after forward model ran
Parameters
----------
data : np.array
Data
Returns
-------
response : str
Supported responses are in ['accept', 'discard']
Source Code
| @copy_ancestor_docstring
def _feedback_forward_model(self, data):
# See BaseGenerator for docstring
return 'accept'
|
Default._feedback_proposed_param(self, param)
Feedback step after parameter has been proposed
Parameters
----------
param : np.array
Parameter
Returns
-------
response : str
Supported responses are in ['accept', 'resample']
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | @copy_ancestor_docstring
def _feedback_proposed_param(self, param):
# See BaseGenerator for docstring
# if prior is uniform, reject samples outside of bounds
# samples might be drawn outside bounds due to proposal
if isinstance(self.prior, dd.Uniform):
if np.any(param < self.prior.lower) or \
np.any(param > self.prior.upper):
return 'resample'
elif isinstance(self.prior, dd.IndependentJoint):
for j, p in enumerate(self.prior.dists):
ii = self.prior.dist_index_eachdim == j
if isinstance(p, dd.Uniform):
if np.any(param[:, ii] < p.lower) or \
np.any(param[:, ii] > p.upper):
return 'resample'
elif isinstance(p, dd.Gamma):
if np.any(param[:, ii] < p.offset):
return 'resample'
return 'accept'
|
Default._feedback_summary_stats(self, sum_stats)
Feedback step after summary stats were computed
Parameters
----------
sum_stats : np.array
Summary stats
Returns
-------
response : str
Supported responses are in ['accept', 'discard']
Source Code
| @copy_ancestor_docstring
def _feedback_summary_stats(self, sum_stats):
# See BaseGenerator for docstring
return 'accept'
|
Default.draw_params(self, n_samples, skip_feedback=False, prior_mixin=0, verbose=True, leave_pbar=True)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | def draw_params(self, n_samples, skip_feedback=False, prior_mixin=0, verbose=True, leave_pbar=True):
if not verbose:
pbar = no_tqdm()
else:
pbar = progressbar(total=n_samples, leave=leave_pbar)
desc = 'Draw parameters '
if type(verbose) == str:
desc += verbose
pbar.set_description(desc)
# collect valid parameter vectors from the prior
params = [] # list of parameter vectors
with pbar:
i = 0
while i < n_samples:
# sample parameter
if self.proposal is None or self.rng.random_sample() < prior_mixin:
proposed_param = self.prior.gen(n_samples=1) # dim params,
else:
proposed_param = self.proposal.gen(n_samples=1)
# check if parameter vector is valid
response = self._feedback_proposed_param(proposed_param)
if response == 'accept' or skip_feedback:
# add valid param vector to list
params.append(proposed_param.reshape(-1))
i += 1
pbar.update(1)
elif response == 'resample':
# continue without increment on i or updating the bar
continue
else:
raise ValueError('response not supported')
return params
|
Default.gen(self, n_samples, n_reps=1, skip_feedback=False, prior_mixin=0, minibatch=50, keep_data=True, verbose=True, leave_pbar=True)
Draw parameters and run forward model
Parameters
----------
n_samples : int
Number of samples
n_reps: int
Number of repetitions per parameter sample
skip_feedback: bool
If True, feedback checks on params, data and sum stats are skipped
verbose : bool or str
If False, will not display progress bars. If a string is passed,
it will be appended to the description of the progress bar.
Returns
-------
params : n_samples x n_reps x n_params
Parameters
stats : n_samples x n_reps x n_summary
Summary statistics of data
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | def gen(self, n_samples, n_reps=1, skip_feedback=False, prior_mixin=0, minibatch=50, keep_data=True, verbose=True,
leave_pbar=True):
assert n_reps == 1, 'n_reps > 1 is not yet supported'
params = self.draw_params(n_samples=n_samples,
skip_feedback=skip_feedback,
prior_mixin=prior_mixin,
verbose=verbose,
leave_pbar=leave_pbar)
# Run forward model for params (in batches)
if not verbose:
pbar = no_tqdm()
else:
pbar = progressbar(total=len(params), leave=leave_pbar)
desc = 'Run simulations '
if type(verbose) == str:
desc += verbose
pbar.set_description(desc)
final_params = []
final_stats = [] # list of summary stats
with pbar:
for params_batch in self.iterate_minibatches(params, minibatch):
# run forward model for all params, each n_reps times
result = self.model.gen(params_batch, n_reps=n_reps, pbar=pbar)
stats, params = self.process_batch(params_batch, result,skip_feedback=skip_feedback)
final_params += params
final_stats += stats
# TODO: for n_reps > 1 duplicate params; reshape stats array
# n_samples x dim theta
params = np.array(final_params)
# n_samples x dim summary stats
stats = np.array(final_stats)
if len(final_stats)>0:
stats = stats.squeeze(axis=1)
return params, stats
|
Default.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Default.iterate_minibatches(self, params, minibatch=50)
Source Code
| def iterate_minibatches(self, params, minibatch=50):
n_samples = len(params)
for i in range(0, n_samples - minibatch+1, minibatch):
yield params[i:i + minibatch]
rem_i = n_samples - (n_samples % minibatch)
if rem_i != n_samples:
yield params[rem_i:]
|
Default.process_batch(self, params_batch, result, skip_feedback=False)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | def process_batch(self, params_batch, result,skip_feedback=False):
ret_stats = []
ret_params = []
# for every datum in data, check validity
params_data_valid = [] # list of params with valid data
data_valid = [] # list of lists containing n_reps dicts with data
for param, datum in zip(params_batch, result):
# check validity
response = self._feedback_forward_model(datum)
if response == 'accept' or skip_feedback:
data_valid.append(datum)
# if data is accepted, accept the param as well
params_data_valid.append(param)
elif response == 'discard':
continue
else:
raise ValueError('response not supported')
# for every data in data, calculate summary stats
for param, datum in zip(params_data_valid, data_valid):
# calculate summary statistics
sum_stats = self.summary.calc(datum) # n_reps x dim stats
# check validity
response = self._feedback_summary_stats(sum_stats)
if response == 'accept' or skip_feedback:
ret_stats.append(sum_stats)
# if sum stats is accepted, accept the param as well
ret_params.append(param)
elif response == 'discard':
continue
else:
raise ValueError('response not supported')
return ret_stats, ret_params
|
Default.reseed(self, seed)
Carries out the following operations, in order:
1) Reseeds the master RNG for the generator object, using the input seed
2) Reseeds the prior from the master RNG. This may cause additional
distributions to be reseeded using the prior's RNG (e.g. if the prior is
a mixture)
3) Reseeds the simulator RNG, from the master RNG
4) Reseeds the proposal, if present
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
self.prior.reseed(self.gen_newseed())
if self.model is not None:
self.model.reseed(self.gen_newseed())
if self.proposal is not None:
self.proposal.reseed(self.gen_newseed())
|
MPGeneratorclass
MPGenerator.__del__(self)
Source Code
| def __del__(self):
self.stop_workers()
|
MPGenerator.__init__(self, models, prior, summary, rej=None, seed=None, verbose=False)
Generator supporting multiprocessing
Parameters
----------
models : List of simulator instances
Forward models
prior : Distribution or Mixture instance
Prior over parameters
summary : SummaryStats instance
Summary statistics
rej : Function
Rejection kernel
Attributes
----------
proposal : None or Distribution or Mixture instance
Proposal prior over parameters. If specified, will generate
samples given parameters drawn from proposal distribution rather
than samples drawn from prior when `gen` is called.
Source Code
| def __init__(self, models, prior, summary, rej=None, seed=None, verbose=False):
super().__init__(model=None, prior=prior, summary=summary, seed=seed)
self.rej = rej if rej is not None else default_MPGenerator_rej
self.verbose = verbose
self.models = models
|
MPGenerator._feedback_forward_model(self, data)
Feedback step after forward model ran
Parameters
----------
data : np.array
Data
Returns
-------
response : str
Supported responses are in ['accept', 'discard']
Source Code
| @copy_ancestor_docstring
def _feedback_forward_model(self, data):
# See BaseGenerator for docstring
return 'accept'
|
MPGenerator._feedback_proposed_param(self, param)
Feedback step after parameter has been proposed
Parameters
----------
param : np.array
Parameter
Returns
-------
response : str
Supported responses are in ['accept', 'resample']
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | @copy_ancestor_docstring
def _feedback_proposed_param(self, param):
# See BaseGenerator for docstring
# if prior is uniform, reject samples outside of bounds
# samples might be drawn outside bounds due to proposal
if isinstance(self.prior, dd.Uniform):
if np.any(param < self.prior.lower) or \
np.any(param > self.prior.upper):
return 'resample'
elif isinstance(self.prior, dd.IndependentJoint):
for j, p in enumerate(self.prior.dists):
ii = self.prior.dist_index_eachdim == j
if isinstance(p, dd.Uniform):
if np.any(param[:, ii] < p.lower) or \
np.any(param[:, ii] > p.upper):
return 'resample'
elif isinstance(p, dd.Gamma):
if np.any(param[:, ii] < p.offset):
return 'resample'
return 'accept'
|
MPGenerator._feedback_summary_stats(self, sum_stats)
Feedback step after summary stats were computed
Parameters
----------
sum_stats : np.array
Summary stats
Returns
-------
response : str
Supported responses are in ['accept', 'discard']
Source Code
| def _feedback_summary_stats(self, sum_stats):
if self.rej(sum_stats):
return 'accept'
else:
return 'discard'
|
MPGenerator.draw_params(self, n_samples, skip_feedback=False, prior_mixin=0, verbose=True, leave_pbar=True)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | def draw_params(self, n_samples, skip_feedback=False, prior_mixin=0, verbose=True, leave_pbar=True):
if not verbose:
pbar = no_tqdm()
else:
pbar = progressbar(total=n_samples, leave=leave_pbar)
desc = 'Draw parameters '
if type(verbose) == str:
desc += verbose
pbar.set_description(desc)
# collect valid parameter vectors from the prior
params = [] # list of parameter vectors
with pbar:
i = 0
while i < n_samples:
# sample parameter
if self.proposal is None or self.rng.random_sample() < prior_mixin:
proposed_param = self.prior.gen(n_samples=1) # dim params,
else:
proposed_param = self.proposal.gen(n_samples=1)
# check if parameter vector is valid
response = self._feedback_proposed_param(proposed_param)
if response == 'accept' or skip_feedback:
# add valid param vector to list
params.append(proposed_param.reshape(-1))
i += 1
pbar.update(1)
elif response == 'resample':
# continue without increment on i or updating the bar
continue
else:
raise ValueError('response not supported')
return params
|
MPGenerator.filter_data(self, stats, params, skip_feedback=False)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | def filter_data(self, stats, params, skip_feedback=False):
if skip_feedback == True:
return stats, params
ret_stats = []
ret_params = []
for stat, param in zip(stats, params):
response = self._feedback_summary_stats(stat)
if response == 'accept':
ret_stats.append(stat)
ret_params.append(param)
elif response == 'discard':
continue
else:
raise ValueError('response not supported')
return ret_stats, ret_params
|
MPGenerator.gen(self, n_samples, n_reps=1, skip_feedback=False, prior_mixin=0, verbose=True, **kwargs)
Draw parameters and run forward model
Parameters
----------
n_samples : int
Number of samples
n_reps: int
Number of repetitions per parameter sample
skip_feedback: bool
If True, feedback checks on params, data and sum stats are skipped
verbose : bool or str
If False, will not display progress bars. If a string is passed,
it will be appended to the description of the progress bar.
Returns
-------
params : n_samples x n_reps x n_params
Parameters
stats : n_samples x n_reps x n_summary
Summary statistics of data
Source Code
| def gen(self, n_samples, n_reps=1, skip_feedback=False, prior_mixin=0, verbose=True, **kwargs):
assert n_reps == 1, 'n_reps > 1 is not yet supported'
params = self.draw_params(n_samples=n_samples,
skip_feedback=skip_feedback,
prior_mixin=prior_mixin,
verbose=verbose)
return self.run_model(params, skip_feedback=skip_feedback, verbose=verbose, **kwargs)
|
MPGenerator.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
MPGenerator.iterate_minibatches(self, params, minibatch=50)
Source Code
| def iterate_minibatches(self, params, minibatch=50):
n_samples = len(params)
for i in range(0, n_samples - minibatch + 1, minibatch):
yield params[i:i + minibatch]
rem_i = n_samples - (n_samples % minibatch)
if rem_i != n_samples:
yield params[rem_i:]
|
MPGenerator.log(self, msg)
Source Code
| def log(self, msg):
if self.verbose:
print("Parent: {}".format(msg))
|
MPGenerator.process_batch(self, params_batch, result, skip_feedback=False)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | def process_batch(self, params_batch, result,skip_feedback=False):
ret_stats = []
ret_params = []
# for every datum in data, check validity
params_data_valid = [] # list of params with valid data
data_valid = [] # list of lists containing n_reps dicts with data
for param, datum in zip(params_batch, result):
# check validity
response = self._feedback_forward_model(datum)
if response == 'accept' or skip_feedback:
data_valid.append(datum)
# if data is accepted, accept the param as well
params_data_valid.append(param)
elif response == 'discard':
continue
else:
raise ValueError('response not supported')
# for every data in data, calculate summary stats
for param, datum in zip(params_data_valid, data_valid):
# calculate summary statistics
sum_stats = self.summary.calc(datum) # n_reps x dim stats
# check validity
response = self._feedback_summary_stats(sum_stats)
if response == 'accept' or skip_feedback:
ret_stats.append(sum_stats)
# if sum stats is accepted, accept the param as well
ret_params.append(param)
elif response == 'discard':
continue
else:
raise ValueError('response not supported')
return ret_stats, ret_params
|
MPGenerator.reseed(self, seed)
Carries out the following operations, in order:
1) Reseeds the master RNG for the generator object, using the input seed
2) Reseeds the prior from the master RNG. This may cause additional
distributions to be reseeded using the prior's RNG (e.g. if the prior is
a mixture)
3) Reseeds the simulator RNG, from the master RNG
4) Reseeds the proposal, if present
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
self.prior.reseed(self.gen_newseed())
for m in self.models:
m.reseed(self.gen_newseed())
if self.proposal is not None:
self.proposal.reseed(self.gen_newseed())
|
MPGenerator.run_model(self, params, minibatch=50, skip_feedback=False, keep_data=True, verbose=False)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 | def run_model(self, params, minibatch=50, skip_feedback=False, keep_data=True, verbose=False):
# Run forward model for params (in batches)
if not verbose:
pbar = no_tqdm()
else:
pbar = progressbar(total=len(params))
desc = 'Run simulations '
if type(verbose) == str:
desc += verbose
pbar.set_description(desc)
self.start_workers()
final_params = []
final_stats = [] # list of summary stats
minibatches = self.iterate_minibatches(params, minibatch)
done = False
with pbar:
while not done:
active_list = []
for w, p in zip(self.workers, self.pipes):
try:
params_batch = next(minibatches)
except StopIteration:
done = True
break
active_list.append((w, p))
self.log("Dispatching to worker (len = {})".format(len(params_batch)))
p.send(params_batch)
self.log("Done")
n_remaining = len(active_list)
while n_remaining > 0:
self.log("Listening to worker")
msg = self.queue.get()
if type(msg) == int:
self.log("Received int")
pbar.update(msg)
elif type(msg) == tuple:
self.log("Received results")
stats, params = self.filter_data(*msg, skip_feedback=skip_feedback)
final_stats += stats
final_params += params
n_remaining -= 1
else:
self.log("Warning: Received unknown message of type {}".format(type(msg)))
self.stop_workers()
# TODO: for n_reps > 1 duplicate params; reshape stats array
# n_samples x n_reps x dim theta
params = np.array(final_params)
# n_samples x n_reps x dim summary stats
stats = np.array(final_stats)
stats = stats.squeeze(axis=1)
return params, stats
|
MPGenerator.start_workers(self)
Source Code
| def start_workers(self):
pipes = [ mp.Pipe(duplex=True) for m in self.models ]
self.queue = mp.Queue()
self.workers = [ Worker(i, self.queue, pipes[i][1], self.models[i], self.summary, seed=self.rng.randint(low=0,high=2**31), verbose=self.verbose) for i in range(len(self.models)) ]
self.pipes = [ p[0] for p in pipes ]
self.log("Starting workers")
for w in self.workers:
w.start()
self.log("Done")
|
MPGenerator.stop_workers(self)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | def stop_workers(self):
if self.workers is None:
return
self.log("Closing")
for w, p in zip(self.workers, self.pipes):
self.log("Closing pipe")
p.close()
for w in self.workers:
self.log("Joining process")
w.join(timeout=1)
w.terminate()
self.queue.close()
self.workers = None
self.pipes = None
self.queue = None
|