使用pythonsdk在spark上运行apachebeam wordcount管道时并行度低

8tntrjer  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(571)

我对spark集群的配置和pyspark管道的运行非常有经验,但我只是从beam开始。因此,我尝试在spark portablerunner上对pyspark和beam python sdk进行苹果对苹果的比较(运行在同一个小型spark集群上,每个集群有4个Worker,每个Worker有4个内核和8gb ram),我已经确定了一个相当大的数据集的wordcount工作,将结果存储在Parquet表中。
因此,我下载了50gb的wikipedia文本文件,拆分成大约100个未压缩的文件,并将它们存储在目录中 /mnt/nfs_drive/wiki_files/ ( /mnt/nfs_drive 是安装在所有工作机上的nfs驱动器)。
首先,我运行以下pyspark wordcount脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

这个脚本运行得非常好,大约8分钟就将Parquet文件输出到所需的位置。将主阶段(读取和拆分令牌)划分为合理数量的任务,以便有效地使用集群:

我现在正试图用beam和便携式转轮来达到同样的效果。首先,我使用以下命令启动了spark作业服务器(在spark主节点上):

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

然后,在主节点和工作节点上,我运行sdk线束,如下所示:

docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

现在spark cluster已设置为运行束流管道,我可以提交以下脚本:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])

wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)

_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

代码提交成功后,我可以在spark ui上看到作业,工人正在执行它。但是,即使运行超过1小时,也不会产生任何输出!
因此,我想确保我的设置没有问题,所以我在一个较小的数据集(只有一个wiki文件)上运行了完全相同的脚本。这个过程在大约3.5分钟内成功完成(相同数据集上的spark wordcount需要16秒!)。
我想知道beam怎么会那么慢,所以我开始研究beam管道通过作业服务器提交给spark的dag。我注意到spark工作的大部分时间都花在以下阶段:

这只是分为两个任务,如下所示:

打印调试行表明,此任务是执行“繁重工作”(即从wiki文件和拆分令牌中读取行)的地方-但是,由于此操作仅在两个任务中发生,因此工作最多将分配给两个工作人员。同样有趣的是,在50gb的大数据集上运行会在相同的dag上产生相同数量的任务。
我很不确定如何进一步进行。似乎beam管道降低了并行性,但我不确定这是由于作业服务器对管道的次优转换,还是应该以其他方式指定ptransforms以增加spark上的并行性。
感谢您的建议!

a11xaf1n

a11xaf1n1#

管道的文件io部分可以通过使用 apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*') .
融合是防止并行的另一个原因。解决办法是加入一个 apache_beam.transforms.util.Reshuffle 读入所有文件后。

相关问题