Pyspark从带有填充的行创建滑动窗口

hgqdbh6s  于 2023-01-13  发布在  Apache
关注(0)|答案(2)|浏览(133)

我尝试将行组收集到表示为向量的滑动窗口中。
给定示例输入:

+---+-----+-----+
| id|Label|group|
+---+-----+-----+
|  A|    T|    1|
|  B|    T|    1|
|  C|    F|    2|
|  D|    F|    2|
|  E|    F|    3|
|  F|    T|    3|
|  G|    F|    3|
|  H|    T|    3|
+---+-----+-----+

预期产出为:

windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
+-----+-------------+-------------+
|group|      Windows|       Labels|
+-----+-------------+-------------+
|    1|   [A, B, '']|    [T, T, f]|
|    2|   [C, D, '']|    [F, F, f]|
|    3|    [E, F, G]|    [F, T, F]|
|    3|    [F, G, H]|    [T, F, T]|
+-----+-------------+-------------+

我最近的尝试产生了没有填充的滚动窗口。下面是我的代码:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A", "T", 1),
    ("B", "T", 1),
    ("C", "F", 2),
    ("D", "F", 2),
    ("E", "F", 3),
    ("F", "T", 3),
    ("G", "F", 3),
    ("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])

grouping = 3

w2 = Window.partitionBy('group').orderBy('id')

df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')\
  .agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))\
  .drop('rows') \
  .orderBy('group').show()

我试着寻找这种方法的变体,可能是通过执行一个SQL查询,比如this case,或者使用一些内置的SQL函数,比如ROWS N PRECEDING,但是我没有找到我想要的。大多数来自Web的结果都集中在时间滑动窗口上,但是我试着在行上做。
任何帮助都将不胜感激。

    • 编辑:**

我想我找到了一个解决填充感谢this answer
我仍然需要组织行在滑动窗口虽然...

nhn9ugyo

nhn9ugyo1#

下面是一个可能的解决方案(不是最优雅的,但仍然有效)。
在窗口定义中,使用.rowsBetween创建指定size的滑动窗口;0表示当前行。

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# parameters
size = 3
id_padding = '\'\''
label_padding = 'f'

# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)

(df.select(
  'group',
  F.collect_list('id').over(w_ordered_limited).alias('Windows'),
  F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
  F.count('group').over(w).alias('n'),
  F.row_number().over(w_ordered).alias('n_row')
  )
  # pad arrays and then slice them to the desired `size`
  .withColumn('Windows', F.when(F.col('n') < size, F.slice(F.concat('Windows', F.array_repeat(F.lit(id_padding), size - 1)), 1, size))
                          .otherwise(F.col('Windows')))
  .withColumn('Groups',  F.when(F.col('n') < size, F.slice(F.concat('Groups', F.array_repeat(F.lit(label_padding), size - 1)), 1, size))
                          .otherwise(F.col('Groups')))
  # filter out useless rows
  .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) 
           | ((F.col('n') >= size) & (F.size('Windows') == size)))
  .drop('n', 'n_row')
 ).show()

+-----+----------+---------+
|group|   Windows|   Groups|
+-----+----------+---------+
|    1|[A, B, '']|[T, T, f]|
|    2|[C, D, '']|[F, F, f]|
|    3| [E, F, G]|[F, T, F]|
|    3| [F, G, H]|[T, F, T]|
+-----+----------+---------+

我建议您一步一步地浏览解决方案,一次一行代码,以理解其背后的逻辑。

xcitsw88

xcitsw882#

为了扩展@Ric S的答案,我还需要考虑步幅。
我的解决方案是根据步幅值及其与win_size的比率来处理条件和变换:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# parameters
size = 5
stride = 2
id_padding = '\'\''
label_padding = 'f'

# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)

if stride == 1:
  filter_cond = (F.col('n') >= size) & (F.size('Windows') == size)
else:
  filter_cond = (F.col('n') >= size) & (F.col('n_row') < size) & (F.col('n_row') % stride == 1) & (F.size('Windows') == size)

if size % stride != 0:
  transf = lambda dfcol : F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)
else:
  transf = lambda dfcol : F.when(F.col('n') < size, F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)) \
                          .otherwise(F.col(dfcol))
(df.select(
  'group',
  F.collect_list('id').over(w_ordered_limited).alias('Windows'),
  F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
  F.count('group').over(w).alias('n'),
  F.row_number().over(w_ordered).alias('n_row')
  )
  # pad arrays and then slice them to the desired `size`
  .withColumn('Windows', transf("Windows"))
  .withColumn('Groups',  transf("Groups"))
  # filter out useless rows
  .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) 
           | (filter_cond))
  .drop('n', 'n_row')
 ).show()

如果输入比问题中给出的输入稍大,则其输出为:

+-----+------------------+------------------+
|group|           Windows|            Groups|
+-----+------------------+------------------+
|    1|[A, B, '', '', '']|[T, T, '', '', '']|
|    2|[C, D, '', '', '']|[F, F, '', '', '']|
|    3|   [E, F, G, H, I]|   [F, T, F, T, T]|
|    3|  [G, H, I, J, '']|  [F, T, T, T, '']|
+-----+------------------+------------------+

相关问题