我有一个Dataframe,我想把它插入hbase。我看这个文件。
我的Dataframe是这样的:
--------------------
|id | name | address |
|--------------------|
|23 |marry |france |
|--------------------|
|87 |zied |italie |
--------------------
我使用以下代码创建hbase表:
val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
print("-----------------------------------------------------------------------------------------------------------")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
admin.createTable(tableDesc)
}else{
print("Table already exists!!--------------------------------------------------------------------------------------")
}
现在如何将这个Dataframe插入hbase?
在另一个示例中,我成功地使用以下代码插入到hbase中:
val myTable = new HTable(conf, tableName)
for (i <- 0 to 1000) {
var p = new Put(Bytes.toBytes(""+i))
p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
myTable.put(p)
}
myTable.flushCommits()
但现在我陷入了困境,如何将Dataframe的每条记录插入hbase表。
谢谢你的时间和关注
3条答案
按热度按时间wnrlj8wa1#
使用answer进行代码格式化doc告诉:
其中sc.parallelize(data).todf是您的Dataframe。doc示例使用sc.parallelize(data).todf将scala集合转换为dataframe
你已经有你的Dataframe了,试着打电话给
它应该有用。医生很清楚。。。
升级版
给定一个具有指定模式的Dataframe,上面将创建一个具有5个区域的hbase表,并将Dataframe保存在其中。请注意,如果未指定hbasetablecatalog.newtable,则必须预先创建表。
它是关于数据分区的。每个hbase表可以有1…x个区域。你应该仔细挑选区域的数量。区域数低是不好的。高地区数字也不好。
wj8zmpe12#
下面是使用maven提供的hortonworks spark hbase连接器的完整示例。
此示例显示
如何检查hbase表是否存在
创建hbase表(如果不存在)
将Dataframe插入hbase表
在我的资源中提供了相关的站点xmls(“core site.xml”、“hbase site.xml”、“hdfs site.xml”)时,这对我很有用。
7lrncoxx3#
另一种方法是查看rdd.saveasnewapihadoopdataset,将数据插入hbase表。
裁判:https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/