将pysparkDataframe保存到hbase

dxxyhpgq  于 2021-06-08  发布在  Hbase
关注(0)|答案(0)|浏览(260)

嗨,我在下面引用了这个链接来将我的pysparkDataframe写入hbase。但是我得到下面的错误我不知道我做错了什么,似乎我在配置方面遗漏了一些东西/遗漏了jar
下面是我尝试使用的代码:

df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1'])

catalog = ''.join("""{
"table":{"namespace":"default", "name":"testtable"},
"rowkey":"key",
"columns":{
    "col0":{"cf":"rowkey", "col":"key", "type":"string"},
    "col1":{"cf":"cf", "col":"col1", "type":"string"}
}
}""".split())

# write to hbase

df.write \
.options(catalog=catalog) \
.format('org.apache.spark.sql.execution.datasources.hbase') \
.mode("overwrite") \
.option("zkUrl","host1,host2,host3:2181") \
.save()

# reading

df_read = spark.read.options(catalog=catalog).format('org.apache.spark.sql.execution.datasources.hbase').load()
df_read .show()

我已经使用kernel.json启动了pyspark,它的pyspark提交参数是:

"PYSPARK_SUBMIT_ARGS": "--master yarn --jars hbase_spark_jar/hbase-0.90.2.jar,/hbase_spark_jar/hbase-client-1.3.1.jar,hbase_spark_jar/spark-avro_2.11-3.0.1.jar,/hbase_spark_jar/hbase-spark-1.2.0-cdh5.7.3.jar,/hbase_spark_jar/shc-1.0.0-2.0-s_2.11.jar --files /etc/hbase/2.5.0.0-1245/0/hbase-site.xml  --executor-memory 8g --executor-cores 4 --num-executors 4  pyspark-shell"

我得到以下错误:

Py4JJavaError: An error occurred while calling o90.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 5.0 failed 4 times, most recent failure: Lost task 3.3 in stage 5.0 (TID 73, hostname): java.lang.NullPointerException
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:107)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1124)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1343)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.insert(HBaseRelation.scala:160)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:107)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1124)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1343)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题