为什么csr_matrix片A[index,:]在scipy中的速度这么快?

pkmbmrz7  于 2022-11-10  发布在  其他
关注(0)|答案(2)|浏览(154)

现在,我需要在C++中使用Eigen3.3.8来实现稀疏矩阵切片,但是我无法像Python中的scipy那样快速地实现。
例如,A是一个形状稀疏矩阵(1000000,1000000),index是我想从A中选择的列表。
当我这样写的时候:

A=A[index,:]

它花费了0.5114s。
当我这样写的时候:

def extractor(indices,N)::
    indptr=np.arange(len(indices)+1)
    data=np.ones(len(indices))
    shape=(len(indices),N)
    return sparse.csr_matrix((data,indices,indptr), shape=shape)
A=extractor(index,A.shape[0])*A

它花费3.381s。(这个想法来自Sparse matrix slicing using list of int)。
那么如何实现快速的CSR矩阵切片呢?

n3h0vuf2

n3h0vuf21#

对于一个更小的矩阵,我的提取器代码会更慢,但只有2倍。

In [4]: M = sparse.random(1000,1000,.1,'csr')

In [5]: M
Out[5]: 
<1000x1000 sparse matrix of type '<class 'numpy.float64'>'
    with 100000 stored elements in Compressed Sparse Row format>

In [6]: idx = np.arange(100)

In [7]: M[idx,:]
Out[7]: 
<100x1000 sparse matrix of type '<class 'numpy.float64'>'
    with 10116 stored elements in Compressed Sparse Row format>

In [8]: timeit M[idx,:]
173 µs ± 514 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

提取器:

In [10]: def extractor(indices,N):
    ...:     indptr=np.arange(len(indices)+1)
    ...:     data=np.ones(len(indices))
    ...:     shape=(len(indices),N)
    ...:     return sparse.csr_matrix((data,indices,indptr), shape=shape)

nnz匹配:

In [13]: extractor(idx, M.shape[0])*M
Out[13]: 
<100x1000 sparse matrix of type '<class 'numpy.float64'>'
    with 10116 stored elements in Compressed Sparse Row format>

稍微慢一点,但没有你的例子那么快。

In [14]: timeit extractor(idx, M.shape[0])*M
302 µs ± 1.02 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

这比我5年前测试的要大。
因为我还有其他事情要做,所以我不打算尝试测试其他情况--更大的M和更长的idx
欢迎您探索sparse代码,看看他们是否改变了计算,可能以各种方式简化了它,我的重建没有捕捉到。

kt06eoxx

kt06eoxx2#

正如我之前在注解中提到的,你列出的两个方法是不等价的。带有索引列表的A[index]改变了输出的形状。它还可以置换或复制行。这可以像下面这样相当快地实现:

using CsrMatrixD = Eigen::SparseMatrix<double, Eigen::RowMajor>;

CsrMatrixD rowsFromCsr(const Eigen::Ref<const CsrMatrixD>& in,
                       const Eigen::Ref<const Eigen::VectorXi>& indices)
{
  using Triplet = Eigen::Triplet<double>;
  using InnerIterator = Eigen::Ref<const CsrMatrixD>::InnerIterator;
  std::vector<Triplet> triplets;
  int outrow = 0;
  for(int row: indices) {
    for(InnerIterator nonzero(in, row); nonzero; ++nonzero)
      triplets.emplace_back(outrow, nonzero.col(), nonzero.value());
    ++outrow;
  }
  CsrMatrixD rtrn(outrow, in.cols());
  rtrn.setFromTriplets(triplets.begin(), triplets.end());
  return rtrn;
}

如果索引是排序的,运行速度会更快,但在其他排列方式下仍然可以工作。InnerIterator可能是Eigen-3.4中的新功能。
第二个选项是与对角矩阵相乘,它保留了元素的原始形状和顺序。它可以像这样完成:

CsrMatrixD rowsFromCsr(const Eigen::Ref<const CsrMatrixD>& in,
                       const Eigen::Ref<const Eigen::VectorXi>& indices)
{
    Eigen::VectorXd diag = Eigen::VectorXd::Zero(in.rows());
    for(int i: indices)
      diag[i] = 1.;
    return diag.asDiagonal() * in;
}

这仍然是相当快的。遗憾的是,似乎没有一个版本使用稀疏向量作为对角线。在我的测试中,使用prune方法更快:

CsrMatrixD rowsFromCsr(const Eigen::Ref<const CsrMatrixD>& in,
                       const Eigen::Ref<const Eigen::VectorXi>& indices)
{
  std::vector<bool> bitmap(in.rows());
  for(int row: indices)
    bitmap[row] = true;
  CsrMatrixD rtrn = in;
  rtrn.prune([&bitmap](int row, int /*col*/, double /*value*/) noexcept -> bool {
    return bitmap[row];
  });
  rtrn.makeCompressed();
  return rtrn;
}

相关问题