flink使用s3Parquet文件kyro序列化错误

c0vxltue  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(520)

我们要使用来自s3的Parquet文件
我的代码片段是这样的。我的输入文件是protobuf编码的Parquet文件。protobuf类是pageview.class。

import com.twitter.chill.protobuf.ProtobufSerializer;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.proto.ProtoParquetInputFormat;
import org.apache.hadoop.fs.Path;
import scala.Tuple2;

public class ParquetReadJob {
    public static void main(String... args) throws Exception {

        ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
        ee.getConfig().registerTypeWithKryoSerializer(StandardLog.Pageview.class, ProtobufSerializer.class);
        String path = args[0];

        Job job = Job.getInstance();
        job.setInputFormatClass(ProtoParquetInputFormat.class);

        HadoopInputFormat<Void, StandardLog.Pageview> hadoopIF =
                new HadoopInputFormat<> (new ProtoParquetInputFormat<>(), Void.class, StandardLog.Pageview.class, job);

        ProtoParquetInputFormat.addInputPath(job, new Path(path));
        DataSource<Tuple2<Void, StandardLog.Pageview>> dataSet = ee.createInput(hadoopIF).setParallelism(10);

        dataSet.print();
    }
}

总是有错误:

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
supportCrtSize_ (access.Access$AdPositionInfo)
adPositionInfo_ (access.Access$AccessRequest)
accessRequest_ (com.adshonor.proto.StandardLog$Pageview$Builder)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 23 more

有没有人能告诉我如何编写能够使用这种文件的批处理程序?

uoifb46i

uoifb46i1#

我也遇到了这个问题。我在flink protobuf的未决公关中发现了这个和这个,解决了这个问题。
您需要添加 NonLazyProtobufSerializer 以及 ProtobufKryoSerializer 类并注册 NonLazyProtobufSerializer 作为消息类型的默认kryo序列化程序:

env.getConfig().addDefaultKryoSerializer(Message.class, NonLazyProtobufSerializer.class);

作者javadocs:
这是在flink表环境中使用kafka的数据源时出现的问题的解决方法。对于在.proto中声明为“string”类型的字段,java类上的相应字段已声明为“object”类型。message.parsefrom(byte[])返回的对象上这些字段的实际类型是“bytearray”。但是这些字段的getter方法返回'string',在必要时用字符串懒洋洋地替换底层的bytearray字段。
希望这有帮助。

相关问题