我编写了一个kafka消费者从主题中获取所有记录,然后只移动到下一步,但它没有获取所有记录。
while (fetchedRecord) {
ConsumerRecords<String, String> records = consumer.poll(10);
System.out.println("Waiting for Records from Party-Resolved-Update");
Thread.sleep(40000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record fetched");
end++;
System.out.println(record.value());
if (!StringUtils.isEmpty(record.value())) {
fetchedRecord = false;
response = record.value();
FSecurity sc = new FSecurity();
sc.init();
decryptResponse.add(sc.decryptData(response));
System.out.println("decryptResponse=" + decryptResponse);
}
}
}
1条答案
按热度按时间ao218c7q1#
请正确描述你的问题。
假设您不能使用下一组记录。
解决方案要么提供
enable.auto.commit
作为true
,或添加consumer.commitSync()
语句。这将允许您轮询下一个偏移记录。