将spark df存储到hbase

c9qzyr3d  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(442)

我试图以一种有效的方式将spark数据集存储到hbase。当我们尝试用java中的lambda做类似的事情时:

  1. sparkDF.foreach(l->this.hBaseConnector.persistMappingToHBase(l,"name_of_hBaseTable") );

函数persistmappingtohbase使用hbase java客户机(put)存储在hbase中。

  1. I get an exception: Exception in thread "main" org.apache.spark.SparkException: Task not serializable

然后我们尝试了这个:

  1. sparkDF.foreachPartition(partition -> {
  2. final HBaseConnector hBaseConnector = new HBaseConnector();
  3. hBaseConnector.connect(hbaseProps);
  4. while (partition.hasNext()) {
  5. hBaseConnector.persistMappingToHBase(partition.next());
  6. }
  7. hBaseConnector.closeConnection();
  8. });

这似乎是工作,但似乎相当低效,我猜是因为我们创建并关闭了Dataframe的每一行的连接。
将spark ds存储到hbase的好方法是什么?我看到ibm开发的连接器,但从未使用过。

wnavrhmk

wnavrhmk1#

以下内容可用于将内容保存到hbase

  1. val hbaseConfig = HBaseConfiguration.create
  2. hbaseConfig.set("hbase.zookeeper.quorum", "xx.xxx.xxx.xxx")
  3. hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
  4. val job = Job.getInstance(hbaseConfig)
  5. job.setOutputFormatClass(classOf[TableOutputFormat[_]])
  6. job.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "test_table")
  7. val result = sparkDF.map(row -> {
  8. // Using UUID as my rowkey, you can use your own rowkey
  9. val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
  10. // setting the value of each row to Put object
  11. ....
  12. ....
  13. new Tuple2[ImmutableBytesWritable, Put](new ImmutableBytesWritable(), put)
  14. });
  15. // save result to hbase table
  16. result.saveAsNewAPIHadoopDataset(job.getConfiguration)

我的数据库中有以下依赖项 build.sbt 文件

  1. libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.3.0"
  2. libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.3.0"
  3. libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.3.0"
展开查看全部

相关问题