java在storm拓扑中存储中间数据

2ledvvac  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(279)

我正在读Kafka两个主题的资料。可以描述为: Topic1 data content: VehicleRegistrationNo, Timestamp, Location Topic2 data content: VehicleRegistrationNo, Timestamp, Speed 我需要根据最接近的时间戳合并这两条消息,并将元组作为消息输出 VehicleRegistrationNo, Timestamp, Speed, Location . 我通过两个喷口阅读这些主题 S1 以及 S2 . 然后螺栓 MergeS1andS2 从这些喷口获取输入,并作为: if (message from S1): save present message from S1 along with 2 previous messages (3 consecutive locations) to LocationHashMap elseif (message from S2): get locations details from LocationHashmap and merge speed for same Vehicle with location info, then send details to next bolt as tuple 我知道hashmap不是在多节点中存储数据的有效方法。所以我读了关于trident和redis来存储中间数据。在这个可以在分布式拓扑中工作的senario中,我应该使用什么来存储中间数据。

kognpnkq

kognpnkq1#

任何没有sql的数据库都可以做到这一点。选择唯一标识元组的键,而不管主题来自哪个。逻辑是这样的:
尝试从数据库中查找元组。
如果数据库中不存在元组,请将从主题中获取的元组存储到数据库中。
如果存在元组,则合并数据库元组和主题元组的内容,并将结果元组存储回数据库(覆盖数据库中上一个元组的内容)

相关问题