我在做一个项目,包括Kafka,Spark和Hive。我有这样一个事件例子,
{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}
有一个consumer作业订阅kafka主题并使用事件,然后将它们写入hdfs(我的hive表的位置)
我的问题是,我想为每一行编写一个函数,将json事件解析为字符串,但是 AttributeError: 'NoneType' object has no attribute 'repartition'
我的整个消费者工作就像
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json
class OrderEventConsumer:
def __init__(self):
conf = SparkConf().setAppName('OrderEventConsumer')
self.sc = SparkContext().getOrCreate(conf)
self.sc.setLogLevel('ERROR')
self.ssc = StreamingContext(self.sc, 5)
self.ssc.checkpoint('/tmp/SparkCheckpoints')
sqlContext = SQLContext(self.sc)
# Kafka variables
self.zkQuorum = 'localhost:2189'
self.topic = 'test' # 'prod-raw-product-view'
def format_event(self, rdd):
for i in range(len(rdd['lineitems'])):
yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])
def consume(self):
kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
aRdd = kvs.map(lambda x: json.loads(x[1])) \
.foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
.repartition(1) \
.saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
self.ssc.start()
self.ssc.awaitTermination()
if __name__ == '__main__':
orderConsumer = OrderEventConsumer()
orderConsumer.consume()
我想写一个文件到hdfs包含行项目计数时间行为每个事件。我该怎么办?
谢谢。
1条答案
按热度按时间vjrehmav1#
你不应该使用
foreachRDD
以及foreach
函数-它们不返回任何数据。如果你想格式化你的东西,只要使用map
就像你在上一行做的那样。另外,不要使用
repartition
,但使用coalesce
-可以快得多p、 如果你刚刚开始,我建议你使用spark结构化流媒体-它可能比spark流媒体更有效,更容易起诉。