我尝试将行组收集到表示为向量的滑动窗口中。
给定示例输入:
+---+-----+-----+
| id|Label|group|
+---+-----+-----+
| A| T| 1|
| B| T| 1|
| C| F| 2|
| D| F| 2|
| E| F| 3|
| F| T| 3|
| G| F| 3|
| H| T| 3|
+---+-----+-----+
预期产出为:
windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
+-----+-------------+-------------+
|group| Windows| Labels|
+-----+-------------+-------------+
| 1| [A, B, '']| [T, T, f]|
| 2| [C, D, '']| [F, F, f]|
| 3| [E, F, G]| [F, T, F]|
| 3| [F, G, H]| [T, F, T]|
+-----+-------------+-------------+
我最近的尝试产生了没有填充的滚动窗口。下面是我的代码:
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [
("A", "T", 1),
("B", "T", 1),
("C", "F", 2),
("D", "F", 2),
("E", "F", 3),
("F", "T", 3),
("G", "F", 3),
("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])
grouping = 3
w2 = Window.partitionBy('group').orderBy('id')
df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')\
.agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))\
.drop('rows') \
.orderBy('group').show()
我试着寻找这种方法的变体,可能是通过执行一个SQL查询,比如this case,或者使用一些内置的SQL函数,比如ROWS N PRECEDING,但是我没有找到我想要的。大多数来自Web的结果都集中在时间滑动窗口上,但是我试着在行上做。
任何帮助都将不胜感激。
- 编辑:**
我想我找到了一个解决填充感谢this answer。
我仍然需要组织行在滑动窗口虽然...
2条答案
按热度按时间nhn9ugyo1#
下面是一个可能的解决方案(不是最优雅的,但仍然有效)。
在窗口定义中,使用
.rowsBetween
创建指定size
的滑动窗口;0
表示当前行。我建议您一步一步地浏览解决方案,一次一行代码,以理解其背后的逻辑。
xcitsw882#
为了扩展@Ric S的答案,我还需要考虑步幅。
我的解决方案是根据步幅值及其与win_size的比率来处理条件和变换:
如果输入比问题中给出的输入稍大,则其输出为: