嗨,我最近创建了一个程序来提取和保存数据到Kafka。
为了测试,我把1到20000的数据放在kafka中,用下面的代码读取记录并打印出来。如果没有暂停和恢复,输出在1到20000之间正常。但是当运行pause&resume时,打印中有一个空数字。
在我的程序中经常使用暂停和恢复。所以它似乎影响了偏移值。
你经历过同样的事情吗?
consumer.handler(record -> {
try {
JsonObject json = record.value();
final String value = json.getString("data_source");
System.out.println(value); //value=record.Number
count++;
if (count%1000==0) {
consumer.pause();
System.out.println("Pause & Resume >>>>>>>>");
count=0;
consumer.resume();
}
结果:[1,2,3,(空数),10,11,12,~(空数)~,199981999920000]
暂无答案!
目前还没有任何答案,快来回答吧!