Distributions
Discreteclass
Discrete.__init__(self, p, seed=None)
Discrete distribution
Parameters
----------
p : list or np.array, 1d
Probabilities of elements, must sum to 1
seed : int or None
If provided, random number generator will be seeded
Source Code
| def __init__(self, p, seed=None):
super().__init__(ndim=1, seed=seed)
p = np.asarray(p)
assert p.ndim == 1, 'p must be a 1-d array'
assert np.isclose(np.sum(p), 1), 'p must sum to 1'
self.p = p
|
Discrete.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
| @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
raise NotImplementedError("To be implemented")
|
Discrete.gen(self, n_samples=1, seed=None)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
# See BaseDistribution.py for docstring
c = np.cumsum(self.p[:-1])[np.newaxis, :] # cdf
r = self.rng.rand(n_samples, 1)
return np.sum((r > c).astype(int), axis=1).reshape(-1, 1)
|
Discrete.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Discrete.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
Gammaclass
Gamma.__init__(self, alpha=1.0, beta=1.0, offset=0.0, seed=None)
Univariate (!) Gamma distribution
Parameters
----------
alpha : list, or np.array, 1d
Shape parameters
beta : list, or np.array, 1d
inverse scale paramters
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13 | def __init__(self, alpha=1., beta=1., offset=0., seed=None):
super().__init__(ndim=1, seed=seed)
alpha, beta = np.atleast_1d(alpha), np.atleast_1d(beta)
assert alpha.ndim == 1, 'alpha must be a 1-d array'
assert alpha.size == beta.size, 'alpha and beta must match in size'
assert np.all(alpha > 0.), 'Should be greater than zero.'
assert np.all(beta > 0.), 'Should be greater than zero.'
self.alpha = alpha
self.beta = beta
self.offset = offset
self._gamma = gamma(a=alpha, scale=1./beta)
|
Gamma.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# univariate distribution only, i.e. ii=[0] in any case
# x should have a second dim with length 1, not more
x = np.atleast_2d(x)
assert x.shape[1] == 1, 'x needs second dim'
assert not x.ndim > 2, 'no more than 2 dims in x'
res = self._gamma.logpdf(x-self.offset) if log else self._gamma.pdf(x-self.offset)
# reshape to (nbatch, )
return res.reshape(-1)
|
Gamma.gen(self, n_samples=1, seed=None)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
# See BaseDistribution.py for docstring
x = self.rng.gamma(shape=self.alpha,
scale=1./self.beta,
size=(n_samples, self.ndim)) + self.offset
return x
|
Gamma.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Gamma.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
Gaussianclass
Gaussian.__imul__(self, other)
Incrementally multiply with another Gaussian
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | def __imul__(self, other):
assert isinstance(other, Gaussian)
res = self * other
self.m = res.m
self.P = res.P
self.C = res.C
self.S = res.S
self.Pm = res.Pm
self.logdetP = res.logdetP
return res
|
Gaussian.__init__(self, m=None, P=None, U=None, S=None, Pm=None, seed=None)
Gaussian distribution
Initialize a gaussian pdf given a valid combination of its parameters.
Valid combinations are: m-P, m-U, m-S, Pm-P, Pm-U, Pm-S
Focus is on efficient multiplication, division and sampling.
Parameters
----------
m : list or np.array, 1d
Mean
P : list or np.array, 2d
Precision
U : list or np.array, 2d
Upper triangular precision factor such that U'U = P
S : list or np.array, 2d
Covariance
C : list or np.array, 2d
Upper or lower triangular covariance factor, in any case S = C'C
Pm : list or np.array, 1d
Precision times mean such that P*m = Pm
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | def __init__(self, m=None, P=None, U=None, S=None, Pm=None, seed=None):
assert m is None or np.asarray(m).ndim == 1
assert P is None or np.asarray(P).ndim == 2
assert U is None or np.asarray(U).ndim == 2
assert S is None or np.asarray(S).ndim == 2
assert Pm is None or np.asarray(Pm).ndim == 1
self.__div__ = self.__truediv__
self.__idiv__ = self.__itruediv__
if m is not None:
m = np.asarray(m)
self.m = m
ndim = self.m.size
if P is not None:
P = np.asarray(P)
L = np.linalg.cholesky(P) # P=LL' (lower triag)
self.P = P
self.C = np.linalg.inv(L) # C is lower triangular here
# S = C'C = L^{-1}^T L^{-1} = (LL^T)^{-1}
self.S = np.dot(self.C.T, self.C)
self.Pm = np.dot(P, m)
self.logdetP = 2.0 * np.sum(np.log(np.diagonal(L)))
elif U is not None:
U = np.asarray(U)
self.P = np.dot(U.T, U)
self.C = np.linalg.inv(U.T) # C is lower triangular here
self.S = np.dot(self.C.T, self.C)
self.Pm = np.dot(self.P, m)
self.logdetP = 2.0 * np.sum(np.log(np.diagonal(U)))
elif S is not None:
S = np.asarray(S)
self.P = np.linalg.inv(S)
self.C = np.linalg.cholesky(S).T # C is upper triangular here
self.S = S
self.Pm = np.dot(self.P, m)
self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))
else:
raise ValueError('Precision information missing')
elif Pm is not None:
Pm = np.asarray(Pm)
self.Pm = Pm
ndim = self.Pm.size
if P is not None:
P = np.asarray(P)
L = np.linalg.cholesky(P)
# L = np.linalg.cholesky(P + 0.001*np.identity(P.shape[0]))
self.P = P
self.C = np.linalg.inv(L)
self.S = np.dot(self.C.T, self.C)
self.m = np.linalg.solve(P, Pm)
self.logdetP = 2.0 * np.sum(np.log(np.diagonal(L)))
elif U is not None:
U = np.asarray(U)
self.P = np.dot(U.T, U)
self.C = np.linalg.inv(U.T)
self.S = np.dot(self.C.T, self.C)
self.m = np.linalg.solve(self.P, Pm)
self.logdetP = 2.0 * np.sum(np.log(np.diagonal(U)))
elif S is not None:
S = np.asarray(S)
self.P = np.linalg.inv(S)
self.C = np.linalg.cholesky(S).T
self.S = S
self.m = np.dot(S, Pm)
self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))
else:
raise ValueError('Precision information missing')
else:
raise ValueError('Mean information missing')
super().__init__(ndim, seed=seed)
|
Gaussian.__ipow__(self, power)
Incrementally raise gaussian to a power
Source Code
1
2
3
4
5
6
7
8
9
10
11
12 | def __ipow__(self, power):
res = self ** power
self.m = res.m
self.P = res.P
self.C = res.C
self.S = res.S
self.Pm = res.Pm
self.logdetP = res.logdetP
return res
|
Gaussian.__itruediv__(self, other)
Incrementally divide by another Gaussian
Note that the resulting Gaussian might be improper.
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | def __itruediv__(self, other):
assert isinstance(other, Gaussian)
res = self / other
self.m = res.m
self.P = res.P
self.C = res.C
self.S = res.S
self.Pm = res.Pm
self.logdetP = res.logdetP
return res
|
Gaussian.__mul__(self, other)
Multiply with another Gaussian
Source Code
| def __mul__(self, other):
assert isinstance(other, Gaussian)
P = self.P + other.P
Pm = self.Pm + other.Pm
return Gaussian(P=P, Pm=Pm, seed=self.seed)
|
Gaussian.__pow__(self, power, modulo=None)
Raise Gaussian to a power and get another Gaussian
Source Code
| def __pow__(self, power, modulo=None):
P = power * self.P
Pm = power * self.Pm
return Gaussian(P=P, Pm=Pm, seed=self.seed)
|
Gaussian.__truediv__(self, other)
Divide by another Gaussian
Note that the resulting Gaussian might be improper.
Source Code
| def __truediv__(self, other):
assert isinstance(other, Gaussian)
P = self.P - other.P
Pm = self.Pm - other.Pm
return Gaussian(P=P, Pm=Pm, seed=self.seed)
|
Gaussian.convert_to_T(self, dof)
Converts Gaussian to Student T
Parameters
----------
dof : int
Degrees of freedom
Source Code
| def convert_to_T(self, dof):
return StudentsT(self.m, self.S, dof, seed=self.seed)
|
Gaussian.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseDistribution.py for docstring
x = np.atleast_2d(x)
if ii is None:
assert x.shape[1] == self.ndim, "incorrect data dimension"
xm = x - self.m
lp = -np.sum(np.dot(xm, self.P) * xm, axis=1)
lp += self.logdetP - self.ndim * np.log(2.0 * np.pi)
lp *= 0.5
else:
ii = np.atleast_1d(ii)
m = self.m[ii]
S = self.S[ii][:, ii]
if m.size == 1: # single marginal
x = x.reshape(-1, m.size)
assert x.shape[1] == m.size
if np.linalg.matrix_rank(S)==len(S[:,0]):
lp = scipy.stats.multivariate_normal.logpdf(x, m, S, allow_singular=True)
lp = np.array([lp]) if x.shape[0] == 1 else lp
else:
raise ValueError('Rank deficiency in covariance matrix')
res = lp if log else np.exp(lp)
return res
|
Gaussian.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseDistribution.py for docstring
z = self.rng.randn(n_samples, self.ndim)
samples = np.dot(z, self.C) + self.m
return samples
|
Gaussian.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Gaussian.kl(self, other)
Calculates the KL divergence from this to another Gaussian
Direction of KL is KL(this | other)
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | def kl(self, other):
assert isinstance(other, Gaussian)
assert self.ndim == other.ndim
t1 = np.sum(other.P * self.S)
m = other.m - self.m
t2 = np.dot(m, np.dot(other.P, m))
t3 = self.logdetP - other.logdetP
t = 0.5 * (t1 + t2 + t3 - self.ndim)
return t
|
Gaussian.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
Gaussian.ztrans(self, mean, std)
Z-transform
Parameters
----------
mean : array
Mean vector
std : array
Std vector
Returns
-------
Gaussian distribution
Source Code
| def ztrans(self, mean, std):
m = (self.m - mean) / std
S = self.S / np.outer(std, std)
return Gaussian(m=m, S=S, seed=self.seed)
|
Gaussian.ztrans_inv(self, mean, std)
Z-transform inverse
Parameters
----------
mean : array
Mean vector
std : array
Std vector
Returns
-------
Gaussian distribution
Source Code
| def ztrans_inv(self, mean, std):
m = std * self.m + mean
S = np.outer(std, std) * self.S
return Gaussian(m=m, S=S, seed=self.seed)
|
Logisticclass
Logistic.__init__(self, mu=0.0, s=1.0, seed=None)
Distribution with independent dimensions and logistic marginals
Parameters
---------
mu : list, or np.array, 1d
Means
s : list, or np.array, 1d
Scale factors
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12 | def __init__(self, mu=0.0, s=1.0, seed=None):
mu, s = np.atleast_1d(mu), np.atleast_1d(s)
if s.size == 1:
s = np.full(mu.size, s[0])
assert (s > 0).all() and np.isfinite(s).all() and np.isfinite(mu).all() and np.isreal(s).all() and \
np.isreal(mu).all(), "bad params"
assert s.ndim == 1 and mu.ndim == 1 and mu.size == s.size, "bad sizes"
self.mu, self.s = mu, s
super().__init__(ndim=mu.size, seed=seed)
|
Logistic.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseDistribution.py for docstring
x = np.atleast_2d(x)
assert x.shape[1] == self.ndim, "incorrect data dimension"
if ii is None:
ii = np.arange(self.ndim)
z = (x - self.mu) / self.s
logp_eachdim = -z - np.log(self.s) - 2.0 * np.log(1.0 + np.exp(-z))
logp = logp_eachdim[:, ii].sum(axis=1)
return logp if log else np.exp(logp)
|
Logistic.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseDistribution.py for docstring
u = np.random.uniform(size=(n_samples, self.ndim))
return self.mu + self.s * (np.log(u) - np.log(1 - u))
|
Logistic.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Logistic.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
Poissonclass
Poisson.__init__(self, mu=0.0, offset=0.0, seed=None)
Univariate (!) Poisson distribution
Parameters
----------
mu: shape parameter of the Poisson (Poisson rate)
offset: shift in the mean parameter, see scipy.stats.Poisson documentation.
seed : int or None
If provided, random number generator will be seeded
Source Code
| def __init__(self, mu=0., offset=0., seed=None):
super().__init__(ndim=1, seed=seed)
mu = np.atleast_1d(mu)
assert mu.ndim == 1, 'mu must be a 1-d array'
assert offset >= 0, 'offset must not be negative'
self.mu = mu
self.offset = offset
self._poisson = poisson(mu=mu, loc=offset)
|
Poisson.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# univariate distribution only, i.e. ii=[0] in any case
assert ii is None, 'this is a univariate Poisson, ii must be None.'
# x should have a second dim with length 1, not more
x = np.atleast_2d(x)
assert x.shape[1] == 1, 'x needs second dim'
assert not x.ndim > 2, 'no more than 2 dims in x'
res = self._poisson.logpmf(x) if log else self._poisson.pmf(x)
# reshape to (nbatch, )
return res.reshape(-1)
|
Poisson.gen(self, n_samples=1, seed=None)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
# See BaseDistribution.py for docstring
x = self._poisson.rvs(random_state=self.rng, size=(n_samples, self.ndim))
return x
|
Poisson.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Poisson.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
StudentsTclass
StudentsT.__init__(self, m, S, dof, seed=None)
Student's T distribution
Parameters
----------
m : list or np.array, 1d
Mean
S : list or np.array, 1d
Covariance
dof : int
Degrees of freedom
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | def __init__(self, m, S, dof, seed=None):
m = np.asarray(m)
self.m = m
self.dof = dof
assert(dof > 0)
S = np.asarray(S)
self.P = np.linalg.inv(S)
self.C = np.linalg.cholesky(S).T # C is upper triangular here
self.S = S
self.Pm = np.dot(self.P, m)
self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))
super().__init__(ndim=m.size, seed=seed)
|
StudentsT.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseDistribution.py for docstring
if ii is not None:
raise NotImplementedError
xm = x - self.m
lp = np.log(1 + np.sum(np.dot(xm, self.P) * xm, axis=1) / self.dof)
lp *= -(self.dof + self.ndim) / 2.0
lp += np.log(scipy.special.gamma((self.dof + self.ndim) / 2))
lp -= scipy.special.gammaln(self.dof / 2) + \
self.ndim / 2 * np.log(self.dof * np.pi) - 0.5 * self.logdetP
res = lp if log else np.exp(lp)
return res
|
StudentsT.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseDistribution.py for docstring
u = self.rng.chisquare(self.dof, n_samples) / self.dof
y = self.rng.multivariate_normal(np.zeros(self.ndim),
self.S, (n_samples,))
return self.m + y / np.sqrt(u)[:, None]
|
StudentsT.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
StudentsT.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
Uniform.__init__(self, lower=0.0, upper=1.0, seed=None)
Uniform distribution
Parameters
----------
lower : list, or np.array, 1d
Lower bound(s)
upper : list, or np.array, 1d
Upper bound(s)
seed : int or None
If provided, random number generator will be seeded
Source Code
| def __init__(self, lower=0., upper=1., seed=None):
self.lower = np.atleast_1d(lower)
self.upper = np.atleast_1d(upper)
assert self.lower.ndim == self.upper.ndim
assert self.lower.ndim == 1
super().__init__(ndim=len(self.lower), seed=seed)
|
Uniform.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseDistribution.py for docstring
if ii is None:
ii = np.arange(self.ndim)
else:
ii = np.atleast_1d(ii)
if x.ndim == 1 and ii.size == 1:
x = x.reshape(-1, 1)
else:
x = np.atleast_2d(x)
assert x.ndim == 2 and ii.ndim == 1
assert x.shape[1] == ii.size
N = x.shape[0]
p = 1.0 / np.prod(self.upper[ii] - self.lower[ii])
p = p * np.ones((N,)) # broadcasting
# truncation of density
ind = (x > self.lower[ii]) & (x < self.upper[ii])
p[np.prod(ind, axis=1)==0] = 0
if log:
if ind.any() == False:
raise ValueError('log probability not defined outside of truncation')
else:
return np.log(p)
else:
return p
|
Uniform.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseDistribution.py for docstring
ms = self.rng.rand(n_samples, self.ndim) * (self.upper - self.lower) + self.lower
return ms
|
Uniform.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
Uniform.reseed(self, seed)
Reseeds the distribution's RNG
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
|
MoGclass
Abstract base class for mixture distributions
Distributions must at least implement abstract methods of this class.
Component distributions should be added to self.xs, which is a list
containing the distributions of individual components.
Parameters
----------
a : list or np.array, 1d
Mixing coefficients
ncomp : int
Number of components
ndim : int
Number of ndimensions of the component distributions
seed : int or None
If provided, random number generator will be seeded
MoG.__imul__(self, other)
Incrementally multiply with a single gaussian
Source Code
| def __imul__(self, other):
assert isinstance(other, Gaussian)
res = self * other
self.a = res.a
self.xs = res.xs
return res
|
MoG.__init__(self, a, ms=None, Ps=None, Us=None, Ss=None, xs=None, seed=None)
Mixture of Gaussians
Creates a MoG with a valid combination of parameters or an already given
list of Gaussian variables.
Parameters
----------
a : list or np.array, 1d
Mixing coefficients
ms : list, length n_components
Means
Ps : list, length n_components
Precisions
Us : list, length n_components
Precision factors such that U'U = P
Ss : list, length n_components
Covariances
xs : list, length n_components
List of gaussian variables
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | def __init__(
self,
a,
ms=None,
Ps=None,
Us=None,
Ss=None,
xs=None,
seed=None):
self.__div__ = self.__truediv__
self.__idiv__ = self.__itruediv__
if ms is not None:
super().__init__(
a=np.asarray(a),
ncomp=len(ms),
ndim=np.asarray(
ms[0]).ndim,
seed=seed)
if Ps is not None:
self.xs = [
Gaussian(
m=m, P=P, seed=self.gen_newseed()) for m, P in zip(
ms, Ps)]
elif Us is not None:
self.xs = [
Gaussian(
m=m, U=U, seed=self.gen_newseed()) for m, U in zip(
ms, Us)]
elif Ss is not None:
self.xs = [
Gaussian(
m=m, S=S, seed=self.gen_newseed()) for m, S in zip(
ms, Ss)]
else:
raise ValueError('Precision information missing')
elif xs is not None:
super().__init__(
a=np.asarray(a),
ncomp=len(xs),
ndim=xs[0].ndim,
seed=seed)
self.xs = xs
else:
raise ValueError('Mean information missing')
|
MoG.__itruediv__(self, other)
Incrementally divide by a single gaussian
Source Code
| def __itruediv__(self, other):
assert isinstance(other, Gaussian)
res = self / other
self.a = res.a
self.xs = res.xs
return res
|
MoG.__mul__(self, other)
Multiply with a single gaussian
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | def __mul__(self, other):
assert isinstance(other, Gaussian)
ys = [x * other for x in self.xs]
lcs = np.empty_like(self.a)
for i, (x, y) in enumerate(zip(self.xs, ys)):
lcs[i] = x.logdetP + other.logdetP - y.logdetP
lcs[i] -= np.dot(x.m, np.dot(x.P, x.m)) + \
np.dot(other.m, np.dot(other.P, other.m)) - \
np.dot(y.m, np.dot(y.P, y.m))
lcs[i] *= 0.5
la = np.log(self.a) + lcs
la -= scipy.special.logsumexp(la)
a = np.exp(la)
return MoG(a=a, xs=ys, seed=self.seed)
|
MoG.__truediv__(self, other)
Divide by a single gaussian
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | def __truediv__(self, other):
assert isinstance(other, Gaussian)
ys = [x / other for x in self.xs]
lcs = np.empty_like(self.a)
for i, (x, y) in enumerate(zip(self.xs, ys)):
lcs[i] = x.logdetP - other.logdetP - y.logdetP
lcs[i] -= np.dot(x.m, np.dot(x.P, x.m)) - \
np.dot(other.m, np.dot(other.P, other.m)) - \
np.dot(y.m, np.dot(y.P, y.m))
lcs[i] *= 0.5
la = np.log(self.a) + lcs
la -= scipy.special.logsumexp(la)
a = np.exp(la)
return MoG(a=a, xs=ys, seed=self.seed)
|
MoG.calc_mean_and_cov(self)
Calculate the mean vector and the covariance matrix of the MoG
Source Code
| def calc_mean_and_cov(self):
ms = [x.m for x in self.xs]
m = np.dot(self.a, np.array(ms))
msqs = [x.S + np.outer(mi, mi) for x, mi in zip(self.xs, ms)]
S = np.sum(
np.array([a * msq for a, msq in zip(self.a, msqs)]), axis=0) - np.outer(m, m)
return m, S
|
MoG.convert_to_E(self, beta=0.99)
Convert to Mixture of ellipsoidal distributions
Source Code
| def convert_to_E(self, beta=0.99):
return MoE(self.a, xs=self.xs, seed=self.seed, beta=beta)
|
MoG.convert_to_T(self, dofs)
Convert to Mixture of Student's T distributions
Parameters
----------
dofs : int or list of ints
Degrees of freedom of component distributions
Source Code
| def convert_to_T(self, dofs):
if type(dofs) == int:
dofs = [dofs for i in range(len(self.xs))]
ys = [x.convert_to_T(dof) for x, dof in zip(self.xs, dofs)]
return MoT(self.a, xs=ys, seed=self.seed)
|
MoG.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseMixture.py for docstring
ps = np.array([c.eval(x, ii, log) for c in self.xs]).T
res = scipy.special.logsumexp(
ps +
np.log(
self.a),
axis=1) if log else np.dot(
ps,
self.a)
return res
|
MoG.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseMixture.py for docstring
ii = self.gen_comp(n_samples) # n_samples,
ns = [np.sum((ii == i).astype(int)) for i in range(self.n_components)]
samples = [x.gen(n) for x, n in zip(self.xs, ns)]
samples = np.concatenate(samples, axis=0)
self.rng.shuffle(samples)
return samples
|
MoG.gen_comp(self, n_samples)
Generate component index according to self.a
Source Code
| def gen_comp(self, n_samples):
return self.discrete_sample.gen(n_samples).reshape(-1) # n_samples,
|
MoG.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
MoG.kl(self, other, n_samples=10000)
Estimates the KL from this to another PDF
KL(this | other), using Monte Carlo
Source Code
| def kl(self, other, n_samples=10000):
x = self.gen(n_samples)
lp = self.eval(x, log=True)
lq = other.eval(x, log=True)
t = lp - lq
res = np.mean(t)
err = np.std(t, ddof=1) / np.sqrt(n_samples)
return res, err
|
MoG.project_to_gaussian(self)
Returns a gaussian with the same mean and precision as the mog
Source Code
| def project_to_gaussian(self):
m, S = self.calc_mean_and_cov()
return Gaussian(m=m, S=S, seed=self.seed)
|
MoG.prune_negligible_components(self, threshold)
Prune components
Removes all the components whose mixing coefficient is less
than a threshold.
Source Code
| def prune_negligible_components(self, threshold):
ii = np.nonzero((self.a < threshold).astype(int))[0]
total_del_a = np.sum(self.a[ii])
del_count = ii.size
self.ncomp -= del_count
self.a = np.delete(self.a, ii)
self.a += total_del_a / self.n_components
self.xs = [x for i, x in enumerate(self.xs) if i not in ii]
|
MoG.reseed(self, seed)
Reseeds the following RNGs in the following order:
1) Master RNG for the mixture object, using the input seed
2) RNG for the discrete distribution used to sample components. The seed
is generated using the master RNG.
3) RNG for each mixture component, in order. Each seed is generated by
the master RNG.
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
self.discrete_sample.reseed(seed=self.gen_newseed())
for x in self.xs:
x.reseed(seed=self.gen_newseed())
|
MoG.ztrans(self, mean, std)
Source Code
| def ztrans(self, mean, std):
xs = [x.ztrans(mean, std) for x in self.xs]
return MoG(self.a, xs=xs, seed=self.seed)
|
MoG.ztrans_inv(self, mean, std)
Source Code
| def ztrans_inv(self, mean, std):
xs = [x.ztrans_inv(mean, std) for x in self.xs]
return MoG(self.a, xs=xs, seed=self.seed)
|
MoTclass
Abstract base class for mixture distributions
Distributions must at least implement abstract methods of this class.
Component distributions should be added to self.xs, which is a list
containing the distributions of individual components.
Parameters
----------
a : list or np.array, 1d
Mixing coefficients
ncomp : int
Number of components
ndim : int
Number of ndimensions of the component distributions
seed : int or None
If provided, random number generator will be seeded
MoT.__init__(self, a, ms=None, Ss=None, dofs=None, xs=None, seed=None)
Mixture of Student's T distributions
Creates a MoT with a valid combination of parameters or an already given
list of gaussian variables.
Parameters
----------
a : list or 1d array
Mixing coefficients
ms : list of length n_components
Means
Ss : list of length n_components
Covariances
dofs: list of length n_components
Degrees of freedom
xs : list of length n_components
List of Student's T distributions
seed : int or None
If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | def __init__(self, a, ms=None, Ss=None, dofs=None, xs=None, seed=None):
if ms is not None:
super().__init__(
a=np.asarray(a),
ncomp=len(ms),
ndim=np.asarray(
ms[0]).ndim,
seed=seed)
self.xs = [
StudentsT(
m=m,
S=S,
dof=dof,
seed=self.gen_newseed()) for m,
S,
dof in zip(
ms,
Ss,
dofs)]
elif xs is not None:
super().__init__(
a=np.asarray(a),
ncomp=len(xs),
ndim=xs[0].ndim,
seed=seed)
self.xs = xs
else:
raise ValueError('Mean information missing')
|
MoT.eval(self, x, ii=None, log=True)
Method to evaluate pdf
Parameters
----------
x : int or list or np.array
Rows are inputs to evaluate at
ii : list
A list of indices specifying which marginal to evaluate.
If None, the joint pdf is evaluated
log : bool, defaulting to True
If True, the log pdf is evaluated
Returns
-------
scalar
Source Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | @copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
# See BaseMixture.py for docstring
if ii is not None:
raise NotImplementedError
ps = np.array([c.eval(x, ii=None, log=log) for c in self.xs]).T
res = scipy.special.logsumexp(
ps +
np.log(
self.a),
axis=1) if log else np.dot(
ps,
self.a)
return res
|
MoT.gen(self, n_samples=1)
Method to generate samples
Parameters
----------
n_samples : int
Number of samples to generate
Returns
-------
n_samples x self.ndim
Source Code
| @copy_ancestor_docstring
def gen(self, n_samples=1):
# See BaseMixture.py for docstring
ii = self.gen_comp(n_samples) # n_samples,
ns = [np.sum((ii == i).astype(int)) for i in range(self.n_components)]
samples = [x.gen(n) for x, n in zip(self.xs, ns)]
samples = np.concatenate(samples, axis=0)
self.rng.shuffle(samples)
return samples
|
MoT.gen_comp(self, n_samples)
Generate component index according to self.a
Source Code
| def gen_comp(self, n_samples):
return self.discrete_sample.gen(n_samples).reshape(-1) # n_samples,
|
MoT.gen_newseed(self)
Generates a new random seed
Source Code
| def gen_newseed(self):
if self.seed is None:
return None
else:
return self.rng.randint(0, 2**31)
|
MoT.kl(self, other, n_samples=10000)
Estimates the KL from this to another PDF
KL(this | other), using Monte Carlo
Source Code
| def kl(self, other, n_samples=10000):
x = self.gen(n_samples)
lp = self.eval(x, log=True)
lq = other.eval(x, log=True)
t = lp - lq
res = np.mean(t)
err = np.std(t, ddof=1) / np.sqrt(n_samples)
return res, err
|
MoT.prune_negligible_components(self, threshold)
Prune components
Removes all the components whose mixing coefficient is less
than a threshold.
Source Code
| def prune_negligible_components(self, threshold):
ii = np.nonzero((self.a < threshold).astype(int))[0]
total_del_a = np.sum(self.a[ii])
del_count = ii.size
self.ncomp -= del_count
self.a = np.delete(self.a, ii)
self.a += total_del_a / self.n_components
self.xs = [x for i, x in enumerate(self.xs) if i not in ii]
|
MoT.reseed(self, seed)
Reseeds the following RNGs in the following order:
1) Master RNG for the mixture object, using the input seed
2) RNG for the discrete distribution used to sample components. The seed
is generated using the master RNG.
3) RNG for each mixture component, in order. Each seed is generated by
the master RNG.
Source Code
| def reseed(self, seed):
self.rng.seed(seed=seed)
self.seed = seed
self.discrete_sample.reseed(seed=self.gen_newseed())
for x in self.xs:
x.reseed(seed=self.gen_newseed())
|