如何将sparkDataframe写入hbase?

x7rlezfr  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(374)

我在试着写作 Spark Dataframe 进入 HBase 并且关注了其他几个博客,其中一个是this,但它不起作用。
但是我可以从 HBase 成功地作为 Dataframe . 还有一些帖子用过 org.apache.hadoop.hbase.spark 格式和其他 org.apache.spark.sql.execution.datasources.hbase . 我不知道该用哪一个。 Spark - 2.2.2 ; HBase - 1.4.7 ; Scala - 2.11.12 以及 Hortonworks SHC 1.1.0-2.1-s_2.11 从这里开始。
代码如下:

case class UserMessageRecord(
                          rowkey: String,
                          Name: String,
                          Number: String,
                          message: String,
                          lastTS: String
                        )//this has been defined outside of the object scope

val exmple = List(UserMessageRecord("86325980047644033486","enrique","123455678",msgTemplate,timeStamp))

import spark.sqlContext.implicits._

val userDF = exmple.toDF()

//write to HBase
userDF.write
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase").save() //exception here

//read from HBase and it's working fine
def withCatalog(cat: String): DataFrame = {
      spark.sqlContext
    .read
    .options(Map(HBaseTableCatalog.tableCatalog->cat))
    .format("org.apache.spark.sql.execution.datasources.hbase")
    .load()
    }
    val df = withCatalog(catalog)
df.show()

例外情况如下:
位于org.apache.hadoop.hbase.security.userprovider.instantiate(userprovider)的线程“main”java.lang.nullpointerexception中出现异常。java:122)在org.apache.hadoop.hbase.client.connectionfactory.createconnection(connectionfactory。java:214)在org.apache.hadoop.hbase.client.connectionfactory.createconnection(connectionfactory。java:119)在org.apache.hadoop.hbase.mapreduce.tableoutputformat.checkoutputspecs(tableoutputformat)。java:177)在org.apache.spark.internal.io.sparkhadoopmapreducewriter$.write(sparkhadoopmapreducewriter。scala:76)在org.apache.spark.rdd.pairddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply$mcv$sp(pairddfunctions)。scala:1085)在org.apache.spark.rdd.pairddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply(pairddfunctions)。scala:1085)位于org.apache.spark.rdd.pairddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply(pairddfunctions)。scala:1085)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:151)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:112)在org.apache.spark.rdd.rdd.withscope(rdd。scala:362)在org.apache.spark.rdd.pairddfunctions.saveasnewapihadoopdataset(pairddfunctions)上。scala:1084)位于org.apache.spark.sql.execution.datasources.hbase.hbaserelation.insert(hbaserelation)。scala:218)在org.apache.spark.sql.execution.datasources.hbase.defaultsource.createrelation(hbaserelation。scala:61)在org.apache.spark.sql.execution.datasources.datasource.write(datasource。scala:469)在org.apache.spark.sql.execution.datasources.saveintodatasourcecommand.run(saveintodatasourcecommand。scala:48)在org.apache.spark.sql.execution.command.executedcommandexec.sideeffectresult$lzycompute(命令。scala:58)在org.apache.spark.sql.execution.command.executedcommandexec.sideeffectresult(commands。scala:56)在org.apache.spark.sql.execution.command.executecommandexec.doexecute(commands。scala:74)在org.apache.spark.sql.execution.sparkplan$$anonfun$执行$1.apply(sparkplan。scala:117)在org.apache.spark.sql.execution.sparkplan$$anonfun$execute$1.apply(sparkplan。scala:117)在org.apache.spark.sql.execution.sparkplan$$anonfun$executequery$1.apply(sparkplan。scala:138)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:151)在org.apache.spark.sql.execution.sparkplan.executequery(sparkplan。scala:135)在org.apache.spark.sql.execution.sparkplan.execute(sparkplan。scala:116)在org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution。scala:92)在org.apache.spark.sql.execution.queryexecution.tordd(查询执行)。scala:92)位于org.apache.spark.sql.dataframewriter.runcommand(dataframewriter。scala:609)位于org.apache.spark.sql.dataframewriter.save(dataframewriter。scala:233)在hbaseconnectiontest.hbaseloadusingspark$.main(hbaseloadusingspark。scala:85)在hbaseconnectiontest.hbaseloadusingspark.main(hbaseloadusingspark.scala)

wa7juj8i

wa7juj8i1#

正如这里所讨论的,我对sparksession builder做了额外的配置更改,例外情况消失了。然而,我不清楚的原因和解决办法。

val spark: SparkSession = SparkSession.builder()
  .master("local[1]")
  .appName("HbaseSparkWrite")
  .config("spark.hadoop.validateOutputSpecs", false)
  .getOrCreate()

相关问题