我是pyspark的新手,我想以这样一种方式分解数组值,即每个值都被分配给一个新列。我尝试使用explode,但无法获得所需的输出。下面是我的输出
现有产量
架构:
所需输出:
这是密码
from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local[3]") \
.appName("DataOps") \
.getOrCreate()
dataFrameJSON = spark.read \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.json("data.json")
dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()
sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)
new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()
这是数据:
{
"values" : {
"line" : [
{
"id" : 1,
"period" : {
"start_ts" : "2020-01-01T00:00:00",
"end_ts" : "2020-01-01T00:15:00"
},
"property" : [
{
"name" : "PID",
"val" : "P120E12345678"
},
{
"name" : "EngID",
"val" : "PANELID00000000"
},
{
"name" : "TownIstat",
"val" : "12058091"
},
{
"name" : "ActiveEng",
"val" : "5678.1"
}
]
}
}
2条答案
按热度按时间jvidinwx1#
这是一个通用的解决方案,即使json很混乱(元素的顺序不同或者缺少一些元素),它也能工作
你得先压平,
regexp_replace
拆分“属性”列并最终透视。这也避免了新列名的硬编码。构建Dataframe:
展平和旋转:
输出:
5f0d552i2#
你能用数据代替截图吗?
同时,假设
df
我们需要做的是创建一个新的Dataframe,同时导出vals
从上一个property
数组到新列,并删除property
最后一列:万一
element
属于ArrayType
使用以下选项:explode将数组分解为新行,而不是新列,请参见:pyspark explode