我使用flink(最新通过git)从Kafka流到Cassandra。为了简化单元测试,我通过dagger添加了依赖注入。
objectgraph似乎正在正确设置自身,但flink将“内部对象”标记为“不可序列化”。如果我直接包含这些对象,它们会工作-那么有什么区别?
类实现了mapfunction和@inject,分别为cassandra和读取配置文件注入了一个模块。
有没有一种方法来建立这个,所以我可以使用后期绑定或Flink使这不可能?
编辑:
fwiw依赖注入(通过dagger)和richmapfunction不能共存。匕首不允许你在定义中包含任何有扩展的对象。
进一步:
通过dagger lazy示例化的对象也不会序列化。
线程“main”org.apache.flink.api.common.invalidprogramexception中出现异常:object com.someapp。savemap@2e029d61 不可序列化
...
原因:java.io.notserializableexception:dagger.internal.lazybinding$1
2条答案
按热度按时间sbdsn5lh1#
在深入讨论问题的细节之前,先了解一下apache flink中函数的可序列化性的一些背景知识:
可串行化
ApacheFlink使用java序列化(java.io.serializable)来发布函数对象(这里是
MapFunction
)给那些并行执行它们的工人。因此,函数需要是可序列化的:函数不能包含任何不可序列化的字段,即不是基元(int,long,double,…)且不实现的类型java.io.Serializable
.处理不可序列化结构的典型方法是延迟初始化它们。
延迟初始化
在flink函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段仍然是
null
当函数被序列化以发送时,并且仅在worker对函数进行反序列化之后设置。例如,在scala中,可以简单地使用惰性字段
lazy val x = new NonSerializableType()
. 这个NonSerializableType
类型实际上只在第一次访问变量时创建x
,通常在工人身上。因此,类型可以是不可序列化的,因为x
当函数被序列化为传送到worker时为null。在java中,可以初始化
open()
函数的方法,如果你使它成为一个丰富的函数。丰富的功能(如RichMapFunction
)是基本函数的扩展版本(这里是MapFunction
)给你生命周期的方法,比如open()
以及close()
.惰性依赖注入
我不太熟悉依赖注入,但dagger似乎也提供了类似于惰性依赖的东西,这可能有助于解决类似scala中惰性变量的问题:
50pmv0ei2#
我也面临类似的问题。有两种方法可以不反序列化依赖项。
使依赖关系保持静态,但这并不总是可能的。它也会扰乱你的代码设计。
使用transient:通过将依赖项声明为transient,您的意思是它们不是对象持久状态的一部分,也不应该是序列化的一部分。
这在使用外部库时尤其有用,因为您无法更改外部库的实现以使其可序列化。