是否可以“存储”或处理流输出作为Dataframe,以创建数据的实时分析/摘要?
我有以下代码从Kafka消费者那里捕获数据:
import findspark
findspark.init("/home/apps/spark")
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc.stop()
sc = SparkContext(appName="KafkaStreaming-0")
ssc = StreamingContext(sc, 5)
kafkaParams = {"metadata.broker.list": "localhost:9090"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["MyTopic"], kafkaParams)
def update_func(new_val, last_sum):
return sum(new_val) + (last_sum or 0)
checkpointDir = "file:///home/spark/checkpoint"
ssc.checkpoint(checkpointDir)
lines = directKafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split("\t")) \
.map(lambda word: (word, 1)) \
.updateStateByKey(update_func)
counts.pprint()
ssc.start()
它返回以下内容:
-------------------------------------------
Time: 2021-04-17 15:47:10
-------------------------------------------
('551GEF,Category_A', 1)
('558PSX,Category_B', 1)
('512SED,Category_B', 1)
我想按“类别”创建一个计数摘要:
+-----------+------------+
|Category |Total_Count |
+-----------+------------+
|Category_A | 1 |
+-----------+------------+
|Category_B | 2 |
+-----------+------------+
暂无答案!
目前还没有任何答案,快来回答吧!