c++ 如何改进cwiseProduct操作?

dhxwm5r4  于 2023-01-22  发布在  其他
关注(0)|答案(2)|浏览(817)

我在代码中多次调用了这个函数:

void Grid::computeFVarsSigma(const int DFAType,
                                const Matrix& D_sigma,
                                const Matrix& Phi,
                                const Matrix& DPhiDx,
                                const Matrix& DPhiDy,
                                const Matrix& DPhiDz,
                                Matrix& Rho,
                                Matrix& DRhoDx,
                                Matrix& DRhoDy,
                                Matrix& DRhoDz)
{
    // auto PhiD = Phi * D_sigma;
    Rho = ((Phi * D_sigma).cwiseProduct(Phi)).rowwise().sum();

    if (DFAType == 1)
    {
        DRhoDx = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDx)).rowwise().sum();
        DRhoDy = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDy)).rowwise().sum();
        DRhoDz = 2. * ((Phi * D_sigma).cwiseProduct(DPhiDz)).rowwise().sum();
    }
}

对于我进行基准测试的用例,输入数组具有以下维度:

D_sigma     42 42
Phi     402264 42
DPhiDx  402264 42
DPhiDy  402264 42
DPhiDz  402264 42

这个函数被调用12次的平均时间是0.621 seconds,用std::chrono::high_resolution_clock测量。我在用g++ 7.5.0编译的AMD锐龙5中运行这些计算。我可以提升编译器版本,但我现在最感兴趣的是代码优化。
我想探索的一个想法是将DRhoDx、DRhoDy和DRhoDz的cwiseProduct计算直接存储在一个3xNGridPoints矩阵中,但我还不知道如何实现。
是否有任何其他的操作,我可以尝试改善这一功能?
感谢您的评分
我想感谢@chatz和@Homer512提供的非常好的建议。我对@chatz提出的一行优化非常满意,然而,@Homer512建议在性能上有了巨大的变化,如下图所示(特别感谢@Homer512!)。我一定会使用这两个建议作为改进代码其他部分的起点。
注意,我使用的是double,在下图中,return param和return tuple分别代表将输出作为元组和参数返回的同一个函数。x1c 0d1x

bzzcjhmw

bzzcjhmw1#

我将逐步进行优化。首先,我们建立一个基本值。
你没有给予你的Matrix类型一个类型定义。所以我把它定义为Eigen::MatrixXf。同样,为了我自己的理智,我重新定义了各种Rho向量。注意,Eigen偶尔会优化向量的代码路径,而矩阵恰好是向量。所以这样做是一个好主意,而且它使阅读代码更容易。

using Matrix = Eigen::MatrixXf;
using Vector = Eigen::VectorXf;

namespace {
void compute(const Matrix& Phi, const Matrix& D_sigma, const Matrix& DPhi,
             float factor, Vector& Rho)
{
    Rho = (Phi * D_sigma).cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    compute(Phi, D_sigma, Phi, 1.f, Rho);
    if (DFAType == 1) {
        compute(Phi, D_sigma, DPhiDx, 2.f, DRhoDx);
        compute(Phi, D_sigma, DPhiDy, 2.f, DRhoDy);
        compute(Phi, D_sigma, DPhiDz, 2.f, DRhoDz);
    }
}

@chtz提出的第一个优化是为此缓存矩阵乘法. Don't use auto,如Eigen文档中所述。

namespace {
void compute(const Matrix& PhiD, const Matrix& DPhi, float factor, Vector& Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Matrix PhiD = Phi * D_sigma;
    compute(PhiD, Phi, 1.f, Rho);
    if (DFAType == 1) {
        compute(PhiD, DPhiDx, 2.f, DRhoDx);
        compute(PhiD, DPhiDy, 2.f, DRhoDy);
        compute(PhiD, DPhiDz, 2.f, DRhoDz);
    }
}

现在在我的系统上这是3. 15倍的速度。
第二步是减少执行块操作所需的内存量,其思想非常简单:我们受到内存带宽的限制,特别是矩阵-矩阵乘积比较“薄”,而且它对后面的步骤也有帮助。
这里我选择了一个384行的块大小。我的经验法则是输入和输出应该适合L2缓存(128-256 kB,可能由2个线程共享),并且它应该是16的倍数,以便在整个板上进行良好的矢量化。384 rows * 42 columns * 4 byte per float = 64 kiB。根据其他标量类型的需要进行调整,但从我的测试来看,它实际上不是很敏感。
注意使用Eigen::Ref或适当的模板以避免复制,正如我在compute helper函数中所做的那样。

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.rows(), blocksize = 384;
    Rho.resize(n);
    if(DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
    Matrix PhiD;
    for(Eigen::Index i = 0; i < n; i += blocksize) {
        const Eigen::Index cur = std::min(blocksize, n - i);
        PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
        compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
        if (DFAType == 1) {
            compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
                    DRhoDx.segment(i, cur));
            compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
                    DRhoDy.segment(i, cur));
            compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
                    DRhoDz.segment(i, cur));
        }
    }
}

这是又一次加速,提高了1.75倍。
现在我们有了这个,我们可以很容易地并行化。Eigen可以在内部并行化矩阵-矩阵乘法,但不能在外部并行化其余的,所以我们都在外部并行化。分块版本工作得更好,因为它可以让所有线程一直处于忙碌状态,而且它可以更好地利用系统的L2缓存容量。用-fopenmp编译

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).rowwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.rows(), blocksize = 384;
    Rho.resize(n);
    if(DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
#   pragma omp parallel
    {
        Matrix PhiD;
#       pragma omp for nowait
        for(Eigen::Index i = 0; i < n; i += blocksize) {
            const Eigen::Index cur = std::min(blocksize, n - i);
            PhiD.noalias() = Phi.middleRows(i, cur) * D_sigma;
            compute(PhiD, Phi.middleRows(i, cur), 1.f, Rho.segment(i, cur));
            if (DFAType == 1) {
                compute(PhiD, DPhiDx.middleRows(i, cur), 2.f,
                        DRhoDx.segment(i, cur));
                compute(PhiD, DPhiDy.middleRows(i, cur), 2.f,
                        DRhoDy.segment(i, cur));
                compute(PhiD, DPhiDz.middleRows(i, cur), 2.f,
                        DRhoDz.segment(i, cur));
            }
        }
    }
}

有趣的是,这并没有给我的系统带来巨大的好处,在8核/ 16线程的情况下,只有1.25倍的好处。我还没有调查实际的瓶颈是什么。我猜是我的主内存带宽。每核带宽较低和/或每节点带宽较高的系统(Xenons、Threadrippers)可能会受益更多。
最后一个建议,但那是情境:转置Phi和DPhiDx/y/z矩阵。这允许对以列为主的矩阵(例如used by Eigen)进行两个进一步的优化:
1.一般的矩阵-矩阵乘法以A.transpose() * B的形式表示时速度最快,将Phi中的元素转置后可以写成PhiD = D_sigma.transpose() * Phi
1.列缩减比行缩减快,但列数非常少的情况除外,如MatrixX4f

namespace {
void compute(const Matrix& PhiD, const Eigen::Ref<const Matrix>& DPhi,
             float factor, Eigen::Ref<Vector> Rho)
{
    Rho = PhiD.cwiseProduct(DPhi).colwise().sum() * factor;
}
} /* namespace anonymous */

void computeFVarsSigma(const int DFAType, const Matrix& D_sigma,
        const Matrix& Phi, const Matrix& DPhiDx, const Matrix& DPhiDy,
        const Matrix& DPhiDz, Vector& Rho, Vector& DRhoDx, Vector& DRhoDy,
        Vector& DRhoDz)
{
    const Eigen::Index n = Phi.cols(), blocksize = 384;
    Rho.resize(n);
    if(DFAType == 1)
        for(Vector* vec: {&DRhoDx, &DRhoDy, &DRhoDz})
            vec->resize(n);
#   pragma omp parallel
    {
        Matrix PhiD;
#       pragma omp for nowait
        for(Eigen::Index i = 0; i < n; i += blocksize) {
            const Eigen::Index cur = std::min(blocksize, n - i);
            PhiD.noalias() = D_sigma.transpose() * Phi.middleCols(i, cur);
            compute(PhiD, Phi.middleCols(i, cur), 1.f, Rho.segment(i, cur));
            if (DFAType == 1) {
                compute(PhiD, DPhiDx.middleCols(i, cur), 2.f,
                        DRhoDx.segment(i, cur));
                compute(PhiD, DPhiDy.middleCols(i, cur), 2.f,
                        DRhoDy.segment(i, cur));
                compute(PhiD, DPhiDz.middleCols(i, cur), 2.f,
                        DRhoDz.segment(i, cur));
            }
        }
    }
}

这又带来了1.14倍的加速。如果内部维度从42增长到接近100或1000,并且如果上面的瓶颈不那么明显,我会假设一些更大的优势。

通过分解改进

对于(Phi * D_sigma).cwiseProduct(Phi).rowwise().sum()的情况,可以使用一个简单的技巧:
p为Phi的行向量,SD_sigmad为这一行的标量结果。

d = p * S * p'

如果S是半正定的,我们可以使用LDLT分解:

S = P' * L * D * L' * P

分解为置换矩阵X1 M15 N1 X、下三角矩阵X1 M16 N1 X和对角矩阵X1 M17 N1 X。
由此得出:

d = p * P' * L * D * L' * P * p'
d = (p * P') * (L * sqrt(D)) * (sqrt(D) * L') * (P * p')
d = ||(P * p) * (L * sqrt(D))||^2

(P * p)是一个简单的置换。(L * sqrt(D))是另一个快速而简单的运算,因为D只是一个对角矩阵。(P * p)向量与(L * sqrt(D))矩阵的最终乘法也比以前便宜,因为L是一个三角矩阵。所以你可以使用Eigen的triangularView<Eigen::Lower>来保存运算。
由于分解可能会失败,您必须提供原始方法作为后备。

izkcnapc

izkcnapc2#

设M=402264,N=42,那么Phi*D_sigma乘积需要M*N² FMA运算,cwiseProduct需要和M*N FMA运算。如果你只计算Phi * D_sigma一次,你可以保护一些重要的工作,但你需要实际评估结果,例如:

Matrix PhiD = Phi * D_sigma;  // DO NOT USE `auto` HERE!
 Rho = PhiD.cwiseProduct(Phi).rowwise().sum();
 if(...) // etc

相关问题