风暴Kafka喷口上的最大元组重放次数

koaltpgm  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(360)

我们用的是Kafka喷口的风暴。当消息失败时,我们希望重放它们,但在某些情况下,错误的数据或代码错误将导致消息始终失败,因此我们将进入无限的重放周期。显然,当我们发现错误时,我们正在修复错误,但是希望我们的拓扑通常是容错的。一个元组被重放超过n次之后,我们如何确认它呢?
翻阅Kafka喷口的代码,我发现它的设计是使用指数退避计时器和pr状态的注解来重试:
“喷口不终止重试周期(我确信它不应该这样做,因为它不能报告关于中止请求失败的上下文),它只处理延迟重试。拓扑中的一个螺栓最终仍将调用ack()而不是fail(),以停止循环。”
我看到过stackoverflow的回复,建议写一个定制的喷口,但是如果有推荐的方法可以在一个螺栓中实现这一点的话,我宁愿不被困在维护Kafka喷口内部的定制补丁上。
用螺栓做这件事的正确方法是什么?我在元组中没有看到任何状态显示它被重放了多少次。

lkaoscv7

lkaoscv71#

storm本身并不能为您的问题提供任何支持。因此,定制解决方案是唯一的出路。即使你不想修补 KafkaSpout ,我认为,引入计数器并打破其中的重放周期,将是最好的方法。另一种选择是,您也可以继承 KafkaSpout 在子类中放置一个计数器。这当然有点类似于修补程序,但可能不那么麻烦,更容易实现。
如果要使用螺栓,可以执行以下操作(这也需要对 KafkaSpout 或者它的一个子类)。
为每个元组分配一个唯一的id作为附加属性(也许,已经有一个唯一的id可用;否则,您可以引入一个“counter id”或整个元组(即所有属性)来标识每个元组)。
在后面插入螺栓 KafkaSpout 通过 fieldsGrouping 在id上(以确保重放的元组流到同一个bolt示例)。
在你的螺栓内,使用 HashMap<ID,Counter> 缓冲所有元组并计算(重试)次数的。如果计数器小于阈值,则转发输入元组,以便由后面的实际拓扑处理它(当然,您需要适当地锚定元组)。如果计数大于阈值,则确认元组以打破循环,并从 HashMap (您可能还需要记录所有失败的元组)。
为了从 HashMap ,每次在 KafkaSpout 您需要将元组id转发给bolt,以便它可以从 HashMap . 只需为您的应用程序声明第二个输出流 KafkaSpout 子类和覆盖 Spout.ack(...) (当然你需要打电话 super.ack(...) 确保 KafkaSpout 也得到确认)。
不过,这种方法可能会消耗大量内存。另一种方法是在 HashMap 您还可以使用第三个流(与其他两个流一样连接到bolt),并在元组失败时转发元组id(即 Spout.fail(...) ). 每次,bolt从第三个流接收到一个“fail”消息,计数器就会增加。只要没有进入 HashMap (或者未达到阈值),螺栓只是转发元组进行处理。这应该会减少使用的内存,但需要一些更多的逻辑来实现在您的喷口和螺栓。
这两种方法都有缺点,即每个acked元组都会向新引入的bolt生成额外的消息(从而增加网络流量)。对于第二种方法,似乎只需要向bolt发送一个“ack”消息,以查找以前失败的元组。但是,您不知道哪些元组失败了,哪些元组没有失败。如果你想摆脱这种网络开销,你可以引入第二个 HashMapKafkaSpout 缓冲失败消息的ID。因此,只有成功重放失败的元组时,才能发送“ack”消息。当然,第三种方法使得要实现的逻辑更加复杂。
不修改 KafkaSpout 在某种程度上,我认为你的问题没有解决的办法。我个人会修补 KafkaSpout 或者用第三种方法 HashMapKafkaSpout 子类和bolt(因为与前两个解决方案相比,它消耗的内存很少,并且不会给网络带来太多额外的负载)。

b1zrtrql

b1zrtrql2#

基本上是这样的:
如果您部署拓扑,它们应该是生产级的(这是预期的某种质量级别,并且元组的数量较低)。
如果元组失败,请检查元组是否实际有效。
如果元组有效(例如,由于无法连接到外部数据库而无法插入元组,或类似的情况),请回复元组。
如果元组未形成且永远无法处理(例如,数据库id为文本,而数据库需要整数),则应该 ack ,您将永远无法修复此类问题或将其插入数据库。
应该记录新的异常类型(以及元组内容本身)。您应该检查这些日志,并在将来生成验证元组的规则。并最终添加代码以在将来正确处理它们(etl)。
不要记录一切,否则你的日志文件将是巨大的,是非常有选择性的什么你记录。日志文件的内容应该是有用的,而不是一堆垃圾。
继续这样做,最终你将只涵盖所有的情况。

e0bqpujr

e0bqpujr3#

我们也面临着类似的数据,我们有坏数据进来,导致螺栓无限失败。
为了在运行时解决这个问题,我们又引入了一个bolt,将其命名为“debugbolt”,以供参考。因此,喷口首先将消息发送到此螺栓,然后此螺栓对错误消息执行所需的数据修复,然后将它们发送到所需的螺栓。这样一来,人们就可以即时修复数据错误。
另外,如果需要删除一些消息,实际上可以将ignoreflag从debugbolt传递到原始bolt,如果ignoreflag为true,则原始bolt应该只向spout发送一个ack而不进行处理。

uidvcgyl

uidvcgyl4#

我们只是让我们的bolt在错误流中发出坏元组并确认它。另一个bolt通过将错误写回Kafka主题来处理错误。这使我们能够轻松地引导正常与错误数据流通过拓扑。
唯一一种元组失败的情况是因为某些必需的资源处于脱机状态,例如网络连接、数据库等。。。这些是可重试的错误。其他任何内容都会被定向到错误流,以便进行适当的修复或处理。
当然,这都是假设您不希望发生任何数据丢失。如果你只想尝试一个最大的努力,并忽略几次后重试,那么我会看看其他的选择。

acruukt9

acruukt95#

据我所知,storm并没有对此提供内置支持。
我申请了以下提到的实施:

public class AuditMessageWriter extends BaseBolt {

        private static final long serialVersionUID = 1L;
        Map<Object, Integer> failedTuple = new HashMap<>();

        public AuditMessageWriter() {

        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            //any initialization if u want
        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void execute(Tuple input) {
            try {

            //Write your processing logic
            collector.ack(input);
            } catch (Exception e2) {
            //In case of any exception save the tuple in failedTuple map with a count 1
            //Before adding the tuple in failedTuple map check the count and increase it and fail the tuple

            //if failure count reaches the limit (message reprocess limit) log that and remove from map and acknowledge the tuple
            log(input);
            ExceptionHandler.LogError(e2, "Message IO Exception");
            }

        }

        void log(Tuple input) {

            try {
                //Here u can pass result to dead queue or log that
//And ack the tuple 
            } catch (Exception e) {
                ExceptionHandler.LogError(e, "Exception while logging");
            }
        }

        @Override
        public void cleanup() {
            // To declare output fields.Not required in this alert.
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // To declare output fields.Not required in this alert.
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }

    }

相关问题