消息在喷口和螺栓之间掉落

uurity8g  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(317)

我实现了一个heron拓扑,它从kafka队列读取消息。因此,我的拓扑有一个kafka喷口和一个统计从队列中读取的消息数的螺栓。
当我说 10000 消息进入Kafka队列,我可以看到所有的消息正在收到的Kafka喷口在苍鹭拓扑,但有几个消息正在螺栓丢失。
以下是heron的拓扑设置

Config config = Config.newBuilder()
                    .setUserConfig("topology.max.spout.pending", 100000)

                    .setUserConfig("topology.message.timeout.secs", 100000)
                    .setNumContainers(1)
                    .setPerContainerCpu(3)
                    .setPerContainerRamInGigabytes(4)
                    .setDeliverySemantics("ATLEAST_ONCE")
                    .build();

任何提示都会有帮助。
编辑:我正在使用heron的streamlet api。我把计数螺栓换成了 log 但是在的日志中看到了同样的消息 log 螺栓

processingGraphBuilder.newSource(kafkaSource)
                      .log();

编辑2:我通过完全删除streamlet api解决了这个问题。我重新实现了所有使用基本喷口和螺栓api,并在喷口 Package 。这解决了这个问题。我猜这是因为streamlet api中的喷口没有发生回缩

s6fujrry

s6fujrry1#

答案很简单:不应该掉下去。
有几个问题:-在heronui中,您的喷口的所有时间发射和确认计数是多少?-在heronui中,螺栓的所有时间执行、确认和失败计数是多少?

am46iovg

am46iovg2#

当您说消息被丢弃时,您看到的是失败计数度量中注册的失败,还是bolt中的execute计数与喷口的emit计数不一致?
风暴兼容模式下的度量是基于一个样本计算的(我认为默认值是5%)。因此,计数可能会超出这一差距。例如,根据流的采样时间,可以发送100个元组,执行计数可以是80或120。

相关问题