当我运行下面的代码时。日志打印:
类scala.collection.mutable.listbuffer不包含字段scala$collection$mutable$listbuffer$$start的setter
类scala.collection.mutable.listbuffer不能用作pojo类型,因为并非所有字段都是有效的pojo字段,必须作为generictype处理。
代码:
private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])
context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))
类定义:
class School {
var classes: ListBuffer[Class] = ListBuffer()
}
class Class {
var students: ListBuffer[Class] = ListBuffer()
}
class Student {
var name = ""
}
如果pojo有listbuffer type字段,并且listbuffer的元素也有listbuffer type字段,我该怎么办?
1条答案
按热度按时间vsnjm48y1#
在你关于不变性的问题的评论中已经有了一些提示。
一般来说,我还建议这样做,因为当您使用flink state时,一般的api约定是,如果您更新您的state对象(schooldescriptor),您必须用它调用state#update。
这可能适用于堆状态而不调用update(api并不总是保证),但不适用于rocksdb state backend。如果使用纯pojo[1],序列化也会容易得多。
在非pojo的情况下,一般的方法是实现您的定制 org.apache.flink.api.common.typeutils.typeserializer或注册自定义序列化程序[2]以使用另一个状态描述符构造函数: liststatedescriptor(字符串名称, typeserializer)或重构类以支持开箱即用的序列化[3]。
来自安德烈
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-对于pojo类型
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html