可能由于代码中的错误或缺乏验证,进入flink作业的数据会触发异常。我的目标是提供一致的异常处理方法,我们的团队可以在flink作业中使用这种方法,而不会导致生产中的任何停机。
重启策略在这里似乎不适用,因为:
简单的重启并不能解决问题,我们陷入了重启循环
我们不能简单地跳过这个事件
它们可以是好的oome或一些暂时的问题
我们不能添加自定义的
“keyby”函数中的try/catch块没有完全帮助,因为:
在异常处理之后,无法跳过“keyby”中的事件
示例代码:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
我希望能够跳过“keyby”和类似方法中引起问题的事件的处理,这些方法应该只返回一个结果。
2条答案
按热度按时间k97glaaz1#
除了@phanhuy152(在我看来完全合法)的建议之外,为什么不呢
filter
之前keyBy
?yv5phkfx2#
您能为
keyBy
在这种情况下要回来吗?然后你的flatMap
函数遇到这样的值时可以跳过吗?