pandas\udf级联循环

nnvyjq4y  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(524)

我对pandas比较陌生,最近一直在使用用户定义的函数。
我的数据集如下:

|header|planned|
|  a   |   1   |
|  a   |   2   |
|  a   |   3   |
|  a   |   4   |
|  a   |   5   |
|  b   |   1   |
|  b   |   2   |
|  b   |   3   |
|  b   |   4   |
|  b   |   5   |

我必须在列中连接值 planned 以两行为一组得到如下结果:

|header|planned|p_cat|
|  a   |   1   | 1_2 | 
|  a   |   2   | 2_3 |
|  a   |   3   | 3_4 |
|  a   |   4   | 4_5 |
|  a   |   5   |     |
|  b   |   1   | 1_2 |
|  b   |   2   | 2_3 |
|  b   |   3   | 3_4 |
|  b   |   4   | 4_5 |
|  b   |   5   |     |

列中的数字 planned 不按此特定顺序排列,但始终是整数。
我的自定义项:

schema = ds_adh.schema

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def concat_operations(ds_op):

        s = ds_op['planned']

        for index in range(ds_op['planned'].count()-1):

            # clearly working only for the last index
            couple = str([s.iloc[index]]) + '_' + str([s.iloc[index+1]])

        ds_op_new = ds_op

        ds_op_new ['p_cat'] = couple

        return ds_op_new

ds_adh = ds_adh.orderBy("time")
ds_adh = ds_adh.groupBy("header").apply(concat_operations)

我的问题:
连接本身不起作用
我不知道怎么把连接推进去 couple 对于循环的所有迭代
我也尝试过pandaseries,但没有成功。
以下是我在代码中遇到的错误:

IndexError: single positional indexer is out-of-bounds
v7pvogib

v7pvogib1#

使用内置窗口 lead 函数 partitionBy 在计划列的标题和orderby上,如 udf 会降低性能。

from pyspark.sql import *
from pyspark.sql.functions import *
w=Window.partitionBy("header").orderBy("planned")
df.withColumn("p_cat", when(lead(col("planned"),1).over(w).isNull(),lit("")).otherwise(concat_ws("_",col("planned"),lead(col("planned"),1).over(w)))).show()

# +------+-------+-----+

# |header|planned|p_cat|

# +------+-------+-----+

# |     a|      1|  1_2|

# |     a|      2|  2_3|

# |     a|      3|  3_4|

# |     a|      4|  4_5|

# |     a|      5|     |

# |     b|      1|  1_2|

# |     b|      2|  2_3|

# |     b|      3|  3_4|

# |     b|      4|  4_5|

# |     b|      5|     |

# +------+-------+-----+
rqcrx0a6

rqcrx0a62#

如果这是一个实际问题,您可以使用pyspark内置,如下所示:

import pyspark.sql.functions as F
w = Window.partitionBy("header").orderBy("idx")

(df.withColumn("idx",F.monotonically_increasing_id())
   .withColumn("Lead",F.lead("planned").over(w))
   .withColumn("p_cat",F.when(F.col("Lead").isNull(),'')
   .otherwise(F.concat_ws("_","planned","Lead")))
   .orderBy("idx").drop("idx","Lead")).show()
+------+-------+-----+
|header|planned|p_cat|
+------+-------+-----+
|     a|      1|  1_2|
|     a|      2|  2_3|
|     a|      3|  3_4|
|     a|      4|  4_5|
|     a|      5|     |
|     b|      1|  1_2|
|     b|      2|  2_3|
|     b|      3|  3_4|
|     b|      4|  4_5|
|     b|      5|     |
+------+-------+-----+

相关问题