我有一个如下所示的 Dataframe :
rdd = spark.sparkContext.parallelize([
(22,'fl1.variant,fl2.variant,fl3.control','xxx','yyy','zzz'),
(23,'fl1.variant,fl2.neither,fl3.control','xxx','yyy','zzz'),
(24,'fl4.variant,fl2.variant,fl4.variant','xxx1','yyy1','zzz1'),
(25,'fl3.control,fl3.control,fl3.variant','xxx1','yyy1','zzz1')
])
df = rdd.toDF(['Date','Type','Data1','Data2','Data3'])
示例数据:
| 日期|型号|数据1|数据2|数据3|
| - -|- -|- -|- -|- -|
| 二十二个|fl 1.变量,fl 2.变量,fl 3.控制|xxx|年|兹兹|
| 二十三个|fl 1.变异体,fl 2.两者都不,fl 3.对照|xxx|年|兹兹|
| 二十四|fl 4.变体,fl 2.变体,fl 4.变体|第1章|yyy1| zzz 1语言|
| 二十五个|fl 3.控制,fl 3.控制,fl 3.变量|第1章|yyy1| zzz 1语言|
我需要根据Data1
、Data2
和Data3
列标识Type
列数据的不同值。Type
列的数据类型是由,
分隔的字符串。
基于Data1
、Data2
、Data3
,我需要合并所有数据并确定Type
列的唯一值。
预期输出:
| 数据1|数据2|数据3|类型列表(_L)|
| - -|- -|- -|- -|
| xxx|年|兹兹|[fl 1.变体,fl 2.变体,fl 3.对照,fl 2.两者都不]|
| 第1章|yyy1| zzz 1语言|[fl 4.变量,fl 2.变量,fl 3.控制,fl 3.变量]|
我尝试了如下方法,但无法获得预期的非重复值
df1 = df.sort("Data1","Data2","Data3","Type"). \
groupBy("Data1","Data2","Data3"). \
agg(func.collect_set("Type").cast(func.StringType())). \
withColumnRenamed("CAST(collect_set(Type) AS STRING)", "Type_list")
| 数据1|数据2|数据3|类型列表(_L)|
| - -|- -|- -|- -|
| xxx|年|兹兹|[fl 1.变体,fl 2.变体,fl 3.对照,fl 1.变体,fl 2.两者皆不是,fl 3.对照]|
| 第1章|yyy1| zzz 1语言|[fl 4.变量,fl 2.变量,fl 4.变量,fl 3.控制,fl 3.控制,fl 3.变量]|
df2 = df1.select("Data1","Data2","Data3",func.array_distinct(func.split("Type_list" , ",")))
| 数据1|数据2|数据3|数组_distinct(split(类型列表,,,-1))|
| - -|- -|- -|- -|
| xxx|年|兹兹|fl 1.变体,fl 2.变体,fl 3.对照,fl 1.变体,fl 2.两者都不,fl 3.对照|
| 第1章|yyy1| zzz 1语言|fl 4.变体,fl 2.变体,fl 4.变体,fl 3.对照,fl 3.对照,fl 3.变体|
1条答案
按热度按时间mrwjdhj31#
先
split
,然后array_distinct(flatten(collect_list()))