我在apache flink上运行一个程序。我有个错误:
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
如何在scala/java中检查对象的序列化方法?如何检查kryo是如何序列化的?
编辑:完全异常如下:
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:143)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 103, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123)
at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:34)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:107)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
... 5 more
编辑2:
这是我们使用的flink数据集的类型:
(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long, Long, Long, Long, Long)
哪里:
sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case GNull() => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
override def compareTo(o: GValue): Int = {
o match {
case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 }
case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 }
case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 }
case GNull() => 0
case _ => 0
}
}
}
/**
* Represents a @GValue that contains an integer
* @deprecated
* @param v
*/
case class GInt(v: Int) extends GValue{
def this() = this(0)
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GInt(value) => value.equals(v)
case _ => false
}
}
}
/**
* Represents a @GValue that contains a number as a @Double
* @param v number
*/
case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
def this() = this(0.0)
override def toString() : String = {
val dfs = new DecimalFormatSymbols(Locale.ENGLISH);
val df = new DecimalFormat("#.########", dfs);
df.setRoundingMode(RoundingMode.FLOOR);
df.format(v)
}
override def equals(other : Any) : Boolean = {
other match {
case GDouble(value) => value.equals(v)
case _ => false
}
}
}
/**
* Represents a @GValue that contains a @String
* @param v string
*/
case class GString(v: String) extends GValue{
def this() = this(".")
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GString(value) => value.equals(v)
case _ => false
}
}
}
case class GNull() extends GValue{
override def toString() : String = {
"null"
}
override def equals(other : Any) : Boolean = {
other match {
case GNull() => true
case _ => false
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!