flink序列化错误

i7uq4tfw  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(630)

我试图在我的apache flink gelly图上运行标签传播协议。
这是我的密码:

Graph<String, Long, String> ugraph = Graph.fromDataSet(vertex, edgeSet, env).getUndirected();
        DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
            .zipWithUniqueId(graph.getVertexIds())
            .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
                public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
                }
            }); 
        DataSet<Vertex<String, Long>> verticesWithCommunity = graph.joinWithVertices(idsWithInitialLabels,
            new VertexJoinFunction<Long, Long>() {
            public Long vertexJoin(Long vertexValue, Long inputValue) {
            return inputValue;
        }})
    .run(new LabelPropagation<String, Long, String>(10));

我收到以下错误消息:
org.apache.flink.api.common.invalidprogramexception:对象org.apache.flink.graph。graph$applycogrouptovertexvalues@4dde0543 不能在org.apache.flink.api.java.closurecleaner.ensureserializable(closurecleaner)上序列化。java:99)在org.apache.flink.api.java.closurecleaner.clean(closurecleaner。java:61)在org.apache.flink.api.java.dataset.clean(数据集。java:186)位于org.apache.flink.api.java.operators.cogroupoperator$cogroupoperatorsets$cogroupoperatorsets预测$cogroupoperatorwithoutfunction.with(cogroupoperator)。java:619)在org.apache.flink.graph.graph.joinwithvertices(图。java:587)在tu.master.conceptdetection.textprocessor.clustering(textprocessor。java:405)在tu.master.conceptdetection.textprocessor$4.actionperformed(textprocessor。java:210)
感谢您的帮助:)

q0qdq0h2

q0qdq0h21#

我猜包含图形代码的类不是 Serializable . java中的匿名类实际上是非静态的内部类,这意味着它们引用了包含类的 this (见此答案)。如果包含类不是 Serializable ,的 this 引用不会序列化,匿名类也不会。
这就解释了为什么切换到lambda表达式会使其序列化。lambda表达式不是匿名类,因此它们不会自动捕获隐式 this 参考文献。
但它没有解释为什么要宣布 MapFunction 作为一个匿名类仍然有效。如果你还有这个密码,@nesrine,我会很好奇整个班级是什么样的。

相关问题