pytorch 在Python中使用GPU在大型数组上执行tan()和arctan()的最快方法?

mtb9vblg  于 2023-10-20  发布在  Python
关注(0)|答案(2)|浏览(105)

这本质上是this question的扩展。这里提供的答案是使用数学库来实现函数。不幸的是,数学库only works on scalars,而不是数组。当我尝试将njit与np.tan()和np.arctan()一起使用时,它使用的是我的CPU,而不是我的GPU:

import numpy as np
import numba as nb

@nb.njit(fastmath=True, parallel=True)
def f1(a):
    np.tan(a)
    return np.arctan(a)

我有一个大的数组(3 mil),我需要在上面执行tan()和arctan()。我有8000万行数据要处理,但我可以一次批处理一些数据以将其放入内存中,并且我可以将这些80 mil划分为多个CPU/GPU作业以帮助并行化。如果GPU速度更快,我想使用GPU来实现所有这些功能。Numba会为此工作吗?像PyTorch这样的东西会在这里工作吗?或者会有太多的开销吗?在大型数组上执行递归函数的最快方法是什么?
编辑:这些组成每行的3 mil列数组是在运行时动态生成的。我可以一次批处理多达20个这样的函数,而不会耗尽内存,然后为2个函数发送整个(20,3 mil)数组并保存结果(从而为下一批20个函数释放内存)。所以我只是在寻找在(20,3 mil)数组上执行tan()和arctan()的最快方法,我将一次循环20行数据。CPU还是GPU?Numba还是PyTorch?@njit还是@vectorize?

iswrvxsc

iswrvxsc1#

我将向您展示几个实现。在这篇文章的最后有一些基准测试的代码,所以请自己尝试一下,看看哪一个最适合你。
这是使用numba jit的基本实现。

import math
import numba as nb
import numpy as np

@nb.njit("float64[:](float64[:])", parallel=True)
def numba_jit_cpu(a):
    out = np.empty_like(a)
    for i in nb.prange(a.shape[0]):
        out[i] = math.atan(a[i])
    return out

关于你的第一个问题,关键是使用math模块,而不是numpy。
下面是20* 300万个元素的基准测试结果。

numba_jit_cpu: 121 ms

有400万批,所以总共需要大约5.6天。
然而,实际上,超过一半的时间花在分配out上,这是存储输出的地方。因此,我们可以让函数接受out作为参数并写入它,就像许多numpy API一样。

@nb.njit("void(float64[:],float64[:])", parallel=True)
def numba_jit_cpu_buf(a, out):
    for i in nb.prange(a.shape[0]):
        out[i] = math.atan(a[i])
numba_jit_cpu_buf: 51 ms

现在总共需要2.4天。这是我目前能提供的最快的方法。
作为比较,让我们比较分配和写入相同大小的内存所需的时间。

def single_thread_allocation(a):
    return np.ones_like(a)

def single_thread_write(a, x):
    x[:] = a
single_thread_allocation: 87 ms
single_thread_write: 27 ms

正如你所看到的,numba_jit_cpu_buf比分配快,只比写入慢2倍。如果你想要一个比这更快的方法,你必须优化每一个内存分配和写入。这种优化是必须的。
但既然我已经尝试过了,这里有一些其他的方法。第一个是使用numba的vectorize。

@nb.vectorize("float64(float64)", target="parallel", nopython=True)
def numba_vectorize_cpu(a):
    return math.atan(a)

@nb.vectorize("float64(float64)", target="cuda")
def numba_vectorize_cuda(a):
    return math.atan(a)

请注意,要使用cuda选项,您需要安装cudatoolkit。
这些都很容易实现,但最有效的基于数字的解决方案之一。

numba_vectorize_cpu: 101 ms
numba_vectorize_cuda: 213 ms

它在GPU上实际上较慢的原因肯定是因为数据传输是瓶颈。下面你可以看到numba需要多长时间转移。

from numba import cuda

def numba_copy_only(a):
    ac = cuda.to_device(a)
    return ac.copy_to_host()
numba_copy_only: 188 ms

请记住,仅传输数据的时间就比numba_jit_cpu_buf长三倍多。如果你的数据是float32,还有可能,但如果是float64,就没有机会了。
让我们也试试pytorch解决方案。

import torch

class Model(torch.nn.Module):
    def forward(self, x):
        return torch.atan(x)

def pytorch_cpu(model, a):
    x = torch.from_numpy(a)
    return model(x).numpy()

def pytorch_cuda(model, a):
    x = torch.from_numpy(a).to("cuda")
    return model(x).cpu().numpy()

def pytorch_copy_only(a):
    x = torch.from_numpy(a).to("cuda")
    return x.cpu().numpy()
pytorch_cpu: 112 ms
pytorch_cuda: 186 ms
pytorch_copy_only: 180 ms

没多大区别
应该注意的是,理论上可以通过利用DataLoader的特性(如预取和固定内存)来使其更快。然而,这并不容易,你可能会发现自己正在进入兔子洞。请记住,只要有一个额外的内存分配或两个写入,它就会比numba_jit_cpu_buf慢。
下面是剩下的benchmark代码

import timeit

def baseline(a):
    return np.arctan(a)

def benchmark():
    rng = np.random.default_rng(0)
    batch = rng.random((20, 3 * 1000 * 1000), dtype=np.float64) * np.pi - np.pi / 2

    expected = baseline(batch)

    n = 100
    elapsed = timeit.timeit(lambda: baseline(batch), number=n) / n
    print(f"baseline: {elapsed*1000:.0f} ms")

    elapsed = timeit.timeit(lambda: single_thread_allocation(batch), number=n) / n
    print(f"single_thread_allocation: {elapsed * 1000:.0f} ms")

    buf = np.zeros_like(batch)
    elapsed = timeit.timeit(lambda: single_thread_write(batch, buf), number=n) / n
    print(f"single_thread_write: {elapsed * 1000:.0f} ms")

    assert np.allclose(expected.ravel(), numba_jit_cpu(batch.ravel()))
    elapsed = timeit.timeit(lambda: numba_jit_cpu(batch.ravel()), number=n) / n
    print(f"numba_jit_cpu: {elapsed * 1000:.0f} ms")

    buf = np.zeros_like(batch)
    numba_jit_cpu_buf(batch.ravel(), buf.ravel())
    assert np.allclose(expected, buf)
    elapsed = timeit.timeit(lambda: numba_jit_cpu_buf(batch.ravel(), buf.ravel()), number=n) / n
    print(f"numba_jit_cpu_buf: {elapsed * 1000:.0f} ms")

    assert np.allclose(expected, numba_vectorize_cpu(batch))
    elapsed = timeit.timeit(lambda: numba_vectorize_cpu(batch), number=n) / n
    print(f"numba_vectorize_cpu: {elapsed * 1000:.0f} ms")

    assert np.allclose(expected, numba_vectorize_cuda(batch))
    elapsed = timeit.timeit(lambda: numba_vectorize_cuda(batch), number=n) / n
    print(f"numba_vectorize_cuda: {elapsed * 1000:.0f} ms")

    elapsed = timeit.timeit(lambda: numba_copy_only(batch), number=n) / n
    print(f"numba_copy_only: {elapsed * 1000:.0f} ms")

    model = Model().eval()
    assert np.allclose(expected, pytorch_cpu(model, batch))
    elapsed = timeit.timeit(lambda: pytorch_cpu(model, batch), number=n) / n
    print(f"pytorch_cpu: {elapsed * 1000:.0f} ms")

    model = model.to("cuda")
    assert np.allclose(expected, pytorch_cuda(model, batch))
    elapsed = timeit.timeit(lambda: pytorch_cuda(model, batch), number=n) / n
    print(f"pytorch_cuda: {elapsed * 1000:.0f} ms")

    elapsed = timeit.timeit(lambda: pytorch_copy_only(batch), number=n) / n
    print(f"pytorch_copy_only: {elapsed * 1000:.0f} ms")

if __name__ == "__main__":
    benchmark()
wh6knrhe

wh6knrhe2#

从磁盘上阅读720 TB将是瓶颈,如果您所做的唯一计算是arctan。最好将arctan嵌入到产生720 TB数据的算法中,或者将真正使用该数据的算法中。否则,仅为arctan流式传输720 TB将破坏CPU的所有缓存,在SSD上检查并消耗大量能源。但是,在数据生成算法的最后一行嵌入arctan将避免上述所有负面部分,除非必须更改现有代码。
假设arctan仅包含约30 FLOP,则每4字节仅需要30 FLOP,或者对于4 GB/s仅需要30 GFLOP/s。如果你有一个额定为4GB/s串行阅读性能的SSD,那么你只需要一个30 GFLOP/s的CPU核心。当前的处理器为并行代码提供约5倍的性能,为标量版本提供约0.5倍的性能。可能需要第二个CPU核心,CPU上的Numba已经很好了。

相关问题