json类型数据:
{
"id": "34cx34fs987",
"time_series": [
{
"time": "2020090300: 00: 00",
"value": 342342.12
},
{
"time": "2020090300: 00: 05",
"value": 342421.88
},
{
"time": "2020090300: 00: 10",
"value": 351232.92
}
]
}
我从Kafka那里得到了json:
spark = SparkSession.builder.master('local').appName('test').getOrCreate()
df = spark.readStream.format("kafka")...
如何操作df获得如下所示的Dataframe:
id time value
34cx34fs987 20200903 00:00:00 342342.12
34cx34fs987 20200903 00:00:05 342421.88
34cx34fs987 20200903 00:00:10 351232.92
2条答案
按热度按时间bejyjqdl1#
使用scala:
如果将模式定义为
然后可以使用sparksql内置函数
from_json
以及explode
```import org.apache.spark.sql.functions._
import spark.implicits._
val df1 = df
.selectExpr("CAST(value as STRING) as json")
.select(from_json('json, schema).as("data"))
.select(col("data.id").as("id"), explode(col("data.time_series")).as("time_series"))
.select(col("id"), col("time_series.time").as("time"), col("time_series.value").as("value"))
+-----------+-----------------+---------+
|id |time |value |
+-----------+-----------------+---------+
|34cx34fs987|20200903 00:00:00|342342.12|
|34cx34fs987|20200903 00:00:05|342421.88|
|34cx34fs987|20200903 00:00:10|351232.92|
+-----------+-----------------+---------+
qrjkbowd2#
pyspark中的示例代码