mongodb 在节点应用程序中序列化SQS fifo队列的消费者

8cdiaqws  于 8个月前  发布在  Go
关注(0)|答案(2)|浏览(81)

我试图在SQS队列的消费者中实现序列化,SQS队列在多个nodeJs示例中运行。也就是说,我试图做的是确保内容以相同的顺序完成,一次一件事。我最初的想法是在documentDb(mongo)中放置某种可能使用锁文档的信号量机制,这是服务的主存储库,通过使用变更流机制等待对该文档的更改。每个示例都有自己的标识符,锁文件将包含拥有锁的人的ID。不确定这是否是最好的方法,所以我想问,是否有更好的方法来实现这一点?或者稍微修改一下就可以了?我主要担心的是可能会挨饿的情况,这将违背所有这一切的目的,也可以放置一个顺序计数器,每个示例跟踪它们在顺序中的距离,并且当它们试图获取锁时,它们可以将它们的顺序计数器与锁上有什么,如果他们落后了,就会继续等待。虽然这看起来很脆弱,如果一个示例没有完成序列步骤就死亡了,可能会拖延一切。有什么想法吗?
代码看起来像这样

const LockSchema = new Schema({
  id: String,
  owner: String,
  createdAt: { type: Date, required: true, default: Date.now },
});

LockSchema.index({ createdAt: 1 }, { expireAfterSeconds: 600 });

const Lock = mongoose.model("queue-lock", LockSchema, "locks");

const filter = [{
    $match: {
            { operationType: "delete" }
        }
    }];

const options = { fullDocument: 'updateLookup' };

let lockStream = Lock.watch(filter, options)

lockStream.on('change', event => {
  if (!event.fullDocument) {// deleted
    result = await Lock.findOneAndUpdate(
      { id: "migration-lock" },
      { $setOnInsert: { owner: whoAmI } },
      { upsert: true, new: true }
    ).lean();
    if (result.owner === whoAmI) {
      // proud owner of lock, do stuff
      await Lock.deleteOne({ id: "queue-lock" }); // when done

    } else {
      // keep waiting
    }
  }

});

字符串

qoefvg9y

qoefvg9y1#

我想做的是确保这些东西是按照它们进来的顺序来做的,一次做一件事。
可以使用FIFO queues
发送到FIFO队列中同一组的消息将按照发送时的顺序读取。在已请求的消息从队列中删除之前,不会再发送来自同一组的消息。
我主要担心的是示例可能会变得饥饿,这将破坏所有这些的目的,因为如果幸运的话,一个示例可能会连续两次获得锁。
您是否关心消息的顺序或哪个示例处理什么?
同一队列中同一组中的消息假定什么处理它们无关紧要。
如果你有本质上不同的示例(具有不同的计算能力,不同的规格等),那么你应该将你的消息路由到不同的队列中。
如果处理的顺序很重要,那么您应该向消息引入一个偶然的依赖关系(仅在处理了后续处理消息的发起者之后才发布后续处理消息)。

d4so4syb

d4so4syb2#

对于任何人正面临着同样的问题,我将张贴在答案部分(因为它对我有用).经过更多的研究,我决定利用一个名为node-raft-redis的现有npm包它让你有示例决定在它们之间投票,并选出一个“领导者”,然后在我的情况下,它将是唯一一个消费消息的示例.这些示例做的工作比这多得多,因此,如果一个示例比其他示例做得多一点,那么它将使事情发生很小的偏差。

相关问题