Storm 如何在KafkaSpout中解析Kafka消息并设置元组值

xxhby3vn  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(145)

我正在尝试从KafkaSpout读取Kafka消息,并从json中设置从该消息解析的元组值。实际上,我正在创建一个额外的Bolt,它使用KafkaSpout中的json字符串解析一个名为“value”的元组字段。是否可以在Spout中设置这些值?

class ScanConfigKafkaSpout(kafkaUrl: String, kafkaGroup: String, kafkaTopic: String) : KafkaSpout<String, String>(
    KafkaSpoutConfig
        .builder(kafkaUrl, kafkaTopic)
        .setProp(KEY_KAFKA_GROUP, "grp1")
        .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
        .build()
), ComponentId {
    override fun open(conf: MutableMap<String, Any>?, context: TopologyContext?, collector: SpoutOutputCollector?) {
        try {
            logger.debug("<${id()}> Opening ScanConfigKafkaSpout  with ${conf.toString()}")
            super.open(conf, context, collector)
            logger.debug("<${id()}> ScanConfigKafkaSpout opened")
        } catch (t: Throwable) {
            logger.error("<${id()}> Error during opening CrawlScanConfigKafkaSpout", t)
        }
    }

    override fun id(): String = SCAN_CONFIG_KAFKA_SPOUT

    companion object {
        private val logger = LoggerFactory.getLogger(ScanConfigKafkaSpout::class.java)
    }

}
klsxnrf1

klsxnrf11#

您可能需要从IComponent实现方法declareOutputFields(OutputFieldsDeclarer declarer
Storm使用它来序列化属性值和元组配置。
如 * 数据模型 * 一节中的here所述,它表示:

拓扑中的每个节点都必须声明其发出的元组的输出字段。

也有一个java例子给出了该方法。

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("double", "triple"));
}

相关问题