我正在尝试从KafkaSpout读取Kafka消息,并从json中设置从该消息解析的元组值。实际上,我正在创建一个额外的Bolt,它使用KafkaSpout中的json字符串解析一个名为“value”的元组字段。是否可以在Spout中设置这些值?
class ScanConfigKafkaSpout(kafkaUrl: String, kafkaGroup: String, kafkaTopic: String) : KafkaSpout<String, String>(
KafkaSpoutConfig
.builder(kafkaUrl, kafkaTopic)
.setProp(KEY_KAFKA_GROUP, "grp1")
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.build()
), ComponentId {
override fun open(conf: MutableMap<String, Any>?, context: TopologyContext?, collector: SpoutOutputCollector?) {
try {
logger.debug("<${id()}> Opening ScanConfigKafkaSpout with ${conf.toString()}")
super.open(conf, context, collector)
logger.debug("<${id()}> ScanConfigKafkaSpout opened")
} catch (t: Throwable) {
logger.error("<${id()}> Error during opening CrawlScanConfigKafkaSpout", t)
}
}
override fun id(): String = SCAN_CONFIG_KAFKA_SPOUT
companion object {
private val logger = LoggerFactory.getLogger(ScanConfigKafkaSpout::class.java)
}
}
1条答案
按热度按时间klsxnrf11#
您可能需要从
IComponent
实现方法declareOutputFields(OutputFieldsDeclarer declarer
。Storm使用它来序列化属性值和元组配置。
如 * 数据模型 * 一节中的here所述,它表示:
拓扑中的每个节点都必须声明其发出的元组的输出字段。
也有一个java例子给出了该方法。