我想测试一个Kafka的例子:制作人:
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{
val record = new ProducerRecord(topic, "key "+i," value "+i)
producer.send(record)
Thread.sleep(100)
}
}
消费者:
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
val records:ConsumerRecords[String,String] = consumer.poll(200)
println("records size "+records.count())
}
}
主题“topictest”是用一个分区创建的。
预期结果是:
records size 21
records size 21
records size 21
records size 21
...
但得到的结果是:
records size 21
records size 0
records size 21
records size 0
records size 21
records size 0
...
消费者轮流阅读记录。我想知道原因。谢谢您
1条答案
按热度按时间ahy6op9u1#
在我看来,你看到这种行为的原因可能是你设置的短暂超时
poll()
. 超时200
毫秒可能不足以重试内部pollOnce()
(链接)偏移重置后。一次之后消费者超时pollOnce()
它需要偏移量重置,并且不返回任何记录。在应用程序的下一个循环中poll()
所谓的偏移量已经是我们需要的了。所以呢200
毫秒可能足以检索记录。在第三次呼叫时,上述行为重复。请注意,调用
seekToBeginning()
不会立即重置偏移量,但会将分区标记为偏移量重置。下一次调用poll()
或者position()
.试着增加压力
poll()
超时,希望能消除record size 0
你得到的结果。