我有以下代码:
output = (assignations
.join(activations,['customer_id','external_id'],'left')
.join(redeemers,['customer_id','external_id'],'left')
.groupby('external_id')
.agg(f.expr('COUNT(DISTINCT(CASE WHEN assignation = 1 THEN customer_id ELSE NULL END))').alias('assigned'),
f.expr('COUNT(DISTINCT(CASE WHEN activation = 1 THEN customer_id ELSE NULL END))').alias('activated'),
f.expr('COUNT(DISTINCT(CASE WHEN redeemer = 1 THEN customer_id ELSE NULL END))').alias('redeemed'))
)
此代码提供以下输出:
external_id assigned activated redeemed
DISC0000089309 31968 901 491
DISC0000089428 31719 893 514
DISC0000089283 2617 60 39
我的想法是改变 case when
部分转换为更具pythonic/pyspark特性的代码。因此我尝试了以下代码:
output = (assignations
.join(activations,['customer_id','external_id'],'left')
.join(redeemers,['customer_id','external_id'],'left')
.groupby('external_id')
.agg(f.count(f.when(f.col('assignation')==1,True).alias('assigned')),
f.count(f.when(f.col('activation')==1,True).alias('activated')),
f.count(f.when(f.col('redeemer')==1,True).alias('redeem'))
))
问题是输出不一样,数字不匹配。如何转换代码以获得相同的输出?
1条答案
按热度按时间x7yiwoj41#
你可以用
f.countDistinct
达到COUNT(DISTINCT )
在spark sql中: