def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)
5条答案
按热度按时间sshcrbum1#
正如justin和wang所提到的,要得到rdd的大小并不是一件简单的事情。我们可以做个估计。
我们可以采样一个rdd,然后使用sizeestimator得到样本的大小。正如wang和justin所提到的,基于离线采样的大小数据,比如说,x行离线使用ygb,z行在运行时可能会使用z*y/xgb
下面是示例scala代码,用于获取rdd的大小/估计值。
我对scala和spark还不熟悉。下面的示例可以用更好的方式编写
1yjd4xko2#
如果您实际使用集群上的大数据,那么可以使用这个版本——即,它消除了collect。
q7solyqu3#
一种直接的方法是调用以下命令,这取决于您是否希望以序列化形式存储数据,然后转到spark ui的“存储”页面,您应该能够计算出rdd(内存+磁盘)的总大小:
在运行时计算准确的内存大小并不容易。不过,您可以尝试在运行时进行估计:基于脱机采样的大小数据,例如,x行脱机使用y gb,运行时z行可能会使用z*y/x gb;这与贾斯汀先前提出的类似。
希望这能有所帮助。
myss37ts4#
我认为rdd.count()会给出rdd中元素的数量
pdtvr36n5#
这将取决于诸如序列化之类的因素,因此它不是简单的。但是,您可以获取一个样本集,并对该样本数据运行一些实验,从中推断。