如何使用结构化流媒体从spark发布到kafka?

t98cgbkg  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(568)

我正在编写一个spark应用程序,它从一个kafka主题读取消息,在db中查找记录,构造新消息并将它们发布到另一个kafka主题。我的代码是这样的-

val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "input-kafka-topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[InputMessage] = parseMessages(r)
  }

inputMessagesDataSet
  .writeStream
  .foreachBatch(processMessages _)
  .trigger(trigger)
  .start
  .awaitTermination

def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
   // fetch stuff from DB and build a DataSet[OutputMessage]
   val outputMessagesDataSet: DataSet[OutputMessage] = ...
   // now queue to another kafka topic
  outputMessagesDataSet
      .writeStream
      .trigger(trigger)
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("topic", "output-kafka-topic")
      .option("checkpointLocation", loc)
      .start
      .awaitTermination
}

但我有个错误 org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame; 在线 outputMessagesDataSet.writeStream 这似乎是因为 outputMessagesDataSet 不是使用创建的 readStream . 我之所以不建造 DataSet[OutputMessage] 原版 mapPartitions() 是因为获取db记录等所需的类不可序列化,所以它抛出 NotSerializableException .
如何为kafka构建新的数据集和队列?

rkkpypqq

rkkpypqq1#

foreachBatch 接受静态数据集,因此需要使用 write ,不是 writeStream 或者,你可以 writeStream.format("kafka") 不使用 forEachBatch

相关问题