为什么pyspark udf函数只能在一个内核上运行?

wqsoz72f  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(172)

假设我有一个只有一列text的 Dataframe ,我想写一个udf函数,从这个 Dataframe 的每一行中提取多个文本跨度。我有以下函数:

@F.udf(returnType=T.ArrayType(T.StringType()))
def generate_text_spans(text):
    spans = []
    # performs some processing on text and fills spans variable
    return spans

df = df.withColumn('spans', generate_text_spans(F.col('text')))
df.collect()

问题是,当我运行这段代码时,整个过程只发生在一个内核上,而我有将近100个内核专用于我的pyspark(其他所有内核都很顺利,但这一任务只在一个内核上工作)。
我真的很感谢你的任何想法。

weylhg0b

weylhg0b1#

使用重新分区

df = df.repartition(100).withColumn('spans', generate_text_spans(F.col('text')))
toiithl6

toiithl62#

显然,当你persist()你的数据和show()它时,就会发生这种情况。删除这些行对我来说很有用。现在所有的内核都被使用了。

相关问题