我正在用scala做spark2.3的一些特性工程。
我在spark数据框的一列中有ip地址
.
然后我用 data.groupBy("ip").count()
获取每个ip地址的频率列表。这看起来像
现在我想把每个频率Map到原始的Dataframe。我本该去的地方
ip | freq |
-- | |
123 | 3 |
567 | 7 |
857 | 10 |
123 | 3 |
解决这样一个问题的有效方法是什么?
我正在用scala做spark2.3的一些特性工程。
我在spark数据框的一列中有ip地址
.
然后我用 data.groupBy("ip").count()
获取每个ip地址的频率列表。这看起来像
现在我想把每个频率Map到原始的Dataframe。我本该去的地方
ip | freq |
-- | |
123 | 3 |
567 | 7 |
857 | 10 |
123 | 3 |
解决这样一个问题的有效方法是什么?
1条答案
按热度按时间ne5o7dgx1#
我开发了超过10亿行的管道,我就是这样做的。
w=window.partitionby('id')
df.withcolumn('freq',f.count('id').over(w)).show()
这要简单得多,可读性好,最重要的是效率高。它不聚合数据,因此不需要创建两个df对象并进行连接。
前面的答案不能很好地扩展到大数据中,主要是因为由于额外的洗牌,连接成本很高。