我正在处理一个任务,在这个任务中,我需要在pythonspark流式处理作业中保持跨批运行数据总量。我正在使用updatestatebykey(代码末尾):
import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
if __name__ == "__main__":
# Create Spark Context
sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
ssc = StreamingContext(sc, 1)
# Checkpoint for backups
ssc.checkpoint("file:///tmp/spark")
brokers, topic = sys.argv[1:]
print(brokers)
print(topic)
sc.setLogLevel("WARN")
#Connect to Kafka
kafkaParams = {"metadata.broker.list": brokers}
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
def parse_log_line(line):
(uuid, timestamp, url, user, region, browser, platform, cd, ttf) = line.strip().split(",")
hour = timestamp[0:13]
return (url, 1)
lines = kafkaStream.map(lambda x: x[1])
parsed_lines = lines.map(parse_log_line)
clicks = parsed_lines.reduceByKey(lambda a, b: a + b)
clicks.pprint()
def countKeys(newValues, lastSum):
if lastSum is None :
lastSum = 0
return sum(newValues, lastSum)
# The problem is here
sum_clicks = clicks.updateStateByKey(countKey)
# I tried this but it didn't help
# sum_clicks = clicks.updateStateByKey(countKeys, numPartitions=2)
sum_clicks.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()
调用pprint()时会显示错误消息的相关部分,但我认为这只是因为这会触发求值。错误是:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195, num of partitions: 2]; Checkpoint RDD [ID: 267, num of partitions: 0].
它显示了原始rdd和检查点rdd中的分区数是不同的——但是我尝试指定numpartitions=2,但没有什么区别。
有人知道我做错了什么吗?谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!