我正在处理xml中的500万条记录。我将它们加载到spark dataframe中,然后尝试使用dataframeforeach方法将它们加载到hbase中。在foreach本身的处理时间很少或加载速度非常慢之后,就会出现内存不足错误。有人能提出任何解决方案或更好的方法吗?
代码:
val xmlSchemaXML = StructType(Array(
StructField("A", StringType, nullable = true),
StructField("B", StringType, nullable = true),
StructField("C", StringType, nullable = true),
StructField("D", StringType, nullable = true))
)
//Get File In DataFrame
var dfXML = sqlContext.read.format("com.databricks.spark.xml")
.option("rootTag", "ABC")
.option("rowTag", "AB")
.schema(xmlSchemaXML)
.load("file:///home/xyz.xml")
.withColumn("as_of_date", current_date())
.withColumn("last_updated_date", current_timestamp())
//Create HBase Configuration
val hBaseConf = HBaseConfiguration.create()
//Set HBase Configurations
hBaseConf.set("hadoop.security.authentication", "kerberos")
hBaseConf.set("hbase.zookeeper.quorum", cluster)
hBaseConf.set("hbase.zookeeper.property.client.port", "2181")
//Login Using KeyTab
UserGroupInformation.setConfiguration(hBaseConf)
UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")
println("Creating Connection With HBase...")
val hBaseAdmin = new HBaseAdmin(hBaseConf)
/***************Check if Table Already Exists or Create One***************/
if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
println("ns:table_name does not exist...")
val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
columnDescriptor.setVersions(1, 15)
try {
tableDescriptor.addFamily(columnDescriptor)
hBaseAdmin.createTable(tableDescriptor)
println("ns:table_name created...")
}
catch {
case _: Throwable => println("table creation failed...")
}
}
/****************GET RECORD COUNT FROM ns:table_name****************/
var rowKeyCount: Long = 0
try {
hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found in ns:table_name: " + hBaseRDD.count())
rowKeyCount = hBaseRDD.count()
println("====================================================================================")
}
catch {
case _: Throwable => println("table reading failed...")
case npe: NullPointerException =>
println("Result NullPointerException: Table does not exist")
rowKeyCount = 0
}
println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")
//Create HTable for ns:table_name
val hTable = new HTable(hBaseConf, "ns:table_name")
println("Total Rows in File: " + dfXML.count())
var A = ""
var B = ""
var C = ""
var D = ""
var as_of_date = ""
var last_updated_date = ""
dfXML.limit(100000).collect().foreach(f = elem => {
//println(elem)
rowKeyCount = rowKeyCount + 1
//println("0")
if (elem.getString(0) == null)
A = ""
else
A = elem.getString(0)
//println("1")
if (elem.getString(1) == null)
B = ""
else
B = elem.getString(1)
//println("2")
if (elem.getString(2) == null)
C = ""
else
C = elem.getString(2)
//println("3")
if (elem.getString(3) == null)
D = ""
else
D = elem.getString(3)
//println("4")
as_of_date = elem.getDate(4).toString
//println("5")
last_updated_date = elem.getTimestamp(5).toString
var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey
put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())
//Commit to HBaseDB
hTable.put(put);
//println(rowKeyCount + " : Record written to HBase...")
})
hTable.flushCommits();
1条答案
按热度按时间yyhrrdl81#
您需要做的是将默认的100个分区增加到对您的工作负载更敏感的分区。请从
df.repartition(1000). foreachPartition(...
然后看看1000是太多还是太少。5m记录似乎不是一个很大的数量,很可能是您有大量的记录,或者在执行器上没有分配足够的堆空间。