Kafka镜子制造者不同版本之间的时间戳

4smxwvx5  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(437)

我想使用kafka mirrormaker将所有事件从kafka集群版本0.8镜像到另一个kafka集群版本1.0,两个集群都应该保持生产状态。
问题是旧的kafka(版本0.8)存储的消息没有时间戳字段(时间戳是消息的一部分)。
我正在寻找一种方法,使kafka mirrormaker能够向kafka cluster 1.0生成带有时间戳的消息,这些消息将从消息中提取出来(事件时间而不是处理时间)。
有人知道如何使用Kafka·米罗马克或其他工具吗?

lnlaulya

lnlaulya1#

您可能需要配置kafkatopic,使数据镜像到其上 message.timestamp.type=LogAppendTime . 当读取v0.8.x中的数据时,如您所知,它将没有任何时间戳。当mirrormaker脚本将此消息发布到v0.10.x(或更高版本)时 message.timestamp.type=LogAppendTime 配置后,将记录时间戳。

vcudknz3

vcudknz32#

您可以使用mirrormaker 0.8将数据引入1.0集群,然后使用kafka streams应用程序或mirrormaker消息处理程序(使用1.0版本和集群内镜像)进行转换。下面是一个示例消息处理程序。https://github.com/gwenshap/kafka-examples/blob/master/mirrormakerhandler/src/main/java/com/shapira/examples/topicswitchinghandler.java
无论哪种方式,如果您想要消息中的时间戳,您必须首先将它带到1.0集群,然后在那里处理它。否则,您需要让一些应用程序在一个版本上读取和解析消息,然后在消息格式的新版本上创建一个新记录。这就限制了你重新处理的能力。我只是将旧数据拉入新集群,为新数据格式创建一个新主题,同时逐步淘汰旧格式。

w8biq8rn

w8biq8rn3#

正如您所指出的,mirror maker在添加kafka时无法读取0.10之前的时间戳。
nifi和streamset还不支持设置记录时间戳。
最好的选择是要求您反序列化来自消费者的消息,提取记录中的时间戳,并创建和发送带有该时间戳的producerrecord。spark/flink是使它运行得更快的最佳选择

相关问题