写了一个代码,直接流(Kafka)字计数时,文件是给定的(在生产者)
代码:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
## OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
需要使用dstream将输入json文件转换为spark dataframe。
1条答案
按热度按时间ohtdti5x1#
这应该起作用:
一旦变量包含transformeddstream
kvs
,您只需创建一个数据流Map,并将数据传递给如下处理程序函数:您应该定义处理程序函数,该函数应使用json数据创建dataframe:
希望它能帮助我的朋友:)