我需要在kafka消费者应用程序中获取消息生成时的时间戳(事件时间)。我知道timestampextractor可以与kafka流一起使用,但我的要求不同,因为我没有使用流来使用消息。
我的Kafka制作人如下:
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
Message<PageViewEvent> message = MessageBuilder
.withPayload(pageViewEvent).
setHeader(KafkaHeaders.MESSAGE_KEY, pageViewEvent.getUserId().getBytes())
.build();
try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};
kafka consumer是使用spring kafka@kafkalistener实现的。
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
集装箱工厂配置
@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(priceEventConsumerFactory());
return factory;
}
当我打印时正在发送消息的制作人给我以下数据:
[payload=pageviewevent(userid=blog,page=about,duration=10),headers={id=8ebdad85-e2f7-958f-500e-4560ac0970e5,kafka\u messagekey=[b@71975e1a,contenttype=application/json,timestamp=1553041963803}]
它确实有一个生成的时间戳。如何获取用spring kafka生成的时间戳消息?
1条答案
按热度按时间ylamdve61#
received\u timestamp表示接收到的是记录的时间戳,而不是接收到的时间戳。。我们避免将其放入时间戳中,以避免无意中传播到出站消息。