check序列化方法

vwoqyblh  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(607)

我在apache flink上运行一个程序。我有个错误:

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.

如何在scala/java中检查对象的序列化方法?如何检查kryo是如何序列化的?
编辑:完全异常如下:

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. 
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:143)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 103, Size: 1
        at java.util.ArrayList.rangeCheck(ArrayList.java:635)
        at java.util.ArrayList.get(ArrayList.java:411)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123)
        at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:34)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:107)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:100)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
        ... 5 more

编辑2:
这是我们使用的flink数据集的类型:

(Long, String, Long, Long, Char, Array[GValue], Array[List[GValue]], Int, Long, Long, Long, Long, Long)

哪里:

sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
    def compare(o : GValue) : Int = {
      o match {
        case GDouble(v) => this.asInstanceOf[GDouble].v compare v
        case GString(v) => this.asInstanceOf[GString].v compare v
        case GInt(v) => this.asInstanceOf[GInt].v compare v
        case GNull() => 0
      }
    }
    def equal(o : GValue) : Boolean = {
      o match {
        case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
        case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
        case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
        case GNull() => o.isInstanceOf[GNull]
        case _ => false
      }
    }
  override def compareTo(o: GValue): Int = {
    o match {
      case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 }
      case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 }
      case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 }
      case GNull() => 0
      case _ => 0
    }
  }
  }

  /**
   * Represents a @GValue that contains an integer
   * @deprecated
   * @param v
   */
  case class GInt(v: Int) extends GValue{
    def this() = this(0)
    override def toString() : String = {
      v.toString
    }
    override def equals(other : Any) : Boolean = {
      other match {
        case GInt(value) => value.equals(v)
        case _ => false
      }
    }

  }

  /**
   * Represents a @GValue that contains a number as a @Double
   * @param v number
   */
  case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{

    def this() = this(0.0)
    override def toString() : String = {

      val dfs = new DecimalFormatSymbols(Locale.ENGLISH);
      val df = new DecimalFormat("#.########", dfs);
      df.setRoundingMode(RoundingMode.FLOOR);
      df.format(v)

    }
    override def equals(other : Any) : Boolean = {
      other match {
        case GDouble(value) => value.equals(v)
        case _ => false
      }
    }

  }

  /**
   * Represents a @GValue that contains a @String
   * @param v string
   */
  case class GString(v: String) extends GValue{
    def this() = this(".")
    override def toString() : String = {
      v.toString
    }
    override def equals(other : Any) : Boolean = {
      other match {
        case GString(value) => value.equals(v)
        case _ => false
      }
    }
  }

  case class GNull() extends GValue{
    override def toString() : String = {
      "null"
    }

    override def equals(other : Any) : Boolean = {
      other match {
        case GNull() => true
        case _ => false
      }
    }
  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题