如何使用ApacheFlink解析com.esotericsoftware.kryo.kryo.readobject上的npe?

csbfibhn  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(570)

我使用flink和自定义kryo类作为我的pojo类。但是得到

Caused by: java.lang.NullPointerException
    at MyTreeSerializer.read(MyTreeSerializer.java:36)
    at MyTreeSerializer.read(MyTreeSerializer.java:11)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:414)
    ... 16 more

以下是详细信息-
kryo 2.24.0版
我的pojo课

public class MyTree extends TreeMap<String, Object> {
private String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

pojo的序列化程序
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;

public class MyTreeSerializer extends Serializer<MyTree> {

    public MyTreeSerializer() {
    }

    @Override
    public void write(Kryo kryo, Output output, MyTree object) {
        output.writeString(object.getId());
        kryo.writeObject(output, object, new MapSerializer());

    }

    @Override
    public MyTree read(Kryo kryo, Input input, Class<MyTree> type) {
        String id = input.readString();
        System.out.println("Serialized Id " + id);
        MyTree myTree = kryo.readObject(input, type, new MapSerializer());
        System.out.println("Serialized Object " + myTree);
        myTree.setId(id);
        return myTree;
    }
}
flink流媒体主程序
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class MultiSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Setting Serializer
        env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);

        DataStreamSource<String> data = env.fromElements("1", "2");

        DataStream<MyTree> returns = data.map(new MapFunction<String, MyTree>() {
            @Override
            public MyTree map(String s) throws Exception {
                MyTree myTree = new MyTree();
                myTree.setId(s);
                myTree.put("name", "sohi");
                return myTree;
            }
        }).returns(MyTree.class);

        returns.addSink(new SinkFunction<MyTree>() {
            @Override
            public void invoke(MyTree myTree) throws Exception {
                System.out.println("==> " + myTree.toString());
            }
        });

        env.execute();
    }
}

通过使用所有提到的代码,只有id被序列化,而不是mytree的map部分。
但如果我替换

env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);

具有

env.getConfig().addDefaultKryoSerializer(MyTree.class, MapSerializer.class);

然后id没有序列化,但Map正在序列化。
只是需要帮助为什么它不工作时使用mytreeserializer.class。
提前谢谢。

zbq4xfa0

zbq4xfa01#

下一行输入 MyTreeSerializer 结果是 null: ```
MyTree myTree = kryo.readObject(input, type, new MapSerializer());

这也是为什么 `myTree.setId(id)` 结果是
nullpointerexception。
当你使用 `MapSerializer` ,它工作正常(当然除了id的反序列化),因为 `MyTree` 从 `TreeMap` 它实现了 `Map` .
在你实施 `MyTreeSerializer` ,您正在尝试反序列化类的成员 `MyTree` 从 `MyTree` 对象。就像 `MyTreeSerializer` 需要如下示例代码中所示的对象:

public class MyTree extends TreeMap {
private String id;
private MyTree myTree;

    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

    public MyTree getMyTree() {
        return myTree;
    }

    public void setMyTree(MyTree myTree) {
        this.myTree = myTree;
    } 
}
它认为你需要看看 `MapSerializer` 并从中扩展或将其用作自己实现的基础,以便序列化和反序列化mytree对象。

相关问题