我想弄清楚如何为protobuf状态启用模式演化。我们的flink程序是用scala2.11编写的,我们使用flink1.10.1和scalapb生成protobuf代码。rocksdb用作状态后端。
我在写一封信 TypeSerializer
对于由4个双字段组成的protobuf消息。这是一个poc,所以我选择的消息格式非常简单。
Flink的这方面似乎没有充分的记载。我尽可能地实现了所有必需的方法 TypeSerializer
除外 snapshotConfiguration
. 中途我发现 ProtobufSerializer
, ProtobufTypeSerializer
以及 ProtobufTypeSerializerSnapshot
JavaAPI文档中的类,这些类在文章中根本没有提到。
现在,具体问题:
我该怎么写 snapshotConfiguration
方法?我在这里画了一个空白,特别是在序列化程序快照中包含什么,以便能够确定序列化程序之间的兼容性( TypeSerializerSnapshot#resolveSchemaCompatibility
).
是什么 ProtobufSerializer
以及 ProtobufTypeSerializer
为了什么?我应该实现它们而不是 TypeSerializer
为了我的需要?
我不确定我写了什么 getLength
正确地。我假设4个double需要4 x 24=96字节,但是protobuf消息的长度不是恒定的,即使是4个double。
你的孩子怎么了 TypeSerializer
接受另一个t并在t不可变时尝试重用它的方法?在我的情况下,他们抛出了一个例外。他们是否被称为 isImmutableType = true
?
暂无答案!
目前还没有任何答案,快来回答吧!