我使用的是flink,系统中有一个json字符串流,其中包含动态变化的字段和嵌套字段。因此,我无法将这个传入的json模拟并转换为静态pojo,而只能依赖于Map。
我的第一个转换是使用gson解析将json字符串流转换为map对象流,然后将map Package 到一个名为data的dto中。
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
当这个转换处理之后,我试图将结果流输入到我的自定义flink接收器时,问题就出现了。调用函数不会在接收器中被调用。但是,如果我从这个包含dto的Map更改为一个基本体或一个没有Map的常规dto,那么sink可以工作。
我的dto看起来像这样:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
我尝试了以下两种解决方案:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
在这种情况下,有什么Maven建议我可以用吗?
1条答案
按热度按时间x9ybnkn61#
我能解决这个问题。在我的flink日志中,我看到一个名为reflectionserializerfactory类的kryo文件没有找到。我在maven中更新了kryo版本,并为我的Map使用了flink文档中说flink支持的Map类型。
只需确保在代码中指定泛型类型,并在pojo for maps中添加getter和setter。
我还使用.returns(xyz.class)类型删除来避免类型擦除的影响。