我正在开发一个应用程序,其中我使用Kafka和技术是scala。我的Kafka消费代码如下:
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(topic))
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
它给我所有的记录,但问题是我已经在Kafka消费者的数据,这可能会导致重复的数据意味着相同的关键数据可以在主题中已经存在。有没有什么方法可以让我从某个特定的时间检索数据。是指在轮询之前,我是否可以计算当前时间并仅检索在该时间之后出现的记录。我有什么办法可以做到吗?
2条答案
按热度按时间0yg35tkg1#
您可以在kafkaconsumerapi中使用offsetsfortimes方法。
代码
试验
将时间戳选择为
3_old
以及1_new
只使用“新”消息。输出
dzjeubhm2#
使用任何给定时间戳的唯一方法是
查找
offsetsForTimes
seek
至和commitSync
这个结果开始轮询
但是,您需要意识到数据流是连续的,以后可能还会有重复的键。
如果您在数据中有相同的键,您只希望看到最新的,那么最好使用ktable