我实现了一个heron拓扑,它从kafka队列读取消息。因此,我的拓扑有一个kafka喷口和一个统计从队列中读取的消息数的螺栓。
当我说 10000
消息进入Kafka队列,我可以看到所有的消息正在收到的Kafka喷口在苍鹭拓扑,但有几个消息正在螺栓丢失。
以下是heron的拓扑设置
Config config = Config.newBuilder()
.setUserConfig("topology.max.spout.pending", 100000)
.setUserConfig("topology.message.timeout.secs", 100000)
.setNumContainers(1)
.setPerContainerCpu(3)
.setPerContainerRamInGigabytes(4)
.setDeliverySemantics("ATLEAST_ONCE")
.build();
任何提示都会有帮助。
编辑:我正在使用heron的streamlet api。我把计数螺栓换成了 log
但是在的日志中看到了同样的消息 log
螺栓
processingGraphBuilder.newSource(kafkaSource)
.log();
编辑2:我通过完全删除streamlet api解决了这个问题。我重新实现了所有使用基本喷口和螺栓api,并在喷口 Package 。这解决了这个问题。我猜这是因为streamlet api中的喷口没有发生回缩
2条答案
按热度按时间s6fujrry1#
答案很简单:不应该掉下去。
有几个问题:-在heronui中,您的喷口的所有时间发射和确认计数是多少?-在heronui中,螺栓的所有时间执行、确认和失败计数是多少?
am46iovg2#
当您说消息被丢弃时,您看到的是失败计数度量中注册的失败,还是bolt中的execute计数与喷口的emit计数不一致?
风暴兼容模式下的度量是基于一个样本计算的(我认为默认值是5%)。因此,计数可能会超出这一差距。例如,根据流的采样时间,可以发送100个元组,执行计数可以是80或120。