我正在使用S3中的一个高度嵌套的不可拆分的JSON格式输入数据集。文件的大小可以有很大的不同-最小为10 KB,而其他为300 MB。
当使用下面的代码阅读文件时,并且仅仅进行简单的重新分区以获得所需数量的分区会导致任务分散-大多数任务在几秒钟内完成,但有一个任务会持续几个小时,然后遇到内存问题(心跳丢失/堆空间等)我重新分区是为了随机化分区到文件的Map,因为spark可能按顺序阅读文件,而同一目录中的文件往往具有相同的性质-均为大/均为小等。
df = spark.read.json('s3://my/parent/directory')
df.repartition(396)
# Settings (few):
default parallelism = 396
total number of cores = 400
我尝试的:
1.我认为输入分区(s3分区而不是spark)方案(文件夹层次结构)可能会导致这种分区倾斜的问题,其中一些s3文件夹(技术上的“前缀”)只有一个文件,而其他文件夹有数千个,所以我使用hashcode将输入转换为扁平的目录结构,其中每个文件夹只有一个文件:
之前:
/parent1
/file1
/file2
.
.
/file1000
/parent2/
/file1
现在:
hashcode=FEFRE#$#$$#FE/parent1/file1
hashcode=#$#$#Cdvfvf@#/parent1/file1
但是没有任何效果。
1.我也尝试过使用非常大的集群--认为即使存在输入偏差--那么多的内存应该能够处理更大的文件,但我仍然陷入了零散的任务中。
当我检查文件数量时(每个文件由于其嵌套-不可拆分的性质而成为 Dataframe 中的一行)分配给每个分区-我看到分配的文件数量在2到32之间。这是因为Spark基于spark.sql.files.maxPartitionBytes
在分区中拾取文件-并且可能它只分配两个文件大小巨大的文件,而当文件大小较小时,单个分区中的文件会多得多?
任何建议,使工作正常工作,并分配任务均匀-给定大小的输入文件是不能改变的东西,由于输入文件的性质。
1条答案
按热度按时间66bbxpm51#
伟大的工作扁平化的文件,以提高读取速度。前缀,因为你似乎明白是有关桶和桶读取速度是有关的文件数量下的每个前缀和他们的大小。你采取的方法将阅读速度比你原来的战略。它不会帮助你的数据本身的歪斜。
您可以考虑的一件事是,原始数据和工作数据不需要是同一组文件,而是有一种策略来获取数据,然后对数据进行预处理以提高性能。
也就是说,保持原始数据的现有格式,然后以更方便的格式制作数据副本,以便进行监管查询。(Parquet是使用S3的最佳选择)。
1.土地数据a“着陆区”
1.根据需要处理存储在登录区中的数据,将其转换为便于查询的可拆分格式。(“预处理文件夹”)
1.处理原始数据后,将其移至“已处理文件夹”。(使用现有的平面文件夹结构。)如果需要重新构建表或更改表格式,则此处理表非常重要。
1.创建一个
view
,它是“着陆区”和“预处理”文件夹中数据的并集。这将为您提供一个包含最新数据的性能表。S3被设计成一个长期廉价的存储器。它不是为了快速运行而设计的,但是他们一直在努力使它变得更好。