SpringKafka—如何在生成消息时获取时间戳(事件时间)

c9qzyr3d  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(701)

我需要在kafka消费者应用程序中获取消息生成时的时间戳(事件时间)。我知道timestampextractor可以与kafka流一起使用,但我的要求不同,因为我没有使用流来使用消息。
我的Kafka制作人如下:

  1. @Override
  2. public void run(ApplicationArguments args) throws Exception {
  3. List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
  4. List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
  5. Runnable runnable = () -> {
  6. String rPage = pages.get(new Random().nextInt(pages.size()));
  7. String rName = pages.get(new Random().nextInt(names.size()));
  8. PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
  9. Message<PageViewEvent> message = MessageBuilder
  10. .withPayload(pageViewEvent).
  11. setHeader(KafkaHeaders.MESSAGE_KEY, pageViewEvent.getUserId().getBytes())
  12. .build();
  13. try {
  14. this.pageViewsOut.send(message);
  15. log.info("sent " + message);
  16. } catch (Exception e) {
  17. log.error(e);
  18. }
  19. };

kafka consumer是使用spring kafka@kafkalistener实现的。

  1. @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
  2. public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
  3. LOG.info("Message received");
  4. LOG.info("received data='{}'", data);
  5. }

集装箱工厂配置

  1. @Bean
  2. public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
  3. Map<String, Object> props = new HashMap<>();
  4. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  5. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  6. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  7. props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
  8. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  9. return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));
  10. }
  11. @Bean
  12. public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
  13. ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
  14. new ConcurrentKafkaListenerContainerFactory<>();
  15. factory.setConsumerFactory(priceEventConsumerFactory());
  16. return factory;
  17. }

当我打印时正在发送消息的制作人给我以下数据:
[payload=pageviewevent(userid=blog,page=about,duration=10),headers={id=8ebdad85-e2f7-958f-500e-4560ac0970e5,kafka\u messagekey=[b@71975e1a,contenttype=application/json,timestamp=1553041963803}]
它确实有一个生成的时间戳。如何获取用spring kafka生成的时间戳消息?

ylamdve6

ylamdve61#

received\u timestamp表示接收到的是记录的时间戳,而不是接收到的时间戳。。我们避免将其放入时间戳中,以避免无意中传播到出站消息。

相关问题