假设我有一个只有一列text
的 Dataframe ,我想写一个udf函数,从这个 Dataframe 的每一行中提取多个文本跨度。我有以下函数:
@F.udf(returnType=T.ArrayType(T.StringType()))
def generate_text_spans(text):
spans = []
# performs some processing on text and fills spans variable
return spans
df = df.withColumn('spans', generate_text_spans(F.col('text')))
df.collect()
问题是,当我运行这段代码时,整个过程只发生在一个内核上,而我有将近100个内核专用于我的pyspark(其他所有内核都很顺利,但这一任务只在一个内核上工作)。
我真的很感谢你的任何想法。
2条答案
按热度按时间weylhg0b1#
使用重新分区
toiithl62#
显然,当你
persist()
你的数据和show()
它时,就会发生这种情况。删除这些行对我来说很有用。现在所有的内核都被使用了。