使用MongoDB Streams监视文档创建

5gfr0r5j  于 2022-11-22  发布在  Go
关注(0)|答案(2)|浏览(220)

我想使用MongoDB流在每次将特定类型的数据插入到集合中时触发一个事件。
我已经找到了一些与我所寻找的内容大致相似的东西,但这只适用于变更流,而不适用于插入。
你知道我该怎么做吗
我使用Mongodb驱动程序和Nodejs来完成这一工作,因此我的代码如下所示:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {

const db = client.db('mydb');

// Connect using MongoClient
var filter = [{
    $match: {
        $or: [
            { $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
    }
}];

var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data => 
  {
    console.log(data);
  });
});
  • 是否需要在筛选器中指定operationType
  • 我还需要获取fullDocument,但显然updateLookup不是合适的工具,我应该使用什么?
  • 我可以为on事件使用哪些选项?我使用了create,但我甚至不确定它是否存在,是吗?

很抱歉所有这些问题,但我正在努力找到一些答案,在官方文件。
解决方案:
注意不要忘记在您的请求中的fullDocument;- )

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll);

  const insert_pipeline = [ { $match:
                    { 
                        operationType: 'insert',
                        $or: [
                            { "fullDocument.receipt.receiver": "newdexpocket" },
                            { "fullDocument.act.account": "newdexpocket" }
                        ]
                    }
                }];

  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    });
}

async function run(uri) {
    try {
        con = await MongoClient.connect(uri, {"useNewUrlParser": true});
        watch_insert(con, 'EOS', 'action_traces');
    } catch (err) {
        console.log(err);
    }
}
gojuced7

gojuced71#

您需要:
1.指定operationType: 'insert'。因为您不想监视更新,所以不需要updateLookup
1.为包含operationType的筛选器创建一个正确的aggregation pipeline
1.汇总管缐会筛选watch()传回的文件。“变更事件”页面中有一个输出范例。
watch()会传回ChangeStream。它会引发closechangeenderror事件。如需详细信息,请参阅ChangeStream
下面是一个完整的更改流示例,它侦听数据库test集合test上的insert操作。它将输出包含字段{a: 1}'fullDocument.a': 1)的文档,并将忽略更新、a的其他值的插入或没有字段a的任何内容。

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'

const insert_pipeline = [
  {$match: {operationType: 'insert', 'fullDocument.a': 1}}
]

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_insert(con, 'test', 'test')
}

run()
8xiog9wr

8xiog9wr2#

# 适用 于 macOS 用户 的 解决 方案 , 使用 自制 软件

你 可以 通过 寻找 一些 要点 的 解决 方案 来 适应 你 的 变体
1.停止 mongo 服务 :x1月 1 日
1.编辑 /opt/homebrew/etc/mongod.conf
1.添加 此行 :

replication:
       replSetName: "rs0"

中 的 每 一 个
1.保存 文件 并 打开 mongo shell :mongo
1.执行 此 指令 :rs.initiate()
1.退出 mongo shell :exit
1.启动 mongo 服务 :brew services start mongodb-community
1.在 mongo 客户 端 配置 中 , 添加 以下 选项 :replicaSet=rs0
并且 " The $changeStream stage is only supported on replica sets because I'm not using a replicaSet " 错误 将 消失 !

相关问题