我定制的 KeyedDeserializationSchema
要反序列化kafka消息并像这样使用它:
object Job {
case class KafkaMsg[K, V](
key: K, value: V, topic: String, partiton: Int, offset: Long)
trait Deser[A] {
def deser(a: Array[Byte]): A
}
object Deser {
def apply[A](implicit sh: Deser[A]): Deser[A] = sh
def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)
implicit val stringDeser: Deser[String] =
new Deser[String] {
def deser(a: Array[Byte]): String = ""
}
implicit val longDeser: Deser[Long] =
new Deser[Long] {
def deser(a: Array[Byte]): Long = 0
}
}
class TypedKeyedDeserializationSchema[
K: Deser: TypeInformation,
V: Deser: TypeInformation
] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(Deser[K].deser(key),
Deser[V].deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
createTypeInformation
}
def main(args: Array[String]) {
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new FlinkKafkaConsumer011(
"topic",
new TypedKeyedDeserializationSchema[String, Long],
properties
))
.print
env.execute("Flink Scala API Skeleton")
}
}
这给了我:
[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error] at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error] at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error] at l7.Job$.main(Job.scala:89)
[error] at l7.Job.main(Job.scala)
问题显然出在我身上 Deser
类的实现,但我不明白到底是什么导致了这个错误,也不知道如何修复它。
1条答案
按热度按时间dvtswwa31#
是的,这个错误的原因是
Deser
不像TypeInformation
不扩展/实现Serializable
. 为了找出为什么会发生这种情况,你可以先问自己一个问题:为什么我需要申报implicit val stringDeser
以及implicit val longDeser
?答案是当scala编译器看到一个形式为
K: Deser: TypeInformation
. 它所做的是用implicit
证据对象。所以你的代码被转换成这样:很明显,类型的对象
TypedKeyedDeserializationSchema[String,Long]
实际上包含两个类型为Deser[String]
以及Deser[Long]
使用来自implicit val
你在上面声明了什么。所以当flink试图确保传递给它的函数Serializable
,检查失败。现在的解决办法是显而易见的:使你的特点
Deser[A]
延伸Serializable
```trait Deser[A] extends Serializable {
def deser(a: Array[Byte]): A
}