如何使用kafka主题中的最新偏移量

iyfamqjs  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(606)

我正在开发一个scala应用程序,其中使用了kafka。我的Kafka消费代码如下。

def getValues(topic: String): String  = {

  val props = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(util.Collections.singletonList(topicPartition))
  val offset = consumer.position(topicPartition) - 1
  val record = consumer.poll(Duration.ofMillis(500)).asScala
  for (data <- record)
    if(data.offset() == offset) val value = data.value()
  return value
}

在这里,我只想返回最新的值。当我运行应用程序时,会得到以下日志:

Resetting offset for partition topic-0 to offset 0

因为这个 val offset = consumer.position(topicPartition) - 1 变为-1,data.offset()给出所有偏移的列表。结果我没有得到最新的值。为什么会自动将偏移量重置为0?我怎样才能纠正它?我的代码有什么错误?或者我可以从最新的偏移量得到值?

6vl6ewon

6vl6ewon1#

在这条线上, props.put("auto.offset.reset", "earliest") ,设置参数 auto.offset.reset 你的Kafka消费者 earliest ,将偏移量重置为最早。如果需要最新的值,应该使用 latest 相反。你可以在这里找到文件。

gwo2fgha

gwo2fgha2#

你在找 seek 方法,该方法根据javadocs-“覆盖使用者将在下一次轮询(超时)时使用的获取偏移量”。
还要确保您正在设置

props.put("auto.offset.reset", "latest")

对您的代码进行这两个修改后,下面的内容对我来说只起到了 value 部分的最新偏移量 0 在所选主题中:

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

def getValues(topic: String): String  = {
    val props = new Properties()
    props.put("group.id", "test")
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

    val topicPartition = new TopicPartition(topic, 0)
    consumer.assign(java.util.Collections.singletonList(topicPartition))
    val offset = consumer.position(topicPartition) - 1
    consumer.seek(topicPartition, offset)
    val record = consumer.poll(Duration.ofMillis(500)).asScala
    for (data <- record) {
      val value: String = data.value() // you are only reading one message if no new messages flow into the Kafka topic
    }
    value
}

相关问题