apache flink检查点时出现cep问题“找不到条目的id”

1tu0hz3e  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(347)

当检查点打开时,使用简单的cep循环模式

private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
        .followedBy("middle").where(checkStatusOn).times(2)
        .next("end").where(checkStatusOn).within(Time.minutes(5))

我看到了失败。
simplebinaryevent是

public class SimpleBinaryEvent implements Serializable {

private int id;
private int sequence;
private boolean status;
private long time;

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
    this.id = id;
    this.sequence = sequence;
    this.status = status;
    this.time = time;
}
public int getId() {
    return id;
}
public int getSequence() {
    return sequence;
}
public boolean isStatus() {
    return status;
}
public long getTime() {
    return time;
}
@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    SimpleBinaryEvent that = (SimpleBinaryEvent) o;

    if (getId() != that.getId()) return false;
    if (isStatus() != that.isStatus()) return false;
    if (getSequence() != that.getSequence()) return false;
    return getTime() == that.getTime();
}

@Override
public int hashCode() {
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
    int result = getId();
    result = 31 * result + (isStatus() ? 1 : 0);
    result = 31 * result + getSequence();
    result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
    return result;
}

@Override
public String toString() {
    return "SimpleBinaryEvent{" +
            "id='" + id + '\'' +
            ", status=" + status +
            ", sequence=" + sequence +
            ", time=" + time +
            '}';
}

}
故障原因:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....

我确信我已经按它应该的方式实现了equals()和hashcode()。我也试过objects.hashcode。在其他情况下,我在sharedbuffer.tostring()上有循环引用(因此也有stackoverflow),它再次指出了引用的问题(相等和不相等)。如果不打开检查点,它将按预期工作。我正在本地群集上运行。cep生产准备好了吗?
我使用的是1.3.2 flink

ssgvzors

ssgvzors1#

非常感谢您尝试了图书馆和报告这一点!
随着越来越多的功能添加到该库中,该库正在积极开发中。1.3是这个库的第一个版本,它的语义如此丰富,因此我们希望看到1)人们如何使用它,2)是否有任何bug。所以我想说,它不是100%的生产准备,但它是不远。
现在对于手头的问题,我想您正在使用rocksdb进行检查点,对吗?我假设的原因是,使用rocksdb,在每个水印(在事件时间)您都会反序列化必要的状态(例如 NFA ),处理一些事件,然后在将其放回rocksdb之前再次序列化。
文件系统状态后端不是这种情况,只有在检查点设置时才序列化状态,只有在恢复时才读取并反序列化状态。所以在本例中,假设您说过,如果不检查您的工作是否正常,那么只有在从失败中恢复之后,您才会看到这个问题。
问题的根源可能是 equals() / hashcode() 是bug(似乎不是这样),或者在序列化/反序列化cep状态的方式上有问题。
你能不能提供一个最小的事件输入序列来产生这种情况?这对再现问题非常有帮助。
多谢,科斯塔斯

相关问题