我正在从Flink1.10.2迁移到Flink1.14.5,但我找不到如何将时间戳写入Kafka sink。
producer.setWriteTimestampToKafka(true)
使用Flink 14时,我的输出中的时间戳等于源代码中的ts,但使用旧版本或使用Table API(Flink 14)时,我的输出中的Kafka记录的时间戳为当前时间戳。在所有情况下,我都使用Event Time。
我的来源:
KafkaSource.builder()
.setTopics("topic_source")
.setBootstrapServers("bootstrap.servers")
.setGroupId("group.id")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty("commit.offsets.on.checkpoint", "true")
.setValueOnlyDeserializer(schema)
.build()
"我的Flume"
KafkaSink.builder()
.setBootstrapServers("bootstrap.servers")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopicSelector(topicSelector)
.setValueSerializationSchema(schema)
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setKafkaProducerConfig(props)
.build()
浮水印:
val wmStrategy: WatermarkStrategy[AnritsuA] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(1))
val stream = env.fromSource(kafka_source, wmStrategy, jobArgs("topic_source"))
谁能告诉我为什么会这样?
1条答案
按热度按时间hs1rzwqc1#
如果您使用DataStream API,您可以设置KafkaRecordSerializationSchema并创建一个您认为合适的
KafkaRecord
。默认情况下,KafkaRecordSerializationSchemaBuilder使用记录的事件时间戳(例如输入记录的时间戳),这通常是丰富事件时所需的时间戳。
如果你只是设置了
null
,Kafka会在接收到记录时附加当前时间戳。如果这是Table API的行为,那么这可能是一个bug而不是一个特性(但似乎对你有用)。或者,如果在处理过程中根本不需要时间戳,也可以在源上分配时间戳。或者直接切换到处理时间。写入的时间戳将默认为在源上分配的时间戳。