flink 1.4 avroutils错误

jobtbby3  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(595)

我尝试在Flink1.4上提交一个作业,得到以下异常。
你知道怎么解决这个问题吗?

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.VerifyError: Bad type on operand stack
Exception Details:
  Location:
    org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
  Reason:
Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
  Current Frame:
bci: @23
flags: { }
locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
   Bytecode:
0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
0x0000020: 57b1                                   

at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:119)
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
qyyhg6bp

qyyhg6bp1#

多亏了前面回答的“toch”,他给了我解决问题的方向

<exclude>com.esotericsoftware.kryo:kryo</exclude>

已经被排除在我们的 pom.xml 我注意到 kryo-shaded 包括在内
我被加入了pom

<exclude>com.esotericsoftware:kryo-shaded</exclude>

它解决了问题

qyswt5oh

qyswt5oh2#

这是作业jar中的kryo lib和flink中已经加载的kryo lib之间的冲突。
实际上,serializers$specificinstancecollectionserializerforarraylist通过specificinstancecollectionserializer和collectionserializer从com.esotericsoftware.kryo.serializer继承(请参阅flink中的serializers.java和kryo中的collectionserializer.java)。
我们通过从job jar中排除kryo lib来修复它。在gradle build.gradle中,它是通过(如果您使用shadowjar插件)完成的:

configurations {
  runtime.exclude module: 'kryo' , group: 'com.esotericsoftware.kryo'
}

在maven pom.xml中,它是通过(如果您使用shade插件)完成的:

<exclude>com.esotericsoftware.kryo:kryo</exclude>

相关问题