在apache flink中处理毒消息

p5cysglq  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(376)

我试图找出用apache flink处理有毒消息/未处理异常的最佳实践。我们的工作是对物联网设备的位置数据进行实时事件处理。可能出现以下两种情况:
数据在某些方面是错误的-例如无效值
由于一些我们没有预料到的边缘情况,数据触发了一个bug。
目前,我所有的数据处理都因为一条消息而停止。
我看到两个建议:
捕获异常-这需要我用一些东西来 Package 每个逻辑片段,以捕获每个运行时异常
使用边输出作为dlq的一种-从我可以看出,这似乎是#1的一种变体,我必须捕获所有异常并将它们发送到边输出。
除了用异常处理来 Package 每一个逻辑片段之外,真的没有其他方法可以做到这一点吗?是否没有通用的方法来捕获异常并且不继续处理?

jk9hmnmh

jk9hmnmh1#

我认为这样做的目的不是捕获所有类型的异常并将它们发送到其他地方,而是要有经过良好测试且运行良好的代码,并且只对无效的输入使用死信。
所以典型的管道是

source => validate => ... => sink
                  \=> dead letter queue

一旦您的记录通过validate操作符,您就希望所有错误都冒出来,因为这些操作符中的任何错误都可能导致损坏的聚合和数据,这些数据一旦写入就无法轻松还原。
验证步骤可以与您概述的两种方法中的任何一种一起使用。通常,边输出具有更好的语义,但最终可能会得到更多的代码。
现在,您可能有一个具有高sla的服务,并且实际上希望它生成输出,即使它只是为了生成数据而损坏。或者您有一个简单的转换管道,在这里您将错过一些事件,但保留大部分(下游可以处理不完整的数据)。那么您就需要用try-catch Package 所有操作符的代码了。但是,您通常仍然只对脆弱的操作符执行此操作,而不是对所有操作符执行此操作。琐碎的操作符应该经过测试,然后被信任才能工作。此外,通常只捕获特定类型的异常,以将范围限制为可能发生的预期异常类型。
你可能想知道为什么flink没有将它作为默认模式合并。据我所知,有两个原因:
如果flink默默地忽略任何类型的异常并向辅助接收器发送额外的消息,那么flink如何确保抛出操作符随后处于正常状态?它如何避免由于未执行清理代码而可能发生的任何类型的泄漏?
在java中更常见的是让开发人员显式地解释异常和异常处理。也不能直接看到需求是什么:您是否只希望输入?是否还要存储异常?可能影响结果的操作员状态如何?当在给定的时间窗口内接收到太多错误时,flink是否仍会失败?它很快就成为了一个巨大的特性,因为在一个理想的世界中,高质量的数据被接收和正确处理,根本不应该发生这样的事情。
因此,虽然对于您的案例来说似乎很容易,因为您确切地知道要存储哪种类型的信息,但要为所有目的提供一个解决方案并不容易,特别是因为与通用解决方案相比,用户必须编写的额外代码很小。
你能做的就是把大部分复杂的逻辑事物提取到一个 ProcessFunction 并使用您概述的侧输出。因为它是一个中心部分,所以只需要编写一次side输出函数。如果这样做了多次,您可以提取一个helper函数,将实际代码作为 RunnableWithException lambda隐藏了所有的边输出逻辑。一定要多用点 finally 阻止以确保状态正常。
我还会添加一些it案例,并使用突变测试来更快地强化你的管道。如果您将测试数据保持内联,那么变体也可能精确地模拟您的意外数据问题,这样您的validate操作符就更完整了。

相关问题