我正在尝试将要用于构建实时数据管道的技术,但在将内容导出到文件时遇到了一些问题。
我已经设置了一个本地kafka集群和一个node.js生产者,它只发送一条简单的文本消息来测试功能并粗略估计实现的复杂性。
这是Spark流的工作,是读取Kafka,我正试图让它写入一个文件。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "KafkaStreamingConsumer")
ssc = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group", {"test": 1})
kafkaStream.saveAsTextFile('out.txt')
print 'Event recieved in window: ', kafkaStream.pprint()
ssc.start()
ssc.awaitTermination()
我在提交spark作业时看到的错误是:
kafkaStream.saveAsTextFile('out.txt')
AttributeError: 'TransformedDStream' object has no attribute 'saveAsTextFile'
没有对数据执行任何计算或转换,我只想构建流。我需要更改/添加什么才能导出文件中的数据?
1条答案
按热度按时间wlwcrazw1#
http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html
savastextfiles(注意复数形式)
saveastextfile(单数)是rdd上的方法,而不是数据流。