使用pyspark处理kafka流中的数据

uubf1zoe  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(226)

Kafka消费者的控制台是什么样子的:

["2017-12-31 16:06:01", 12472391, 1]
["2017-12-31 16:06:01", 12472097, 1]
["2017-12-31 16:05:59", 12471979, 1]
["2017-12-31 16:05:59", 12472099, 0]
["2017-12-31 16:05:59", 12472054, 0]
["2017-12-31 16:06:00", 12472318, 0]
["2017-12-31 16:06:00", 12471979, 0]

我想使用pyspark在指定的时间段后获取列表中的每个值或这些值的df。
我尝试过:

sc = SparkContext(appName='PythonStreamingDirectKafka')
sc.setLogLevel("WARN")
spark = SparkSession(sc)
ssc = StreamingContext(sc, 10)

brokers, topic = sys.argv[1:]

kvs = KafkaUtils.createDirectStream(ssc, [topic],
    {'metadata.broker.list': brokers})

lines = kvs.map(lambda x: x[1])
text =  lines.flatMap(lambda line: line.split(" ")).pprint()

ssc.start()
ssc.awaitTermination()

上面的文本变量是一个dstream对象,我不知道如何操作或转换它。浏览了很多博客和文档。
我想将信息提取到python列表或df中,以便对其进行操作
如果有任何帮助我会非常感激的。谢谢~

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题