“keyby”中的异常处理

hsvhsicv  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(505)

可能由于代码中的错误或缺乏验证,进入flink作业的数据会触发异常。我的目标是提供一致的异常处理方法,我们的团队可以在flink作业中使用这种方法,而不会导致生产中的任何停机。
重启策略在这里似乎不适用,因为:
简单的重启并不能解决问题,我们陷入了重启循环
我们不能简单地跳过这个事件
它们可以是好的oome或一些暂时的问题
我们不能添加自定义的
“keyby”函数中的try/catch块没有完全帮助,因为:
在异常处理之后,无法跳过“keyby”中的事件
示例代码:

env.addSource(kafkaConsumer)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

我希望能够跳过“keyby”和类似方法中引起问题的事件的处理,这些方法应该只返回一个结果。

k97glaaz

k97glaaz1#

除了@phanhuy152(在我看来完全合法)的建议之外,为什么不呢 filter 之前 keyBy ?

env.addSource(kafkaConsumer)
    .filter(invalidKeys)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
yv5phkfx

yv5phkfx2#

您能为 keyBy 在这种情况下要回来吗?然后你的 flatMap 函数遇到这样的值时可以跳过吗?

相关问题