Apache Spark 对非确定性表达式进行重新分区

bybem2ql  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(126)

我想写这样的代码:

df.repartition(42, monotonically_increasing_id() / lit(10000))

这段代码是否会因为重新分区中的非确定性表达式而破坏某些东西?我知道这段代码将变成确定性的HashPartitioning。
提醒我的是,Spark在应用RoundRobin分区之前会在内部对分区进行排序,这是由于它的不确定性。
动机:我希望我的DF被重组成更大的块,以具有一些数据同质性,以获得更好的压缩。
RangePartitioning速度太慢,并且可能存在与非确定性类似的问题。
我试着执行这段代码,它工作正常,但是我想确保它对节点故障有弹性。

ozxc1zmp

ozxc1zmp1#

是的,这段代码将变成HashPartitioning。只有当你提供分区数给修复函数,但没有任何重新分区表达式时,才会使用循环调度。
在你的情况下,我认为你应该是好的。让我们看看什么Spark是生产在其计划,对我们来说最重要的部分是在这里:
(2)项目[代码生成ID:1]输出1:[单调递增标识()AS_不确定性#64L]输入:[]
(3)交换输入1:[_nondeterministic#64L]参数:散列分区((cast(_nondeterministic#64L as double)/ 10000.0),42),重新分区次数,[id=#231]
因此,我们有两个阶段,第一个阶段是从monotonally_increasing_id获取值的项目,然后是哈希分区
假设我们的输入有10个分区,我们执行项目,然后成功交换了9个分区,但1个失败,需要重新计算。在此阶段,计算了分区1-9的数据,但对于分区10,需要再次调用monotonically_increasing_id()。
Spark 3.0 source code for this function
看起来这个函数是不确定的,因为它的结果取决于分区号。所以问题是,如果在重新计算分区号的过程中,分区号发生了变化,现在我还没有答案。但如果它没有变化(这是我的期望)你会得到相同的值,如果它在变化,你会得到不同的值,你的数据可能会有一点不同的分布,但在你的情况下仍然是可以的(数据分布应该非常相似)。

相关问题