我知道对于spark,我们可以将不同的池设置为Fair或FIFO,并且行为可以不同。然而,在fairscheduler.xml中,我们也可以将单个池设置为Fair或FIFO,我测试了几次,因为它们的行为似乎是相同的。然后我看了一下spark源代码,schedulingAlgorithm是这样的:
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
}
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
字符串
在fairSchedulingAlgorithm中,如果s1和s2来自同一个池,则minshare、runningtask和weight应该是相同的值,这样我们总是可以得到返回值为false。所以他们不是公平的,而是先进先出。我的fairscheduler.xml是这样的:
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>3</weight>
<minShare>2</minShare>
</pool>
<pool name="cubepublishing">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
</allocations>
型
而spark.scheduler.mode是:
# job scheduler
spark.scheduler.mode FAIR
spark.scheduler.allocation.file conf/fairscheduler.xml
型
谢谢你的帮助!
1条答案
按热度按时间pkwftd7m1#
当你使用spark-submit或其他方式在集群中提交作业时,它将被交给负责实现作业逻辑计划的Spark调度器。在Spark中,我们有两种模式。
1. FIFO默认情况下,Spark的调度器以FIFO方式运行作业。每个工作都分为几个阶段(例如:Map和归约阶段),并且第一作业在其阶段具有要启动的任务时获得所有可用资源的优先级,然后第二作业获得优先级,等等。如果队列头部的作业不需要使用整个集群,则后面的作业可以立即开始运行,但是如果队列头部的作业很大,则后面的作业可能会被显著延迟。
2.公平公平调度器还支持将作业分组到池中,并设置不同的调度选项(例如:重量)。例如,这对于为更重要的作业创建高优先级池,或者将每个用户的作业分组在一起并为用户提供相等的份额而不管他们有多少并发作业,而不是为作业提供相等的份额,都很有用。这种方法是在Hadoop Fair Scheduler之后建模的。
在没有任何干预的情况下,新提交的作业将进入默认池,但是可以通过将spark.scheduler.pool“local属性”添加到提交作业的线程中的SparkContext来设置作业池。