在编写之前,我曾尝试使用spark-aque动态合并shuffle分区。默认情况下,spark会创建太多的小文件。然而,aqe特性声称启用它将优化这一点,并将小文件合并到更大的文件中。这对于像我这样的awss3用户来说非常重要,因为当以后尝试读取小文件时,小文件太多会导致网络拥塞。
以下是我的spark配置:
[('spark.executor.extraJavaOptions', '-XX:+UseG1GC'),
('spark.executor.id', 'driver'),
('spark.driver.extraJavaOptions', '-XX:+UseG1GC'),
('spark.driver.memory', '16g'),
('spark.sql.adaptive.enabled', 'true'),
('spark.app.name', 'pyspark-shell'),
('spark.sql.adaptive.coalescePartitions.minPartitionNum', '5'),
('spark.app.startTime', '1614929855179'),
('spark.sql.adaptive.coalescePartitions.enabled', 'true'),
('spark.driver.port', '34447'),
('spark.executor.memory', '16g'),
('spark.driver.host', '2b7345ffcf3e'),
('spark.rdd.compress', 'true'),
('spark.serializer.objectStreamReset', '100'),
('spark.master', 'local[*]'),
('spark.submit.pyFiles', ''),
('spark.submit.deployMode', 'client'),
('spark.app.id', 'local-1614929856024'),
('spark.ui.showConsoleProgress', 'true')]
aqe所需的参数都已启用,我还看到 AdaptiveSparkPlan isFinalPlan=true
在执行计划中。当我运行一个小任务(读取一个csv,做一些计算,做一个连接操作并写入parquet)时,它仍然在parquet文件夹中生成太多的小文件。是我遗漏了什么,还是这个功能没有做到它所承诺的?
暂无答案!
目前还没有任何答案,快来回答吧!