kafka使用者未使用java中的所有记录

odopli94  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我编写了一个kafka消费者从主题中获取所有记录,然后只移动到下一步,但它没有获取所有记录。

while (fetchedRecord) {

            ConsumerRecords<String, String> records = consumer.poll(10);
            System.out.println("Waiting for Records from Party-Resolved-Update");
            Thread.sleep(40000);
            for (ConsumerRecord<String, String> record : records) {

                System.out.println("Record fetched");
                end++;
                System.out.println(record.value());
                if (!StringUtils.isEmpty(record.value())) {
                    fetchedRecord = false;
                    response = record.value();

                    FSecurity sc = new FSecurity();
                    sc.init();
                    decryptResponse.add(sc.decryptData(response));
                    System.out.println("decryptResponse=" + decryptResponse);
                }

            }

        }
ao218c7q

ao218c7q1#

请正确描述你的问题。
假设您不能使用下一组记录。
解决方案要么提供 enable.auto.commit 作为 true ,或添加 consumer.commitSync() 语句。这将允许您轮询下一个偏移记录。

相关问题