flink抛出java.io.notserializableexception

tgabmvqs  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(1447)

我定制的 KeyedDeserializationSchema 要反序列化kafka消息并像这样使用它:

object Job {
  case class KafkaMsg[K, V](
    key: K, value: V, topic: String, partiton: Int, offset: Long)

  trait Deser[A] {
    def deser(a: Array[Byte]): A
  }

  object Deser {

    def apply[A](implicit sh: Deser[A]): Deser[A] = sh
    def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)

    implicit val stringDeser: Deser[String] =
      new Deser[String] {
        def deser(a: Array[Byte]): String = ""
      }

    implicit val longDeser: Deser[Long] =
      new Deser[Long] {
        def deser(a: Array[Byte]): Long = 0
      }
  }

  class TypedKeyedDeserializationSchema[
    K: Deser: TypeInformation,
    V: Deser: TypeInformation
  ] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {

    def deserialize(key:   Array[Byte],
                    value: Array[Byte],
                    topic: String,
                    partition: Int,
                    offset:    Long
    ): KafkaMsg[K, V] =
      KafkaMsg(Deser[K].deser(key),
               Deser[V].deser(value),
               topic,
               partition,
               offset
      )

    def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false

    def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
      createTypeInformation
  }

  def main(args: Array[String]) {
    val properties = new Properties
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink-test")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env
        .addSource(new FlinkKafkaConsumer011(
                     "topic",
                     new TypedKeyedDeserializationSchema[String, Long],
                     properties
                   ))
        .print

    env.execute("Flink Scala API Skeleton")
  }
}

这给了我:

[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error]         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error]         at l7.Job$.main(Job.scala:89)
[error]         at l7.Job.main(Job.scala)

问题显然出在我身上 Deser 类的实现,但我不明白到底是什么导致了这个错误,也不知道如何修复它。

dvtswwa3

dvtswwa31#

是的,这个错误的原因是 Deser 不像 TypeInformation 不扩展/实现 Serializable . 为了找出为什么会发生这种情况,你可以先问自己一个问题:为什么我需要申报 implicit val stringDeser 以及 implicit val longDeser ?
答案是当scala编译器看到一个形式为 K: Deser: TypeInformation . 它所做的是用 implicit 证据对象。所以你的代码被转换成这样:

class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K],
                                            val kTypeInfoEn: TypeInformation[K],
                                            val vDeserEv: Deser[V],
                                            val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] {

  def deserialize(key: Array[Byte],
                  value: Array[Byte],
                  topic: String,
                  partition: Int,
                  offset: Long
                 ): KafkaMsg[K, V] =
    KafkaMsg(kDeserEv.deser(key),
      vDeserEv.deser(value),
      topic,
      partition,
      offset
    )

  def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false

  def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation
}

很明显,类型的对象 TypedKeyedDeserializationSchema[String,Long] 实际上包含两个类型为 Deser[String] 以及 Deser[Long] 使用来自 implicit val 你在上面声明了什么。所以当flink试图确保传递给它的函数 Serializable ,检查失败。
现在的解决办法是显而易见的:使你的特点 Deser[A] 延伸 Serializable ```
trait Deser[A] extends Serializable {
def deser(a: Array[Byte]): A
}

相关问题