按列表pyspark中的键过滤 Dataframe

nvbavucw  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(177)

我有 Dataframe :

d1 = [({'the town': 1, 'County Council s': 2, 'email':5},2),
      ({'Mayor': 2, 'Indiana': 2}, 4),
      ({'Congress': 2, 'Justice': 2,'country': 2, 'veterans':1},6)
]
df1 = spark.createDataFrame(d1, ['dct', 'count'])
df1.show()

 ignore_lst = ['County Council s', 'emal','Indiana']
filter_lst = ['Congress','town','Mayor', 'Indiana']

我想写两个函数:第一个函数过滤不在ignore_list中的dct列的键,第二个函数过滤filter_lst中的键
因此,将有两列包含字典,这些字典的关键字由ignore_listfilter_lst过滤

mfpqipee

mfpqipee1#

这两个UDF应该足以满足您的情况:

from pyspark.sql.functions import col

d1 = [({'the town': 1, 'County Council s': 2, 'email':5},2),
      ({'Mayor': 2, 'Indiana': 2}, 4),
      ({'Congress': 2, 'Justice': 2,'country': 2, 'veterans':1},6)
]
ignore_lst = ['County Council s', 'emal','Indiana']
filter_lst = ['Congress','town','Mayor', 'Indiana']

df1 = spark.createDataFrame(d1, ['dct', 'count'])

@udf
def apply_ignore_lst(dct):
    return {k:v for k, v in dct.items() if k not in ignore_lst}

@udf
def apply_filter_lst(dct):
    return {k:v for k, v in dct.items() if k in filter_lst}

df1.withColumn("apply_ignore_lst", apply_ignore_lst(col("dct"))).withColumn("apply_filter_lst", apply_filter_lst(col("apply_ignore_lst"))).show(truncate=False)

+----------------------------------------------------------+-----+----------------------------------------------+----------------+
|dct                                                       |count|apply_ignore_lst                              |apply_filter_lst|
+----------------------------------------------------------+-----+----------------------------------------------+----------------+
|{the town -> 1, County Council s -> 2, email -> 5}        |2    |{the town=1, email=5}                         |{}              |
|{Indiana -> 2, Mayor -> 2}                                |4    |{Mayor=2}                                     |{Mayor=2}       |
|{Justice -> 2, Congress -> 2, country -> 2, veterans -> 1}|6    |{Congress=2, Justice=2, country=2, veterans=1}|{Congress=2}    |
+----------------------------------------------------------+-----+----------------------------------------------+----------------+
to94eoyn

to94eoyn2#

它可以使用map_filter在一行程序中完成:

df1 \
  .withColumn("ignored", F.map_filter("dct", lambda k, _: ~k.isin(ignore_lst))) \
  .withColumn("filtered", F.map_filter("dct", lambda k, _: k.isin(filter_lst)))

完整示例:

d1 = [({'the town': 1, 'County Council s': 2, 'email':5},2),
      ({'Mayor': 2, 'Indiana': 2}, 4),
      ({'Congress': 2, 'Justice': 2,'country': 2, 'veterans':1},6)
]
df1 = spark.createDataFrame(d1, ['dct', 'count'])

ignore_lst = ['County Council s', 'emal', 'Indiana']
filter_lst = ['Congress', 'town', 'Mayor', 'Indiana']

df1 = df1 \
        .withColumn("ignored", F.map_filter("dct", lambda k, _: ~k.isin(ignore_lst))) \
        .withColumn("filtered", F.map_filter("dct", lambda k, _: k.isin(filter_lst)))

[Out]:
+----------------------------------------------------------+--------------------------+
|ignored                                                   |filtered                  |
+----------------------------------------------------------+--------------------------+
|{the town -> 1, email -> 5}                               |{}                        |
|{Mayor -> 2}                                              |{Indiana -> 2, Mayor -> 2}|
|{Justice -> 2, Congress -> 2, country -> 2, veterans -> 1}|{Congress -> 2}           |
+----------------------------------------------------------+--------------------------+

相关问题