如何使用结构化流从kafka读取json格式的记录?

gstyhher  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(370)

我尝试使用结构化流方法,使用基于dataframe/datasetapi的spark流来从kafka加载数据流。
我使用:
Spark2.10
Kafka0.10
spark-sql-kafka-0-10型
spark kafka数据源已定义基础架构:

|key|value|topic|partition|offset|timestamp|timestampType|

我的数据是json格式的,它们存储在value列中。我正在寻找一种方法,如何从value列中提取底层模式,并将接收到的Dataframe更新为value中存储的列?我尝试了下面的方法,但不起作用:

val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

我有个例外 org.apache.spark.sql.AnalysisException: Can't extract value from value#337; 因为在创建流的时候,里面的值是未知的。。。
你有什么建议吗?

exdqitrt

exdqitrt1#

从Spark的Angular value 只是一个字节序列。它不知道序列化格式或内容。要提取文件,必须首先对其进行解析。
如果将数据序列化为json字符串,则有两个选项。你可以
cast valueStringType 使用 from_json 并提供一个模式:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

或者 castStringType ,按路径提取字段,使用 get_json_object :

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

以及 cast 稍后,选择所需的类型。

相关问题