Apache Spark 广播变量和Map分区

noj0wjuj  于 2022-11-30  发布在  Apache
关注(0)|答案(1)|浏览(154)

内容

pySpark 中,我使用以下代码向所有节点广播一个变量:

sc = spark.sparkContext # Get context

# Extract stopwords from a file in hdfs
# The result looks like stopwords = {"and", "foo", "bar" ... }
stopwords = set([line[0] for line in csv.reader(open(SparkFiles.get("stopwords.txt"), 'r'))])

# The set of stopwords is broadcasted now
stopwords = sc.broadcast(stopwords)

在广播stopwords之后,我想让它在mapPartitions中可访问:

# Some dummy-dataframe
df = spark.createDataFrame([(["TESTA and TESTB"], ), (["TESTB and TESTA"], )], ["text"])

# The method which will be applied to mapPartitions
def stopwordRemoval(partition, passed_broadcast):
    """
    Removes stopwords from "text"-column.

    @partition: iterator-object of partition.
    @passed_stopwords: Lookup-table for stopwords.
    """

    # Now the broadcast is passed
    passed_stopwords = passed_broadcast.value

    for row in partition:
        yield [" ".join((word for word in row["text"].split(" ") if word not in passed_stopwords))]

# re-partitioning in order to get mapPartitions working
df = df.repartition(2)

# Now apply the method
df = df.select("text").rdd \
        .mapPartitions(lambda partition: stopwordRemoval(partition, stopwords)) \
        .toDF()

# Result
df.show()

#Result:
+------------+
| text       |
+------------+
|TESTA TESTB |
|TESTB TESTA |
+------------+

问题

尽管它能工作,但我不太确定这是否是广播变量的正确用法。因此,我的问题是:
1.当我以演示的方式将广播传递给mapParitions时,是否正确执行了广播?
1.在mapParitions中使用广播是否有用,因为无论如何,停用词都将随函数分发到所有节点(停用词从不重用)?
第二个问题与this question有关,它部分地回答了我自己的问题。这就是为什么我也选择问这个问题。

g6ll5ycj

g6ll5ycj1#

过了一段时间,我读到了一些额外的信息,这些信息回答了我的问题。因此,我想分享我的见解。

**问题1:**当我以演示的方式将广播传递给mapParitions时,广播是否正确执行?

首先要注意的是,SparkContext.broadcast()是一个封装变量的 Package 器,可以在文档中读取。此 Package 器序列化变量并将信息添加到执行图,以在节点上分发此序列化形式。调用广播.value-参数是在使用时再次反序列化变量的命令。此外,文档状态为:
创建广播变量后,应在集群上运行的任何函数中使用该变量来代替值v,以便v [变量]不会多次发送到节点。
其次,我发现了几个来源,它们说明这可以用于UDF(* 用户定义函数 *),例如heremapPartitions()udf()应该被认为是类似的,因为在pySpark的情况下,它们都将数据传递给各自节点上的Python示例。
关于这一点,重要的部分是:反序列化必须是Python函数(udf()或任何传递给mapPartitions()的函数)本身的一部分,这意味着它的.value参数不能作为函数参数传递。
由此可见,广播的方式做得对:braodcasted Package 函数会当做参数传递,而且变数会在stopwordRemoval()内还原序列化。

**问题2:**mapParitions中使用广播是否有用,因为停用词将随函数分发到所有节点(停用词从不重用)?

它记录了只有当序列化为手头的任务产生任何价值时才有优势。
以这种方式广播的数据会以序列化的形式快取,并在执行每个工作之前还原序列化。这表示只有当多个阶段的工作需要相同的数据,或必须以还原序列化的形式快取数据时,明确建立广播变数才有用。
当您有一个大型引用要广播到群集时,可能会出现这种情况:
[...]Spark还尝试使用有效的广播算法来分发广播变量以减少通信成本。
如果这适用于您的广播,则广播具有优势。

相关问题