如何知道Kafka风暴中的重试次数

ttygqcqt  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(687)

我知道喷动配置 retryLimit 设置可以重新处理消息的次数。
关于 retryLimit ,这是我在spoutconfig.class中找到的消息:
指数后退重试设置。在bolt调用outputcollector.fail()之后,这些消息由exponentialbackoffmsgretrymanager用于重试消息。
我想知道,当元组在我的代码中的任何给定的bolt中被处理时,是否有任何方法可以知道重试的确切次数。
例如,如果我设置 retryLimit=5 它失败了(调用 OutputCollector.fail() )第一次重新处理第二次我想知道这个元组已经失败了1次。
我会很感激你的帮助。
谢谢。

eeq64g8w

eeq64g8w1#

对此没有内置支持。由Kafka记录产生的元组https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/recordtranslator.java 只取决于Kafka的记录,而不是重播的次数。
默认的recordtranslator将发出主题、分区和偏移量作为元组的一部分,因此您可以使用它们来检查您的Bolt以前是否见过元组(假设您有某种状态存储)。为什么螺栓需要知道元组失败了多少次?
编辑:
我认为我们没有将失败计数作为一个选项添加到发出的元组中的原因之一是它不可靠。由于一个元组的失败次数只存在于喷口中的内存中,因此您可能会遇到这样的情况:元组失败,喷口崩溃,并且您永远不会看到一个失败计数大于0的元组。
即使我们在喷口中有一个持久的状态存储,仍然会有失败的元组没有被标记为这样的情况,例如,如果喷口首先崩溃,而先前发出的元组随后失败。重新启动的喷口无法识别先前发出的元组,因此不会将其标记为失败。
在我看来,您实际上需要跟踪的是喷口是否多次发出元组,而不是喷口是否认为它以前失败过。
你也许可以用https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/kafkatuplelistener.java 以及 onEmit 跟踪已多次发射的偏移量。因为它是作为喷口的一部分运行的,所以当元组被确认时,清理状态应该非常简单。仍然有可能丢失一个失败的元组,因为 onEmit 在喷口发出元组后运行,因此如果喷口在发出元组后立即崩溃,则可能会错过失败。也许你可以先考虑一下是否可以围绕这个需求进行设计。

5anewei6

5anewei62#

据我所知,storm并没有对此提供内置支持。
我尝试了以下提到的实现:

public class AuditMessageWriter extends BaseBolt {

        private static final long serialVersionUID = 1L;
        Map<Object, Integer> failedTuple = new HashMap<>();

        public AuditMessageWriter() {

        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            //any initialization if u want
        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void execute(Tuple input) {
            try {

            //Write your processing logic
            collector.ack(input);
            } catch (Exception e2) {
            //In case of any exception save the tuple in failedTuple map with a count 1
            //Before adding the tuple in failedTuple map check the count and increase it and fail the tuple

            //if failure count reaches the limit (message reprocess limit) log that and remove from map and acknowledge the tuple
            log(input);
            ExceptionHandler.LogError(e2, "Message IO Exception");
            }

        }

        void log(Tuple input) {

            try {
                //Here u can pass result to dead queue or log that
//And ack the tuple 
            } catch (Exception e) {
                ExceptionHandler.LogError(e, "Exception while logging");
            }
        }

        @Override
        public void cleanup() {
            // To declare output fields.Not required in this alert.
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // To declare output fields.Not required in this alert.
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }

    }

相关问题