我在spark submit中使用以下jar:
--jars $(pwd)/commons-httpclient-3.1.jar,
$(pwd)/postgresql-42.2.5.jar,
$(pwd)/hbase-client-2.1.0.jar,
$(pwd)/hbase-spark-1.0.0.jar,
$(pwd)/shc-core-1.1.3-2.4-s_2.11.jar,
$(pwd)/hbase-0.94.21.jar,
$(pwd)/hbase-common-2.1.0.jar,
$(pwd)/hbase-protocol-shaded-2.1.10.jar,
$(pwd)/hbase-protocol-2.1.0.jar,
$(pwd)/hbase-server-2.1.0.jar,
$(pwd)/hadoop-client-2.5.0.jar,
$(pwd)/htrace-core-3.2.0-incubating.jar,
$(pwd)/hbase-shaded-miscellaneous-2.1.0.jar
低于错误
引起原因:java.lang.reflect.invocationtargetexception引起原因:java.lang.nosuchmethoderror:org.apache.hadoop.hbase.util.bytes.createmaxbytearray(i)[b]
下面是我的代码截图
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main{
/**
* Fetch data from the Postggres and return DataFrame Object
* @param sparkSession
* @return
*/
def fetch(sparkSession: SparkSession): DataFrame = {
val query = "(select * from table ") srceinfo"
val dbOptions:Map[String,String]= Map(
"url" -> dburl,
"dbtable" -> query,
"user" -> dbusername,
"password" -> dbpwd,
"driver" -> driver
)
val data = sparkSession.read.format("jdbc").options(dbOptions).load()
//data.show(10)
return data
}
/**
*
* @param data
*/
def saveToHbase(data: DataFrame): Unit ={
val schema =
s"""{
|"table":{"namespace":"default","name":"address_mdm"},
|"rowkey":"address_id",
|"columns":{
|"address_id":{"cf":"rowkey","col":"address_id","type":"integer"},
|"address_line_1":{"cf":"address_details","col":"address_line_1","type":"string"},
|"city":{"cf":"address_details","col":"city","type":"string"},
|"test_id":{"cf":"location_details","col":"test_id","type":"string"}
|}
|}""".stripMargin
val hbaseOptions = Map(
HBaseTableCatalog.tableCatalog -> schema,
HBaseTableCatalog.newTable -> "4"
)
data.write.options(hbaseOptions)
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
def getSession(): SparkSession = {
return SparkSession.builder().appName("DataMigrator").getOrCreate()
}
def main(args: Array[String]){
val sparkSession = getSession()
val data = fetch(sparkSession)
saveToHbase(data)
}
}
暂无答案!
目前还没有任何答案,快来回答吧!