Distributions

Discreteclass

Discrete.__init__(self, p, seed=None)

Discrete distribution

Parameters
----------
p : list or np.array, 1d
    Probabilities of elements, must sum to 1
seed : int or None
    If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
def __init__(self, p, seed=None):

    super().__init__(ndim=1, seed=seed)

    p = np.asarray(p)
    assert p.ndim == 1, 'p must be a 1-d array'
    assert np.isclose(np.sum(p), 1), 'p must sum to 1'
    self.p = p

Discrete.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
1
2
3
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    raise NotImplementedError("To be implemented")

Discrete.gen(self, n_samples=1, seed=None)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
6
@copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
    # See BaseDistribution.py for docstring
    c = np.cumsum(self.p[:-1])[np.newaxis, :]  # cdf
    r = self.rng.rand(n_samples, 1)
    return np.sum((r > c).astype(int), axis=1).reshape(-1, 1)

Discrete.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Discrete.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

Gammaclass

Gamma.__init__(self, alpha=1.0, beta=1.0, offset=0.0, seed=None)

Univariate (!) Gamma distribution

Parameters
----------
alpha : list, or np.array, 1d
    Shape parameters
beta : list, or np.array, 1d
    inverse scale paramters
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def __init__(self, alpha=1., beta=1., offset=0., seed=None):

    super().__init__(ndim=1, seed=seed)

    alpha, beta = np.atleast_1d(alpha), np.atleast_1d(beta)
    assert alpha.ndim == 1, 'alpha must be a 1-d array'
    assert alpha.size == beta.size, 'alpha and beta must match in size'
    assert np.all(alpha > 0.), 'Should be greater than zero.'
    assert np.all(beta > 0.), 'Should be greater than zero.'
    self.alpha = alpha
    self.beta = beta
    self.offset = offset
    self._gamma = gamma(a=alpha, scale=1./beta)

Gamma.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # univariate distribution only, i.e. ii=[0] in any case

    # x should have a second dim with length 1, not more
    x = np.atleast_2d(x)
    assert x.shape[1] == 1, 'x needs second dim'
    assert not x.ndim > 2, 'no more than 2 dims in x'

    res = self._gamma.logpdf(x-self.offset) if log else self._gamma.pdf(x-self.offset)
    # reshape to (nbatch, )
    return res.reshape(-1)

Gamma.gen(self, n_samples=1, seed=None)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
6
7
8
@copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
    # See BaseDistribution.py for docstring

    x = self.rng.gamma(shape=self.alpha,
                       scale=1./self.beta,
                       size=(n_samples, self.ndim)) + self.offset
    return x

Gamma.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Gamma.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

Gaussianclass

Gaussian.__imul__(self, other)

Incrementally multiply with another Gaussian
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def __imul__(self, other):

    assert isinstance(other, Gaussian)

    res = self * other

    self.m = res.m
    self.P = res.P
    self.C = res.C
    self.S = res.S
    self.Pm = res.Pm
    self.logdetP = res.logdetP

    return res

Gaussian.__init__(self, m=None, P=None, U=None, S=None, Pm=None, seed=None)

Gaussian distribution

Initialize a gaussian pdf given a valid combination of its parameters.
Valid combinations are: m-P, m-U, m-S, Pm-P, Pm-U, Pm-S

Focus is on efficient multiplication, division and sampling.

Parameters
----------
m : list or np.array, 1d
    Mean
P : list or np.array, 2d
    Precision
U : list or np.array, 2d
    Upper triangular precision factor such that U'U = P
S : list or np.array, 2d
    Covariance
C : list or np.array, 2d
    Upper or lower triangular covariance factor, in any case S = C'C
Pm : list or np.array, 1d
    Precision times mean such that P*m = Pm
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, m=None, P=None, U=None, S=None, Pm=None, seed=None):

    assert m is None or np.asarray(m).ndim == 1
    assert P is None or np.asarray(P).ndim == 2
    assert U is None or np.asarray(U).ndim == 2
    assert S is None or np.asarray(S).ndim == 2
    assert Pm is None or np.asarray(Pm).ndim == 1

    self.__div__ = self.__truediv__
    self.__idiv__ = self.__itruediv__

    if m is not None:
        m = np.asarray(m)
        self.m = m
        ndim = self.m.size

        if P is not None:
            P = np.asarray(P)
            L = np.linalg.cholesky(P)  # P=LL' (lower triag)
            self.P = P
            self.C = np.linalg.inv(L)  # C is lower triangular here
            # S = C'C = L^{-1}^T L^{-1} = (LL^T)^{-1}
            self.S = np.dot(self.C.T, self.C)
            self.Pm = np.dot(P, m)
            self.logdetP = 2.0 * np.sum(np.log(np.diagonal(L)))

        elif U is not None:
            U = np.asarray(U)
            self.P = np.dot(U.T, U)
            self.C = np.linalg.inv(U.T)  # C is lower triangular here
            self.S = np.dot(self.C.T, self.C)
            self.Pm = np.dot(self.P, m)
            self.logdetP = 2.0 * np.sum(np.log(np.diagonal(U)))

        elif S is not None:
            S = np.asarray(S)
            self.P = np.linalg.inv(S)
            self.C = np.linalg.cholesky(S).T  # C is upper triangular here
            self.S = S
            self.Pm = np.dot(self.P, m)
            self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))

        else:
            raise ValueError('Precision information missing')

    elif Pm is not None:
        Pm = np.asarray(Pm)
        self.Pm = Pm
        ndim = self.Pm.size

        if P is not None:
            P = np.asarray(P)
            L = np.linalg.cholesky(P)
            # L = np.linalg.cholesky(P + 0.001*np.identity(P.shape[0]))
            self.P = P
            self.C = np.linalg.inv(L)
            self.S = np.dot(self.C.T, self.C)
            self.m = np.linalg.solve(P, Pm)
            self.logdetP = 2.0 * np.sum(np.log(np.diagonal(L)))

        elif U is not None:
            U = np.asarray(U)
            self.P = np.dot(U.T, U)
            self.C = np.linalg.inv(U.T)
            self.S = np.dot(self.C.T, self.C)
            self.m = np.linalg.solve(self.P, Pm)
            self.logdetP = 2.0 * np.sum(np.log(np.diagonal(U)))

        elif S is not None:
            S = np.asarray(S)
            self.P = np.linalg.inv(S)
            self.C = np.linalg.cholesky(S).T
            self.S = S
            self.m = np.dot(S, Pm)
            self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))

        else:
            raise ValueError('Precision information missing')

    else:
        raise ValueError('Mean information missing')

    super().__init__(ndim, seed=seed)

Gaussian.__ipow__(self, power)

Incrementally raise gaussian to a power
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def __ipow__(self, power):

    res = self ** power

    self.m = res.m
    self.P = res.P
    self.C = res.C
    self.S = res.S
    self.Pm = res.Pm
    self.logdetP = res.logdetP

    return res

Gaussian.__itruediv__(self, other)

Incrementally divide by another Gaussian

Note that the resulting Gaussian might be improper.
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def __itruediv__(self, other):

    assert isinstance(other, Gaussian)

    res = self / other

    self.m = res.m
    self.P = res.P
    self.C = res.C
    self.S = res.S
    self.Pm = res.Pm
    self.logdetP = res.logdetP

    return res

Gaussian.__mul__(self, other)

Multiply with another Gaussian
Source Code
1
2
3
4
5
6
7
8
def __mul__(self, other):

    assert isinstance(other, Gaussian)

    P = self.P + other.P
    Pm = self.Pm + other.Pm

    return Gaussian(P=P, Pm=Pm, seed=self.seed)

Gaussian.__pow__(self, power, modulo=None)

Raise Gaussian to a power and get another Gaussian
Source Code
1
2
3
4
5
6
def __pow__(self, power, modulo=None):

    P = power * self.P
    Pm = power * self.Pm

    return Gaussian(P=P, Pm=Pm, seed=self.seed)

Gaussian.__truediv__(self, other)

Divide by another Gaussian

Note that the resulting Gaussian might be improper.
Source Code
1
2
3
4
5
6
7
8
def __truediv__(self, other):

    assert isinstance(other, Gaussian)

    P = self.P - other.P
    Pm = self.Pm - other.Pm

    return Gaussian(P=P, Pm=Pm, seed=self.seed)

Gaussian.convert_to_T(self, dof)

Converts Gaussian to Student T

Parameters
----------
dof : int
    Degrees of freedom
Source Code
1
2
3
def convert_to_T(self, dof):

    return StudentsT(self.m, self.S, dof, seed=self.seed)

Gaussian.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseDistribution.py for docstring
    x = np.atleast_2d(x)

    if ii is None:
        assert x.shape[1] == self.ndim, "incorrect data dimension"
        xm = x - self.m
        lp = -np.sum(np.dot(xm, self.P) * xm, axis=1)
        lp += self.logdetP - self.ndim * np.log(2.0 * np.pi)
        lp *= 0.5

    else:
        ii = np.atleast_1d(ii)
        m = self.m[ii]
        S = self.S[ii][:, ii]
        if m.size == 1:  # single marginal
            x = x.reshape(-1, m.size)
        assert x.shape[1] == m.size
        if np.linalg.matrix_rank(S)==len(S[:,0]):
            lp = scipy.stats.multivariate_normal.logpdf(x, m, S, allow_singular=True)
            lp = np.array([lp]) if x.shape[0] == 1 else lp
        else:
            raise ValueError('Rank deficiency in covariance matrix')

    res = lp if log else np.exp(lp)
    return res

Gaussian.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
6
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseDistribution.py for docstring
    z = self.rng.randn(n_samples, self.ndim)
    samples = np.dot(z, self.C) + self.m
    return samples

Gaussian.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Gaussian.kl(self, other)

Calculates the KL divergence from this to another Gaussian

Direction of KL is KL(this | other)
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def kl(self, other):

    assert isinstance(other, Gaussian)
    assert self.ndim == other.ndim

    t1 = np.sum(other.P * self.S)

    m = other.m - self.m
    t2 = np.dot(m, np.dot(other.P, m))

    t3 = self.logdetP - other.logdetP

    t = 0.5 * (t1 + t2 + t3 - self.ndim)

    return t

Gaussian.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

Gaussian.ztrans(self, mean, std)

Z-transform

Parameters
----------
mean : array
    Mean vector
std : array
    Std vector

Returns
-------
Gaussian distribution
Source Code
1
2
3
4
5
def ztrans(self, mean, std):

    m = (self.m - mean) / std
    S = self.S / np.outer(std, std)
    return Gaussian(m=m, S=S, seed=self.seed)

Gaussian.ztrans_inv(self, mean, std)

Z-transform inverse

Parameters
----------
mean : array
    Mean vector
std : array
    Std vector

Returns
-------
Gaussian distribution
Source Code
1
2
3
4
5
def ztrans_inv(self, mean, std):

    m = std * self.m + mean
    S = np.outer(std, std) * self.S
    return Gaussian(m=m, S=S, seed=self.seed)

Logisticclass

Logistic.__init__(self, mu=0.0, s=1.0, seed=None)

Distribution with independent dimensions and logistic marginals

Parameters
---------
mu : list, or np.array, 1d
    Means
s : list, or np.array, 1d
    Scale factors
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def __init__(self, mu=0.0, s=1.0, seed=None):

    mu, s = np.atleast_1d(mu), np.atleast_1d(s)
    if s.size == 1:
        s = np.full(mu.size, s[0])

    assert (s > 0).all() and np.isfinite(s).all() and np.isfinite(mu).all() and np.isreal(s).all() and \
           np.isreal(mu).all(), "bad params"
    assert s.ndim == 1 and mu.ndim == 1 and mu.size == s.size, "bad sizes"
    self.mu, self.s = mu, s

    super().__init__(ndim=mu.size, seed=seed)

Logistic.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseDistribution.py for docstring
    x = np.atleast_2d(x)
    assert x.shape[1] == self.ndim, "incorrect data dimension"

    if ii is None:
        ii = np.arange(self.ndim)

    z = (x - self.mu) / self.s
    logp_eachdim = -z - np.log(self.s) - 2.0 * np.log(1.0 + np.exp(-z))
    logp = logp_eachdim[:, ii].sum(axis=1)

    return logp if log else np.exp(logp)

Logistic.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseDistribution.py for docstring
    u = np.random.uniform(size=(n_samples, self.ndim))
    return self.mu + self.s * (np.log(u) - np.log(1 - u))

Logistic.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Logistic.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

Poissonclass

Poisson.__init__(self, mu=0.0, offset=0.0, seed=None)

Univariate (!) Poisson distribution
Parameters
----------
mu: shape parameter of the Poisson (Poisson rate)
offset: shift in the mean parameter, see scipy.stats.Poisson documentation. 
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def __init__(self, mu=0., offset=0., seed=None):

    super().__init__(ndim=1, seed=seed)

    mu = np.atleast_1d(mu)
    assert mu.ndim == 1, 'mu must be a 1-d array'
    assert offset >= 0, 'offset must not be negative'

    self.mu = mu
    self.offset = offset
    self._poisson = poisson(mu=mu, loc=offset)

Poisson.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # univariate distribution only, i.e. ii=[0] in any case
    assert ii is None, 'this is a univariate Poisson, ii must be None.'

    # x should have a second dim with length 1, not more
    x = np.atleast_2d(x)
    assert x.shape[1] == 1, 'x needs second dim'
    assert not x.ndim > 2, 'no more than 2 dims in x'

    res = self._poisson.logpmf(x) if log else self._poisson.pmf(x)
    # reshape to (nbatch, )
    return res.reshape(-1)

Poisson.gen(self, n_samples=1, seed=None)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
6
@copy_ancestor_docstring
def gen(self, n_samples=1, seed=None):
    # See BaseDistribution.py for docstring

    x = self._poisson.rvs(random_state=self.rng, size=(n_samples, self.ndim))
    return x

Poisson.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Poisson.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

StudentsTclass

StudentsT.__init__(self, m, S, dof, seed=None)

Student's T distribution

Parameters
----------
m : list or np.array, 1d
    Mean
S : list or np.array, 1d
    Covariance
dof : int
    Degrees of freedom
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def __init__(self, m, S, dof, seed=None):

    m = np.asarray(m)
    self.m = m
    self.dof = dof
    assert(dof > 0)

    S = np.asarray(S)
    self.P = np.linalg.inv(S)
    self.C = np.linalg.cholesky(S).T  # C is upper triangular here
    self.S = S
    self.Pm = np.dot(self.P, m)
    self.logdetP = -2.0 * np.sum(np.log(np.diagonal(self.C)))
    super().__init__(ndim=m.size, seed=seed)

StudentsT.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseDistribution.py for docstring
    if ii is not None:
        raise NotImplementedError

    xm = x - self.m
    lp = np.log(1 + np.sum(np.dot(xm, self.P) * xm, axis=1) / self.dof)
    lp *= -(self.dof + self.ndim) / 2.0
    lp += np.log(scipy.special.gamma((self.dof + self.ndim) / 2))
    lp -= scipy.special.gammaln(self.dof / 2) + \
        self.ndim / 2 * np.log(self.dof * np.pi) - 0.5 * self.logdetP

    res = lp if log else np.exp(lp)
    return res

StudentsT.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
6
7
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseDistribution.py for docstring
    u = self.rng.chisquare(self.dof, n_samples) / self.dof
    y = self.rng.multivariate_normal(np.zeros(self.ndim),
                                      self.S, (n_samples,))
    return self.m + y / np.sqrt(u)[:, None]

StudentsT.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

StudentsT.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

Uniformclass

Uniform.__init__(self, lower=0.0, upper=1.0, seed=None)

Uniform distribution

Parameters
----------
lower : list, or np.array, 1d
    Lower bound(s)
upper : list, or np.array, 1d
    Upper bound(s)
seed : int or None
    If provided, random number generator will be seeded
Source Code
1
2
3
4
5
6
7
8
9
def __init__(self, lower=0., upper=1., seed=None):

    self.lower = np.atleast_1d(lower)
    self.upper = np.atleast_1d(upper)

    assert self.lower.ndim == self.upper.ndim
    assert self.lower.ndim == 1

    super().__init__(ndim=len(self.lower), seed=seed)

Uniform.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseDistribution.py for docstring
    if ii is None:
        ii = np.arange(self.ndim)
    else:
        ii = np.atleast_1d(ii)

    if x.ndim == 1 and ii.size == 1:
        x = x.reshape(-1, 1)
    else:
        x = np.atleast_2d(x)

    assert x.ndim == 2 and ii.ndim == 1
    assert x.shape[1] == ii.size

    N = x.shape[0]

    p = 1.0 / np.prod(self.upper[ii] - self.lower[ii])
    p = p * np.ones((N,))  # broadcasting

    # truncation of density
    ind = (x > self.lower[ii]) & (x < self.upper[ii])
    p[np.prod(ind, axis=1)==0] = 0

    if log:
        if ind.any() == False:
            raise ValueError('log probability not defined outside of truncation')
        else:
            return np.log(p)
    else:
        return p

Uniform.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
1
2
3
4
5
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseDistribution.py for docstring
    ms = self.rng.rand(n_samples, self.ndim) * (self.upper - self.lower) + self.lower
    return ms

Uniform.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

Uniform.reseed(self, seed)

Reseeds the distribution's RNG
Source Code
1
2
3
4
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed

MoGclass

Abstract base class for mixture distributions

Distributions must at least implement abstract methods of this class.

Component distributions should be added to self.xs, which is a list
containing the distributions of individual components.

Parameters
----------
a : list or np.array, 1d
    Mixing coefficients
ncomp : int
    Number of components
ndim : int
    Number of ndimensions of the component distributions
seed : int or None
    If provided, random number generator will be seeded

MoG.__imul__(self, other)

Incrementally multiply with a single gaussian
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def __imul__(self, other):

    assert isinstance(other, Gaussian)

    res = self * other

    self.a = res.a
    self.xs = res.xs

    return res

MoG.__init__(self, a, ms=None, Ps=None, Us=None, Ss=None, xs=None, seed=None)

Mixture of Gaussians

Creates a MoG with a valid combination of parameters or an already given
list of Gaussian variables.

Parameters
----------
a : list or np.array, 1d
    Mixing coefficients
ms : list, length n_components
    Means
Ps : list, length n_components
    Precisions
Us : list, length n_components
    Precision factors such that U'U = P
Ss : list, length n_components
    Covariances
xs : list, length n_components
    List of gaussian variables
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
        self,
        a,
        ms=None,
        Ps=None,
        Us=None,
        Ss=None,
        xs=None,
        seed=None):

    self.__div__ = self.__truediv__
    self.__idiv__ = self.__itruediv__

    if ms is not None:
        super().__init__(
            a=np.asarray(a),
            ncomp=len(ms),
            ndim=np.asarray(
                ms[0]).ndim,
            seed=seed)

        if Ps is not None:
            self.xs = [
                Gaussian(
                    m=m, P=P, seed=self.gen_newseed()) for m, P in zip(
                    ms, Ps)]

        elif Us is not None:
            self.xs = [
                Gaussian(
                    m=m, U=U, seed=self.gen_newseed()) for m, U in zip(
                    ms, Us)]

        elif Ss is not None:
            self.xs = [
                Gaussian(
                    m=m, S=S, seed=self.gen_newseed()) for m, S in zip(
                    ms, Ss)]

        else:
            raise ValueError('Precision information missing')

    elif xs is not None:
        super().__init__(
            a=np.asarray(a),
            ncomp=len(xs),
            ndim=xs[0].ndim,
            seed=seed)
        self.xs = xs

    else:
        raise ValueError('Mean information missing')

MoG.__itruediv__(self, other)

Incrementally divide by a single gaussian
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def __itruediv__(self, other):

    assert isinstance(other, Gaussian)

    res = self / other

    self.a = res.a
    self.xs = res.xs

    return res

MoG.__mul__(self, other)

Multiply with a single gaussian
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def __mul__(self, other):

    assert isinstance(other, Gaussian)

    ys = [x * other for x in self.xs]

    lcs = np.empty_like(self.a)

    for i, (x, y) in enumerate(zip(self.xs, ys)):

        lcs[i] = x.logdetP + other.logdetP - y.logdetP
        lcs[i] -= np.dot(x.m, np.dot(x.P, x.m)) + \
            np.dot(other.m, np.dot(other.P, other.m)) - \
            np.dot(y.m, np.dot(y.P, y.m))
        lcs[i] *= 0.5

    la = np.log(self.a) + lcs
    la -= scipy.special.logsumexp(la)
    a = np.exp(la)

    return MoG(a=a, xs=ys, seed=self.seed)

MoG.__truediv__(self, other)

Divide by a single gaussian
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def __truediv__(self, other):

    assert isinstance(other, Gaussian)

    ys = [x / other for x in self.xs]

    lcs = np.empty_like(self.a)

    for i, (x, y) in enumerate(zip(self.xs, ys)):

        lcs[i] = x.logdetP - other.logdetP - y.logdetP
        lcs[i] -= np.dot(x.m, np.dot(x.P, x.m)) - \
            np.dot(other.m, np.dot(other.P, other.m)) - \
            np.dot(y.m, np.dot(y.P, y.m))
        lcs[i] *= 0.5

    la = np.log(self.a) + lcs
    la -= scipy.special.logsumexp(la)
    a = np.exp(la)

    return MoG(a=a, xs=ys, seed=self.seed)

MoG.calc_mean_and_cov(self)

Calculate the mean vector and the covariance matrix of the MoG
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def calc_mean_and_cov(self):

    ms = [x.m for x in self.xs]
    m = np.dot(self.a, np.array(ms))

    msqs = [x.S + np.outer(mi, mi) for x, mi in zip(self.xs, ms)]
    S = np.sum(
        np.array([a * msq for a, msq in zip(self.a, msqs)]), axis=0) - np.outer(m, m)

    return m, S

MoG.convert_to_E(self, beta=0.99)

Convert to Mixture of ellipsoidal distributions
Source Code
1
2
3
def convert_to_E(self, beta=0.99):

    return MoE(self.a, xs=self.xs, seed=self.seed, beta=beta)

MoG.convert_to_T(self, dofs)

Convert to Mixture of Student's T distributions

Parameters
----------
dofs : int or list of ints
    Degrees of freedom of component distributions
Source Code
1
2
3
4
5
6
def convert_to_T(self, dofs):

    if type(dofs) == int:
        dofs = [dofs for i in range(len(self.xs))]
    ys = [x.convert_to_T(dof) for x, dof in zip(self.xs, dofs)]
    return MoT(self.a, xs=ys, seed=self.seed)

MoG.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseMixture.py for docstring
    ps = np.array([c.eval(x, ii, log) for c in self.xs]).T
    res = scipy.special.logsumexp(
        ps +
        np.log(
            self.a),
        axis=1) if log else np.dot(
        ps,
        self.a)

    return res

MoG.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseMixture.py for docstring
    ii = self.gen_comp(n_samples)  # n_samples,

    ns = [np.sum((ii == i).astype(int)) for i in range(self.n_components)]
    samples = [x.gen(n) for x, n in zip(self.xs, ns)]
    samples = np.concatenate(samples, axis=0)
    self.rng.shuffle(samples)

    return samples

MoG.gen_comp(self, n_samples)

Generate component index according to self.a
Source Code
1
2
3
def gen_comp(self, n_samples):

    return self.discrete_sample.gen(n_samples).reshape(-1)  # n_samples,

MoG.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

MoG.kl(self, other, n_samples=10000)

Estimates the KL from this to another PDF

KL(this | other), using Monte Carlo
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def kl(self, other, n_samples=10000):

    x = self.gen(n_samples)
    lp = self.eval(x, log=True)
    lq = other.eval(x, log=True)
    t = lp - lq

    res = np.mean(t)
    err = np.std(t, ddof=1) / np.sqrt(n_samples)

    return res, err

MoG.project_to_gaussian(self)

Returns a gaussian with the same mean and precision as the mog
Source Code
1
2
3
4
def project_to_gaussian(self):

    m, S = self.calc_mean_and_cov()
    return Gaussian(m=m, S=S, seed=self.seed)

MoG.prune_negligible_components(self, threshold)

Prune components

Removes all the components whose mixing coefficient is less
than a threshold.
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def prune_negligible_components(self, threshold):

    ii = np.nonzero((self.a < threshold).astype(int))[0]
    total_del_a = np.sum(self.a[ii])
    del_count = ii.size

    self.ncomp -= del_count
    self.a = np.delete(self.a, ii)
    self.a += total_del_a / self.n_components
    self.xs = [x for i, x in enumerate(self.xs) if i not in ii]

MoG.reseed(self, seed)

Reseeds the following RNGs in the following order:
1) Master RNG for the mixture object, using the input seed
2) RNG for the discrete distribution used to sample components. The seed
is generated using the master RNG.
3) RNG for each mixture component, in order. Each seed is generated by
the master RNG.
Source Code
1
2
3
4
5
6
7
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed
    self.discrete_sample.reseed(seed=self.gen_newseed())
    for x in self.xs:
        x.reseed(seed=self.gen_newseed())

MoG.ztrans(self, mean, std)

Z-transform
Source Code
1
2
3
4
def ztrans(self, mean, std):

    xs = [x.ztrans(mean, std) for x in self.xs]
    return MoG(self.a, xs=xs, seed=self.seed)

MoG.ztrans_inv(self, mean, std)

Z-transform inverse
Source Code
1
2
3
4
def ztrans_inv(self, mean, std):

    xs = [x.ztrans_inv(mean, std) for x in self.xs]
    return MoG(self.a, xs=xs, seed=self.seed)

MoTclass

Abstract base class for mixture distributions

Distributions must at least implement abstract methods of this class.

Component distributions should be added to self.xs, which is a list
containing the distributions of individual components.

Parameters
----------
a : list or np.array, 1d
    Mixing coefficients
ncomp : int
    Number of components
ndim : int
    Number of ndimensions of the component distributions
seed : int or None
    If provided, random number generator will be seeded

MoT.__init__(self, a, ms=None, Ss=None, dofs=None, xs=None, seed=None)

Mixture of Student's T distributions

Creates a MoT with a valid combination of parameters or an already given
list of gaussian variables.

Parameters
----------
a : list or 1d array
    Mixing coefficients
ms : list of length n_components
    Means
Ss : list of length n_components
    Covariances
dofs: list of length n_components
    Degrees of freedom
xs : list of length n_components
    List of Student's T distributions
seed : int or None
    If provided, random number generator will be seeded
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, a, ms=None, Ss=None, dofs=None, xs=None, seed=None):

    if ms is not None:
        super().__init__(
            a=np.asarray(a),
            ncomp=len(ms),
            ndim=np.asarray(
                ms[0]).ndim,
            seed=seed)
        self.xs = [
            StudentsT(
                m=m,
                S=S,
                dof=dof,
                seed=self.gen_newseed()) for m,
            S,
            dof in zip(
                ms,
                Ss,
                dofs)]
    elif xs is not None:
        super().__init__(
            a=np.asarray(a),
            ncomp=len(xs),
            ndim=xs[0].ndim,
            seed=seed)
        self.xs = xs
    else:
        raise ValueError('Mean information missing')

MoT.eval(self, x, ii=None, log=True)

Method to evaluate pdf

Parameters
----------
x : int or list or np.array
    Rows are inputs to evaluate at
ii : list
    A list of indices specifying which marginal to evaluate.
    If None, the joint pdf is evaluated
log : bool, defaulting to True
    If True, the log pdf is evaluated

Returns
-------
scalar
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@copy_ancestor_docstring
def eval(self, x, ii=None, log=True):
    # See BaseMixture.py for docstring
    if ii is not None:
        raise NotImplementedError

    ps = np.array([c.eval(x, ii=None, log=log) for c in self.xs]).T
    res = scipy.special.logsumexp(
        ps +
        np.log(
            self.a),
        axis=1) if log else np.dot(
        ps,
        self.a)

    return res

MoT.gen(self, n_samples=1)

Method to generate samples

Parameters
----------
n_samples : int
    Number of samples to generate

Returns
-------
n_samples x self.ndim
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@copy_ancestor_docstring
def gen(self, n_samples=1):
    # See BaseMixture.py for docstring
    ii = self.gen_comp(n_samples)  # n_samples,

    ns = [np.sum((ii == i).astype(int)) for i in range(self.n_components)]
    samples = [x.gen(n) for x, n in zip(self.xs, ns)]
    samples = np.concatenate(samples, axis=0)
    self.rng.shuffle(samples)

    return samples

MoT.gen_comp(self, n_samples)

Generate component index according to self.a
Source Code
1
2
3
def gen_comp(self, n_samples):

    return self.discrete_sample.gen(n_samples).reshape(-1)  # n_samples,

MoT.gen_newseed(self)

Generates a new random seed
Source Code
1
2
3
4
5
6
def gen_newseed(self):

    if self.seed is None:
        return None
    else:
        return self.rng.randint(0, 2**31)

MoT.kl(self, other, n_samples=10000)

Estimates the KL from this to another PDF

KL(this | other), using Monte Carlo
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def kl(self, other, n_samples=10000):

    x = self.gen(n_samples)
    lp = self.eval(x, log=True)
    lq = other.eval(x, log=True)
    t = lp - lq

    res = np.mean(t)
    err = np.std(t, ddof=1) / np.sqrt(n_samples)

    return res, err

MoT.prune_negligible_components(self, threshold)

Prune components

Removes all the components whose mixing coefficient is less
than a threshold.
Source Code
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def prune_negligible_components(self, threshold):

    ii = np.nonzero((self.a < threshold).astype(int))[0]
    total_del_a = np.sum(self.a[ii])
    del_count = ii.size

    self.ncomp -= del_count
    self.a = np.delete(self.a, ii)
    self.a += total_del_a / self.n_components
    self.xs = [x for i, x in enumerate(self.xs) if i not in ii]

MoT.reseed(self, seed)

Reseeds the following RNGs in the following order:
1) Master RNG for the mixture object, using the input seed
2) RNG for the discrete distribution used to sample components. The seed
is generated using the master RNG.
3) RNG for each mixture component, in order. Each seed is generated by
the master RNG.
Source Code
1
2
3
4
5
6
7
def reseed(self, seed):

    self.rng.seed(seed=seed)
    self.seed = seed
    self.discrete_sample.reseed(seed=self.gen_newseed())
    for x in self.xs:
        x.reseed(seed=self.gen_newseed())