如何让每个执行器从hbase读写?

ymdaylpp  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(329)

我试图广播连接,但我不知道如何解决序列化问题。

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

messages.map(_._2).filter(_.length > 0).foreachRDD(rdd => {
  val hbaseConf = HBaseConfiguration.create
  hbaseConf.set("hbase.rootdir", "hdfs://xxx.xxx.xxx.xxx:9000/hbase")
  hbaseConf.set("hbase.zookeeper.quorum", "Master,slave1,slave2")
  val connection = ConnectionFactory.createConnection(hbaseConf)

  val hbaseBr = ssc.sparkContext.broadcast(connection)
  rdd.foreach(x => {
    DataHandlingUtil.dataHandle(x, nameMap, dictBroadcast, platformMapBr, hbaseBr.value)
  })
})

ssc.start()
ssc.awaitTermination()
yhuiod9q

yhuiod9q1#

您应该使用以下代码使每个执行器创建连接:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

更好的版本:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意:从spark streaming编程指南中复制了以上代码:https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html
另一种选择是使用hbasecontext,它内置了bulkget、bulkput和bulkdelete方法。
以下是示例代码:

val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hbase_URL")
hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181)
implicit val hbaseC = new HBaseContext(new SparkContext(new SparkConf()), hbaseConf)

关于hbasecontext的一句话:所有spark和hbase集成的根源是hbasecontext。hbasecontext接收hbase配置并将它们推送到spark执行器。这允许我们在静态位置为每个spark executor建立hbase连接。
有关详细信息,请转到此链接https://hbase.apache.org/book.html

相关问题