scala—在ApacheSpark中,对于一些小数据集,当增加工人数量时,无法达到更好的加速

r7xajy2e  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(411)

利用spark算法对人口普查收入数据集进行随机森林分类。我的工人(w1和w2)有一个cpu(4核)。当我将spark配置为仅使用w1作为worker时,构建模型大约需要12秒。当我将spark配置为同时使用w1和w2作为worker时,再次需要12秒来构建模型。我可以看到,在运行代码时,这两个worker的cpu使用率都很高。不过,我希望达到一个较低的执行时间!。当我使用更大的数据集时,我可以达到更少的执行时间。
下面是我的代码片段:

import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/Census-income.libsvm")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32

val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")

有什么问题?如有任何意见,我们将不胜感激。

tp5buhyn

tp5buhyn1#

阿姆达尔定律和冈瑟定律都适用于这里。
阿姆达尔定律本质上说,并行化带来的性能提升不能超过不可并行化的相对工作量的倒数(相对于总工作量):如果一半工作量是可并行化的,而一半工作量是不可并行化的,然后,添加无限多的worker最多可以使可并行的那一半是瞬时的,但不可并行的那一半是不变的:这样基本上速度就提高了一倍。
gunther定律反过来意味着每个添加的工人都会施加一些额外的争用和一致性延迟:添加工人只会在添加工人的收益超过这个非零性能惩罚时提高性能。
在spark作业中,有非零数量的不可并行工作(例如,为作业设置驱动程序)。可并行化的工作量受数据集大小的限制:数据集越小,不可并行化的工作量就越多。
此外,spark作业中完成的一些操作将导致争用和一致性延迟,从而进一步降低添加更多工人的好处(我对spark mllib是否执行此类操作没有经验)。

相关问题