我必须查询hbase,然后使用spark和scala处理数据。我的问题是,在我的解决方案中,我获取hbase表中的所有数据,然后进行筛选,这不是一种有效的方法,因为它占用了太多内存。所以我想直接做过滤器,怎么做?
def HbaseSparkQuery(table: String, gatewayINPUT: String, sparkContext: SparkContext): DataFrame = {
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
val conf = HBaseConfiguration.create()
val tableName = table
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.master", "localhost:60000")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val DATAFRAME = hBaseRDD.map(x => {
(Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("rssi"))))
}).toDF()
.withColumnRenamed("_1", "GatewayIMEA")
.withColumnRenamed("_2", "EventTime")
.withColumnRenamed("_3", "ap")
.withColumnRenamed("_4", "RSSI")
.filter($"GatewayIMEA" === gatewayINPUT)
DATAFRAME
}
正如您在我的代码中看到的,我在创建Dataframe之后,在加载hbase数据之后执行过滤器。。
提前谢谢你的回答
2条答案
按热度按时间pod7payv1#
这是我找到的解决办法
要做的是设置输入表,设置过滤器,使用过滤器进行扫描,并将扫描结果转换为rdd,然后将rdd转换为Dataframe(可选)
要进行多重筛选:
g6baxovj2#
您可以使用带有 predicate 下推的spark hbase连接器。例如https://spark-packages.org/package/huawei-spark/spark-sql-on-hbase