遍历sparkDataframe需要大量时间,并且失败,错误为outofmemoryerror:gc开销超过限制

yquaqz18  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(378)

我正在处理xml中的500万条记录。我将它们加载到spark dataframe中,然后尝试使用dataframeforeach方法将它们加载到hbase中。在foreach本身的处理时间很少或加载速度非常慢之后,就会出现内存不足错误。有人能提出任何解决方案或更好的方法吗?
代码:

val xmlSchemaXML = StructType(Array(
    StructField("A", StringType, nullable = true),
    StructField("B", StringType, nullable = true),
    StructField("C", StringType, nullable = true),
    StructField("D", StringType, nullable = true))
  )

  //Get File In DataFrame
  var dfXML = sqlContext.read.format("com.databricks.spark.xml")
    .option("rootTag", "ABC")
    .option("rowTag", "AB")
    .schema(xmlSchemaXML)
    .load("file:///home/xyz.xml")
    .withColumn("as_of_date", current_date())
    .withColumn("last_updated_date", current_timestamp())

  //Create HBase Configuration
  val hBaseConf = HBaseConfiguration.create()

  //Set HBase Configurations
  hBaseConf.set("hadoop.security.authentication", "kerberos")
  hBaseConf.set("hbase.zookeeper.quorum", cluster)
  hBaseConf.set("hbase.zookeeper.property.client.port", "2181")

  //Login Using KeyTab
  UserGroupInformation.setConfiguration(hBaseConf)
  UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")

  println("Creating Connection With HBase...")

  val hBaseAdmin = new HBaseAdmin(hBaseConf)

  /***************Check if Table Already Exists or Create One***************/

  if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
    println("ns:table_name does not exist...")
    val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
    val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
    columnDescriptor.setVersions(1, 15)

    try {
      tableDescriptor.addFamily(columnDescriptor)
      hBaseAdmin.createTable(tableDescriptor)
      println("ns:table_name created...")
    }
    catch {
      case _: Throwable => println("table creation failed...")
    }
  }

  /****************GET RECORD COUNT FROM ns:table_name****************/
  var rowKeyCount: Long = 0
  try {
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
    val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found in ns:table_name: " + hBaseRDD.count())
    rowKeyCount = hBaseRDD.count()
    println("====================================================================================")
  }
  catch {
    case _: Throwable => println("table reading failed...")
    case npe: NullPointerException =>
      println("Result NullPointerException: Table does not exist")
      rowKeyCount = 0
  }

  println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")

  //Create HTable for ns:table_name
  val hTable = new HTable(hBaseConf, "ns:table_name")

  println("Total Rows in File: " + dfXML.count())

  var A = ""
  var B = ""
  var C = ""
  var D = ""
  var as_of_date = ""
  var last_updated_date = ""

  dfXML.limit(100000).collect().foreach(f = elem => {

    //println(elem)
    rowKeyCount = rowKeyCount + 1

    //println("0")
     if (elem.getString(0) == null)
       A = ""
     else
    A = elem.getString(0)

    //println("1")
    if (elem.getString(1) == null)
      B = ""
    else
    B = elem.getString(1)

    //println("2")
    if (elem.getString(2) == null)
      C = ""
    else
    C = elem.getString(2)

    //println("3")
    if (elem.getString(3) == null)
      D = ""
    else
    D = elem.getString(3)

    //println("4")        
    as_of_date = elem.getDate(4).toString

    //println("5")        
    last_updated_date = elem.getTimestamp(5).toString

    var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey

    put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
    put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
    put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
    put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
    put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
    put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())

    //Commit to HBaseDB        
    hTable.put(put);
    //println(rowKeyCount + " : Record written to HBase...")

  })

  hTable.flushCommits();
yyhrrdl8

yyhrrdl81#

您需要做的是将默认的100个分区增加到对您的工作负载更敏感的分区。请从 df.repartition(1000). foreachPartition(... 然后看看1000是太多还是太少。
5m记录似乎不是一个很大的数量,很可能是您有大量的记录,或者在执行器上没有分配足够的堆空间。

相关问题