Kafka苏美尔读取所有记录

f45qwnt8  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(246)

我想测试一个Kafka的例子:制作人:

object ProducerApp extends App {

val topic = "topicTest"
val  props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{    
val record = new ProducerRecord(topic, "key "+i," value "+i)    
producer.send(record)    
Thread.sleep(100)    
}
}

消费者:

object ConsumerApp extends App {
val topic = "topicTest"  
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)  
consumer.subscribe(scala.List(topic).asJava)    
while (true) {
val records:ConsumerRecords[String,String] = consumer.poll(200)
println("records size "+records.count())    
}  
}

主题“topictest”是用一个分区创建的。
预期结果是:

records size 21
records size 21
records size 21
records size 21
...

但得到的结果是:

records size 21
records size 0
records size 21
records size 0
records size 21
records size 0
...

消费者轮流阅读记录。我想知道原因。谢谢您

ahy6op9u

ahy6op9u1#

在我看来,你看到这种行为的原因可能是你设置的短暂超时 poll() . 超时 200 毫秒可能不足以重试内部 pollOnce() (链接)偏移重置后。一次之后消费者超时 pollOnce() 它需要偏移量重置,并且不返回任何记录。在应用程序的下一个循环中 poll() 所谓的偏移量已经是我们需要的了。所以呢 200 毫秒可能足以检索记录。在第三次呼叫时,上述行为重复。
请注意,调用 seekToBeginning() 不会立即重置偏移量,但会将分区标记为偏移量重置。下一次调用 poll() 或者 position() .
试着增加压力 poll() 超时,希望能消除 record size 0 你得到的结果。

相关问题