我试图广播连接,但我不知道如何解决序列化问题。
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
messages.map(_._2).filter(_.length > 0).foreachRDD(rdd => {
val hbaseConf = HBaseConfiguration.create
hbaseConf.set("hbase.rootdir", "hdfs://xxx.xxx.xxx.xxx:9000/hbase")
hbaseConf.set("hbase.zookeeper.quorum", "Master,slave1,slave2")
val connection = ConnectionFactory.createConnection(hbaseConf)
val hbaseBr = ssc.sparkContext.broadcast(connection)
rdd.foreach(x => {
DataHandlingUtil.dataHandle(x, nameMap, dictBroadcast, platformMapBr, hbaseBr.value)
})
})
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间yhuiod9q1#
您应该使用以下代码使每个执行器创建连接:
更好的版本:
注意:从spark streaming编程指南中复制了以上代码:https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html
另一种选择是使用hbasecontext,它内置了bulkget、bulkput和bulkdelete方法。
以下是示例代码:
关于hbasecontext的一句话:所有spark和hbase集成的根源是hbasecontext。hbasecontext接收hbase配置并将它们推送到spark执行器。这允许我们在静态位置为每个spark executor建立hbase连接。
有关详细信息,请转到此链接https://hbase.apache.org/book.html