使用phoenix连接器将sparkDataframe写入hbase

pobjuy32  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(517)

我有一个指向hbase表的配置单元表。我有一个spark作业,它创建了模式等于hbase表的数据集。我正在使用下面的命令将这个Dataframe保存到hbase表中。

sql.write().format("org.apache.phoenix.spark")
    .mode(SaveMode.Overwrite).option("table", targetTable)
    .option("zkUrl", "localhost:2181:/hbase-unsecure)
    .insertInto(targetTable);

执行时,我得到以下错误。。

java.lang.NullPointerException
at org.apache.phoenix.hive.PhoenixStorageHandler.configureJobProperties(PhoenixStorageHandler.java:185)
at org.apache.phoenix.hive.PhoenixStorageHandler.configureOutputJobProperties(PhoenixStorageHandler.java:130)
at org.apache.spark.sql.hive.HiveTableUtil$.configureJobPropertiesForStorageHandler(TableReader.scala:324)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.<init>(hiveWriterContainers.scala:67)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:226)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:310)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:259)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:239)
at com.lti.unitrax.data.load.IncrementalHiveTableLoadUnitraxMain.fullDataLoad(IncrementalHiveTableLoadUnitraxMain.java:166)
at com.lti.unitrax.data.load.TestDataLoad.main(TestDataLoad.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)

非常感谢您的帮助。
我正在使用spark2和hdp集群。

o3imoua4

o3imoua41#

我知道我在比赛中迟到了,但看到了这个帖子。我想我的回答能帮上忙。
下面是我在实现中所做的工作
df=Dataframe
zookeeperurl=集群的zookeeper url
_tgttable=要在其中写入数据的表

df.write.format("org.apache.phoenix.spark")
  .mode(org.apache.spark.sql.SaveMode.Overwrite)
  .options(collection.immutable.Map("zkUrl" -> zookeeperURL, "table" -> _tgtTable)) 
  .save()

相关问题