nestjs-kafka有效负载过滤

643ylb08  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(572)

我将nestjs与kafkajs结合使用,我有两个微服务订阅相同的eventpattern,但是其中一个微服务只需要eventpattern的单一负载类型,例如:
事件模式::点

value: {
    jobType: "job1",
    otherStuff: {}
}

value: {
    jobType: "job2",
    otherStuff: {}
}

value: {
    jobType: "job3",
    otherStuff: {}
}

然而,在microservice1中,我只关心dothing->job1
需要dothing->job2、job3等。
有没有一种方法可以让我把这个过滤放进nest,而不是一个简单的if一旦消费完成。
欢迎任何建议。

kgsdhlau

kgsdhlau1#

看起来所有eventpattern对象都属于一个主题。
为不同的微服务维护不同的使用者组,并让微服务决定要处理的消息(使用kafkajs,示例如下..)

const consumer = kafka.consumer({ groupId: 'microservice-1' })
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
            if( shouldProcess(message.value) ) {
                // process them
            }
        })
    },
})

您可能需要将消息转换为json并检查 event.jobType 并进行相应的处理。
如果你想避免 if 对于每种作业类型,都有一个微服务感兴趣的作业类型的白名单(或者)使用正则表达式来匹配作业类型。
对另一个微服务重复相同的操作,除了 group.id

相关问题