numpy Python中复杂矩阵计算的矢量化

fkaflof6  于 2023-11-18  发布在  Python
关注(0)|答案(3)|浏览(120)

我有一个3x 3的矩阵,我可以计算如下

e_matrix = np.array([m[i]*m[j]-n[i]*n[j] for i in range(3) for j in range(3)])

字符串
其中mn都是长度为3的向量。
现在假设mn是形状为(K,3)的矩阵,我想做一个类似于上面的计算,得到一个形状为(3,3,K)e_matrix
我意识到我可以做一个天真的方法,例如。

e_matrix = np.zeros((3,3,K))

for k in range(K):
    e_matrix[:,:,k] = np.array([m[k,i]*m[k,j]-n[k,i]*n[k,j] for i in range(3) for j in range(3)])


是否存在矢量化方法?理想情况下,它可以与Numba/JIT编译一起工作(所以没有,例如np.tensordot)。

a0zr77ik

a0zr77ik1#

通常在raw numpy中通过broadcasting

K = 4
m = np.random.rand(K, 3)
n = np.random.rand(K, 3)

# Shapes become (3, K)
m, n = m.T, n.T

# Shapes become:
# (3, 1, K)
# (1, 3, K)
# ========= *
# (3, 3, K)
out = m[:, None] * m[None, :] - n[:, None] * n[None, :]

字符串
我想这和伦巴是兼容的,但我不太熟悉...

ryoqjall

ryoqjall2#

这是使用numba的“矢量化”变体。

import numpy as np
import numba

@numba.guvectorize("(k),(k)->(k,k)", nopython=True)
def nb_vec_impl(m, n, out):
    for i in range(3):
        for j in range(3):
            out[i, j] = m[i] * m[j] - n[i] * n[j]

def nb_vec(m, n):
    K = len(m)
    e_matrix = np.zeros((3, 3, K))  # Notice the position of K.
    # The matrix you want is (3,3,K), but this function accepts (K,3,3).
    # Instead of transposing the results, the same thing can be done by passing a transposed buffer.
    nb_vec_impl(m, n, e_matrix.T)
    return e_matrix

字符串
当K非常大(例如,100,000)时,该函数很快,但不幸的是,当K很小时,由于开销很大,该函数非常慢。
如果性能是你的优先考虑,我更喜欢一个简单的jit函数。

import numpy as np
import numba

@numba.njit
def nb_jit(m, n):
    K = len(m)
    e_matrix = np.zeros((3, 3, K))

    for k in range(K):
        for i in range(3):
            for j in range(3):
                e_matrix[i, j, k] = m[k, i] * m[k, j] - n[k, i] * n[k, j]

    return e_matrix


基准:

import timeit

import numba
import numpy as np

def native(m, n):
    K = len(m)
    e_matrix = np.zeros((3, 3, K))

    for k in range(K):
        e_matrix[:, :, k] = np.array([[m[k, i] * m[k, j] - n[k, i] * n[k, j] for i in range(3)] for j in range(3)])

    return e_matrix

def broadcast(m, n):
    m, n = m.T, n.T
    return m[:, None] * m[None, :] - n[:, None] * n[None, :]

@numba.njit
def broadcast_nb(m, n):
    m, n = m.T, n.T
    return m[:, None] * m[None, :] - n[:, None] * n[None, :]

@numba.njit
def func2(m, n):
    m, n = m.T, n.T
    x, y = m.shape
    return m.reshape(x, 1, y) * m.reshape(1, x, y) - n.reshape(1, x, y) * n.reshape(x, 1, y)

@numba.njit
def nb_jit(m, n):
    K = len(m)
    e_matrix = np.zeros((3, 3, K))

    for k in range(K):
        for i in range(3):
            for j in range(3):
                e_matrix[i, j, k] = m[k, i] * m[k, j] - n[k, i] * n[k, j]

    return e_matrix

@numba.guvectorize("(k),(k)->(k,k)", nopython=True)
def nb_vec_impl(m, n, out):
    for i in range(3):
        for j in range(3):
            out[i, j] = m[i] * m[j] - n[i] * n[j]

def nb_vec(m, n):
    K = len(m)
    e_matrix = np.zeros((3, 3, K))
    nb_vec_impl(m, n, e_matrix.T)  # numba can accept transposed arrays.
    return e_matrix

def main():
    rng = np.random.default_rng(0)
    K = 100_000
    # K = 100
    m = rng.random((K, 3))
    n = rng.random((K, 3))

    candidates = [native, broadcast, broadcast_nb, func2, nb_jit, nb_vec]

    expected = native(m, n)
    # expected = broadcast(m, n)

    for f in candidates:
        assert np.array_equal(f(m, n), expected), f"{f.__name__}"
        n_run = 10
        elapsed = timeit.timeit(lambda: f(m, n), number=n_run) / n_run
        print(f"{f.__name__}: {elapsed * 1000:.3f} ms")

if __name__ == "__main__":
    main()


结果(K = 100_000):

native: 778.374 ms
broadcast: 13.581 ms
broadcast_nb: 2.848 ms
func2: 6.207 ms
nb_jit: 1.998 ms
nb_vec: 1.891 ms


结果(K = 100):

native: 0.798 ms
broadcast: 0.016 ms
broadcast_nb: 0.004 ms
func2: 0.007 ms
nb_jit: 0.002 ms
nb_vec: 0.127 ms

p4rjhz4m

p4rjhz4m3#

您可以通过更改[:, None]的.来使Chrysophylaxs' answer与numba(<0. 57)njit兼容,以使用ndarray.reshape()代替,因为这是supported by numba

import numpy as np
import numba as nb

K = 4
m = np.random.rand(K, 3)
n = np.random.rand(K, 3)

m, n = m.T, n.T

def func(m, n):
    return m[:, None] * m[None, :] - n[:, None] * n[None, :]

@nb.njit
def func2(m, n):
    assert m.shape == n.shape
    x, y = m.shape
    return m.reshape(x, 1, y) * m.reshape(1, x, y) - n.reshape(
        1, x, y
    ) * n.reshape(x, 1, y)

assert np.array_equal(func(m, n), func2(m, n))
%timeit func(m, n)
%timeit func2(m, n)

字符串
输出量:

3.02 µs ± 50 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
661 ns ± 7.64 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)

相关问题