pyspark:如何过滤maptype列上的Dataframe(就像isin()的样式一样)

t2a7ltrp  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(326)

当我想以 isin() ,最好的策略是什么?
所以基本上我想得到一个Dataframe的所有行,其中maptype列的内容匹配maptype列表中的一个条目-“instances”。也可以是该列上的联接,但迄今为止我尝试的所有方法都失败了,因为 EqualTo does not support ordering on type map .
除了使用isin()或join()的直接方法外,我还想到了使用 to_json() 然后过滤json字符串,但这似乎是随机排序的键,因此字符串比较也不可靠?
有什么简单的东西我错过了吗?你建议如何解决这个问题?
示例df:

+----+---------------------------------------------------------+
|key |metric                                                   |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      |
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)      |
+----+---------------------------------------------------------+

筛选器(伪代码):

df.where(metric.isin([
 Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6),
 Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)
])

期望输出:

----+---------------------------------------------------------+
|key |metric                                                   |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      |
+----+---------------------------------------------------------+
piah890a

piah890a1#

比较spark中的两个map列并不是很明显。对于第一个Map中的每个键,需要检查第二个Map中的值是否相同。钥匙也一样。
使用udf可能更简单,因为在python中可以检查dict相等性:

from pyspark.sql import functions as F

map_equals = F.udf(lambda x, y: x == y, BooleanType())

# create map1 literal to filter with

map1 = F.create_map(*[
    F.lit(x) for x in chain(*{"metric1": 1.3, "metric2": 6.3, "metric3": 7.6}.items())
])

df1 = df.filter(map_equals("metric", map1))

df1.show(truncate=False)

# +----+------------------------------------------------+

# |key |metric                                          |

# +----+------------------------------------------------+

# |123k|[metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6]|

# +----+------------------------------------------------+

另一种方法是添加要作为列进行筛选的Map文本,并检查 metric 你从文字Map中得到相同的值。
下面是一个使用 transfrom 关于Map键数组 array_min 创建筛选器表达式(如果 array_min 返回true表示所有值相等):

filter_map_literal = F.create_map(*[
    F.lit(x) for x in chain(*{"metric1": 1.3, "metric2": 6.3, "metric3": 7.6}.items())
])

df1 = df.withColumn("filter_map", filter_map_literal).filter(
    F.array_min(F.expr("""transform(map_keys(metric),
                           x -> if(filter_map[x] = metric[x], true, false)
                    )""")
                )
).drop("filter_map")
a7qyws3x

a7qyws3x2#

不是比较Map相等性的最优雅的方法:您可以收集Map键,比较两个Map中每个键的值,并确保所有值都相同。我想最好构造一个过滤器df,并进行半连接,而不是使用 isin :
样品测向和过滤器测向:

df.show(truncate=False)
+----+------------------------------------------------+
|key |metric                                          |
+----+------------------------------------------------+
|123k|[metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6]|
|d23d|[metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2]|
|as3d|[metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0]|
+----+------------------------------------------------+

filter_df = df.select('metric').limit(2)
filter_df.show(truncate=False)
+------------------------------------------------+
|metric                                          |
+------------------------------------------------+
|[metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6]|
|[metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2]|
+------------------------------------------------+

过滤方法:

import pyspark.sql.functions as F

result = df.alias('df').join(
    filter_df.alias('filter_df'),
    F.expr("""
        aggregate(
            transform(
                concat(map_keys(df.metric), map_keys(filter_df.metric)),
                x -> filter_df.metric[x] = df.metric[x]
            ),
            true,
            (acc, x) -> acc and x
        )"""),
     'left_semi'
)

result.show(truncate=False)
+----+------------------------------------------------+
|key |metric                                          |
+----+------------------------------------------------+
|123k|[metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6]|
|d23d|[metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2]|
+----+------------------------------------------------+

相关问题