pandas Pyspark -如何高效、重复地搜索相同的 Dataframe ?

fdbelqdn  于 2023-02-14  发布在  Spark
关注(0)|答案(2)|浏览(230)

我试图通过创建一个分组列来将 Dataframe 中的事务链接在一起。下面是我的数据集示例。
| 交易ID|金额|条形码|交易ID_自|交易ID_至|
| - ------|- ------|- ------|- ------|- ------|
| 一百二十|五十五点整|小行星|一百二十|一百三十三|
| 一百三十三|五十五点整|小行星|一百二十|一百三十三|
| 一百三十三|三十一点整|小行星10008|一百三十三|零|
| 一百三十五|二十点整|小行星|一百三十五|一百四十|
| 十个|二十点整|小行星|一百三十五|一百四十|
| 一百四十|八点整|小行星10056|一百四十|一百四十九|
| 一百四十|十七点整|小行星10067|一百四十九|零|
| 一百四十九|八点整|小行星10056|一百四十|一百四十九|
| 一百五十一|五点整|小行星10056|一百五十一|零|
我的目标是对可以通过条形码或一系列条形码链接的交易进行分组,对于上述数据集,预期输出将是:
| 交易ID|金额|条形码|交易ID_自|交易ID_至|组ID|
| - ------|- ------|- ------|- ------|- ------|- ------|
| 一百二十|五十五点整|小行星|一百二十|一百三十三|1个|
| 一百三十三|五十五点整|小行星|一百二十|一百三十三|1个|
| 一百三十三|三十一点整|小行星10008|一百三十三|零|1个|
| 一百三十五|二十点整|小行星|一百三十五|一百四十|第二章|
| 一百四十|二十点整|小行星|一百三十五|一百四十|第二章|
| 一百四十|八点整|小行星10056|一百四十|一百四十九|第二章|
| 一百四十九|八点整|小行星10056|一百四十|一百四十九|第二章|
| 一百五十一|五点整|小行星10058|一百五十一|零|三个|
组1由链接交易120和133的条形码10001形成,组2由链接交易135和140的条形码10022以及链接交易140和149的条形码10056形成。
分组中的交易数量是可变的,通常在1-100之间。在我的数据集中,大约有180,000个条形码和250,000个交易。
我已经尝试过这个问题,但是我的实现太慢了。我怀疑这是非常低效的,因为我重复地查询同一个表。
我的伪代码:

  • 每个条形码的编号:
  • #1.查找与该条形码关联的所有交易
  • #2.找到与这些交易相关的所有条形码
  • #3.最终,与#2中的条形码相关的所有交易
  • #4.重复2-3,直到没有发现进一步的交易
  • #5.重复1-4,直到不再有条形码,跳过之前看到的条形码

您是否有更简单/更快速的替代方法?或者有任何优化建议?

q1qsirdb

q1qsirdb1#

首先,我们创建一个列,显示 TransactionId_FromTransactionId_To 之间的最大值:

from pyspark.sql.functions import greatest, col

df = df.withColumn(
   'TransactionGreatestID',
   greatest(col('TransactionId_From'), col('TransactionId_To')
                  )

然后,您可以使用dense_rank函数根据 TransactionGreatestID 分配组:

from pyspark.sql.window import *
from pyspark.sql.functions import dense_rank

window = Window.orderBy('TransactionGreatestID')
df = df.withColumn('GroupId', dense_rank().over(window))

根据我的经验,这应该不会花太长的时间来运行。但是,对于上面的解决方案,我们假设数据结构是好的,并且给定 GroupId 的每一行都具有相同的 TransactionGreatestID 值。

j9per5c4

j9per5c42#

使用networkx

import networkx as nx

G = nx.from_pandas_edgelist(df[['TransactionId_From', 'TransactionId_To']].fillna('nan'),
                            source='TransactionId_From', target='TransactionId_To',
                            create_using=nx.DiGraph)
G.remove_node('nan')

groups = {n: i for i, s in
          enumerate(nx.weakly_connected_components(G), start=1)
          for n in s}

df['GroupId'] = df['TransactionId_From'].map(groups)

输出:

TransactionID  Amount  Barcode  TransactionId_From  TransactionId_To  GroupId
0            120    55.0    10001                 120             133.0        1
1            133    55.0    10001                 120             133.0        1
2            133    31.0    10008                 133               NaN        1
3            135    20.0    10022                 135             140.0        2
4             10    20.0    10022                 135             140.0        2
5            140     8.0    10056                 140             149.0        2
6            140    17.0    10067                 149               NaN        2
7            149     8.0    10056                 140             149.0        2
8            151     5.0    10056                 151               NaN        3

相关问题