如何找到rdd的大小

eblbsuwk  于 2021-05-31  发布在  Hadoop
关注(0)|答案(5)|浏览(442)

我有 RDD[Row] ,需要将其持久化到第三方存储库。但是这个第三方存储库在一次调用中最多可以接受5 mb。
所以我想根据rdd中数据的大小而不是rdd中的行数来创建分区。
我怎样才能找到一个 RDD 并在此基础上创建分区?

sshcrbum

sshcrbum1#

正如justin和wang所提到的,要得到rdd的大小并不是一件简单的事情。我们可以做个估计。
我们可以采样一个rdd,然后使用sizeestimator得到样本的大小。正如wang和justin所提到的,基于离线采样的大小数据,比如说,x行离线使用ygb,z行在运行时可能会使用z*y/xgb
下面是示例scala代码,用于获取rdd的大小/估计值。
我对scala和spark还不熟悉。下面的示例可以用更好的方式编写

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}
1yjd4xko

1yjd4xko2#

如果您实际使用集群上的大数据,那么可以使用这个版本——即,它消除了collect。

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")

  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")

  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")

  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")

  return estimatedTotalSize
}

// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)
q7solyqu

q7solyqu3#

一种直接的方法是调用以下命令,这取决于您是否希望以序列化形式存储数据,然后转到spark ui的“存储”页面,您应该能够计算出rdd(内存+磁盘)的总大小:

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

在运行时计算准确的内存大小并不容易。不过,您可以尝试在运行时进行估计:基于脱机采样的大小数据,例如,x行脱机使用y gb,运行时z行可能会使用z*y/x gb;这与贾斯汀先前提出的类似。
希望这能有所帮助。

myss37ts

myss37ts4#

我认为rdd.count()会给出rdd中元素的数量

pdtvr36n

pdtvr36n5#

这将取决于诸如序列化之类的因素,因此它不是简单的。但是,您可以获取一个样本集,并对该样本数据运行一些实验,从中推断。

相关问题