ruby kafka阅读所有消息并退出主题

ars1skjm  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(238)

我需要阅读来自kafka主题的所有消息,然后处理并退出(不需要永远像守护进程一样运行)。我写了一个如下的代码,如果topic中有消息,它就起作用了,如果topic是空的(或者没有提到组id的新消息),它会等到下一条消息到达,如果没有消息可以处理,我需要立即退出。请看看我的代码,并建议如果有任何更好的方法来实现这一点。我使用的是ruby kafka 1.3.0 gem

require 'kafka'
khost = 'xxx.xxx.xxx.xxx'
kport = 'xxxx'

kafka = Kafka.new(["#{khost}:#{kport}"] )
consumer = kafka.consumer(group_id: "my-consumer")
consumer.subscribe("my-topic")

consumer.each_batch do |batch|
    $msg = batch
    consumer.stop  # stop after reading first batch
end 

# Process messages here

$msg.messages.each do |message|
  puts message.value
end

我也找到了一个方法 kafka.fetch_messages 但是,我没有找到一个选项来维护 group_id 并跟踪已处理的消息,而不添加其他代码。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题