在集成测试中,在flink小型集群上测试流时遇到了一个问题。流Map生成的avro specifirecord pojo类(java)。
流作业是用scala编写的。
flink运行时正在崩溃,因为它无法示例化 org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
以下是堆栈跟踪:
stack: java.lang.ClassCastException: class org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
java.lang.RuntimeException: Could not instantiate org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:53)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:572)
我认为问题在于flink无法序列化avropojo类,因为该类中有多个嵌套的avropojo类。
我尝试为所有嵌套的pojo类类型添加所有类型信息,但仍然遇到相同的问题。
所以现在我想知道是否有人让flink作业使用生成的avropojo类和嵌套的avropojo类。所有类都继承类型specificrecord,并从avro模式生成。
是否需要编写某种特殊的序列化程序?对于这样一个处理scala或java中多个嵌套pojo类的序列化程序,是否有任何文档或示例?
还是完全不同的问题?
非常感谢您的帮助!
2条答案
按热度按时间odopli941#
我通过在进程函数中进行解析来实现它。
我必须将一个字符串解析为json,然后再解析为specifirecord类的一个特定字段的record类,该字段最终将出现在datasink中。
json的解析现在在另一个processfuncton中实现,现在可以工作了。在我将Map中的解析直接应用到数据流之前。
mnemlml82#
如果
flink-avro
不在类路径中。如果你用的是avro,我会完全禁用kryo来捕捉更细微的错误。