dataframe—从kafka主题中读取数据,并使用scala spark将数据存储到列表中

gab6jxml  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(186)

我正在阅读Kafka的一个主题,做一些数据处理动作,比如删除不必要的字符和url,然后想从dataframe生成一个列表,并使用dataframe结构做进一步的数据分析部分。最后我想把最后的结果写成一个新的Kafka主题

val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "myTopic.raw") 
      .load()

//do some preprocessing tasks

val myDataframe = preprocessing()

val finalDataframe
//convert dataframe into a list

val x = myDataframe.toDF()
    val myList =   x.select("value").rdd.map(r => r(0)).collect.toList
for(element <- myList) {

 finalDataframe =  dataAnalysis(element)
}

//write back to kafka

val writeStream = finalDataframe 
     .writeStream
     .outputMode("append")
     .format("kafka")
     .option("kafka.bootstrap.servers", hostAddress)
     .option("topic", "myRaw.test")
     .start()
     writeStream.awaitAnyTermination()

但是我得到了以下错误
线程“main”org.apache.spark.sql.analysisexception中出现异常:必须使用writestream.start()执行具有流源的查询

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题