我在写一封信 kafka
使用java的消费者。我想保持消息的实时性,所以如果有太多消息等待消费,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移开始消费。
对于这个问题,我尝试比较上一个提交的偏移量和一个主题的结束偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定的量,我会将主题的上一个提交的偏移量设置为下一个偏移量,这样我就可以放弃那些冗余的消息。
现在我的问题是如何得到一个主题的结尾偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
我在写一封信 kafka
使用java的消费者。我想保持消息的实时性,所以如果有太多消息等待消费,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移开始消费。
对于这个问题,我尝试比较上一个提交的偏移量和一个主题的结束偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定的量,我会将主题的上一个提交的偏移量设置为下一个偏移量,这样我就可以放弃那些冗余的消息。
现在我的问题是如何得到一个主题的结尾偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
6条答案
按热度按时间xhv8bpkk1#
我开发了下面的代码来获取偏移量状态
vd8tlhqk2#
自kafka1.0.1以来,消费者有一个称为endoffsets的方法
public java.util.map endoffset(java.util.collection分区)
如果你需要完整的代码,请告诉我。。
请参考apache-kafka-1.0.1-javadoc
nbysray53#
上面的代码段返回给定主题和分区号的当前提交的消息偏移量。
hc8w905p4#
新的消费者也很复杂。
//assign the topic consumer.assign();
//seek to end of the topic consumer.seekToEnd();//the position is the latest offset consumer.position();
s1ag04yj5#
也可以使用kafka服务器命令行工具:
输出的格式为
<topicName>:<partitionID>:<offset>
,例如。t1:0:0
,请参见https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-getoffsetshell.html 更多细节。yruzcnhs6#
对于Kafka版本:0.10.1.1