如何在hdfs中设置文件的行组大小?

nzkunb0c  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(568)

我正在hdfs中运行一些关于块大小(dfs.block.size)和行组大小(parquet.block.size)的实验。
我在hdfs中有一个很大的数据集,我想用不同的块大小和行组大小来复制数据以进行测试。我可以使用以下方法复制具有不同块大小的数据:

hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M

但只有dfs.block.size被更改。我正在和你核实 hdfs dfs -stat 对于块大小,以及 parquet-tools meta 行组大小。事实上,如果我替换 parquet.block.sizeblah.blah.blah 它有同样的效果。我甚至进了Spark壳,把火点燃了 parquet.block.size 属性手动使用

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).

我正在使用hadoop3.1.0。我得到了 parquet.block.size 从这里开始。
下面是我尝试输出的前10行

row group 1:                    RC:4140100 TS:150147503 OFFSET:4
row group 2:                    RC:3520100 TS:158294646 OFFSET:59176084
row group 3:                    RC:880100 TS:80122359 OFFSET:119985867
row group 4:                    RC:583579 TS:197303521 OFFSET:149394540
row group 5:                    RC:585594 TS:194850776 OFFSET:213638039
row group 6:                    RC:2620100 TS:130170698 OFFSET:277223867
row group 7:                    RC:2750100 TS:136761819 OFFSET:332088066
row group 8:                    RC:1790100 TS:86766854 OFFSET:389772650
row group 9:                    RC:2620100 TS:125876377 OFFSET:428147454
row group 10:                   RC:1700100 TS:83791047 OFFSET:483600973

如您所知,ts(总大小)远大于64mb(67108864字节)
我目前的理论是:
我在spark shell里做这个:

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")

所以可能是因为我的输入数据已经有1034个分区了。我真的不确定。我的数据每行大约有118列。

tzxcd3kk

tzxcd3kk1#

这个 parquet.block.size 属性只影响Parquet地板编写器。这个 hdfs dfs -cp 另一方面,命令复制文件而不考虑其内容。这个 parquet.block.size 属性因此被忽略 hdfs dfs -cp .
假设您有一个应用程序,根据配置文件以jpg或png格式截图。你用 cp 命令。当然,即使您在配置文件中更改了所需的图像格式 cp 命令将始终以原始文件的图像格式创建输出文件,而不考虑配置文件。配置文件仅由屏幕截图应用程序使用,而不是由 cp . 这就是 parquet.block.size 房地产也是如此。
更改块大小的方法是重写文件。你说过你有 spark-shell . 通过发出

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")

更新:由于您在下面的评论中提到它对您不起作用,我做了一个实验,并将会议记录发布在下面:

$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1:  RC:4291 TS:5004800 OFFSET:4
row group 2:  RC:3854 TS:4499360 OFFSET:5004804
row group 3:  RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:169 TS:202080 OFFSET:4
row group 2:   RC:168 TS:201760 OFFSET:190164
row group 3:   RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:254 TS:302720 OFFSET:4
row group 2:   RC:255 TS:303280 OFFSET:284004
row group 3:   RC:263 TS:303200 OFFSET:568884

通过查看ts值,可以看到输入文件的行组大小为4.5-5m,输出文件的行组大小分别为200k和300k。这表明使用 sc.hadoopConfiguration 成为“default”,而您在下面的注解中提到的另一个方法 df.options 覆盖此默认值。
更新2:现在你已经发布了你的输出,我可以看到发生了什么。在您的例子中,压缩正在进行,增加了适合行组的数据量。行组大小适用于压缩数据,但ts显示未压缩数据的大小。但是,可以通过减去行组的起始偏移量来推断行组的大小。例如,第一行组的压缩大小为59176084-4=59176080字节或更小(因为也可以进行填充)。我将您的结果复制到计算机上的/tmp/rowgroups.dat中,并通过发出以下命令来计算您的行组大小:

$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519

(the) numinterval 命令位于 num-utils 如您所见,您的所有行组都小于您指定的行组大小(它们不完全符合规定尺寸的原因是parquet-1337。)

相关问题