我使用ibmspl代码创建了一个kafka操作符。现在我必须用这个kafka操作符替换现有ibmsplmm代码的输入。当前代码从不同的位置接收输入,现在我想将输入更改为kafka消费者。帮我解决这个问题。谢谢。
acruukt91#
很抱歉。因此,我创建了一个kafka操作符,它从python代码接收输入消息并将其写入文本文件。复合kafka_read_op{param expression$topic:“dns_ext”;图表
stream<rstring message> KafkaStream = KafkaConsumer() { param propertiesFile : getThisToolkitDir() + "/etc/consumer.properties" ; topic : $topic; } () as Sink = FileSink(KafkaStream) { param file : "/tmp/output.txt"; format : txt; append : true; flush : 1u; } //End of FileSink.
我有一个IBMSPL代码,它接收来自另一个spl代码的输入,看起来像,。public composite externaldnsimportsubscriptionwrapper(output importsubstream){param expression$subscription:getsubmissiontimevalue(“externalfeeder::subscription”,getcompiletimevalue(“externalfeeder::subscription”,“kind==\”\“”));键入$outputtype;graph stream importsubstream=import(){param applicationscope:“com.zel.streams.cti”;订阅:kind==“enrichmentdata”&&category==“externaldnszel”&&id==getchannel()//订阅:kind==“enrichmentdata”&&category==“externaldnszel”&&id==0;}现在我需要用我创建的kafka操作符替换上面的代码。所以我需要发送Kafka的输出来导入还是删除整个导入代码并在这里创建一个Kafka。如果我这样做,我需要改变在其他连接代码导入。我找不到任何示例可以将Kafka的输出发送到导入。
1条答案
按热度按时间acruukt91#
很抱歉。因此,我创建了一个kafka操作符,它从python代码接收输入消息并将其写入文本文件。
复合kafka_read_op{param expression$topic:“dns_ext”;图表
我有一个IBMSPL代码,它接收来自另一个spl代码的输入,看起来像,。
public composite externaldnsimportsubscriptionwrapper(output importsubstream){param expression$subscription:getsubmissiontimevalue(“externalfeeder::subscription”,getcompiletimevalue(“externalfeeder::subscription”,“kind==\”\“”));键入$outputtype;graph stream importsubstream=import(){param applicationscope:“com.zel.streams.cti”;订阅:kind==“enrichmentdata”&&category==“externaldnszel”&&id==getchannel()//订阅:kind==“enrichmentdata”&&category==“externaldnszel”&&id==0;}
现在我需要用我创建的kafka操作符替换上面的代码。所以我需要发送Kafka的输出来导入还是删除整个导入代码并在这里创建一个Kafka。如果我这样做,我需要改变在其他连接代码导入。我找不到任何示例可以将Kafka的输出发送到导入。