Spark按列重新分区,每列的分区数动态变化

3lxsmp7m  于 2023-10-23  发布在  Apache
关注(0)|答案(2)|浏览(206)

如何根据列中项目的数量对DataFrame进行分区。假设我们有一个DataFrame,其中有100人(列为first_namecountry),我们想为一个国家的每10人创建一个分区。
如果我们的数据集包含80个来自中国的人,15个来自法国的人,5个来自古巴的人,那么我们需要为中国划分8个分区,为法国划分2个分区,为古巴划分1个分区。
以下是无法工作的代码:

  • df.repartition($"country"):这将为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区
  • df.repartition(8, $"country", rand):这将为每个国家创建最多8个分区,因此它应该为中国创建8个分区,但法国和古巴分区是未知的。法国可以在8个分区中,古巴可以在5个分区中。有关详细信息,请参阅this answer

以下是repartition()文档:

当我查看repartition()方法时,我甚至没有看到一个接受三个参数的方法,所以看起来有些行为没有被记录下来。
有没有什么方法可以动态地为每列设置分区的数量?这将使创建分区数据集变得更加容易。

i2byvkas

i2byvkas1#

由于spark对数据进行分区的方式,你将无法准确地实现这一点。Spark会接受你在repartition中指定的列,将该值散列为64 b长,然后将该值与分区数取模。这样,分区数是确定性的。它以这种方式工作的原因是,连接还需要连接左右两侧的分区数匹配以确保散列在两侧是相同的。
“我们希望为一个国家的每10个人创建一个分区。”
你到底想在这里完成什么?在一个分区中只有10行可能对性能很糟糕。你是否试图创建一个分区表,其中每个分区中的文件都保证只有x行?
“df.repartition($“country”):这将为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区”
这实际上将创建一个具有默认的shuffle分区数的加密框架,该分区数按国家进行散列处理

def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }

“df.repartition(8,$“country”,兰德):这将为每个国家创建最多8个分区,因此它应该为中国创建8个分区,但法国和古巴分区是未知的。法国可以在8个分区中,古巴可以在5个分区中。有关详细信息,请参阅此答案。”
像wise这是微妙的错误。只有8个分区与国家基本上随机 Shuffle 在这8个分区。
编辑:最后一点要澄清的是, Dataframe 重分区的工作方式与写入时不同,你做了一个partitionBy(...)方法。partitionBy操作spark首先获取所有的spark分区,而不是每个spark分区。它将其切片到一个表partitionBy partition中,然后将每个partitionBy分区写入与partitionBy列对应的文件夹。

r7xajy2e

r7xajy2e2#

下面的代码将为每个数据文件(sample dataset is here)创建10行:

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)

以下是Spark 2.2之前的代码,它将为每个数据文件创建大约10行:

val desiredRowsPerPartition = 10

val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )

val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)

相关问题