如何使用kafka节点从一个主题开始获取消息?

xa9qqrwz  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(424)

我按照这个教程在ubuntu14.04服务器上安装了kafka。为生产者和消费者提供的例子效果良好。
下面是producer命令:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

下面是consumer命令:

~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning

但是,我尝试在node.js中编写一个类似的使用者,它不会显示在创建使用者之前主题中存在的消息。这是我的密码。需要什么额外的配置来完成我的尝试?

var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("localhost:2181/")
var consumer = new Consumer(
    client,
    [
      { topic: 'TutorialTopic', partition: 0, offset: 0}
    ],
    {
      fromOffset: true
    }
  );

consumer.on('message', function (message) {
  console.log("received message", message);
});
bq3bfh9z

bq3bfh9z1#

我想现在发生的是以前的事 consumer.on(...) 你的名字叫什么 kafka-node 已经读过了。相反,请尝试以下方法:

var consumer = new Consumer(
  client,
  [],
  {fromOffset: true}
);

consumer.on('message', function (message) {
  console.log("received message", message);
});

consumer.addTopics([
  { topic: 'TutorialTopic', partition: 0, offset: 0}
], () => console.log("topic added"));
pw9qyyiw

pw9qyyiw2#

使用node v6.11.0和kafka node 1.6.2时,上述内容对我来说不起作用
以下代码没有:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client("localhost:2181/"),
    consumer = new Consumer(
         client,
        [
              { topic: 'TutorialTopic', partition: 0, offset: 0 }
        ],
        { fromOffset: true }         
    );

 consumer.on('message', function (message) 
 {
     console.log(message);
 });

 consumer.on('error', function (err) 
{
    console.log('ERROR: ' + err.toString());
});
vyswwuz2

vyswwuz23#

Add fromOffset to earliest in consumer config  

var consumer = new Consumer(
client,
[
  { topic: 'TutorialTopic', partition: 0, offset: 0}
],
{
  fromOffset: 'earliest',
});

相关问题