spark不能读取整行hbase的数据,只能读取最后一个属性的值

8ehkhllq  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(365)

为什么我无法在终端中获取完整的hbase数据

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
hbase_rdd.collect()
[('1', '23'), ('2', '24'), ('3', '10')]

但是原始数据 Hbase 像这样:

ROW                   COLUMN+CELL                                               
1                    column=info:age, timestamp=1525153512915, value=23        
1                    column=info:gender, timestamp=1525153501730, value=F      
1                    column=info:name, timestamp=1525153481472, value=lihuan   
2                    column=info:age, timestamp=1525153553378, value=24        
2                    column=info:gender, timestamp=1525153542869, value=F      
2                    column=info:name, timestamp=1525153531737, value=sunzhesi 
3                    column=info:age, timestamp=1525157971696, value=10        
3                    column=info:gender, timestamp=1525157958967, value=M      
3                    column=info:name, timestamp=1525157941132, value=axin

系统环境:ubuntu16.04;Python3.5.2;Spark2.3.0;hadoop2.9.0版本;hbase1.4.2版

wkyowqbh

wkyowqbh1#

实际上,我不确定当您像以前那样使用newapihadooprdd时会发生什么,但是当我尝试从hbase扫描数据时,我将“hbase.mapreduce.scan”添加到conf中。因此,也许可以尝试添加如下内容:

from py4j.java_gateway import java_import
from binascii import b2a_base64
jvm = sc._gateway.jvm

java_import(jvm, "org.apache.hadoop.hbase.client.Scan")
java_import(jvm, "org.apache.hadoop.hbase.util.Bytes")
java_import(jvm, "org.apache.hadoop.hbase.protobuf.ProtobufUtil")

to_bytes = lambda x: jvm.Bytes.toBytesBinary(x)

scan = jvm.Scan()
scan.setStartRow(to_bytes(YOUR_START_ROW))
scan.setStopRow(to_bytes(YOUR_STOP_ROW))
scan.addFamily(to_bytes(YOUR_COLUMN_FAMILY_KEY))

scan_proto_bytes = jvm.ProtobufUtil.toScan(scan).toByteArray()
scan_str = b2a_base64(str(scan_proto_bytes))

        conf = {"hbase.mapreduce.inputtable" : table,
            "hbase.mapreduce.scan" : scan_str,
            "hbase.zookeeper.quorum" : host}

相关问题