如何使用用户定义的函数解析spark中的json?

xxhby3vn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(477)

我在做一个项目,包括Kafka,Spark和Hive。我有这样一个事件例子,

{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}

有一个consumer作业订阅kafka主题并使用事件,然后将它们写入hdfs(我的hive表的位置)
我的问题是,我想为每一行编写一个函数,将json事件解析为字符串,但是 AttributeError: 'NoneType' object has no attribute 'repartition' 我的整个消费者工作就像

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json

class OrderEventConsumer:

def __init__(self):
    conf = SparkConf().setAppName('OrderEventConsumer')
    self.sc = SparkContext().getOrCreate(conf)
    self.sc.setLogLevel('ERROR')
    self.ssc = StreamingContext(self.sc, 5)
    self.ssc.checkpoint('/tmp/SparkCheckpoints')
    sqlContext = SQLContext(self.sc)

    # Kafka variables
    self.zkQuorum = 'localhost:2189'
    self.topic = 'test'  # 'prod-raw-product-view'

def format_event(self, rdd):
    for i in range(len(rdd['lineitems'])):
        yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
                                         rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])

def consume(self):
    kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
    aRdd = kvs.map(lambda x: json.loads(x[1])) \
        .foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
        .repartition(1) \
        .saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
    self.ssc.start()
    self.ssc.awaitTermination()

if __name__ == '__main__':
    orderConsumer = OrderEventConsumer()
    orderConsumer.consume()

我想写一个文件到hdfs包含行项目计数时间行为每个事件。我该怎么办?
谢谢。

vjrehmav

vjrehmav1#

你不应该使用 foreachRDD 以及 foreach 函数-它们不返回任何数据。如果你想格式化你的东西,只要使用 map 就像你在上一行做的那样。
另外,不要使用 repartition ,但使用 coalesce -可以快得多
p、 如果你刚刚开始,我建议你使用spark结构化流媒体-它可能比spark流媒体更有效,更容易起诉。

相关问题