我尝试从hbase加载数据到spark dataframe,如下所示
def catalog =
s"""{
|"table":{"namespace":"default", "name":"emp_data"},
|"rowkey":"key",
|"columns":{
|"UID":{"cf":"rowkey", "col":"key", "type":"string"},
|"LAST_NAME":{"cf":"AML_DATA", "col":"last_name", "type":"string"},
|"FIRST_NAME":{"cf":"AML_DATA", "col":"first_name", "type":"string"},
|"ALIASES":{"cf":"AML_DATA", "col":"aliases", "type":"string"}
|}
|}""".stripMargin
val spark = SparkSession.builder()
.appName("HBaseSparkRead")
.getOrCreate()
val sqlContext = spark.sqlContext
import spark.implicits._
import sqlContext.implicits._
var newHbaseDF = spark.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
newHbaseDF.printSchema()
newHbaseDF.show(false)
我可以得到如下模式
root
|-- UID: string (nullable = true)
|-- LAST_NAME: string (nullable = true)
|-- FIRST_NAME: string (nullable = true)
|-- ALIASES: string (nullable = true)
但是当我展示table的时候,我得到了一个例外
2020-03-24 11:59:01,859 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, UID), StringType), true, false) AS UID#174
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, LAST_NAME), StringType), true, false) AS LAST_NAME#175
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, FIRST_NAME), StringType), true, false) AS FIRST_NAME#176
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, ALIASES), StringType), true, false) AS ALIASES#177
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:344)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.$anonfun$toCatalystRDD$2(DataSourceStrategy.scala:415)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:340)
... 19 more
我可以从hbase shell扫描表。在scala spark的行中添加一个新的utf8字符串列时,我已经从错误中检查了问题和答案,但不知道如何解决这个问题。
我测试的环境是:
hadoop: 3.2.1
hbase: 2.2.3
spark: 3.0.0-preview2-hadoop3.2
scala lib: 2.12.10
shc: 1.1.3-2.4 modified for scala 2.12
java: openjdk version "1.8.0_242"
请帮我修一下。谢谢
暂无答案!
目前还没有任何答案,快来回答吧!