在同一Dataframe上触发迭代

yxyvkwin  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(305)

我现在要处理的是用sparksql替换低频值。这意味着对于每个列,它将计算每个值的计数,如果它的计数低于我们给定的阈值(比如说6),我们将用我们指定的另一个值替换它。
通过sparksql和dataframe很难找到一种有效的方法来实现这一点,因为它涉及大量的洗牌和迭代。
下面显示的代码是我为测试其性能而实现的一种方法。然而,pyspark代码无法执行,因为转换链很长(其中有许多连接)。即使我没有启动一个操作,它也不能转换为rdd血统。它只是从来没有停止在一个互动jupyter笔记本细胞(我没有开始行动)。
有人能帮我分析一下原因吗?我能做些什么?

  1. for col in DC_columns:
  2. NAN_VALUE = CAT_NAN_VALUE if 'C' in col else INT_NAN_VALUE
  3. value_counts = df.select(col).groupby(col).count().filter('count < 6').select(col)
  4. df_low = df.join(value_counts, col,'left_semi')
  5. df_high = df.join(value_counts, col,'left_anti')
  6. df_low = df_low.withColumn(col, lit(NAN_VALUE))
  7. df = df_low.union(df_high)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题