我有问题要解决。我有一个Kafka生产者产生的消息(XML)到一个特定的主题。现在,这是一个流数据,我正在使用spark流从主题中获取这些消息。我正在将这些消息写入控制台输出。现在,需要对这些消息执行一个操作(它们基本上是xmls),这些xmls需要被解析,我也为此编写了解析器。现在,当我将xml硬编码为字符串变量时,解析器工作正常。但是,我需要从sparkDataframe获取XML。这就是我到目前为止从Kafka那里得到信息所做的。
val spark = SparkSession.builder
.appName("kafka-ingestion")
.master("local")
.config("spark.sql.orc.impl","native")
.getOrCreate()
import spark.implicits._
// Gets the xpath
//val xpaths = get_xpaths(filename)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "xmlpoc")
.option("encoding","UTF-8")
.option("startingOffsets", "earliest")
.load()
val df2 = df.selectExpr( "CAST(value AS STRING)")
df2.writeStream.format("console")
.outputMode("append")
.start().awaitTermination()
// Now perform parsing on each value of df2.
当我这样做时,我可以看到控制台Dataframe中的所有消息。但是,我希望对所有这些消息执行一个操作(比如这里的解析)。我不知道怎么做。我尝试了自定义项,但它们在我的值列之外添加了另一列。所以,似乎什么都没用。谢谢你的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!