pyspark从下一列减去dataframe列,并将结果保存到另一个dataframe

xt0899hw  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(527)

我正在做一个个人的Pypark项目学习的目的,我有一个特殊的问题。
我有一个包含n列的Dataframe(df),在这个Dataframe中,我想从下一列中减去每一列(例如。 col1 - col2 , col2 - col3 , ..., col(N+1) - colN )并将结果差异列保存在另一个Dataframe中。
我通过解析一个json生成这个df,保存到一个dataframe(模式:dates列,每个项目的列),将列转换成行(每个日期有一个items列和列),然后在spark df中转换它。我这样做是因为spark中的逐行操作似乎很难实现。
我移动第一列 Items 一个新的Dataframe(ndf),所以我只剩下下面的模式(标头由日期组成,数据仅为整数):
日期1日期2日期3…日期1049898…022313580…014312114…0917973…0
我想从列date1的整数中减去列date2的整数(例如。 df.Date1 - df.Date2 )以及得到的值列(其中较大列的标题为- Date1 )要保存/附加到已经存在的ndfDataframe(我之前移动列的Dataframe)中。然后继续减去列date2和列date3( df.Date2 - df.Date3 ),依此类推直到列 Date(N+1) - DateN ,然后停止。
先前从items列创建的新dataframe(ndf)如下所示:
项目日期2…项目160…项目28855…项目3218…项目4126。。。
实际上,我想看看每件物品从一个日期到下一个日期的数量。
我在想做一个for循环。比如:


# get list of column headers

dates = df.columns

# for index and header in list

for idx, date in enumerate(dates):
    if idx < len(dates)-1:
        # calculate df columns subtraction and add differences column to ndf
        df = df.withColumn(f'diff-{date}', F.when((df[date] - df[dates[idx+1]]) < 0, 0)
                        .otherwise(df[date] - df[dates[idx+1]]))
        ndf = ndf.join(df.select(f'diff-{date}'), how='full')

但这是非常缓慢的,我有一种感觉,for循环并没有真正考虑到spark的优点,它可能比使用map/lambda慢得多。

7kjnsjlb

7kjnsjlb1#

我找到了两个解决方案:
对于转置的Dataframe,正如我在上面的问题中提到的,reddit r/dataengineering上的一位用户帮助我解决了这个问题:


# list to save column subtractions

col_defs = []

# grab the date columns

date_cols = df.columns[1:]

# for index and column

for i, date in enumerate(date_cols):
    if i > 0:
        # save the difference between each 2 columns to the list
        col_defs.append((df[date_cols[i - 1]] - df[date]).alias(date))

# result df containing only the items column and the differences for each date

result = df.select('county', *col_defs)

如果我不转置Dataframe,我可以应用窗口函数,正如@mck在对问题的评论中所建议的那样。我更喜欢这种方式,因为我避免转置,列数也将保持不变。这个关于pyspark窗口函数的资源对我理解它们的工作方式非常有帮助:
日期站点1项目2…日期1104223…日期298135…日期39880。。。


# list to save column subtractions

colDiffs= []

# get only the item columns

itemCols = df.columns[1:]

# Window function spec to partition the entire df and sort it by Dates descending as there are no dates that show multiple times.

windowSpec = Window.partitionBy().orderBy(F.col('Dates').desc())

# for each item column

for item in itemCols:
    # add a new column, itemdiff, to the df containing the same numbers but shifted up by one 
    # e.g. if a column X contains the numbers [1, 2, 3], applying the lead window function with 1 as argument, will shift everything up by 1 and the new Xdiff column will contain [2, 3, none]
    df = df.withColumn(f'{item}diff', lead(item, 1).over(windowSpec))
    # append the difference between the current and the lead colum to the list
    colDiffs.append((df[item] - df[f'{item}diff']).alias(item))

# get the final df containing the subtraction results

result = df.select('Dates', *colDiffs)

相关问题