将hbase加载到sparkDataframe时出现异常

9jyewag0  于 2021-06-07  发布在  Hbase
关注(0)|答案(0)|浏览(300)

我尝试从hbase加载数据到spark dataframe,如下所示

def catalog =
                    s"""{
                            |"table":{"namespace":"default", "name":"emp_data"},
                            |"rowkey":"key",
                            |"columns":{
                                    |"UID":{"cf":"rowkey", "col":"key", "type":"string"},
                                    |"LAST_NAME":{"cf":"AML_DATA", "col":"last_name", "type":"string"},
                                    |"FIRST_NAME":{"cf":"AML_DATA", "col":"first_name", "type":"string"},
                                    |"ALIASES":{"cf":"AML_DATA", "col":"aliases", "type":"string"}
                                    |}
                            |}""".stripMargin 
val spark = SparkSession.builder()
    .appName("HBaseSparkRead")
    .getOrCreate()
val sqlContext = spark.sqlContext

import spark.implicits._
import sqlContext.implicits._
var newHbaseDF = spark.read
                    .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
                    .format("org.apache.spark.sql.execution.datasources.hbase")
                    .load()

newHbaseDF.printSchema()
newHbaseDF.show(false)

我可以得到如下模式

root
 |-- UID: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- ALIASES: string (nullable = true)

但是当我展示table的时候,我得到了一个例外

2020-03-24 11:59:01,859 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, UID), StringType), true, false) AS UID#174
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, LAST_NAME), StringType), true, false) AS LAST_NAME#175
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, FIRST_NAME), StringType), true, false) AS FIRST_NAME#176
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, ALIASES), StringType), true, false) AS ALIASES#177
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:344)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy.$anonfun$toCatalystRDD$2(DataSourceStrategy.scala:415)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:340)
        ... 19 more

我可以从hbase shell扫描表。在scala spark的行中添加一个新的utf8字符串列时,我已经从错误中检查了问题和答案,但不知道如何解决这个问题。
我测试的环境是:

hadoop: 3.2.1
hbase: 2.2.3
spark: 3.0.0-preview2-hadoop3.2
scala lib: 2.12.10
shc: 1.1.3-2.4 modified for scala 2.12
java: openjdk version "1.8.0_242"

请帮我修一下。谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题