kafka日志中缺少偏移量-简单使用者无法继续

5cnsuln7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(334)

我有一个3节点Kafka集群设置。我用暴风来读Kafka的留言。我的系统中的每个主题都有7个分区。
现在我面临一个奇怪的问题。直到三天前,一切正常。但是,现在看来我的storm拓扑无法从2个分区读取具体的数据-#1和#4。
我试图深入研究这个问题,发现在我的kafka日志中,对于这两个分区,都缺少一个偏移量,即在5964511之后,下一个偏移量是5964513,而不是5964512。
由于缺少偏移量,简单使用者无法进行下一个偏移量。我是做错了什么还是一个已知的错误?
这种行为的原因可能是什么?
我使用以下代码读取有效偏移量的窗口:

  1. public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
  2. long whichTime, String clientName) {
  3. TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
  4. Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
  5. requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
  6. OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
  7. OffsetResponse response = consumer.getOffsetsBefore(request);
  8. long[] validOffsets = response.offsets(topic, partition);
  9. for (long validOffset : validOffsets) {
  10. System.out.println(validOffset + " : ");
  11. }
  12. long largestOffset = validOffsets[0];
  13. long smallestOffset = validOffsets[validOffsets.length - 1];
  14. System.out.println(smallestOffset + " : " + largestOffset );
  15. return largestOffset;
  16. }

这将提供以下输出:

  1. 4529948 : 6000878

所以,我提供的偏移量在偏移量范围内。

sdnqo3pr

sdnqo3pr1#

很抱歉回答晚了,但是。。。
我为这种情况编写了代码,方法是使用一个长示例var来保存下一个要读取的偏移量,然后在获取之后检查返回的fetchresponse haserror()是否正确。如果有错误,我将下一个偏移值更改为合理的值(可以是下一个偏移或上一个可用偏移),然后重试。

相关问题