pyspark流媒体+kafka字数不打印任何结果

1hdlvixo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(438)

这是我第一次与Kafka和spark流媒体互动,我正在尝试运行下面给出的wordcount脚本。正如许多在线博客中给出的那样,这个脚本相当标准。但不管出于什么原因,spark streaming并不是在打印单词。它没有抛出任何错误,只是不显示计数。我已经通过console consumer测试了这个主题,并且正确地显示了一些消息。我甚至试着用foreachrdd来查看输入的行,但也没有显示任何内容。
提前谢谢!
版本:Kafka2.11-0.8.2.2、spark2.2.1、spark-streaming-kafka-0-8-assemblyKafka2.11-2.2.1

  1. from __future__ import print_function
  2. import sys
  3. from pyspark import SparkContext
  4. from pyspark import SparkConf
  5. from pyspark.streaming import StreamingContext
  6. from pyspark.streaming.kafka import KafkaUtils
  7. from pyspark.sql.context import SQLContext
  8. sc = SparkContext(appName="PythonStreamingKafkaWordCount")
  9. sc.setCheckpointDir('c:\Playground\spark\logs')
  10. ssc = StreamingContext(sc, 10)
  11. ssc.checkpoint('c:\Playground\spark\logs')
  12. zkQuorum, topic = sys.argv[1:]
  13. print(str(zkQuorum))
  14. print(str(topic))
  15. kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  16. lines = kvs.map(lambda x: x[1])
  17. print(kvs)
  18. counts = lines.flatMap(lambda line: line.split(" ")) \
  19. .map(lambda word: (word, 1)) \
  20. .reduceByKey(lambda a, b: a+b)
  21. counts.pprint(num=10)
  22. ssc.start()
  23. ssc.awaitTermination()

生产商代码:

  1. import sys,os
  2. from kafka import KafkaProducer
  3. from kafka.errors import KafkaError
  4. import time
  5. producer = KafkaProducer(bootstrap_servers="localhost:9092")
  6. topic = "KafkaSparkWordCount"
  7. def read_file(fileName):
  8. with open(fileName) as f:
  9. print('started reading...')
  10. contents = f.readlines()
  11. for content in contents:
  12. future = producer.send(topic,content.encode('utf-8'))
  13. try:
  14. future.get(timeout=10)
  15. except KafkaError as e:
  16. print(e)
  17. break
  18. print('.',end='',flush=True)
  19. time.sleep(0.2)
  20. print('done')
  21. if __name__== '__main__':
  22. read_file('C:\\\PlayGround\\spark\\BookText.txt')
rslzwgfq

rslzwgfq1#

你用多少核?
spark流至少需要两个内核,一个用于接收器,一个用于处理器。

相关问题