我有一个用例,其中我有一个消息,必须推送到Kafka的一些主题。目前在较高级别上,该方法如下所示:
pushToTopics(String msg){
pushToTopicA(msg);
pushToTopicB(msg);
pushToTopicC(msg);
.
.
.
pushToTopicN(msg);
}
每个pushtotopopicx(msg)都有一个条件,当满足该条件时,应该将消息发布到相应的主题。现在,所有这些逻辑都在终端螺栓上,为了推送消息,我们使用kafkaproducer。
我正在研究如何将其分解为特定主题的螺栓,更重要的是使用kafkabolts来推送消息。
风暴(1.2.2版)有可能吗?我最近看到一个pr被合并了,它可以创建自定义回调,但是我们没有。
2条答案
按热度按时间hmtdttj41#
kafkabolt可以根据元组决定发送到哪个主题。你可以用一个分割螺栓把你的信息分割成
N
消息,每个都有不同的目标主题,然后将它们全部发送到kafkabolt。5cg8jx4n2#
我最终解决这个问题的方法是创建单独的流,每个流都绑定到目标主题。然后通过特定流上的collector.emit,我能够将消息分散到各个bolt上,最终使用kafkabolt推送到kafka。