我正在使用下面的程序,并在anaconda(spyder)中运行它,用python创建从kafka到spark streaming的数据管道
import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os
## Step 1: Initialize sparkcontext
spark_context = SparkContext(appName="Transformation Application")
### Step 2: Initialize streaming context
ssc = StreamingContext(spark_context, 5)
def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
if s is None:
return None
return s.decode('utf-8')
message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()
当我打印消息、单词、单词时,如果没有得到正确的结果,则会得到十六进制值。
message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>
wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>
在我的主题(testtopic)中,我生成了一个字符串-“您好,您好”,然后wordcount应该给出每个单词的计数,但它给出了一些编码的十六进制值
暂无答案!
目前还没有任何答案,快来回答吧!