我对spark集群的配置和pyspark管道的运行非常有经验,但我只是从beam开始。因此,我尝试在spark portablerunner上对pyspark和beam python sdk进行苹果对苹果的比较(运行在同一个小型spark集群上,每个集群有4个Worker,每个Worker有4个内核和8gb ram),我已经确定了一个相当大的数据集的wordcount工作,将结果存储在Parquet表中。
因此,我下载了50gb的wikipedia文本文件,拆分成大约100个未压缩的文件,并将它们存储在目录中 /mnt/nfs_drive/wiki_files/
( /mnt/nfs_drive
是安装在所有工作机上的nfs驱动器)。
首先,我运行以下pyspark wordcount脚本:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
这个脚本运行得非常好,大约8分钟就将Parquet文件输出到所需的位置。将主阶段(读取和拆分令牌)划分为合理数量的任务,以便有效地使用集群:
我现在正试图用beam和便携式转轮来达到同样的效果。首先,我使用以下命令启动了spark作业服务器(在spark主节点上):
docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
然后,在主节点和工作节点上,我运行sdk线束,如下所示:
docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
现在spark cluster已设置为运行束流管道,我可以提交以下脚本:
import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000",
"--job_name=WordCountBeam"
])
wiki_files = '/mnt/nfs_drive/wiki_files/*'
p = beam.Pipeline(options=options)
beam_counts = (
p
| fileio.MatchFiles(wiki_files)
| beam.Map(lambda x: x.path)
| beam.io.ReadAllFromText()
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
| beam.combiners.Count.PerElement()
| beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)
_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
pyarrow.schema(
[('word', pyarrow.binary()), ('count', pyarrow.int64())]
)
)
result = p.run().wait_until_finish()
代码提交成功后,我可以在spark ui上看到作业,工人正在执行它。但是,即使运行超过1小时,也不会产生任何输出!
因此,我想确保我的设置没有问题,所以我在一个较小的数据集(只有一个wiki文件)上运行了完全相同的脚本。这个过程在大约3.5分钟内成功完成(相同数据集上的spark wordcount需要16秒!)。
我想知道beam怎么会那么慢,所以我开始研究beam管道通过作业服务器提交给spark的dag。我注意到spark工作的大部分时间都花在以下阶段:
这只是分为两个任务,如下所示:
打印调试行表明,此任务是执行“繁重工作”(即从wiki文件和拆分令牌中读取行)的地方-但是,由于此操作仅在两个任务中发生,因此工作最多将分配给两个工作人员。同样有趣的是,在50gb的大数据集上运行会在相同的dag上产生相同数量的任务。
我很不确定如何进一步进行。似乎beam管道降低了并行性,但我不确定这是由于作业服务器对管道的次优转换,还是应该以其他方式指定ptransforms以增加spark上的并行性。
感谢您的建议!
1条答案
按热度按时间a11xaf1n1#
管道的文件io部分可以通过使用
apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')
.融合是防止并行的另一个原因。解决办法是加入一个
apache_beam.transforms.util.Reshuffle
读入所有文件后。