如何在分组的pyspark Dataframe 中获取组名和索引列表?

0kjbasz6  于 2022-09-21  发布在  Spark
关注(0)|答案(1)|浏览(250)

我想要这个Pandas代码的等价物。下面的PANDA代码生成atable名称和在其中找到atable名称的索引:

  1. import pandas as pd
  2. df1 = pd.DataFrame({
  3. 'atable': ['Users', 'Users', 'Domains', 'Domains', 'Locks'],
  4. 'column': ['col_1', 'col_2', 'col_a', 'col_b', 'col'],
  5. 'column_type':['varchar', 'varchar', 'int', 'varchar', 'varchar'],
  6. 'is_null': ['No', 'No', 'Yes', 'No', 'Yes'],
  7. })
  8. df1_grouped = df1.groupby('atable')
  9. # iterate over each group
  10. for group_name, df_group in df1_grouped.groups.items():
  11. print(group_name, df_group)

产出:

  1. Domains Int64Index([2, 3], dtype='int64')
  2. Locks Int64Index([4], dtype='int64')
  3. Users Int64Index([0, 1], dtype='int64')

Spark输出应如下所示:

  1. # +-------+--------------------+
  2. # |atable | sources|
  3. # +-------+--------------------+
  4. # |Domains| [2, 3] |
  5. # |Locks | [4] |
  6. # | Users | [0, 1] |
  7. # +-------+--------------------+
nafvub8i

nafvub8i1#

正如Samkart已经指出的那样,Spark Dataframe 中没有内在的顺序。如果您想保留原始 Dataframe 中的哪一行在分组操作中进入了哪个组的信息,可以使用monotonically_increasing_id为原始 Dataframe 中的每一行分配一个唯一的id,然后使用collect_list作为聚合函数:

  1. from pyspark.sql import functions as F
  2. df=spark.createDataFrame(df1)
  3. df.withColumn("id", F.monotonically_increasing_id())
  4. .groupBy('atable')
  5. .agg(F.collect_list('id'))
  6. .show(truncate=False)

产出:

  1. +-------+--------------------------+
  2. |atable |collect_list(id) |
  3. +-------+--------------------------+
  4. |Domains|[17179869184, 25769803776]|
  5. |Users |[0, 8589934592] |
  6. |Locks |[25769803777] |
  7. +-------+--------------------------+

monotonically_increasing_id创建的ID是唯一的,但不是顺序的。创建顺序ID(herehere)有一些“黑客”方法,但通常这些方法不是一个好主意,因为唯一的顺序ID不适合Spark的分布式本质。

展开查看全部

相关问题