我用Kafka风暴整合。kafka将数据加载到队列中,kafka-spout将提取数据和进程。我有下面的设计。
Kafka -> Queue -> KafkaSpout -> Process1 Bolt -> Process2 Bolt
问题是,如果process2 bolt处理数据花费的时间较长,则kafkaspout将失败,并再次尝试从队列中读取数据,这将导致重复记录。
如果博尔特的处理速度很慢,为什么Kafka普特会将其视为失败?解决办法是什么?是否有任何超时或任何类似的属性,我必须设置在风暴?
1条答案
按热度按时间ttisahbt1#
如果处理一个元组花费的时间太长,storm将失败,默认为30秒。由于storm保证处理,一旦失败,kafka喷口将重放相同的消息,直到元组成功处理。
来自doc
如果元组的消息树未能在指定的超时时间内完全处理,则认为元组失败。可以使用config.topology\ message\ u timeout\ u secs配置在特定于拓扑的基础上配置此超时,默认值为30秒