我是Apache风暴的新手。
我正在尝试使用apachekafka、storm和espercep引擎开发一个实时流处理系统。
为此,我有一个kafkaspout,它将向bolts发出流(它有我的cep查询)来过滤流。
我已经创建了一个拓扑,我正在尝试在本地集群上运行它
问题是在mybolt中运行的cep查询需要成批的元组来对流执行窗口操作。在我的拓扑结构中,kafkaspout一次只发送一个元组到bolt进行处理。所以我的cep查询没有按预期工作。
我在Storm中使用默认的Kafka。有没有办法我可以发送多个不同的元组一次螺栓?一些配置调整可以做到这一点,或者我需要使我的自定义Kafka了吗?
请帮忙!!
我的拓扑:
topologybuilder=新建topologybuilder();
builder.setspout(“kafkaspout”,new kafkaspout<>(kafkaspoutconfig.builder(“localhost:”+9092,“weatherdata”).setprop(consumerconfig.group\u id\u config,“weather consumer group”).build()),4);
builder.setbolt(“a”,new featureselectionbolt(),2).globalgrouping(“kafkaspout”);
builder.setbolt(“b”,new trenddetectionbolt(),2).shufflegrouping(“a”)
我用两个螺栓和一个喷口。
我的esper查询运行在螺栓a中
从weatherevent中选择first(e),last(e)。win:length(3) 作为e
在这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但我得到相同的第一个和最后一个事件,因为kafkaspout一次只发送一个元组。
1条答案
按热度按时间o75abkj41#
喷口不能做到这一点,但你可以使用风暴的窗口支持https://storm.apache.org/releases/2.0.0-snapshot/windowing.html,或者只是编写一个聚合螺栓并将其放在喷口和拓扑的其余部分之间。
所以你的拓扑应该是
spout -> aggregator -> feature selection -> trend detection
.我建议您尝试使用内置的窗口支持,但是如果您想编写自己的聚合,您的bolt实际上只需要接收一些元组(例如3),并发出一个包含所有值的新元组。
聚合器螺栓应该执行以下操作
这样就得到了一个包含3个输入元组内容的元组。