如何根据列中项目的数量对DataFrame进行分区。假设我们有一个DataFrame,其中有100人(列为first_name
和country
),我们想为一个国家的每10人创建一个分区。
如果我们的数据集包含80个来自中国的人,15个来自法国的人,5个来自古巴的人,那么我们需要为中国划分8个分区,为法国划分2个分区,为古巴划分1个分区。
以下是无法工作的代码:
df.repartition($"country")
:这将为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区df.repartition(8, $"country", rand)
:这将为每个国家创建最多8个分区,因此它应该为中国创建8个分区,但法国和古巴分区是未知的。法国可以在8个分区中,古巴可以在5个分区中。有关详细信息,请参阅this answer。
以下是repartition()
文档:
当我查看repartition()
方法时,我甚至没有看到一个接受三个参数的方法,所以看起来有些行为没有被记录下来。
有没有什么方法可以动态地为每列设置分区的数量?这将使创建分区数据集变得更加容易。
2条答案
按热度按时间i2byvkas1#
由于spark对数据进行分区的方式,你将无法准确地实现这一点。Spark会接受你在repartition中指定的列,将该值散列为64 b长,然后将该值与分区数取模。这样,分区数是确定性的。它以这种方式工作的原因是,连接还需要连接左右两侧的分区数匹配以确保散列在两侧是相同的。
“我们希望为一个国家的每10个人创建一个分区。”
你到底想在这里完成什么?在一个分区中只有10行可能对性能很糟糕。你是否试图创建一个分区表,其中每个分区中的文件都保证只有x行?
“df.repartition($“country”):这将为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区”
这实际上将创建一个具有默认的shuffle分区数的加密框架,该分区数按国家进行散列处理
“df.repartition(8,$“country”,兰德):这将为每个国家创建最多8个分区,因此它应该为中国创建8个分区,但法国和古巴分区是未知的。法国可以在8个分区中,古巴可以在5个分区中。有关详细信息,请参阅此答案。”
像wise这是微妙的错误。只有8个分区与国家基本上随机 Shuffle 在这8个分区。
编辑:最后一点要澄清的是, Dataframe 重分区的工作方式与写入时不同,你做了一个partitionBy(...)方法。partitionBy操作spark首先获取所有的spark分区,而不是每个spark分区。它将其切片到一个表partitionBy partition中,然后将每个partitionBy分区写入与partitionBy列对应的文件夹。
r7xajy2e2#
下面的代码将为每个数据文件(sample dataset is here)创建10行:
以下是Spark 2.2之前的代码,它将为每个数据文件创建大约10行: