从kafka消息获取kafka记录时间戳

lsmd5eda  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1180)

我想要生产者在Kafka主题中插入消息的时间戳。
在Kafka消费端,我想提取时间戳。

class Producer {
  def main(args: Array[String]): Unit = {
    writeToKafka("quick-start")
  }
  def writeToKafka(topic: String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String](topic, "key", "value")
    producer.send(record)
    producer.close()
  }
}

class Consumer {
  def main(args: Array[String]): Unit = {
    consumeFromKafka("quick-start")
  }
  def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}

Kafka有办法做到吗?否则,我将不得不发送一个额外的领域从生产者到主题。

bvjxkvbb

bvjxkvbb1#

Kafka从v0.10开始提供了一种方法
在这个版本中,所有消息都有一个时间戳信息 data.timestamp ,其中的信息类型由代理上的配置“message.timestamp.type”控制。值应为 CreateTime 或者 LogAppendTime .
在此版本之前,您必须手动实现它,通常通过修改数据结构来实现。

相关问题