我们用的是Kafka喷口的风暴。当消息失败时,我们希望重放它们,但在某些情况下,错误的数据或代码错误将导致消息始终失败,因此我们将进入无限的重放周期。显然,当我们发现错误时,我们正在修复错误,但是希望我们的拓扑通常是容错的。一个元组被重放超过n次之后,我们如何确认它呢?
翻阅Kafka喷口的代码,我发现它的设计是使用指数退避计时器和pr状态的注解来重试:
“喷口不终止重试周期(我确信它不应该这样做,因为它不能报告关于中止请求失败的上下文),它只处理延迟重试。拓扑中的一个螺栓最终仍将调用ack()而不是fail(),以停止循环。”
我看到过stackoverflow的回复,建议写一个定制的喷口,但是如果有推荐的方法可以在一个螺栓中实现这一点的话,我宁愿不被困在维护Kafka喷口内部的定制补丁上。
用螺栓做这件事的正确方法是什么?我在元组中没有看到任何状态显示它被重放了多少次。
5条答案
按热度按时间lkaoscv71#
storm本身并不能为您的问题提供任何支持。因此,定制解决方案是唯一的出路。即使你不想修补
KafkaSpout
,我认为,引入计数器并打破其中的重放周期,将是最好的方法。另一种选择是,您也可以继承KafkaSpout
在子类中放置一个计数器。这当然有点类似于修补程序,但可能不那么麻烦,更容易实现。如果要使用螺栓,可以执行以下操作(这也需要对
KafkaSpout
或者它的一个子类)。为每个元组分配一个唯一的id作为附加属性(也许,已经有一个唯一的id可用;否则,您可以引入一个“counter id”或整个元组(即所有属性)来标识每个元组)。
在后面插入螺栓
KafkaSpout
通过fieldsGrouping
在id上(以确保重放的元组流到同一个bolt示例)。在你的螺栓内,使用
HashMap<ID,Counter>
缓冲所有元组并计算(重试)次数的。如果计数器小于阈值,则转发输入元组,以便由后面的实际拓扑处理它(当然,您需要适当地锚定元组)。如果计数大于阈值,则确认元组以打破循环,并从HashMap
(您可能还需要记录所有失败的元组)。为了从
HashMap
,每次在KafkaSpout
您需要将元组id转发给bolt,以便它可以从HashMap
. 只需为您的应用程序声明第二个输出流KafkaSpout
子类和覆盖Spout.ack(...)
(当然你需要打电话super.ack(...)
确保KafkaSpout
也得到确认)。不过,这种方法可能会消耗大量内存。另一种方法是在
HashMap
您还可以使用第三个流(与其他两个流一样连接到bolt),并在元组失败时转发元组id(即Spout.fail(...)
). 每次,bolt从第三个流接收到一个“fail”消息,计数器就会增加。只要没有进入HashMap
(或者未达到阈值),螺栓只是转发元组进行处理。这应该会减少使用的内存,但需要一些更多的逻辑来实现在您的喷口和螺栓。这两种方法都有缺点,即每个acked元组都会向新引入的bolt生成额外的消息(从而增加网络流量)。对于第二种方法,似乎只需要向bolt发送一个“ack”消息,以查找以前失败的元组。但是,您不知道哪些元组失败了,哪些元组没有失败。如果你想摆脱这种网络开销,你可以引入第二个
HashMap
在KafkaSpout
缓冲失败消息的ID。因此,只有成功重放失败的元组时,才能发送“ack”消息。当然,第三种方法使得要实现的逻辑更加复杂。不修改
KafkaSpout
在某种程度上,我认为你的问题没有解决的办法。我个人会修补KafkaSpout
或者用第三种方法HashMap
在KafkaSpout
子类和bolt(因为与前两个解决方案相比,它消耗的内存很少,并且不会给网络带来太多额外的负载)。b1zrtrql2#
基本上是这样的:
如果您部署拓扑,它们应该是生产级的(这是预期的某种质量级别,并且元组的数量较低)。
如果元组失败,请检查元组是否实际有效。
如果元组有效(例如,由于无法连接到外部数据库而无法插入元组,或类似的情况),请回复元组。
如果元组未形成且永远无法处理(例如,数据库id为文本,而数据库需要整数),则应该
ack
,您将永远无法修复此类问题或将其插入数据库。应该记录新的异常类型(以及元组内容本身)。您应该检查这些日志,并在将来生成验证元组的规则。并最终添加代码以在将来正确处理它们(etl)。
不要记录一切,否则你的日志文件将是巨大的,是非常有选择性的什么你记录。日志文件的内容应该是有用的,而不是一堆垃圾。
继续这样做,最终你将只涵盖所有的情况。
e0bqpujr3#
我们也面临着类似的数据,我们有坏数据进来,导致螺栓无限失败。
为了在运行时解决这个问题,我们又引入了一个bolt,将其命名为“debugbolt”,以供参考。因此,喷口首先将消息发送到此螺栓,然后此螺栓对错误消息执行所需的数据修复,然后将它们发送到所需的螺栓。这样一来,人们就可以即时修复数据错误。
另外,如果需要删除一些消息,实际上可以将ignoreflag从debugbolt传递到原始bolt,如果ignoreflag为true,则原始bolt应该只向spout发送一个ack而不进行处理。
uidvcgyl4#
我们只是让我们的bolt在错误流中发出坏元组并确认它。另一个bolt通过将错误写回Kafka主题来处理错误。这使我们能够轻松地引导正常与错误数据流通过拓扑。
唯一一种元组失败的情况是因为某些必需的资源处于脱机状态,例如网络连接、数据库等。。。这些是可重试的错误。其他任何内容都会被定向到错误流,以便进行适当的修复或处理。
当然,这都是假设您不希望发生任何数据丢失。如果你只想尝试一个最大的努力,并忽略几次后重试,那么我会看看其他的选择。
acruukt95#
据我所知,storm并没有对此提供内置支持。
我申请了以下提到的实施: