spark insert到hbase slow

vuv7lop3  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(357)

我正在使用spark插入hbase,但速度很慢。60000张唱片需要2-3分钟。我要保存大约一千万张唱片。

object WriteToHbase extends Serializable {
    def main(args: Array[String]) {
        val csvRows: RDD[Array[String] = ...
        val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        val usersRDD = csvRows.map(row => {
            new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
        })
        processUsers(sc: SparkContext, usersRDD, dateFormatter)
    })
}

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {

    usersRDD.foreachPartition(part => {
        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, tablename)

        part.foreach(userRow => {
            val id = userRow.id
            val name = userRow.name
            val date1 = dateFormatter.parseDateTime(userRow.date1)
            val hRow = new Put(Bytes.toBytes(id))
            hRow.add(cf, q, Bytes.toBytes(date1))
            hRow.add(cf, q, Bytes.toBytes(name))
            ...
            table.put(hRow)
        })
        table.flushCommits()
        table.close()
    })
}

我在spark submit中使用这个:

--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2
njthzxwz

njthzxwz1#

您必须考虑将传入数据分发到spark作业的方法。在当前的foreachpartition方法中,您还必须查看map、maptopair等转换。您需要评估整个dag生命周期,以及在哪里可以节省更多时间。
在实现并行性的基础上,可以调用spark的saveasnewapihadoopdataset动作,在hbase中进行更快速、更并行的写操作。比如:

JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};    
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration);

注意:其中,hbaseconfiguration将是一个单例,并且将是executor节点上的单个对象,用于在任务之间共享
请让我知道,如果这个伪代码不适用于您或发现任何困难的相同。

dly7yett

dly7yett2#

它很慢,因为实现没有利用数据的接近性;服务器中的spark rdd可以传输到另一台服务器上运行的hbase regionserver。
目前还没有spark的rrd操作来高效地使用hbase数据存储。

nnsrf1az

nnsrf1az3#

htable中有一个批处理api,你可以尝试以100-500个put数据包的形式发送put请求,我认为它可以加快你的速度。它为每个操作返回单独的结果,所以您可以根据需要检查失败的put。

public void batch(List<? extends Row> actions, Object[] results)

https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/htable.html#batch%28java.util.list,%20java.lang.object[]%29

相关问题