使用sparkDataframe阅读Kafka主题

7uzetpgm  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(560)

我想在kafka主题的顶部创建dataframe,然后我想将该dataframe注册为temp表,以便对数据执行减号操作。我写了下面的代码。但是在查询已注册的表时,出现错误“org.apache.spark.sql.analysisexception:必须使用writestream.start();”执行流源查询

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "SERVER******").option("subscribe", "TOPIC_NAME").option("startingOffsets", "earliest").load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))

val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")

personDF.registerTempTable("final_df1")

spark.sql("select * from final_df1").show

error:

deyfvvtc

deyfvvtc1#

流式Dataframe不支持 show() 方法。当你打电话的时候 start() 方法,它将启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是consolesink,它将数据输出到控制台。你不需要打电话 show() .
拆下下面的线,

personDF.registerTempTable("final_df1")
spark.sql("select * from final_df1").show

然后加上下面的行或相等的行,

val query1 = personDF.writeStream.queryName("final_df1").format("memory").outputMode("append").start()
query1.awaitTermination()
niknxzdl

niknxzdl2#

使用 memory 接收器而不是寄存器可清空。检查以下代码。

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "SERVER******")
.option("subscribe", "TOPIC_NAME")
.option("startingOffsets", "earliest")
.load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))

val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")

personDF
.writeStream
.outputMode("append")
.format("memory")
.queryName("final_df1").start()

spark.sql("select * from final_df1").show(10,false)
wlzqhblo

wlzqhblo3#

“org.apache.spark.sql.analysisexception:必须使用writestream.start();”执行具有流源的查询
我还使用了start()方法,得到以下错误。
2011年8月20日00:59:30错误streaming.microbatchexecution:查询final_df1[id=1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5,runid=7059f3d2-21ec-43c4-b55a-8c735272bf0f]终止,错误为java.lang.abstractmethoderror
注意:编写这个脚本的主要目的是对这个数据编写减号查询,并将它与集群上的一个寄存器表进行比较。所以,总结一下,如果我要从oracle数据库发送kafka主题中的1000条记录,我将在oracle表的顶部创建dataframe,将其注册为temp表,并对kafka主题执行相同的操作。然后我想在源(oracle)和目标(kafka主题)之间运行减号查询。在源和目标之间执行100%数据验证(是否可以将kafka主题注册为临时表?)

相关问题