pyspark dataframe with json column将json元素聚合到一个新列中并删除重复的

yfwxisqw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(470)

我试图在databricks上读取带有json列的pysparkDataframe。
Dataframe:

  1. year month json_col
  2. 2010 09 [{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]
  3. 2010 09 [{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}]
  4. 2007 10 [{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}]
  5. 2007 10 [{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}]

我需要一个新的Dataframe与所有重复的“p\u id”被删除,并按年度和月份聚合

  1. year month p_id (string)
  2. 2010 09 ["vfdvtbe", "cdscs", "usdvwq", "ujhbe", "yjev"]
  3. 2007 10 ["ukerge", "ikrtw", "ikwca", "unvwq", "cqwcq"]

新列“p\u id”是一个数组字符串。我想数一数什么是不同的“p\u id”以及每年和每月有多少个。并且,还要删除在同一年和同一个月出现的重复元素。
我的代码:

  1. from pyspark.sql.types import *
  2. from pyspark.sql.functions import *
  3. schema = ArrayType(StructType(
  4. [
  5. StructField('p_id', StringType(), True)
  6. ]
  7. ))
  8. schema = ArrayType(MapType(StringType(),StringType()))
  9. t = ff.withColumn("data",F.explode(F.from_json(F.col("json_col"),schema))).withColumn("data",F.when(F.col("data")["product_id"].cast("string").isNotNull(),F.col("data")["product_id"])).filter(F.col("data").isNotNull()).drop("json_col")
  10. display(t)

我不确定这是否可以删除重复?
谢谢

zrfyljdw

zrfyljdw1#

使用 flatten, array_distinctgroupBy, collect_list 本案例的功能。 Example: ```
df.show(10,False)

+----+-----+---------------------------------------------------------+

|year|month|json_col |

+----+-----+---------------------------------------------------------+

|2010|09 |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]|

|2010|09 |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}] |

|2007|10 |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}] |

|2007|10 |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}] |

+----+-----+---------------------------------------------------------+

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(StructType(
[
StructField('p_id', StringType(), True)
]
))

df1=df.withColumn("ff",from_json(col("json_col"),schema)).
select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp"))

df1.groupBy("year","month").
agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).
show(10,False)

+----+-----+-------------------------------------------+

|year|month|p_id |

+----+-----+-------------------------------------------+

|2010|09 |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]|

|2007|10 |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] |

+----+-----+-------------------------------------------+

展开查看全部

相关问题