来自pyspark中kafka流数据的Dataframe

zbq4xfa0  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(281)

是否可以“存储”或处理流输出作为Dataframe,以创建数据的实时分析/摘要?
我有以下代码从Kafka消费者那里捕获数据:

  1. import findspark
  2. findspark.init("/home/apps/spark")
  3. from pyspark import SparkContext
  4. from pyspark.streaming import StreamingContext
  5. from pyspark.streaming.kafka import KafkaUtils
  6. sc.stop()
  7. sc = SparkContext(appName="KafkaStreaming-0")
  8. ssc = StreamingContext(sc, 5)
  9. kafkaParams = {"metadata.broker.list": "localhost:9090"}
  10. directKafkaStream = KafkaUtils.createDirectStream(ssc, ["MyTopic"], kafkaParams)
  11. def update_func(new_val, last_sum):
  12. return sum(new_val) + (last_sum or 0)
  13. checkpointDir = "file:///home/spark/checkpoint"
  14. ssc.checkpoint(checkpointDir)
  15. lines = directKafkaStream.map(lambda x: x[1])
  16. counts = lines.flatMap(lambda line: line.split("\t")) \
  17. .map(lambda word: (word, 1)) \
  18. .updateStateByKey(update_func)
  19. counts.pprint()
  20. ssc.start()

它返回以下内容:

  1. -------------------------------------------
  2. Time: 2021-04-17 15:47:10
  3. -------------------------------------------
  4. ('551GEF,Category_A', 1)
  5. ('558PSX,Category_B', 1)
  6. ('512SED,Category_B', 1)

我想按“类别”创建一个计数摘要:

  1. +-----------+------------+
  2. |Category |Total_Count |
  3. +-----------+------------+
  4. |Category_A | 1 |
  5. +-----------+------------+
  6. |Category_B | 2 |
  7. +-----------+------------+

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题