spark structed streaming从kafka读取嵌套的json并将其展平

66bbxpm5  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(424)

json类型数据:

  1. {
  2. "id": "34cx34fs987",
  3. "time_series": [
  4. {
  5. "time": "2020090300: 00: 00",
  6. "value": 342342.12
  7. },
  8. {
  9. "time": "2020090300: 00: 05",
  10. "value": 342421.88
  11. },
  12. {
  13. "time": "2020090300: 00: 10",
  14. "value": 351232.92
  15. }
  16. ]
  17. }

我从Kafka那里得到了json:

  1. spark = SparkSession.builder.master('local').appName('test').getOrCreate()
  2. df = spark.readStream.format("kafka")...

如何操作df获得如下所示的Dataframe:

  1. id time value
  2. 34cx34fs987 20200903 00:00:00 342342.12
  3. 34cx34fs987 20200903 00:00:05 342421.88
  4. 34cx34fs987 20200903 00:00:10 351232.92
bejyjqdl

bejyjqdl1#

使用scala:

如果将模式定义为

  1. val schema: StructType = new StructType()
  2. .add("id", StringType)
  3. .add("time_series", ArrayType(new StructType()
  4. .add("time", StringType)
  5. .add("value", DoubleType)
  6. ))

然后可以使用sparksql内置函数 from_json 以及 explode ```
import org.apache.spark.sql.functions._
import spark.implicits._

val df1 = df
.selectExpr("CAST(value as STRING) as json")
.select(from_json('json, schema).as("data"))
.select(col("data.id").as("id"), explode(col("data.time_series")).as("time_series"))
.select(col("id"), col("time_series.time").as("time"), col("time_series.value").as("value"))

  1. 您的输出将是

+-----------+-----------------+---------+
|id |time |value |
+-----------+-----------------+---------+
|34cx34fs987|20200903 00:00:00|342342.12|
|34cx34fs987|20200903 00:00:05|342421.88|
|34cx34fs987|20200903 00:00:10|351232.92|
+-----------+-----------------+---------+

展开查看全部
qrjkbowd

qrjkbowd2#

pyspark中的示例代码

  1. df2 = df.select("id", f.explode("time_series").alias("col"))
  2. df2.select("id", "col.time", "col.value").show()

相关问题