我正在使用pyspark[spark2.3.1]和hbase1.2.1,我想知道使用pyspark访问hbase的最佳方式是什么?
我做了一些初步的搜索,发现像使用shc这样的选项很少-core:1.1.1-2.1-s_2.11.jar 这是可以实现的,但无论我在哪里尝试寻找一些示例,大多数地方的代码都是用scala编写的,或者示例也是基于scala的。我尝试在pyspark中实现基本代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
def main():
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
catalog = ''.join("""{
"table":{"namespace":"default", "name":"firsttable"},
"rowkey":"key",
"columns":{
"firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
"secondcol":{"cf":"d", "col":"colname", "type":"string"}
}
}""".split())
df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
df.select("secondcol").show()
# entry point for PySpark application
if __name__ == '__main__':
main()
并使用:
spark-submit --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py
它将返回空白输出:
+---------+
|secondcol|
+---------+
+---------+
我不知道我做错了什么?也不确定做这件事的最佳方法是什么??
如有任何推荐信,将不胜感激。
当做
1条答案
按热度按时间lymgl2op1#
最后,使用shc,我可以使用pyspark代码连接到hbase-1.2.1和spark-2.3.1。以下是我的工作:
我所有的hadoop[namenode、datanode、nodemanager、resourcemanager]&hbase[hmaster、hregionserver、hquorumpeer]deamons都在我的ec2示例上启动并运行。
我将emp.csv文件放在hdfs location/test/emp.csv,其中包含以下数据:
我用以下代码创建了readwritehbase.py文件[用于从hdfs读取emp.csv文件,然后在hbase中首先创建tblemployee,将数据推入tblemployee,然后再次从同一个表中读取一些数据并在控制台上显示]:
在vm控制台上使用以下命令运行此脚本:
中间结果:读取csv文件后:
最终输出:从hbase表读取数据后:
注意:在创建hbase表并将数据插入hbase表时,它希望numberofregions应该大于3,因此我添加了
options(catalog=writeCatalog, newtable=5)
向hbase添加数据时