如何在spark中直接流式传输(kafka)json文件并将其转换为rdd?

g52tjvyc  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(335)

写了一个代码,直接流(Kafka)字计数时,文件是给定的(在生产者)
代码:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## Constants

APP_NAME = "PythonStreamingDirectKafkaWordCount"

## OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

需要使用dstream将输入json文件转换为spark dataframe。

ohtdti5x

ohtdti5x1#

这应该起作用:
一旦变量包含transformeddstream kvs ,您只需创建一个数据流Map,并将数据传递给如下处理程序函数:

data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )

您应该定义处理程序函数,该函数应使用json数据创建dataframe:

def readMyRddsFromKafkaStream( readRdd ):
  # Put RDD into a Dataframe
  df = spark.read.json( readRdd )
  df.registerTempTable( "temporary_table" )
  df = spark.sql( """
    SELECT
      *
    FROM
      temporary_table
  """ )
  df.show()

希望它能帮助我的朋友:)

相关问题