scala 使用EMR Spark从S3阅读大型数据集(10 TB)、对分区列执行过滤操作并写回S3的高效方式

ttisahbt  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(141)

我想使用EMR Spark从S3读取一个大的(每天约10 TB)日志数据集,根据分区和非分区列进行一些过滤,并将结果写回S3。过滤后,每天剩下的数据应该少于1 TB。
数据集以这种格式进行分区:day/hour/col 1/col 2。每天每小时的数据量相对相似。这就是为什么我将数据分块,每天读取,并按小时划分查询。下面是一个示例查询:

for(date <- 1 to 30){
        var output_path = "s3a://dest-bucket/logs/day=%d/".format(date)
        spark.table("s3a://source-bucket/logs/day=%d/")
            .filter('col1.isin("A", "B", "C"))      // col1 is partitioned in source S3
            //.filter('col3.isin("X", "Y", "Z"))    --> Point 4
            .withColumn("hid", hash($"id") % 2000)  --> Point 5
            .select("col1","col2","col3","col4")
            // .repartition(col("hid"))             --> Point 5
            .repartition(col("hour"))               --> Point 1
            .write
            // .partitionBy("hid")
            .partitionBy("hour")                    --> Point 2
            .option("header","true").mode("overwrite").parquet(output_path)
}

目标是以尽可能快的速度执行查询,并尽量减少操作问题(OOM,丢失/失败的执行器等)。目前,我的查询需要10-15个小时每日志天。有时它会因为太多的执行者丢失而失败。但是,使用aws xml复制整个文件(无过滤)在单个节点上需要相同的量。因此,我希望使用大型集群加快查询速度。
为了达到这个目的,我有几个问题可以说明这个问题:
1.为了让Spark(v2.4.8)理解源代码是在列上分区的,我需要repartition(col(“hour”))语句吗?对分区数据源进行重新分区的利弊是什么?即使Spark进行了分区发现,并发现数据在多个列上进行了分区,但在知道数据在一个列(小时)上均匀分布的基础上进行重新分区是否有任何好处。
1.为了加快写入过程,我有一个partitionBy(“hour”)语句,使写入速度提高24倍。否则,Spark每次只写一个输出文件(基本上,从_temporary文件夹中移出的部分是顺序完成的)。我不明白原因。我怎样才能使执行器并行执行写操作?
1.如果我有一个书面的partitionBy,我会从repartition语句中受益吗?
1.我可以放弃非分区列col 3上的条件,如果这可以显著提高查询性能的话。
1.我的最终目标是根据用户的ID对用户进行散列分区,并使用该分区重写数据(一个名为HID的新列)。然而,重新 Shuffle 使集群失败。我在集群中有200个节点,每个节点有128 GB的内存(m5.8xlarge)。应该有足够的内存来保存数据。是否有任何方法可以优化基于新的未分区列的重新 Shuffle ?
请参阅下面的Spark配置我使用。

spark-shell  \
--master yarn  \
--conf "spark.executor.instances=1199" \
--conf "spark.default.parallelism=5995" \
--conf "spark.sql.shuffle.partitions=5995" \
--conf "spark.driver.cores=5" \
--conf "spark.driver.maxResultSize=20g" \
--conf "spark.driver.memory=18g" \
--conf "spark.driver.memoryOverhead=3g" \
--conf "spark.executor.cores=5" \
--conf "spark.executor.memory=18g" \
--conf "spark.executor.memoryOverhead=3g" \
--conf "spark.executor.heartbeatInterval=3600" \
--conf "spark.hadoop.orc.overwrite.output.file=true" \
--conf "spark.hadoop.parquet.enable.summary-metadata=false" \
--conf "spark.sql.execution.arrow.pyspark.enabled=true" \
--conf "spark.sql.execution.arrow.pyspark.fallback.enabled=true" \
--conf "spark.sql.parquet.mergeSchema=false" \
--conf "spark.sql.parquet.int96RebaseModeInRead=CORRECTED" \
--conf "spark.sql.debug.maxToStringFields=100" \
--conf "spark.hadoop.fs.s3.connection.maximum=1000" \
--conf "spark.hadoop.fs.s3.connection.timeout=300000" \
--conf "spark.hadoop.fs.s3.threads.core=250" \
--conf "spark.hadoop.fs.s3a.connection.maximum=1000" \
--conf "spark.hadoop.fs.s3a.connection.timeout=300000" \
--conf "spark.hadoop.fs.s3a.threads.core=250"\
--conf "spark.reducer.maxBlocksInFlightPerAddress=20"

任何建议都很感激。

7eumitmz

7eumitmz1#

你应该扩展外部 Shuffle 服务,增加ESS的npc线程到至少128。
也尝试增加npc客户端线程的执行器,以64至少。
否则,在shuffle期间将得到FetchFailedException
另外maxBlocksInFlightPerAddress = 20是超级小的,你通常有多少个shuffle输出块?

相关问题