mongodb NodeJS进程在处理大量记录后挂起

isr3a4wc  于 2023-05-17  发布在  Go
关注(0)|答案(1)|浏览(144)

我有一个 NodeJS 进程,代码片段如下。我尝试在内存中缓存450k事务(transactionObj)。

const transactionObj = {};
    const transactionCursor = target_db.collection("transaction").find();
     processTxns = 0; 
    for await (const transactionDoc of transactionCursor) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: "+processTxns)
    }

当我执行这段代码时,进程在处理了一定数量的记录后挂起,每次大约有16K或20K的不同。我试着使用Map,但同样的行为。我也试着通过论证

node --max-old-space-size=6144

但问题依然存在此代码在具有32GB RAM的Windows服务器上运行。我没有看到代码运行时的大量内存使用(少于50%)

kqlmhetl

kqlmhetl1#

如果这些操作是内存密集型的,那么您应该分块处理它们,一种选择是在查询中实现分页,并获得有限数量的事务并处理它。

const CHUNK_SIZE = 10;

async function* readChunksFromDB() {
  const numberOfTransactions = await target_db
    .collection("transaction")
    .count();
  const numberOfChunks = Math.ceil(numberOfTransactions / CHUNK_SIZE);

  for (let i = 0; i < numberOfChunks; i++) {
    const chunk = await target_db
      .collection("transaction")
      .find({ take: CHUNK_SIZE, skip: i * CHUNK_SIZE });

    yield chunk;
  }
}

async function processChunks() {
  const transactionObj = {};
  processTxns = 0;
  for await (const chunk of readChunksFromDB()) {
    for (const transactionDoc of chunk) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: " + processTxns);
    }
  }
}

或者使用具有自动背压机制的管道的Streams。流是NodeJS推荐的用于管理大量操作数据的解决方案:

const { Readable, Transform } = require("stream");

const transactionsStream = Readable.from(readChunksFromDB());
const processTransactionStream = new Transform({
  transform(chunk, encoding, callback) {
    const transactions = JSON.parse(chunk.toString());
    console.log(transactions);
    transactions
      .map((transactionDoc) => {
        if (transactionDoc.orderId && transactionDoc.orderId[0]) {
          const orderIdString = String(transactionDoc.orderId[0].oid);
          if (!orderIdString) {
            return transactionDoc;
          }

          const existingDoc = transactionObj[orderIdString];
          if (!existingDoc || existingDoc._id < transactionDoc._id) {
            transactionObj[orderIdString] = transactionDoc;
          }

          return transactionDoc;
        }
      })
      .forEach((transaction) => {
        console.log(transaction);
        this.push(JSON.stringify(transaction));
      });

    callback();
  },
});

transactionsStream.pipe(processTransactionStream).on("data", (chunk) => {
    console.log(chunk.toString())
});

在这两种情况下,您都必须管理不同的阵列以进行重复数据消除,但您将在性能方面获得很大提升。

https://blog.appsignal.com/2022/02/02/use-streams-to-build-high-performing-nodejs-applications.html

相关问题