在Flink 1.14 +中设置将时间戳写入Kafka

iyfjxgzm  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(496)

我正在从Flink1.10.2迁移到Flink1.14.5,但我找不到如何将时间戳写入Kafka sink。

producer.setWriteTimestampToKafka(true)

使用Flink 14时,我的输出中的时间戳等于源代码中的ts,但使用旧版本或使用Table API(Flink 14)时,我的输出中的Kafka记录的时间戳为当前时间戳。在所有情况下,我都使用Event Time。

我的来源:

KafkaSource.builder()
          .setTopics("topic_source")
          .setBootstrapServers("bootstrap.servers")
          .setGroupId("group.id")
          .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
          .setProperty("commit.offsets.on.checkpoint", "true")
          .setValueOnlyDeserializer(schema)
          .build()

"我的Flume"

KafkaSink.builder()
        .setBootstrapServers("bootstrap.servers")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopicSelector(topicSelector)
            .setValueSerializationSchema(schema)
            .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .setKafkaProducerConfig(props)
        .build()

浮水印:

val wmStrategy: WatermarkStrategy[AnritsuA] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(1))
val stream = env.fromSource(kafka_source, wmStrategy, jobArgs("topic_source"))

谁能告诉我为什么会这样?

hs1rzwqc

hs1rzwqc1#

如果您使用DataStream API,您可以设置KafkaRecordSerializationSchema并创建一个您认为合适的KafkaRecord
默认情况下,KafkaRecordSerializationSchemaBuilder使用记录的事件时间戳(例如输入记录的时间戳),这通常是丰富事件时所需的时间戳。
如果你只是设置了null,Kafka会在接收到记录时附加当前时间戳。如果这是Table API的行为,那么这可能是一个bug而不是一个特性(但似乎对你有用)。
或者,如果在处理过程中根本不需要时间戳,也可以在源上分配时间戳。或者直接切换到处理时间。写入的时间戳将默认为在源上分配的时间戳。

相关问题