我有一个应用程序,在其中我处理一个流并将其转换为另一个流。以下是一个示例:
public void run(final String... args) {
final Serde<Event> eventSerde = new EventSerde();
final Properties props = streamingConfig.getProperties(
applicationName,
concurrency,
Serdes.String(),
eventSerde
);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventStream = builder.stream(inputStream);
final Serde<Device> deviceSerde = new DeviceSerde();
eventStream
.map((key, event) -> {
final Device device = modelMapper.map(event, Device.class);
return new KeyValue<>(key, device);
})
.to("device_topic", Produced.with(Serdes.String(), deviceSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
以下是有关该应用程序的一些详细信息:
Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6
尽管在输入流中的消息中设置了时间戳,但我还放置了timestampextractor的实现,以确保在所有消息中附加了正确的时间戳(因为其他生产者可能会将消息发送到同一主题中)。
在代码中,我接收到一个事件流,我基本上将它们转换成不同的对象,并最终将这些对象路由到不同的流中。
我试图了解在这种特殊情况下,我设置的初始时间戳是否仍然附加到发布到device\u主题的消息上。
接收端(设备流)如下所示:
@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
log.trace("[{}] Received device: {}", timestamp, device);
}
不幸的是,打印的时间戳似乎是挂钟时间。这是预期的行为还是我遗漏了什么?
1条答案
按热度按时间pjngdqdw1#
SpringKafka1.3.x使用了非常旧的0.11客户端;也许它不会传播时间戳。我刚刚测试了Boot2.1.3和SpringKafka2.2.4,时间戳传播正常。。。
和