pyspark 仅保留DataFrame中与某些字段相关的重复项

falq053o  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(147)

我有这样的SparkDataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|AUTRE|     2|null|    08:58:00|    23:29:00|
|TDR|  QWA|     3|null|    08:57:00|    23:28:00|
|ALT| TEST|     4|null|    08:56:00|    23:27:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

我想获得一个新的 Dataframe ,其中只有关于3个字段"ID""ID2""Number"的非唯一行。
这意味着我想要这个DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

或者可能是一个包含所有副本的 Dataframe :

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+
5us2dqdw

5us2dqdw1#

一种方法是使用pyspark.sql.Window添加一个列,该列计算每行的("ID", "ID2", "Number")组合的重复项数。然后,只选择重复项数大于1的行。

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID', 'ID2', 'Number')
df.select('*', f.count('ID').over(w).alias('dupeCount'))\
    .where('dupeCount > 1')\
    .drop('dupeCount')\
    .show()

# +---+---+------+----+------------+------------+

# | ID|ID2|Number|Name|Opening_Hour|Closing_Hour|

# +---+---+------+----+------------+------------+

# |ALT|QWA|     2|null|    08:54:00|    23:25:00|

# |ALT|QWA|     2|null|    08:53:00|    23:24:00|

# |ALT|QWA|     6|null|    08:59:00|    23:30:00|

# |ALT|QWA|     6|null|    08:55:00|    23:26:00|

# +---+---+------+----+------------+------------+

我使用pyspark.sql.functions.count()来计算每个组中的项数,这将返回一个包含所有重复项的DataFrame(显示的第二个输出)。
如果您希望每个("ID", "ID2", "Number")组合仅获得一行,则可以使用另一个窗口对行进行排序。
例如,在下面,我为row_number添加了另一列,并仅选择重复计数大于1且行号等于1的行。这保证了每个分组一行。

w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
df.select(
        '*',
        f.count('ID').over(w).alias('dupeCount'),
        f.row_number().over(w2).alias('rowNum')
    )\
    .where('(dupeCount > 1) AND (rowNum = 1)')\
    .drop('dupeCount', 'rowNum')\
    .show()

# +---+---+------+----+------------+------------+

# | ID|ID2|Number|Name|Opening_Hour|Closing_Hour|

# +---+---+------+----+------------+------------+

# |ALT|QWA|     2|null|    08:54:00|    23:25:00|

# |ALT|QWA|     6|null|    08:59:00|    23:30:00|

# +---+---+------+----+------------+------------+
yduiuuwa

yduiuuwa2#

下面是一个不使用Window的方法。
具有重复项的DataFrame

df.exceptAll(df.drop_duplicates(['ID', 'ID2', 'Number'])).show()

# +---+---+------+------------+------------+

# | ID|ID2|Number|Opening_Hour|Closing_Hour|

# +---+---+------+------------+------------+

# |ALT|QWA|     2|    08:53:00|    23:24:00|

# |ALT|QWA|     6|    08:55:00|    23:26:00|

# +---+---+------+------------+------------+

具有所有重复项的DataFrame(使用left_anti联接)

df.join(df.groupBy('ID', 'ID2', 'Number')\
          .count().where('count = 1').drop('count'),
        on=['ID', 'ID2', 'Number'],
        how='left_anti').show()

# +---+---+------+------------+------------+

# | ID|ID2|Number|Opening_Hour|Closing_Hour|

# +---+---+------+------------+------------+

# |ALT|QWA|     2|    08:54:00|    23:25:00|

# |ALT|QWA|     2|    08:53:00|    23:24:00|

# |ALT|QWA|     6|    08:59:00|    23:30:00|

# |ALT|QWA|     6|    08:55:00|    23:26:00|

# +---+---+------+------------+------------+
pgccezyw

pgccezyw3#

要在pault的really great answer上扩展:我经常需要将一个 Dataframe 划分为只包含重复x次的条目的子集,由于我经常需要这样做,所以我将其转换为一个函数,在脚本的开头,我将其与许多其他辅助函数一起导入:

import pyspark.sql.functions as f
from pyspark.sql import Window
def get_entries_with_frequency(df, cols, num):
  """
  This function will filter the dataframe df down to all the rows that
  have the same values in cols num times. Example: If num=3, col="cartype", 
  then the function will only return rows where a certain cartype occurs exactly 3 times
  in the dataset. If col "cartype" contains the following:
  ["Mazda", "Seat", "Seat", "VW", "Mercedes", "VW", "VW", "Mercedes", "Seat"],
  then the function will only return rows containing "VW" or "Seat" 
  since these occur exactly 3 times.

  df: Pyspark dataframe
  cols: Either string column name or list of strings for multiple columns.
  num: int - The exact number of times a value (or combination of values,
       if cols is a list) has to appear in df.
  """
  if type(cols)==str:
    cols = [cols]
  w = Window.partitionBy(cols)
  return df.select('*', f.count(cols[0]).over(w).alias('dupeCount'))\
           .where("dupeCount = {}".format(num))\
           .drop('dupeCount')

相关问题