如何防止pyspark使用explode()复制数据?

gzszwxb4  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(446)
[{
        "Data": [{
                "Customer": [{
                        "Prices": {
                            "USD": [[86, "2.18"], [172, "1.67"], [344, "1.52"]]
                        },
                        "Seller": {
                            "Name": "Customer1"
                        }
                    }, {
                        "Prices": {
                            "USD": [[1, "1.99"], [100, "1.55"], [500, "1.24"]]
                        },
                        "Seller": {
                            "Name": "Customer2"
                        }
                    }
                ]
            }
        ],
        "PartNumber": "ABC"
    }
]

使用上面的json,我试图创建一个表,其中显示每个客户的一行及其唯一的价格折扣。

df1 = dfJsonFile.withColumn("Customer", explode("Data.Customer"))

df2 = df1.select("*").withColumn("PriceArray", explode("Customer.Prices.USD")
                    ).withColumn("PriceBreaks", explode("PriceArray"))

df3 = df2.withColumn("Quantity", col("PriceBreaks").getItem(0)
        ).withColumn("Price", col("PriceBreaks").getItem(1))

df4 = df3.select("Customer.Seller.Name", "PartNumber", "Quantity", "Price")
+----------------------+----------+--------+-----+
|Name                  |PartNumber|Quantity|Price|
+----------------------+----------+--------+-----+
|[Customer1, Customer2]|ABC       |86      |2.18 |
|[Customer1, Customer2]|ABC       |172     |1.67 |
|[Customer1, Customer2]|ABC       |344     |1.52 |
|[Customer1, Customer2]|ABC       |1       |1.99 |
|[Customer1, Customer2]|ABC       |100     |1.55 |
|[Customer1, Customer2]|ABC       |500     |1.24 |
+----------------------+----------+--------+-----+

我需要将客户解析出来,这样如果我分解客户,就会得到重复(不正确)的结果:

df5 = df4.withColumn("Customer", explode("Name"))

df5.select("Customer", "PartNumber", "Quantity", "Price").show()
+---------+----------+--------+-----+
| Customer|PartNumber|Quantity|Price|
+---------+----------+--------+-----+
|Customer1|       ABC|      86| 2.18|
|Customer2|       ABC|      86| 2.18|
|Customer1|       ABC|     172| 1.67|
|Customer2|       ABC|     172| 1.67|
|Customer1|       ABC|     344| 1.52|
|Customer2|       ABC|     344| 1.52|
|Customer1|       ABC|       1| 1.99|
|Customer2|       ABC|       1| 1.99|
|Customer1|       ABC|     100| 1.55|
|Customer2|       ABC|     100| 1.55|
|Customer1|       ABC|     500| 1.24|
|Customer2|       ABC|     500| 1.24|
+---------+----------+--------+-----+

我做错什么了?以下是我试图返回的结果:

Customer    Quantity    Price
Customer1   86          2.18
Customer1   172         1.67
Customer1   344         1.52
Customer2   1           1.99
Customer2   100         1.55
Customer2   500         1.24
qmelpv7a

qmelpv7a1#

问题是,您正在使用explode添加列,而您希望选择不希望复制的列,然后将这些列分解,如下所示:

df1 = dfJsonFile.withColumn("Customer", explode("Data.Customer"))
df2 = df1.select(explode("Customer")).select("col.*")
df3 = df2.select(col("Seller.Name").alias("name"), explode("Prices.USD"))
+---------+-----------+
|     name|        col|
+---------+-----------+
|Customer1| [86, 2.18]|
|Customer1|[172, 1.67]|
|Customer1|[344, 1.52]|
|Customer2|  [1, 1.99]|
|Customer2|[100, 1.55]|
|Customer2|[500, 1.24]|
+---------+-----------+

我相信你知道如何结束这一切:-)

相关问题