内容
在 pySpark 中,我使用以下代码向所有节点广播一个变量:
sc = spark.sparkContext # Get context
# Extract stopwords from a file in hdfs
# The result looks like stopwords = {"and", "foo", "bar" ... }
stopwords = set([line[0] for line in csv.reader(open(SparkFiles.get("stopwords.txt"), 'r'))])
# The set of stopwords is broadcasted now
stopwords = sc.broadcast(stopwords)
在广播stopwords
之后,我想让它在mapPartitions
中可访问:
# Some dummy-dataframe
df = spark.createDataFrame([(["TESTA and TESTB"], ), (["TESTB and TESTA"], )], ["text"])
# The method which will be applied to mapPartitions
def stopwordRemoval(partition, passed_broadcast):
"""
Removes stopwords from "text"-column.
@partition: iterator-object of partition.
@passed_stopwords: Lookup-table for stopwords.
"""
# Now the broadcast is passed
passed_stopwords = passed_broadcast.value
for row in partition:
yield [" ".join((word for word in row["text"].split(" ") if word not in passed_stopwords))]
# re-partitioning in order to get mapPartitions working
df = df.repartition(2)
# Now apply the method
df = df.select("text").rdd \
.mapPartitions(lambda partition: stopwordRemoval(partition, stopwords)) \
.toDF()
# Result
df.show()
#Result:
+------------+
| text |
+------------+
|TESTA TESTB |
|TESTB TESTA |
+------------+
问题
尽管它能工作,但我不太确定这是否是广播变量的正确用法。因此,我的问题是:
1.当我以演示的方式将广播传递给mapParitions
时,是否正确执行了广播?
1.在mapParitions
中使用广播是否有用,因为无论如何,停用词都将随函数分发到所有节点(停用词从不重用)?
第二个问题与this question有关,它部分地回答了我自己的问题。这就是为什么我也选择问这个问题。
1条答案
按热度按时间g6ll5ycj1#
过了一段时间,我读到了一些额外的信息,这些信息回答了我的问题。因此,我想分享我的见解。
**问题1:**当我以演示的方式将广播传递给
mapParitions
时,广播是否正确执行?首先要注意的是,
SparkContext.broadcast()
是一个封装变量的 Package 器,可以在文档中读取。此 Package 器序列化变量并将信息添加到执行图,以在节点上分发此序列化形式。调用广播.value
-参数是在使用时再次反序列化变量的命令。此外,文档状态为:创建广播变量后,应在集群上运行的任何函数中使用该变量来代替值v,以便v [变量]不会多次发送到节点。
其次,我发现了几个来源,它们说明这可以用于
UDF
(* 用户定义函数 *),例如here。mapPartitions()
和udf()
应该被认为是类似的,因为在pySpark
的情况下,它们都将数据传递给各自节点上的Python示例。关于这一点,重要的部分是:反序列化必须是Python函数(
udf()
或任何传递给mapPartitions()
的函数)本身的一部分,这意味着它的.value
参数不能作为函数参数传递。由此可见,广播的方式做得对:braodcasted Package 函数会当做参数传递,而且变数会在
stopwordRemoval()
内还原序列化。**问题2:**在
mapParitions
中使用广播是否有用,因为停用词将随函数分发到所有节点(停用词从不重用)?它记录了只有当序列化为手头的任务产生任何价值时才有优势。
以这种方式广播的数据会以序列化的形式快取,并在执行每个工作之前还原序列化。这表示只有当多个阶段的工作需要相同的数据,或必须以还原序列化的形式快取数据时,明确建立广播变数才有用。
当您有一个大型引用要广播到群集时,可能会出现这种情况:
[...]Spark还尝试使用有效的广播算法来分发广播变量以减少通信成本。
如果这适用于您的广播,则广播具有优势。