在pyspark中,如何有效地替换字符串Dataframe中多个regex模式的所有示例?

hwazgwia  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(453)

我在hadoop中有一个表,它包含70亿个字符串,这些字符串本身可以包含任何内容。我需要从包含字符串的列中删除每个名称。例如字符串“john goted to the park”,我需要从中删除“john”,理想情况下只需替换为“[name]”。
在“约翰和玛丽走向市场”的情况下,输出将是“[name]和[name]走向市场”。
为了支持这一点,我有一个最常出现的20k名字的有序列表。
我可以访问hue(hive、impala)和zeppelin(spark、python和libraries)来执行这个。
我曾在db中尝试过这种方法,但由于无法更新列或迭代变量,因此无法启动,因此使用python和pyspark似乎是最好的选择,特别是考虑到计算的数量(20k个名称*7bil个输入字符串)


# nameList contains ['John','Emma',etc]

def removeNames(line, nameList):
    str_line= line[0]
    for name in nameList:
        rx = f"(^| |[[:^alpha:]])({name})( |$|[[:^alpha:]])"
        str_line = re.sub(rx,'[NAME]', str_line)
    str_line= [str_line]
    return tuple(str_line)

df = session.sql("select free_text from table")
rdd = df.rdd.map(lambda line: removeNames(line, nameList))
rdd.toDF().show()

代码正在执行,但即使我将输入文本限制为1000行(对于spark来说这算不了什么),也要花一个半小时的时间,而且在最终输出中这些行实际上没有被替换。
我想知道的是:为什么map没有实际更新rdd的行,我怎样才能使它更有效,从而在合理的时间内执行呢?
这是我第一次张贴,所以如果有必要的信息丢失,我会填写尽可能多。
谢谢您!

o7jaxewo

o7jaxewo1#

如果您对此仍感兴趣,请使用 udf (您的 removeNames 函数)spark将您的所有数据序列化到主节点,基本上取消了使用spark以分布式方式执行此操作的功能。按照评论中建议的方法,如果你同意 regexp_replace() 方法,spark将能够保持分布式节点上的所有数据,保持所有数据的分布式并提高性能。

相关问题