如何基于时间戳获取kafka消息

mbjcgjjk  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(646)

我正在开发一个应用程序,其中我使用Kafka和技术是scala。我的Kafka消费代码如下:

  1. val props = new Properties()
  2. props.put("group.id", "test")
  3. props.put("bootstrap.servers", "localhost:9092")
  4. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  5. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  6. props.put("auto.offset.reset", "earliest")
  7. props.put("group.id", "consumer-group")
  8. val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
  9. consumer.subscribe(util.Collections.singletonList(topic))
  10. val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

它给我所有的记录,但问题是我已经在Kafka消费者的数据,这可能会导致重复的数据意味着相同的关键数据可以在主题中已经存在。有没有什么方法可以让我从某个特定的时间检索数据。是指在轮询之前,我是否可以计算当前时间并仅检索在该时间之后出现的记录。我有什么办法可以做到吗?

0yg35tkg

0yg35tkg1#

您可以在kafkaconsumerapi中使用offsetsfortimes方法。

代码

  1. import java.time.Duration
  2. import java.util.Properties
  3. import org.apache.kafka.clients.consumer.KafkaConsumer
  4. import org.apache.kafka.common.TopicPartition
  5. import collection.JavaConverters._
  6. object OffsetsForTime extends App {
  7. implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
  8. offsetQuery
  9. .map { case (tp, time) => tp -> new java.lang.Long(time) }
  10. .asJava
  11. val topic = "myInTopic"
  12. val timestamp: Long = 1595971151000L
  13. val props = new Properties()
  14. props.put("group.id", "group-id1337")
  15. props.put("bootstrap.servers", "localhost:9092")
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  18. props.put("auto.offset.reset", "earliest")
  19. val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
  20. val topicPartition = new TopicPartition(topic, 0)
  21. consumer.assign(java.util.Collections.singletonList(topicPartition))
  22. // dummy poll before calling seek
  23. consumer.poll(Duration.ofMillis(500))
  24. // get next available offset after given timestamp
  25. val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
  26. // seek to offset
  27. consumer.seek(topicPartition, offsetAndTimestamp.offset())
  28. // poll data
  29. val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
  30. for (data <- record) {
  31. println(s"Timestamp: ${data.timestamp()}, Key: ${data.key()}, Value: ${data.value()}")
  32. }
  33. }

试验

  1. ./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
  2. CreateTime:1595971142560 1_old
  3. CreateTime:1595971147697 2_old
  4. CreateTime:1595971150136 3_old
  5. CreateTime:1595971192649 1_new
  6. CreateTime:1595971194489 2_new
  7. CreateTime:1595971196416 3_new

将时间戳选择为 3_old 以及 1_new 只使用“新”消息。

输出

  1. Timestamp: 1595971192649, Key: null, Value: 1_new
  2. Timestamp: 1595971194489, Key: null, Value: 2_new
  3. Timestamp: 1595971196416, Key: null, Value: 3_new
展开查看全部
dzjeubhm

dzjeubhm2#

使用任何给定时间戳的唯一方法是
查找
offsetsForTimes seek 至和 commitSync 这个结果
开始轮询
但是,您需要意识到数据流是连续的,以后可能还会有重复的键。
如果您在数据中有相同的键,您只希望看到最新的,那么最好使用ktable

相关问题