我正在开发一个scala应用程序,其中使用了kafka。我的Kafka消费代码如下。
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record)
if(data.offset() == offset) val value = data.value()
return value
}
在这里,我只想返回最新的值。当我运行应用程序时,会得到以下日志:
Resetting offset for partition topic-0 to offset 0
因为这个 val offset = consumer.position(topicPartition) - 1
变为-1,data.offset()给出所有偏移的列表。结果我没有得到最新的值。为什么会自动将偏移量重置为0?我怎样才能纠正它?我的代码有什么错误?或者我可以从最新的偏移量得到值?
2条答案
按热度按时间6vl6ewon1#
在这条线上,
props.put("auto.offset.reset", "earliest")
,设置参数auto.offset.reset
你的Kafka消费者earliest
,将偏移量重置为最早。如果需要最新的值,应该使用latest
相反。你可以在这里找到文件。gwo2fgha2#
你在找
seek
方法,该方法根据javadocs-“覆盖使用者将在下一次轮询(超时)时使用的获取偏移量”。还要确保您正在设置
对您的代码进行这两个修改后,下面的内容对我来说只起到了
value
部分的最新偏移量0
在所选主题中: