我有以下代码:
DStream.map {
_.message()
}.foreachRDD { rdd =>
rdd.foreachPartition{iter =>
val conf = HBaseUtils.configureHBase("iemployee")
val connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf("""iemployee"""))
iter.foreach{elem =>
/* loop through the records in the partition and push them out to the DB */
}
}
有人能告诉我连接对象 val connection = ConnectionFactory.createConnection(conf)
这里是每个分区中使用的同一个连接对象(因为我从不关闭它),还是会为每个分区创建一个新的连接对象?
1条答案
按热度按时间0yycz8jy1#
每个分区的新连接示例。。
请参阅以下连接工厂的代码和文档。还有人提到,它的呼叫者有责任关闭连接。
希望这有帮助!!