当我试图用avro在带有各自模式的数据上运行kafka consumer时,它返回一个错误“avroruntimeexception:malformed data”。长度为负:-40“。我看到其他人在将字节数组转换为json、avro读写和kafka avro binary*编码器时也遇到过类似的问题。我还引用了这个消费群体的例子,这些例子都很有用,但是到目前为止对这个错误没有帮助。。它一直工作到代码的这一部分(第73行)
decoder decoder=decoderfactory.get().binarydecoder(bytearrayinputstream,null);
我试过其他解码器并打印出bytearrayinputstream变量的内容,它看起来像我相信的序列化avro数据的样子(在消息中我可以看到模式和一些数据以及一些格式错误的数据),我使用.available()方法打印出了可用的字节,该方法返回594。我很难理解为什么会发生这种错误。apachenifi用于从hdfs生成具有相同模式的kafka流。如果有任何帮助,我将不胜感激。
1条答案
按热度按时间4xy9mtcn1#
也许问题在于nifi写入(编码)avro数据的方式与消费者应用程序读取(解码)数据的方式不匹配。
简而言之,avro的api提供了两种不同的序列化方法:
创建正确的avro文件:对数据记录进行编码,但也要将avro模式嵌入到一种前导码(via)中
org.apache.avro.file.{DataFileWriter/DataFileReader}
). 将模式嵌入avro文件非常有意义,因为(a)avro文件的“有效负载”通常比嵌入的avro模式大几个数量级,(b)然后您可以复制或移动这些文件,并且仍然确保您可以再次读取它们,而无需咨询任何人或事。只编码数据记录,即不嵌入模式(通过
org.apache.avro.io.{BinaryEncoder/BinaryDecoder}
; 请注意包名称中的差异:io
这里vs。file
上面)。例如,当avro对写入Kafka主题的消息进行编码时,这种方法通常是受欢迎的,因为与上面的变体1相比,您不需要将avro模式重新嵌入到每个消息中,假设您的(非常合理的)策略是,对于同一Kafka主题,消息使用相同的avro模式进行格式化/编码。这是一个显著的优点,因为在流数据上下文中,运动数据记录中的数据通常比上面描述的静止avro文件中的数据小得多(通常在100字节到几百kb之间)(通常是数百或数千mb);因此avro模式的大小相对较大,因此在向kafka写入2000条数据记录时,不希望将其嵌入2000x。缺点是,您必须“以某种方式”跟踪avro模式如何Map到Kafka主题——或者更准确地说,您必须以某种方式跟踪消息是用哪个avro模式编码的,而不必直接嵌入模式。好消息是,kafka生态系统(avro schema registry)中提供了透明地执行此操作的工具。因此,与变体1相比,变体2以牺牲便利性为代价提高了效率。其效果是,编码的avro数据的“wire格式”看起来会有所不同,这取决于您是使用上面的(1)还是(2)。
我对apachenifi不是很熟悉,但是快速查看源代码(例如convertavrotojson.java)会发现它使用的是variant 1,即它在avro记录旁边嵌入了avro模式。但是,您的消费代码使用
DecoderFactory.get().binaryDecoder()
因此变量2(没有嵌入模式)。也许这就解释了你一直遇到的错误?