ApacheKafka—当一个流中的消息Map到另一个流时,它的时间戳会发生什么变化?

btxsgosb  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我有一个应用程序,在其中我处理一个流并将其转换为另一个流。以下是一个示例:

public void run(final String... args) {
    final Serde<Event> eventSerde = new EventSerde();

    final Properties props = streamingConfig.getProperties(
        applicationName,
        concurrency,
        Serdes.String(),
        eventSerde
    );

    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);

    final StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Event> eventStream = builder.stream(inputStream);

    final Serde<Device> deviceSerde = new DeviceSerde();

    eventStream
        .map((key, event) -> {
            final Device device = modelMapper.map(event, Device.class);

            return new KeyValue<>(key, device);
        })
        .to("device_topic", Produced.with(Serdes.String(), deviceSerde));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

    streams.start();
}

以下是有关该应用程序的一些详细信息:

Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6

尽管在输入流中的消息中设置了时间戳,但我还放置了timestampextractor的实现,以确保在所有消息中附加了正确的时间戳(因为其他生产者可能会将消息发送到同一主题中)。
在代码中,我接收到一个事件流,我基本上将它们转换成不同的对象,并最终将这些对象路由到不同的流中。
我试图了解在这种特殊情况下,我设置的初始时间戳是否仍然附加到发布到device\u主题的消息上。
接收端(设备流)如下所示:

@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
    log.trace("[{}] Received device: {}", timestamp, device);
}

不幸的是,打印的时间戳似乎是挂钟时间。这是预期的行为还是我遗漏了什么?

pjngdqdw

pjngdqdw1#

SpringKafka1.3.x使用了非常旧的0.11客户端;也许它不会传播时间戳。我刚刚测试了Boot2.1.3和SpringKafka2.2.4,时间戳传播正常。。。

@SpringBootApplication
@EnableKafkaStreams
public class So54771130Application {

    public static void main(String[] args) {
        SpringApplication.run(So54771130Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so54771130", 0, 42L, null, "baz");
        };
    }

    @Bean
    public KStream<String, String> stream(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("so54771130");
        stream
            .map((k, v) -> {
                System.out.println("Mapping:"  + v);
                return new KeyValue<>(null, "bar");
            })
            .to("so54771130-1");
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54771130", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54771130-1", 1, (short) 1);
    }

    @KafkaListener(id = "so54771130", topics = "so54771130-1")
    public void listen(String in, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        System.out.println(in + "@" + ts);
    }

}

Mapping:baz
bar@42

相关问题