如何在pyspark的for循环中插入自定义函数?

i1icjdpr  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(398)

我在azure databricks的spark中面临一个挑战。我有一个数据集

+------------------+----------+-------------------+---------------+
|     OpptyHeaderID|   OpptyID|               Date|BaseAmountMonth|
+------------------+----------+-------------------+---------------+
|0067000000i6ONPAA2|OP-0164615|2014-07-27 00:00:00|    4375.800000|
|0065w0000215k5kAAA|OP-0218055|2020-12-23 00:00:00|    4975.000000|
+------------------+----------+-------------------+---------------+

现在我需要使用循环函数将行附加到此Dataframe。我想在pyspark中复制下面的函数。

Result = ()
for i in (1:12)
{
   select a.PootyHeaderID
          ,a.OpptyID
          ,dateadd(MONTH, i, a.Date) as Date
          ,BaseAmountMonth
   from FinalOut
   Result = Result.Append()
   print(i)  
}

每个附加行中的日期必须有一个后续月份(滚动12个月)。应该是这样的。

+------------------+----------+-------------------+---------------+
|     OpptyHeaderID|   OpptyID|               Date|BaseAmountMonth|
+------------------+----------+-------------------+---------------+
|0067000000i6ONPAA2|OP-0164615|2014-07-27 00:00:00|    4375.800000|
|0067000000i6ONPAA2|OP-0164615|2014-08-27 00:00:00|    4375.800000|
|0067000000i6ONPAA2|OP-0164615|2014-09-27 00:00:00|    4375.800000|
                              .
                              .
                              .
|0067000000i6ONPAA2|OP-0164615|2015-06-27 00:00:00|    4375.800000|
|0065w0000215k5kAAA|OP-0218055|2020-12-23 00:00:00|    4975.000000|
|0065w0000215k5kAAA|OP-0218055|2021-01-23 00:00:00|    4975.000000|    
|0065w0000215k5kAAA|OP-0218055|2021-02-23 00:00:00|    4975.000000|    
                               .
                               .
                               .    
|0065w0000215k5kAAA|OP-0218055|2021-11-23 00:00:00|    4975.000000|    
+------------------+----------+-------------------+---------------+

[编辑1]
如何基于另一个字段使间隔长度动态化?

+------------------+----------+-------------------+---------------+--------+
|     OpptyHeaderID|   OpptyID|               Date|BaseAmountMonth|Interval|
+------------------+----------+-------------------+---------------+--------+
|0067000000i6ONPAA2|OP-0164615|2014-07-27 00:00:00|    4375.800000|      12|
|0065w0000215k5kAAA|OP-0218055|2020-12-23 00:00:00|    4975.000000|       7|
+------------------+----------+-------------------+---------------+--------+
rryofs0p

rryofs0p1#

您可以分解一系列时间戳:

import pyspark.sql.functions as F

df2 = df.withColumn(
    'Date',
    F.expr("""
        explode(
            sequence(
                timestamp(Date),
                add_months(timestamp(Date), `Interval` - 1),
                interval 1 month
            )
        )
    """)
)

df2.show(99)
+------------------+----------+-------------------+---------------+--------+
|     OpptyHeaderID|   OpptyID|               Date|BaseAmountMonth|Interval|
+------------------+----------+-------------------+---------------+--------+
|0067000000i6ONPAA2|OP-0164615|2014-07-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2014-08-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2014-09-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2014-10-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2014-11-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2014-12-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-01-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-02-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-03-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-04-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-05-27 00:00:00|    4375.800000|      12|
|0067000000i6ONPAA2|OP-0164615|2015-06-27 00:00:00|    4375.800000|      12|
|0065w0000215k5kAAA|OP-0218055|2020-12-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-01-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-02-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-03-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-04-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-05-23 00:00:00|    4975.000000|       7|
|0065w0000215k5kAAA|OP-0218055|2021-06-23 00:00:00|    4975.000000|       7|
+------------------+----------+-------------------+---------------+--------+

相关问题