使用pyspark将数组值分解为多列

shyt4zoc  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(663)

我是pyspark的新手,我想以这样一种方式分解数组值,即每个值都被分配给一个新列。我尝试使用explode,但无法获得所需的输出。下面是我的输出
现有产量
架构:
所需输出:
这是密码

from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
    .master("local[3]") \
    .appName("DataOps") \
    .getOrCreate()

dataFrameJSON = spark.read \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .json("data.json")

dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()

sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)

new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()

这是数据:

{
        "values" : {
            "line" : [
                {
                    "id" : 1,
                    "period" : {
                        "start_ts" : "2020-01-01T00:00:00",
                        "end_ts" : "2020-01-01T00:15:00"
                    },
                    "property" : [
                        {
                            "name" : "PID",
                            "val" : "P120E12345678"
                        },
                        {
                            "name" : "EngID",
                            "val" : "PANELID00000000"
                        },
                        {
                            "name" : "TownIstat",
                            "val" : "12058091"
                        },
                        {
                            "name" : "ActiveEng",
                            "val" : "5678.1"
                        }
                    ]
                }
}
jvidinwx

jvidinwx1#

这是一个通用的解决方案,即使json很混乱(元素的顺序不同或者缺少一些元素),它也能工作
你得先压平, regexp_replace 拆分“属性”列并最终透视。这也避免了新列名的硬编码。
构建Dataframe:

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("id", IntegerType()), StructField("start_ts", StringType()), StructField("end_ts", StringType()), \
    StructField("property", ArrayType(StructType(  [StructField("name", StringType()),  StructField("val", StringType())]    )))])

data = [[1, "2010", "2020", [["PID", "P123"], ["Eng", "PA111"], ["Town", "999"], ["Act", "123.1"]]],\
         [2, "2011", "2012", [["PID", "P456"], ["Eng", "PA222"], ["Town", "777"], ["Act", "234.1"]]]]

df = spark.createDataFrame(data,schema=schema)

df.show(truncate=False)
+---+--------+------+------------------------------------------------------+    
|id |start_ts|end_ts|property                                              |
+---+--------+------+------------------------------------------------------+
|1  |2010    |2020  |[[PID, P123], [Eng, PA111], [Town, 999], [Act, 123.1]]|
|2  |2011    |2012  |[[PID, P456], [Eng, PA222], [Town, 777], [Act, 234.1]]|
+---+--------+------+------------------------------------------------------+

展平和旋转:

df_flatten = df.rdd.flatMap(lambda x: [(x[0],x[1], x[2], y) for y in x[3]]).toDF(['id', 'start_ts', 'end_ts', 'property'])\
            .select('id', 'start_ts', 'end_ts', col("property").cast("string"))

df_split = df_flatten.select('id', 'start_ts', 'end_ts', regexp_replace(df_flatten.property, "[\[\]]", "").alias("replacced_col"))\
                .withColumn("arr", split(col("replacced_col"), ", "))\
                .select(col("arr")[0].alias("col1"), col("arr")[1].alias("col2"), 'id', 'start_ts', 'end_ts')

final_df = df_split.groupby(df_split.id,)\
                        .pivot("col1")\
                        .agg(first("col2"))\
                        .join(df,'id').drop("property")

输出:

final_df.show()
+---+-----+-----+----+----+--------+------+
| id|  Act|  Eng| PID|Town|start_ts|end_ts|
+---+-----+-----+----+----+--------+------+
|  1|123.1|PA111|P123| 999|    2010|  2020|
|  2|234.1|PA222|P456| 777|    2011|  2012|
+---+-----+-----+----+----+--------+------+
5f0d552i

5f0d552i2#

你能用数据代替截图吗?
同时,假设 df 我们需要做的是创建一个新的Dataframe,同时导出 vals 从上一个 property 数组到新列,并删除 property 最后一列:

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0].val).withColumn("EngID", col("property")[1].val).withColumn("TownIstat", col("property")[2].val).withColumn("ActiveEng", col("property")[3].val).drop("property")

万一 element 属于 ArrayType 使用以下选项:

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0][1]).withColumn("EngID", col("property")[1][1]).withColumn("TownIstat", col("property")[2][1]).withColumn("ActiveEng", col("property")[3][1]).drop("property")

explode将数组分解为新行,而不是新列,请参见:pyspark explode

相关问题