flink1.9.0-状态反序列化在更改状态对象后失败

vs3odd8k  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(508)

更改state对象后,我们的状态无法加载((不足为奇)
我们的状态对象如下所示:

public class StateHolder {
    private Set<Object1> objects1 = new HashSet<>();
    private Set<Object2> objects2 = new HashSet<>();
    // 4 more sets of objects, Object3 to Object6 let's call them that
    // no args constructor, and getters and setters
...
}

它的用法如下:

ValueStateDescriptor<StateHolder> aggregateValueStateDescriptor = new ValueStateDescriptor<>(
                getDescriptorNamePrefix(STATE_PREFIX, STATE_NAME_COMMAND_AGGREGATE, DATE_OF_STATE_CREATION),
                TypeInformation.of(new TypeHint<StateHolder>() {
                })
        );
        commandAggregateState = getRuntimeContext().getState(aggregateValueStateDescriptor);

最近,我们将字段添加到 Object3 . 我们将其设置为默认值。它是一个字符串,定义如下: private String newField = ""; 执行此操作后,我们得到以下异常:

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)

显然,我们的集合正在使用kryoserializer反序列化,我想这是意料之中的事情。但估计并不理想。
我不明白的是,为什么会失败,以及如何解决这个问题?所以我们在 Object3 类,但反序列化失败 objects5 字段( Set<Object5> ). 新字段是否会导致缓冲区意外移动,从而从错误位置读取注册id?
我们的pojo符合文档中列出的所有规则(可能这与kryo反序列化失败无关):


* The class is public and standalone (no non-static inner class)
* The class has a public no-argument constructor
* All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

通过google异常,有一些建议,比如实现自定义序列化程序。你同意这是正确的方法吗?如果是这样的话,通过google,我只找到了如何定制avro和protobuff序列化程序,我们都没有使用它们来保存状态。看起来,我们的数据是用pojoserializer序列化的,pojoserializer使用不同的kryoserializer来反序列化我们的集合,其中一个显然失败了。我们应该创造新的习俗吗 PojoSerializer 有一个习惯 KryoSerializer 对于这个失败的领域,如果是的话,这是怎样的习惯 KryoSerializer 应该是什么样子?我们应该有符合 java.util.Set ?
感谢您阅读所有这些,如果这是重复的问题,我道歉。我找不到一个和这个差不多的。
更新
所以,在建议将新字段初始化移到noargsconstuctor的默认值之后,我们原来的异常就被 Package 了。

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at fake.package.pipeline.handler.codebook.function.CommandHandlerCodeBookFunction.initializeState(CommandHandlerCodeBookFunction.java:88)
at fake.package.pipeline.handler.Handler.open(Handler.java:104)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.open(KeyedCoProcessOperator.java:62)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state.
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:213)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:603)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:532)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:210)
... 21 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题