numpy 在Python中加速迭代算法

7eumitmz  于 12个月前  发布在  Python
关注(0)|答案(3)|浏览(133)

我正在实现隐马尔可夫模型的前向递归:必要的步骤(a-b)如下所示
x1c 0d1x的数据
下面是我的实现

from scipy.stats import norm
import numpy as np
import random

def gmmdensity(obs, w, mu, sd):
    # compute probability density function of gaussian mixture 
    gauss_mixt = norm.pdf(obs, mu, sd)[:,None]*w
    return gauss_mixt

def alpha(obs, states, A, pi, w, mu, sigma):    
    dens = np.sum(gmmdensity(obs, w, mu, sigma), axis = 2)
    # scaling factor is used to renormalize probabilities in order to
    # avoid numerical underflow
    scaling_factor = np.ones(len(obs))
    alpha_matrix = np.zeros((len(states), len(obs)))
    
    # for t = 0
    alpha_matrix[:,0] = pi*dens[0]
    scaling_factor[0] = 1/np.sum(alpha_matrix[:,0], axis = 0)
    alpha_matrix[:,0] *= scaling_factor[0]
    
    # for t == 1:T
    for t in range(1, len(obs)):
        alpha_matrix[:,t] = np.matmul(alpha_matrix[:,t-1], A)*dens[t]
        scaling_factor[t] = 1/np.sum(alpha_matrix[:,t], axis = 0)
        alpha_matrix[:,t] *= scaling_factor[t]

    return alpha_matrix, scaling_factor

字符串
让我们生成一些数据来运行算法

obs = np.concatenate((np.random.normal(0, 1, size = 500), 
                        np.random.normal(1.5, 1, size = 500))).reshape(-1,1)
N = 2 # number of hidden states
M = 3 # number of mixture components
states = list(range(N))
pi = np.array([0.5, 0.5]) # initial probabilities
A = np.array([[0.8, 0.2], [0.3, 0.7]]) # transition matrix
mu = np.array([np.min(obs), np.median(obs), np.max(obs)]) # means of mixture components
sigma = np.array([1, 1, 1]) # variances of mixture components
w = np.array([[0.2, 0.3, 0.5], [0.6, 0.2, 0.2]]) # weights of mixture components


让我们看看算法有多快

%timeit alpha(obs, states, A, pi, w, mu, sigma)
13.6 ms ± 1.24 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


有没有可能使这段代码更快?我想过使用numba或cython,但它从来没有在这种情况下完全工作。

s2j5cfk0

s2j5cfk01#

我分析了这段代码,发现下面的想法有助于加速它。

  • 多次保存/加载alpha_matrix[:,t]向量是很慢的。将正在操作的向量作为局部变量保存直到完成操作会更快。scaling_factor[t]也是如此。
  • alpha_matrix[:,t]的内存局部性很差。它访问一个元素,跳过几个元素,访问一个元素,等等。因为你到处都在做alpha_matrix[:,t],我发现如果你交换两个索引,索引会更容易。一旦我们完成了操作,我们就可以把它转回到它的原始状态,这样你就不必修改代码的其余部分。
  • 在只有一个轴的数组上运行np.sum(..., axis=0)与不使用轴参数的np.sum()是一样的,但是使用轴参数会更慢。
  • w数组相乘会创建一个大的(1000,2,3)数组,然后将其最后一个维度求和。您可以使用einsum()将这两个步骤合并结合起来。这更快,因为它避免了创建大的中间数组。我从Using Python numpy einsum to obtain dot product between 2 Matrices复制了此代码。
  • scipy.stats.norm.pdf()在这里慢得出奇。我认为SciPy是使用CDF的有限差分来定义PDF?我用formula for the normal PDF重新编码,发现快了3倍。
  • 算法的主循环可以被提取到一个新的函数中,我们可以使用Numba来加速它(这提供了大部分的加速,但我更喜欢最后做这种事情,因为Numba函数不能被分析。

这在提供的基准测试上加起来大约有40倍的加速:

%timeit alpha_orig(obs, states, A, pi, w, mu, sigma)
14.4 ms ± 159 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit alpha(obs, states, A, pi, w, mu, sigma)
308 µs ± 2.02 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

字符串
完整代码:

from scipy.stats import norm
import numpy as np
import random
import numba as nb

@nb.njit()
def fast_normal_pdf(x, mu, sigma):
    x = (x - mu) / sigma
    one_over_sqrt_2pi = 1 / np.sqrt(2 * np.pi)
    return (1 / sigma) * one_over_sqrt_2pi * np.exp(-0.5*x*x)

def gmmdensitysum(obs, w, mu, sd):
    pdf = fast_normal_pdf(obs, mu, sigma)
    return np.einsum('ij,kj->ik', pdf, w)

@nb.njit()
def alpha_inner(N, alpha_matrix, dens, scaling_factor):
    for t in range(1, N):
        current_alpha = (alpha_matrix[t-1] @ A) * dens[t]
        current_scaling_factor = 1 / current_alpha.sum()
        current_alpha *= current_scaling_factor
        scaling_factor[t] = current_scaling_factor
        alpha_matrix[t] = current_alpha

def alpha(obs, states, A, pi, w, mu, sigma):
    dens = gmmdensitysum(obs, w, mu, sigma)
    # scaling factor is used to renormalize probabilities in order to
    # avoid numerical underflow
    scaling_factor = np.ones(len(obs))
    alpha_matrix = np.zeros((len(obs), len(states)))
    
    # for t = 0
    alpha_matrix[0] = pi * dens[0]
    scaling_factor[0] = 1 / np.sum(alpha_matrix[0])
    alpha_matrix[0] *= scaling_factor[0]

    alpha_inner(len(obs), alpha_matrix, dens, scaling_factor)

    return alpha_matrix.T, scaling_factor

3mpgtkmj

3mpgtkmj2#

TL; DR:Numba可以大大加快这段代码的速度,特别是在基本循环的时候。这个答案相对于其他答案来说是比较晚的,但是它提供了迄今为止最快的实现。

这确实是一种典型的情况,Numba可以大大加快计算速度。np.matmul在我的机器上将一个只有2个元素的向量与一个2x2的矩阵相乘需要超过1 µs的时间。在我的机器上,这显然不需要超过0.01 µs的时间(假设数据在L1缓存中)。大部分时间都在Numpy调用开销中丢失。np.sum需要> 3 µs,而它应该需要大约几纳秒(出于同样的原因):它只是两个数字的总和!
为了在Numba中有效地解决这个问题,你需要使用基本循环并避免创建新的临时数组(在这样的基本循环中分配是昂贵的)。注意,gmmdensity不能很容易地转换为Numba,因为它调用了Scipy的norm.pdf,这是Numba还不支持的AFAIK。下面是一个快速实现:

import numba as nb
import numpy as np

@nb.njit('(int64, int64, float64[:,::1], float64[:,::1], float64[::1])')
def fast_iteration(obsLen, statesLen, dens, A, pi):
    # scaling factor is used to renormalize probabilities in order to
    # avoid numerical underflow
    scaling_factor = np.ones(obsLen)
    alpha_matrix = np.zeros((statesLen, obsLen))
    
    # for t = 0
    alpha_matrix[:,0] = pi*dens[0]
    scaling_factor[0] = 1/np.sum(alpha_matrix[:,0], axis = 0)
    alpha_matrix[:,0] *= scaling_factor[0]
    tmp = np.zeros(statesLen)

    for t in range(1, obsLen):
        # Line: alpha_matrix[:,t] = np.matmul(alpha_matrix[:,t-1], A)*dens[t]
        for i in range(statesLen):
            s = 0.0
            for k in range(statesLen):
                s += alpha_matrix[k, t-1] * A[k, i]
            tmp[i] = s * dens[t, i]

        # Line: scaling_factor[t] = 1/np.sum(alpha_matrix[:,t], axis = 0)
        s = 0.0
        for i in range(statesLen):
            s += tmp[i]
        scaling_factor[t] = 1.0 / s

        # Line: alpha_matrix[:,t] *= scaling_factor[t]
        for i in range(statesLen):
            alpha_matrix[i, t] = tmp[i] * scaling_factor[t]

    return alpha_matrix, scaling_factor

@nb.njit('(float64[:,::1], float64[::1], float64[::1])')
def fast_normal_pdf(obs, mu, sigma):
    n, m = obs.shape[0], mu.size
    assert obs.shape[1] == 1
    assert mu.size == sigma.size
    inv_sigma = 1.0 / sigma
    one_over_sqrt_2pi = 1 / np.sqrt(2 * np.pi)
    result = np.empty((n, m))
    for i in range(n):
        for j in range(m):
            tmp = (obs[i, 0] - mu[j]) * inv_sigma[j]
            tmp = np.exp(-0.5 * tmp * tmp)
            result[i, j] = one_over_sqrt_2pi * inv_sigma[j] * tmp
    return result

@nb.njit('(float64[:,::1], float64[:,::1], float64[::1], float64[::1])')
def fast_gmmdensitysum(obs, w, mu, sigma):
    pdf = fast_normal_pdf(obs, mu, sigma)
    result = np.zeros((pdf.shape[0], w.shape[0]))
    for i in range(pdf.shape[0]):
        for k in range(w.shape[0]):
            s = 0.0
            for j in range(pdf.shape[1]):
                s += pdf[i, j] * w[k, j]
            result[i, k] = s
    return result

def fast_alpha(obs, states, A, pi, w, mu, sigma):    
    dens = fast_gmmdensitysum(obs, w, mu, sigma)
    return fast_iteration(len(obs), len(states), dens, A, pi)

# Usage

obs = np.concatenate((np.random.normal(0, 1, size = 500), 
                        np.random.normal(1.5, 1, size = 500))).reshape(-1,1)
N = 2 # number of hidden states
M = 3 # number of mixture components
states = list(range(N))
pi = np.array([0.5, 0.5]) # initial probabilities
A = np.array([[0.8, 0.2], [0.3, 0.7]]) # transition matrix
mu = np.array([np.min(obs), np.median(obs), np.max(obs)]) # means of mixture components
sigma = np.array([1, 1, 1], dtype=np.float64) # variances of mixture components
w = np.array([[0.2, 0.3, 0.5], [0.6, 0.2, 0.2]]) # weights of mixture components

字符串
请注意,sigma的类型已经明确提供,所以它是正确的。fast_normal_pdffast_normal_pdf是受@NickODell的好答案启发而优化的实现。

性能指标评测

下面是我的机器上的i5- 9600 KF CPU的性能结果:

Initial code:        10417.4 µs      x1
Andrej Kesely:         974.6 µs     x11
Nick ODell:            335.1 µs     x31
This implementation:    53.9 µs    x193    <------


虽然这个实现比其他实现更大,但它显然是最快的实现。它比我机器上的初始实现快193倍,比目前为止最快的实现快6倍多。
fast_gmmdensitysum是最昂贵的部分(>60%),特别是np.exp函数(~40%),很难进一步优化。

rkkpypqq

rkkpypqq3#

下面是函数的numba版本(注意:numba版本会引发NumbaPerformanceWarning: '@' is faster on contiguous arrays,它可能需要重新排列输入矩阵(?)):

import random
from timeit import timeit

import numpy as np
from numba import njit
from scipy.stats import norm

def gmmdensity(obs, w, mu, sd):
    # compute probability density function of gaussian mixture
    gauss_mixt = norm.pdf(obs, mu, sd)[:, None] * w
    return gauss_mixt

def alpha(obs, states, A, pi, w, mu, sigma):
    dens = np.sum(gmmdensity(obs, w, mu, sigma), axis=2)
    # scaling factor is used to renormalize probabilities in order to
    # avoid numerical underflow
    scaling_factor = np.ones(len(obs))
    alpha_matrix = np.zeros((len(states), len(obs)))

    # for t = 0
    alpha_matrix[:, 0] = pi * dens[0]
    scaling_factor[0] = 1 / np.sum(alpha_matrix[:, 0], axis=0)
    alpha_matrix[:, 0] *= scaling_factor[0]

    # for t == 1:T
    for t in range(1, len(obs)):
        alpha_matrix[:, t] = np.matmul(alpha_matrix[:, t - 1], A) * dens[t]
        scaling_factor[t] = 1 / np.sum(alpha_matrix[:, t], axis=0)
        alpha_matrix[:, t] *= scaling_factor[t]

    return alpha_matrix, scaling_factor

@njit
def pdf_numba(x, loc, scale):
    x = (x - loc) / scale
    return (np.exp(-(x**2) / 2) / np.sqrt(2 * np.pi)) / scale

@njit
def gmmdensity_numba(obs, w, mu, sd):
    # compute probability density function of gaussian mixture
    gauss_mixt = pdf_numba(obs, mu, sd)[:, None] * w
    return gauss_mixt

@njit
def alpha_numba(obs, states, A, pi, w, mu, sigma):
    dens = np.sum(gmmdensity_numba(obs, w, mu, sigma), axis=2)
    # scaling factor is used to renormalize probabilities in order to
    # avoid numerical underflow

    scaling_factor = np.ones(len(obs))
    alpha_matrix = np.zeros((len(states), len(obs)))

    # for t = 0

    alpha_matrix[:, 0] = pi * dens[0]
    scaling_factor[0] = 1 / np.sum(alpha_matrix[:, 0], axis=0)
    alpha_matrix[:, 0] *= scaling_factor[0]

    # for t == 1:T
    for t in range(1, len(obs)):
        # NumbaPerformanceWarning: '@' is faster on contiguous arrays
        # probably needs alpha_matrix rearranged(?)
        x = alpha_matrix[:, t - 1] @ A

        alpha_matrix[:, t] = x * dens[t]
        scaling_factor[t] = 1 / np.sum(alpha_matrix[:, t], axis=0)
        alpha_matrix[:, t] *= scaling_factor[t]

    return alpha_matrix, scaling_factor

np.random.seed(41)
obs = np.concatenate(
    (np.random.normal(0, 1, size=500), np.random.normal(1.5, 1, size=500))
).reshape(-1, 1)
N = 2  # number of hidden states
M = 3  # number of mixture components
states = np.arange(N)
pi = np.array([0.5, 0.5])  # initial probabilities
A = np.array([[0.8, 0.2], [0.3, 0.7]])  # transition matrix
mu = np.array([np.min(obs), np.median(obs), np.max(obs)])  # means of mixture components
sigma = np.array([1, 1, 1])  # variances of mixture components
w = np.array([[0.2, 0.3, 0.5], [0.6, 0.2, 0.2]])  # weights of mixture components

a_1, s_1 = alpha(obs, states, A, pi, w, mu, sigma)
a_2, s_2 = alpha_numba(obs, states, A, pi, w, mu, sigma)

assert np.allclose(a_1, a_2)
assert np.allclose(s_1, s_2)

t1 = timeit("alpha(obs, states, A, pi, w, mu, sigma)", number=100, globals=globals())
t2 = timeit(
    "alpha_numba(obs, states, A, pi, w, mu, sigma)", number=100, globals=globals()
)
print(t1 / 100)
print(t2 / 100)

字符串
打印在我的机器(AMD 5700 x):

0.005626231823116541
0.0003328173188492656


这是~ 17倍加速。

相关问题