我对storm还比较陌生,最近我将我的bolt改为继承irichbolt而不是basebasicbolt,这意味着我现在负责根据自己的逻辑对tuple进行确认和失败。
我的拓扑结构是这样的:bolta向boltb和c发出相同的元组,每个元组都将数据持久化到cassandra。这些操作不是幂等的,并且包括对两个不同计数器列族的更新。我只对失败元组和在cassandra的某些异常中重放它感兴趣(不是读/写超时,只有queryconstancy或validation异常)。问题是,如果bolt b失败,相同的元组将从喷口重放,并再次发送到bolt c,后者已经成功地保存了its数据,从而创建了错误的数据。
我已经试着理解如何准确地确认(通过阅读:http://www.slideshare.net/andreaiacono/storm-44638254)但我不明白在我描述的情况下会发生什么。
我认为正确解决这个问题的唯一方法是使用相同的输入源创建另一个喷口:喷口1->bolt a->bolt b和喷口1'->bolt a'->bolt c',或者将两个列族的数据合并到一个中,将它们保存在同一批处理语句中,该批处理语句在bolt b和c中完成。
这是正确的还是我遗漏了什么?有没有其他可能的解决方案来正确地确认这些元组?
谢谢。
1条答案
按热度按时间jdzmm42g1#
您没有在bolt b或c中说明要等待多长时间来重试失败的更新,但是您可以添加更多的流,而不是在bolt b中彻底失败元组。将蝎子尾输出流从螺栓b添加回同一螺栓b。如果bolt b中的更新失败,则将元组写入scorpion tail输出流,以便它作为输入再次返回bolt b,只是从第二个流返回。您可以充实元组以保存时间戳,这样您在bolt b上对新流的处理逻辑就可以查看上次尝试的时间,如果没有足够的时间,您可以再次将其写入scorpion尾流。当然你也会为博尔特c做同样的事。
如果您想等待很长时间来重试tuple(风暴术语中的long),您可以用kafka主题替换蝎子尾流以及必要的喷口。