pandas 如何基于account_id-date-cluster_idMapdf(Pyspark)创建account_id-user_idMapdf?

mklgxw1f  于 2023-01-01  发布在  Spark
关注(0)|答案(1)|浏览(126)

我有一个Map表(帐户ID-日期-群集ID)
示例:
| 帐户标识|日期|群集ID|
| - ------| - ------| - ------|
| 1个|二〇一八年|A类|
| 第二章|二〇一八年|A类|
| 第二章|二○一九年|乙|
| 三个|二○一九年|乙|
| 1个|小行星2020| C级|
| 四个|小行星2020| C级|
| 1个|小行星2021| D级|
| 五个|小行星2021| A类|
规则:

  • 相同的cluster_id意味着它是同一个用户,但仅在特定日期有效。因此,一个帐户可以在不同日期具有多个cluster_id
  • 特定日期的cluster_id可以有多个account_id
  • 一个cluster_id可以出现在多个日期,但不一定表示它们是同一个用户

使用此表,我希望创建account_id-user_idMap。
预期输出:
| 帐户标识|用户标识|
| - ------| - ------|
| 1个|1个|
| 第二章|1个|
| 三个|1个|
| 四个|1个|
| 五个|第二章|

    • 实际的user_id并不重要,只要将正确的帐户链接在一起即可

我试着这样开始,但不认为这是正确的。开放的任何想法。

df_user_mapping = cluster_df.withColumn('intermediate_group_id', F.dense_rank().over(Window.orderBy('year','month','cluster_id')).selectExpr('account_id','month','year','cluster_id as group_id','intermediate_group_id')
inter_groups = df_user_mapping.groupBy('account_id').agg(F.collect_set('intermediate_group_id').alias('intermediate_groups')))
groups = df_user_mapping.groupBy('account_id','year','month').agg(F.collect_set('group_id').alias('groups')))
joined = groups.join(inter_groups, ['account_id'], 'outer')
inter_explode = joined.select('account_id','groups','year','month',F.explode('intermediate_groups').alias('intermediate_group_id'))
group_explode = inter_explode.select('account_id','intermediate_group_id','year','month',F.explode('groups').alias('group_id'))
first_ids = group_explode.withColumn('first_id_inter', F.min('account_id').over(Window.partitionBy('inter_group_id'))).withColumn('first_id_group', F.min('first_id_inter').over(Window.partitionBy('group_id','year','month'))).withColumn('first_id', F.min('first_id_group').over(Window.partitionBy('account_id')))
final = first_ids.selectExpr('account_id','first_id as user_id').distinct()

如果你有什么想法请告诉我!
初学者模板

example = [('1',1,2018,'A'),
           ('2',1,2018,'A'),
           ('2',1,2019,'B'),
           ('3',1,2019,'B'),
           ('1',1,2020,'C'),
           ('4',1,2020,'C'),
           ('1',1,2021,'D'),
           ('5',1,2021,'A'),
          ]
example_df = spark.createDataFrame(example, ['account_id','month','year','cluster_id'])
kyks70gy

kyks70gy1#

试试这个

from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, collect_set, explode

# Step 1: Create a new column 'user_id' and set it to 0 for all rows
df = df.withColumn('user_id', F.lit(0))

# Step 2: Loop through the dataframe, grouping by cluster_id and date
# and assigning unique user_ids to each group
counter = 1
for name, group in df.groupby(['cluster_id', 'date']):
    df = df.withColumn('user_id', F.when((df['cluster_id'] == name[0]) & (df['date'] == name[1]), counter).otherwise(df['user_id']))
    counter += 1

# Step 3: Sort the dataframe by account_id and date
df = df.sort(['account_id', 'date'])

# Step 4: Loop through the sorted dataframe and assign the user_id of the first row
# to all subsequent rows for the same account_id
current_account_id = None
current_user_id = None
for row in df.collect():
    if row['account_id'] != current_account_id:
        current_account_id = row['account_id']
        current_user_id = row['user_id']
    else:
        df = df.withColumn('user_id', F.when(df['account_id'] == current_account_id, current_user_id).otherwise(df['user_id']))

# The resulting dataframe should have the desired account_id - user_id mapping
df.show()

相关问题