pyspark触发流数据管道的问题

mfuanj7w  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(288)

我正在使用下面的程序,并在anaconda(spyder)中运行它,用python创建从kafka到spark streaming的数据管道

import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os

## Step 1: Initialize sparkcontext

spark_context = SparkContext(appName="Transformation Application")

### Step 2: Initialize streaming context

ssc = StreamingContext(spark_context, 5)

def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()

当我打印消息、单词、单词时,如果没有得到正确的结果,则会得到十六进制值。

message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>

wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>

在我的主题(testtopic)中,我生成了一个字符串-“您好,您好”,然后wordcount应该给出每个单词的计数,但它给出了一些编码的十六进制值

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题