我正在做一个个人的Pypark项目学习的目的,我有一个特殊的问题。
我有一个包含n列的Dataframe(df),在这个Dataframe中,我想从下一列中减去每一列(例如。 col1 - col2
, col2 - col3
, ..., col(N+1) - colN
)并将结果差异列保存在另一个Dataframe中。
我通过解析一个json生成这个df,保存到一个dataframe(模式:dates列,每个项目的列),将列转换成行(每个日期有一个items列和列),然后在spark df中转换它。我这样做是因为spark中的逐行操作似乎很难实现。
我移动第一列 Items
一个新的Dataframe(ndf),所以我只剩下下面的模式(标头由日期组成,数据仅为整数):
日期1日期2日期3…日期1049898…022313580…014312114…0917973…0
我想从列date1的整数中减去列date2的整数(例如。 df.Date1 - df.Date2
)以及得到的值列(其中较大列的标题为- Date1
)要保存/附加到已经存在的ndfDataframe(我之前移动列的Dataframe)中。然后继续减去列date2和列date3( df.Date2 - df.Date3
),依此类推直到列 Date(N+1) - DateN
,然后停止。
先前从items列创建的新dataframe(ndf)如下所示:
项目日期2…项目160…项目28855…项目3218…项目4126。。。
实际上,我想看看每件物品从一个日期到下一个日期的数量。
我在想做一个for循环。比如:
# get list of column headers
dates = df.columns
# for index and header in list
for idx, date in enumerate(dates):
if idx < len(dates)-1:
# calculate df columns subtraction and add differences column to ndf
df = df.withColumn(f'diff-{date}', F.when((df[date] - df[dates[idx+1]]) < 0, 0)
.otherwise(df[date] - df[dates[idx+1]]))
ndf = ndf.join(df.select(f'diff-{date}'), how='full')
但这是非常缓慢的,我有一种感觉,for循环并没有真正考虑到spark的优点,它可能比使用map/lambda慢得多。
1条答案
按热度按时间7kjnsjlb1#
我找到了两个解决方案:
对于转置的Dataframe,正如我在上面的问题中提到的,reddit r/dataengineering上的一位用户帮助我解决了这个问题:
如果我不转置Dataframe,我可以应用窗口函数,正如@mck在对问题的评论中所建议的那样。我更喜欢这种方式,因为我避免转置,列数也将保持不变。这个关于pyspark窗口函数的资源对我理解它们的工作方式非常有帮助:
日期站点1项目2…日期1104223…日期298135…日期39880。。。