spark:repartitionbyrange创建多个文件

brvekthn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(1032)

我写的数据Parquet文件如下-

df.repartitionByRange($"key", rand)
  .write
  .option("maxRecordsPerFile", 5000)
  .partitionBy("key")
  .parquet("somelocation")

我使用了一个字符串列(键)来划分城市,因为我有更多的过滤器基于此。
即使在指定 maxRecordsPerFile ,在一个分区文件夹中创建了多个小文件(数十条或数百条记录)。

2exbekwf

2exbekwf1#

好的,下面的用例可以帮助你解决问题,
术语:
1maxrecordsperfile—限制每个文件写入的最大记录数。
2按范围重新分区(10,$“id”)
repartitionbyrange(numpartitions:int,partitionexprs:column*)
它将通过将分区表达式拆分为分区表达式/numpartitions equal records splits来创建numpartitions。
在将数据写入磁盘时改进压缩,
基于内存分区
为了将数据正确地写入磁盘,您几乎总是需要首先将数据重新分区到内存中。
三。partitionedby(“您想写的目录”)
方法,指定是否应将数据写入文件夹中的磁盘。默认情况下,spark不会将数据写入嵌套文件夹中的磁盘。
磁盘级分区
案例1: input rows - 1000 , repartition-10 ,maxrecordsperfile=inputrows/repartitioncount。 1000/10=100 . 导致 10 part-xx 记录数相等的文件( 100 records in each file )在磁盘级分区目录中( partition=1 )

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 100).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")

案例2: input rows - 1000 , repartition-10 ,maxrecordsperfile>输入/重新分区计数。 1000 . 再次导致 10 part-xx files 记录数相等( 100 records in each file )在磁盘级分区目录中( partition=1 )

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 1000).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")

案例3: input rows - 1000 , repartition-10 ,maxrecordsperfile<inputrows/repartitioncount,示例= 10 . 导致 100 part-xx files 记录数相等( 10 records in each file )在磁盘级分区目录中( partition=1 )

import org.apache.spark.sql.functions.{col, lit, when}
val df=spark.range(1000)
val df1=df.withColumn("partitioncol",lit("1"))

df1.repartitionByRange(10, $"id").write.option("maxRecordsPerFile", 10).partitionBy("partitioncol").parquet("/FileStore/import-stage/all4")

相关问题