我现在要处理的是用sparksql替换低频值。这意味着对于每个列,它将计算每个值的计数,如果它的计数低于我们给定的阈值(比如说6),我们将用我们指定的另一个值替换它。
通过sparksql和dataframe很难找到一种有效的方法来实现这一点,因为它涉及大量的洗牌和迭代。
下面显示的代码是我为测试其性能而实现的一种方法。然而,pyspark代码无法执行,因为转换链很长(其中有许多连接)。即使我没有启动一个操作,它也不能转换为rdd血统。它只是从来没有停止在一个互动jupyter笔记本细胞(我没有开始行动)。
有人能帮我分析一下原因吗?我能做些什么?
for col in DC_columns:
NAN_VALUE = CAT_NAN_VALUE if 'C' in col else INT_NAN_VALUE
value_counts = df.select(col).groupby(col).count().filter('count < 6').select(col)
df_low = df.join(value_counts, col,'left_semi')
df_high = df.join(value_counts, col,'left_anti')
df_low = df_low.withColumn(col, lit(NAN_VALUE))
df = df_low.union(df_high)
暂无答案!
目前还没有任何答案,快来回答吧!