我有一个将pyspark流数据转换为Dataframe的代码。我需要将这个Dataframe存储到hbase中。另外帮我写代码。
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: sql_network_wordcount.py <hostname> <port> ",
file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
words = rdd.map(lambda line :line.split(" ")).collect()
spark = getSparkSessionInstance(rdd.context.getConf())
linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])
linesDataFrame.show()
except :
pass
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间iswrvxsc1#
您可以使用spark hbase connector从spark访问hbase,它提供了一个低层和底层的api
RDD
以及Dataframes
.连接器要求您定义
Schema
用于hbase表。下面是为名为的hbase表定义的模式示例table1
,行键作为键和许多列(col1-col8)。请注意rowkey
还必须详细定义为列(col0),该列具有特定的cf(rowkey)。根据Dataframe的架构定义目录后,可以使用以下方法将Dataframe写入hbase:
要从hbase读取数据:
在提交spark应用程序时,需要包含spark hbase connector包,如下所示。