我需要阅读来自kafka主题的所有消息,然后处理并退出(不需要永远像守护进程一样运行)。我写了一个如下的代码,如果topic中有消息,它就起作用了,如果topic是空的(或者没有提到组id的新消息),它会等到下一条消息到达,如果没有消息可以处理,我需要立即退出。请看看我的代码,并建议如果有任何更好的方法来实现这一点。我使用的是ruby kafka 1.3.0 gem
require 'kafka'
khost = 'xxx.xxx.xxx.xxx'
kport = 'xxxx'
kafka = Kafka.new(["#{khost}:#{kport}"] )
consumer = kafka.consumer(group_id: "my-consumer")
consumer.subscribe("my-topic")
consumer.each_batch do |batch|
$msg = batch
consumer.stop # stop after reading first batch
end
# Process messages here
$msg.messages.each do |message|
puts message.value
end
我也找到了一个方法 kafka.fetch_messages
但是,我没有找到一个选项来维护 group_id
并跟踪已处理的消息,而不添加其他代码。
暂无答案!
目前还没有任何答案,快来回答吧!