使用sparksql从hbase获取所有记录

wmtdaxz3  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(577)

我正在尝试读取hbase表中的所有记录。下面是代码片段。

SparkContext sparkContext = new SparkContext(conf);

    SQLContext sqlContext = new SQLContext(sparkContext);

    Configuration hbaseConf = HBaseConfiguration.create();

    hbaseConf.set("hbase.master", "localhost:60000");
    hbaseConf.setInt("timeout", 120000);
    hbaseConf.set("hbase.zookeeper.quorum", "localhost");
    hbaseConf.set("zookeeper.znode.parent", "/hbase-unsecure");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Test");

    DataFrame df = sqlContext.createDataFrame(sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class), TestBean.class);

    df.registerTempTable("TempTest");
    df.show();

df.show() 我在犯错误 java.lang.IllegalArgumentException: object is not an instance of declaring class at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 任何提示,为什么我要面对这个问题。

brccelvz

brccelvz1#

您正尝试从由以下数据对组成的rdd创建Dataframe:

org.apache.hadoop.hbase.io.ImmutableBytesWritable     
org.apache.hadoop.hbase.client.Result

您需要阅读您的hbaserdd:

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]);

然后将(immutablebyteswritable,result)元组转换为result的rdd:

val resultRDD = hBaseRDD.map(tuple => tuple._2)

然后将行转换为rdd,这些行可以转换为Dataframe。
作为一个示例,我们假设您有一个带有键的hbase表,该键包含两个值“value1\u value2”,您可以使用以下方法解析该键(由“\u”指定):

val keyValueRDD = resultRDD.map(result =>     (Bytes.toString(result.getRow()).split("_")(0),   Bytes.toString(result.getRow()).split("_")(1), Bytes.toFloat(result.value())))

现在,您可以创建一个Dataframe,其中的值在“\”分隔键中:

import sqlContext.implicits._
   val df = keyValueRDD.toDF("value1", "value2");
   df.registerTempTable("Table")
   sqlContext.sql("SELECT * FROM Table Limit 5").show()

为了将hbase表完全Map到Dataframe,您需要:
创建case类:(在对象外部)

case class TestRow(rowkey: String, value1: String, value2: String, value3: Float, value4: Float)

将列族定义为字节:

final val cfTest = "te"
final val cfTestBytes = Bytes.toBytes(cfTest)

分析结果:

object TestRow {
    def parseTestRow(result: Result): TestRow = {
      val rowkey = Bytes.toString(result.getRow())

      val p0 = rowkey
      val p1 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("currency")))
      val p2 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("asat")))
      val p3 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_aed")))
      val p4 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_usd")))
      TestRow(p0, p1, p2, p3, p4)
    }
  }

创建Dataframe

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]);  

val resultRDD = hBaseRDD.map(tuple => tuple._2)
val testRDD = resultRDD.map(TestRow.parseTestRow)
import sqlContext.implicits._
val testDF = testRDD.toDF()
testDF.registerTempTable("Test")
sqlContext.sql("SELECT count(*) FROM Test").show()

相关问题