我正在阅读Kafka的一个主题,做一些数据处理动作,比如删除不必要的字符和url,然后想从dataframe生成一个列表,并使用dataframe结构做进一步的数据分析部分。最后我想把最后的结果写成一个新的Kafka主题
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "myTopic.raw")
.load()
//do some preprocessing tasks
val myDataframe = preprocessing()
val finalDataframe
//convert dataframe into a list
val x = myDataframe.toDF()
val myList = x.select("value").rdd.map(r => r(0)).collect.toList
for(element <- myList) {
finalDataframe = dataAnalysis(element)
}
//write back to kafka
val writeStream = finalDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "myRaw.test")
.start()
writeStream.awaitAnyTermination()
但是我得到了以下错误
线程“main”org.apache.spark.sql.analysisexception中出现异常:必须使用writestream.start()执行具有流源的查询
暂无答案!
目前还没有任何答案,快来回答吧!