我有一个简单的storm拓扑,它从kafka读取数据,解析和提取消息字段。我想通过一个字段值过滤元组流,并对另一个字段值执行计数聚合。我怎么能在Storm中做到这一点?我还没有找到相应的元组方法(filter,aggregate),所以我应该直接对字段值执行这些函数吗?
这是一个拓扑:
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
我已经设置了kafkatwitterbolt,用于对解析字段进行计数和过滤。我设法只过滤整个值列表,而不是按特定字段:
class KafkaTwitterBolt() extends BaseBasicBolt{
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val tweetValues = input.getValues.asScala.toList
val filterTweets = tweetValues
.map(_.toString)
.filter(_ contains "big data")
val resultAllValues = new Values(filterTweets)
collector.emit(resultAllValues)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
}
}
2条答案
按热度按时间lmvvr0a81#
你的答案是https://stackoverflow.com/a/59805582/8845188 有点不对劲。storm core api允许过滤和聚合,您只需自己编写逻辑即可。
过滤螺栓只是一个螺栓,丢弃一些元组,并传递其他元组。例如,以下螺栓将根据字符串字段过滤出元组:
聚合螺栓只是收集多个元组的螺栓,然后发出锚定在原始元组中的新聚合元组:
注意,对于聚合,您需要扩展
BaseRichBolt
并手动进行确认,因为您希望延迟确认元组,直到它包含在聚合元组中。6rqinv9w2#
原来storm core api不允许这样,为了在任何战场上执行过滤,都应该使用三叉戟(它有内置的过滤功能)。代码如下所示:
过滤函数本身: