从kafka消费者处获取最新数据

6pp0gazn  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(700)

我在scala应用程序中使用kafka。我的代码如下。

object KafkaConsumerSubscribeApp extends App {

  val props:Properties = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer") 
  Props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
 val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(5000).asScala.toList.map(_.value())

}

这给了我一个值列表。但我只需要最新偏移量的值。我试着在网上搜索一些例子,但不知道怎么做。如何从最新偏移量中获取值?

nwo49xxi

nwo49xxi1#

以下属性将偏移值设置为最新(您将开始阅读那些只有在您的消费者启动并运行之后才会出现在您的主题中的值)。

props.put("auto.offset.reset", "latest")

相关问题