我在scala应用程序中使用kafka。我的代码如下。
object KafkaConsumerSubscribeApp extends App {
val props:Properties = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers","localhost:9092")
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
Props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(topic))
val record = consumer.poll(5000).asScala.toList.map(_.value())
}
这给了我一个值列表。但我只需要最新偏移量的值。我试着在网上搜索一些例子,但不知道怎么做。如何从最新偏移量中获取值?
1条答案
按热度按时间nwo49xxi1#
以下属性将偏移值设置为最新(您将开始阅读那些只有在您的消费者启动并运行之后才会出现在您的主题中的值)。