如何将一个kafkaspout中的多个(不同的)元组同时发送到bolt?

gtlvzcf8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(400)

我是Apache风暴的新手。
我正在尝试使用apachekafka、storm和espercep引擎开发一个实时流处理系统。
为此,我有一个kafkaspout,它将向bolts发出流(它有我的cep查询)来过滤流。
我已经创建了一个拓扑,我正在尝试在本地集群上运行它
问题是在mybolt中运行的cep查询需要成批的元组来对流执行窗口操作。在我的拓扑结构中,kafkaspout一次只发送一个元组到bolt进行处理。所以我的cep查询没有按预期工作。
我在Storm中使用默认的Kafka。有没有办法我可以发送多个不同的元组一次螺栓?一些配置调整可以做到这一点,或者我需要使我的自定义Kafka了吗?
请帮忙!!
我的拓扑:
topologybuilder=新建topologybuilder();
builder.setspout(“kafkaspout”,new kafkaspout<>(kafkaspoutconfig.builder(“localhost:”+9092,“weatherdata”).setprop(consumerconfig.group\u id\u config,“weather consumer group”).build()),4);
builder.setbolt(“a”,new featureselectionbolt(),2).globalgrouping(“kafkaspout”);
builder.setbolt(“b”,new trenddetectionbolt(),2).shufflegrouping(“a”)
我用两个螺栓和一个喷口。
我的esper查询运行在螺栓a中
从weatherevent中选择first(e),last(e)。win:length(3) 作为e
在这里,我试图从事件流中获取长度为3的窗口中的第一个和最后一个事件。但我得到相同的第一个和最后一个事件,因为kafkaspout一次只发送一个元组。

o75abkj4

o75abkj41#

喷口不能做到这一点,但你可以使用风暴的窗口支持https://storm.apache.org/releases/2.0.0-snapshot/windowing.html,或者只是编写一个聚合螺栓并将其放在喷口和拓扑的其余部分之间。
所以你的拓扑应该是 spout -> aggregator -> feature selection -> trend detection .
我建议您尝试使用内置的窗口支持,但是如果您想编写自己的聚合,您的bolt实际上只需要接收一些元组(例如3),并发出一个包含所有值的新元组。
聚合器螺栓应该执行以下操作

private List<Tuple> buffered;

execute(Tuple input) {
  if (buffered.size != 2) {
    buffered.add(input)
    return
  }
  Tuple first = buffered.get(0)
  Tuple second = buffered.get(1)
  Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
  List<Tuple> anchors = List.of(first, second, input)
  collector.emit(anchors, aggregate)
  collector.ack(first, second, input)
  buffered.clear()
}

这样就得到了一个包含3个输入元组内容的元组。

相关问题