如何处理kafka作为源的结构化流的每个值?

b09cbbtk  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(239)

我有问题要解决。我有一个Kafka生产者产生的消息(XML)到一个特定的主题。现在,这是一个流数据,我正在使用spark流从主题中获取这些消息。我正在将这些消息写入控制台输出。现在,需要对这些消息执行一个操作(它们基本上是xmls),这些xmls需要被解析,我也为此编写了解析器。现在,当我将xml硬编码为字符串变量时,解析器工作正常。但是,我需要从sparkDataframe获取XML。这就是我到目前为止从Kafka那里得到信息所做的。

val spark = SparkSession.builder
  .appName("kafka-ingestion")
  .master("local")
  .config("spark.sql.orc.impl","native")
  .getOrCreate()

 import spark.implicits._

 // Gets the xpath
 //val xpaths = get_xpaths(filename)
 val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("subscribe", "xmlpoc")
   .option("encoding","UTF-8")
   .option("startingOffsets", "earliest")
   .load()

val df2 = df.selectExpr( "CAST(value AS STRING)")

df2.writeStream.format("console")
  .outputMode("append")
  .start().awaitTermination()

// Now perform parsing on each value of df2.

当我这样做时,我可以看到控制台Dataframe中的所有消息。但是,我希望对所有这些消息执行一个操作(比如这里的解析)。我不知道怎么做。我尝试了自定义项,但它们在我的值列之外添加了另一列。所以,似乎什么都没用。谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题