正确并行pyspark文本分析应用程序

lx0bsm1f  于 2021-05-22  发布在  Spark
关注(0)|答案(1)|浏览(492)

热释光;我博士通过增加分区的数量来修复我的代码,使之等于文件的数量,但是我不明白为什么这样做,或者这是不是最好的方法。任何意见都将不胜感激。
我一直在尝试通过 SparkConf 但这通常会导致内存开销错误(超过5gb的默认分配)。解决办法是 spark.default.parallelism 到690(s3中690个单独文件的100 GB)。
难道每个执行者一次不能处理多个文件吗?
函数 process_files 由以下部分组成:
从s3下载文本文件
计算一个单词的示例
产量计数和其他元数据

def run():
    '''docstring for run'''
    conf = SparkConf() \
        .set("spark.default.parallelism", 690)
    sc = SparkContext(
        appName='spark-cc-analysis',
        conf=conf)
    sqlc = SQLContext(sparkContext=sc)

    filename = config.input_file
    pathlist = pathlist_from_csv(filename)

    rdd = sc.parallelize(pathlist)
    results = rdd.mapPartitions(process_files).collect()

    columns = ['file_name','timestamp','entity','entity_count']
    df = sqlc.createDataFrame(results,columns)
    df.show()
    output = config.output
    df.write.mode('overwrite').parquet(output)

并行度设置为100时发生的内存开销错误

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 13 for 
reason Container killed by YARN for exceeding memory limits.  5.1 GB of 5.0 GB physical memory 
used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-
check-enabled because of YARN-4714.
e37o9pze

e37o9pze1#

这完全取决于文件的大小和Map的操作。这里没有人能告诉你为什么你的执行者使用了和它一样多的内存。
您将不得不通过日志分析来调试执行器的资源消耗。或者如果成本不是问题,你可以增加内存而不用担心。

相关问题