如何将spark流保存到本地pc和hdfs?

eiee3dmh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(448)

此数据正在流化,无法以元组形式保存在本地磁盘或HDF中。从pyspark导入sparkconf,sparkcontext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## Constants

APP_NAME = "PythonStreamingDirectKafkaWordCount"

## OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    def process(RDD):
        #RDD.pprint()
        kvs2=RDD.map()
        kvs2.saveAsTextFiles('path')

    #kvs.foreachRDD(lambda x: process(x))
    #kvs1=kvs.map(lambda x: x)
    kvs.pprint()

    kvs.saveAsTextFiles('path','txt')

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
pjngdqdw

pjngdqdw1#

在这一行:

kvs.saveAsTextFiles('path','txt')

您存储的是原始流,而不是具有元组的流。而是从计数中存储:

counts.saveAsTextFiles('path','txt')

请注意保存在“path”中提供的目录下的工作节点上的文件。
pyspark api不支持保存到hdfs,因为对于最新版本,其他语言都有saveashadoopfiles。链接到文档。

相关问题