有点像我一个月前的一篇文章的分支。我有一个spark结构的蒸汽应用程序,我在读Kafka。这是我的代码的基本结构。
我创建了spark会话。
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我在小溪里看书
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在Kafka记录中,我将“值”转换为字符串。它从二进制转换为字符串。
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的模式,我尝试将json结构解析为列。但是,这里的问题是,如果数据是“坏的”或不同的格式,那么它与定义的模式不匹配。我需要筛选出与我的架构不匹配的行。不管它们是空的,数字,一些随机的文本,比如“你好”。如果它不是一个json,那么它就不应该继续到下一个dataframe进程
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
如果我通过console producer传入一个空的kafka消息,spark查询就会崩溃
java.util.nosuchelementexception:scala.collection.immutable.nil$.head(list。scala:420)在scala.collection.immutable.nil$.head(列表。scala:417)位于org.apache.spark.sql.catalyst.expressions.jsontostruct.nullsafeeval(jsonexpressions)。scala:500)在org.apache.spark.sql.catalyst.expressions.unaryexpression.eval(表达式。scala:325)位于org.apache.spark.sql.catalyst.expressions.generatedclass$specificpredicate.eval(未知源代码),位于org.apache.spark.sql.execution.filterexec$$anonfun$17$$anonfun$apply$2.apply(basicphysicaloperators)。scala:219)在org.apache.spark.sql.execution.filterexec$$anonfun$17$$anonfun$apply$2.apply(基本物理运算符)。scala:218)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:463)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)位于org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1.apply(foreachsink)。scala:52)在org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1.apply(foreachsink)。scala:49)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:925)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:925)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:1944)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:1944)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(task。scala:99)
暂无答案!
目前还没有任何答案,快来回答吧!