pyspark:(广播)在最近的datetimes/unix上连接两个数据集

x9ybnkn6  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(558)

我正在使用pyspark,几乎要放弃我的问题了。我有两个数据集:一个非常非常非常大的数据集(集合a)和一个非常小的数据集(集合b)。它们的形式如下:

Data set A:
variable   | timestampA
---------------------------------
x          | 2015-01-01 09:29:21
y          | 2015-01-01 12:01:57

Data set B:
different information | timestampB
-------------------------------------------
info a                | 2015-01-01 09:30:00
info b                | 2015-01-01 09:30:00
info a                | 2015-01-01 12:00:00
info b                | 2015-01-01 12:00:00

a有许多行,每行都有不同的时间戳。b每隔几分钟就有一个时间戳。这里的主要问题是,在这两个数据集中没有匹配的精确时间戳。
我的目标是在最近的时间戳上加入数据集。另一个问题出现了,因为我想以一种特定的方式加入。对于a中的每个条目,我希望在复制a中的条目时Map最接近时间戳的整个信息。所以,结果应该是这样的:

Final data set
variable   | timestampA          | information     | timestampB
--------------------------------------------------------------------------
x          | 2015-01-01 09:29:21 | info a          | 2015-01-01 09:30:00
x          | 2015-01-01 09:29:21 | info b          | 2015-01-01 09:30:00
y          | 2015-01-01 12:01:57 | info a          | 2015-01-01 12:00:00
y          | 2015-01-01 12:01:57 | info b          | 2015-01-01 12:00:00

我对pyspark(也是stackoverflow)很陌生。我想我可能需要使用一个窗口函数和/或广播连接,但我真的没有点开始,并将感谢任何帮助。谢谢您!

mnowg1ta

mnowg1ta1#

你可以用 broadcast 为了避免混乱。
如果理解正确,你有时间戳 set_B 哪些是确定间隔的结果?如果是这样,您可以执行以下操作:

from pyspark.sql import functions as F

# assuming 5 minutes is your interval in set_B

interval = 'INTERVAL {} SECONDS'.format(5 * 60 / 2)

res = set_A.join(F.broadcast(set_B), (set_A['timestampA'] > (set_B['timestampB'] - F.expr(interval))) & (set_A['timestampA'] <= (set_B['timestampB'] + F.expr(interval))))

输出:

+--------+-------------------+------+-------------------+
|variable|         timestampA|  info|         timestampB|
+--------+-------------------+------+-------------------+
|       x|2015-01-01 09:29:21|info a|2015-01-01 09:30:00|
|       x|2015-01-01 09:29:21|info b|2015-01-01 09:30:00|
|       y|2015-01-01 12:01:57|info a|2015-01-01 12:00:00|
|       y|2015-01-01 12:01:57|info b|2015-01-01 12:00:00|
+--------+-------------------+------+-------------------+

如果没有确定的间隔,则只交叉连接,然后查找 min(timestampA - timestampB) 间歇可以达到目的。你可以用窗口功能和 row_number 功能如下:

w = Window.partitionBy('variable', 'info').orderBy(F.abs(F.col('timestampA').cast('int') - F.col('timestampB').cast('int')))

res = res.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')

相关问题