即使对于POJO和Avro SpecificRecords,Flink也默认为Kryo序列化

7cjasjjr  于 2022-12-16  发布在  Apache
关注(0)|答案(1)|浏览(156)

我正在尝试进行Flink State Schema Evolution的POC。我使用的是Flink 1.15.0和Java 11。我尝试创建3个数据类-每个数据类对应一个序列化类型:

  1. io.peleg.kryo.User-使用java.time.Instant类,我知道Flink中的POJO序列化不支持该类。
  2. io.peleg.pojo.User-只使用经典的 Package 原语-IntegerLongString。getter,setter和constructor使用Lombok生成。
  3. io.peleg.avro.User-使用Avro Maven插件从Avro架构生成。
    对于每个类,我都编写了一个流作业,它使用时间窗口来缓冲元素并将它们转换为列表。
    对于每一个类,我尝试做以下事情:
    1.运行作业
    1.停止保存点
    1.向数据类添加字段
    1.使用保存点提交
    对于所有数据类,使用保存点提交失败,但出现以下异常:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    ... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
    ... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    ... 31 more

由于Flink不支持Kryo序列化类的状态模式演化,我预计io.peleg.kryo.User会抛出此异常。
但在我看来,对于所有类,它最终都使用了Kryo序列化器,而不是POJO或Avro序列化器。
我在使用docker compose创建的Flink集群上运行了这个任务:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

我的全部代码在GitHub here上是公开的。
我希望实现的是成功地从具有更少/更多字段的旧版本POJO的保存点运行作业。

6rqinv9w

6rqinv9w1#

我用这个POJO类做了一个实验:

public class Event {

    public long id;
    public String data;
    public Instant timestamp;

    public Event() {}

    public Event(final long id, final String data, final Instant timestamp ) {
        this.id = id;
        this.data = data;
        this.timestamp = timestamp;
    }

    ...
}

下面的图片来自IntelliJ调试器,正如你所看到的,Flink提供了一个InstantSerializer,并且正在使用它的POJOSerializer作为这个Event类。

我不知道你哪里做错了,但你可以用

env.getConfig().disableGenericTypes();

关闭Kyro后备系统这对实验会有帮助。

相关问题