nodejs:kafkajsprotocolerror:组成员支持的协议与现有成员的协议不兼容

eit6fx6z  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(866)

我尝试使用mongodb debezium连接器从kafka捕获数据,但尝试使用kafkajs读取数据时出错:

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

我正在使用docker图像来捕获数据。
以下是我的步骤:
启动zookeeper

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest

开始Kafka

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest

我已经用复制模式运行mongodb了
启动debezium kafka连接

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka  debezium/connect:latest

然后发布mongodb连接器配置

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'

有了这个,如果我运行一个watcher docker容器,我就能够在控制台中以json格式保存数据

docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test

但我想在应用程序中捕获这些数据,以便对其进行操作、处理并推送到elasticsearch。我用的是

https://github.com/tulios/kafkajs

但是当我运行消费代码时,我得到了一个错误。。下面是代码示例

//'use strict';

// clientId=connect-1, groupId=1

const { Kafka } = require('kafkajs')

const kafka = new Kafka({

  clientId: 'connect-1',

  brokers: ['localhost:9092', 'localhost:9093']

})

// Consuming

const consumer = kafka.consumer({ groupId: '1' })

var consumeMessage = async () => {

await consumer.connect()

await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })

await consumer.run({

  eachMessage: async ({ topic, partition, message }) => {

    console.log({

      value: message.value.toString(),

    })

  },

})

}

consumeMessage();

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
vwoqyblh

vwoqyblh1#

您不应该在connect和kafkajs消费者中使用相同的groupid。如果你这样做了,他们将是同一个消费群体的一部分,这意味着消息将只被一个或另一个消费,如果它甚至工作。
如果您将kafkajs消费者的groupid更改为独特的内容,它应该可以工作。
请注意,默认情况下,新的kafkajs消费组将从最新偏移量开始消费,因此它不会消费已经生成的消息。可以使用 fromBeginning 中的标志 consumer.subscribe 打电话。看到了吗https://kafka.js.org/docs/consuming#from-开始

相关问题