Kafka消费者进入一个无休止的循环

6xfqseft  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(346)

我使用kafka队列来保存消费者应用程序要检索的一些对象,并对其执行一些操作。
问题:如果消费者的处理时间超过2小时,Kafka似乎会一次又一次地回馈同一个物体
代码:

private static Queue queue = new LinkedList();
  while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("\n\n\n Kafka has :[" + record.offset());
                queue.add(record.value());
            }
            System.out.println("\n\n\n Kafka has :[" + records.count());
            if (queue != null) {
                maintainQueue();
            }
        }
6ojccjat

6ojccjat1#

我使用的是Kafka版本0.10.1.0
我们已通过更新 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false" 和添加 consumer.commitSync(); 之后 consumer.poll(Long.MAX_VALUE); 参考文献:
kafka-如何使用高级使用者提交每条消息之后的偏移量?
http://www.slideshare.net/jjkoshy/offset-management-in-kafka

相关问题