spark streaming updatestatewithkey失败

1tuwyuhd  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(312)

我正在处理一个任务,在这个任务中,我需要在pythonspark流式处理作业中保持跨批运行数据总量。我正在使用updatestatebykey(代码末尾):

  1. import sys
  2. from pyspark import SparkContext, SparkConf
  3. from pyspark.streaming import StreamingContext
  4. from pyspark.streaming.kafka import KafkaUtils
  5. import os
  6. if __name__ == "__main__":
  7. # Create Spark Context
  8. sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
  9. ssc = StreamingContext(sc, 1)
  10. # Checkpoint for backups
  11. ssc.checkpoint("file:///tmp/spark")
  12. brokers, topic = sys.argv[1:]
  13. print(brokers)
  14. print(topic)
  15. sc.setLogLevel("WARN")
  16. #Connect to Kafka
  17. kafkaParams = {"metadata.broker.list": brokers}
  18. kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
  19. def parse_log_line(line):
  20. (uuid, timestamp, url, user, region, browser, platform, cd, ttf) = line.strip().split(",")
  21. hour = timestamp[0:13]
  22. return (url, 1)
  23. lines = kafkaStream.map(lambda x: x[1])
  24. parsed_lines = lines.map(parse_log_line)
  25. clicks = parsed_lines.reduceByKey(lambda a, b: a + b)
  26. clicks.pprint()
  27. def countKeys(newValues, lastSum):
  28. if lastSum is None :
  29. lastSum = 0
  30. return sum(newValues, lastSum)
  31. # The problem is here
  32. sum_clicks = clicks.updateStateByKey(countKey)
  33. # I tried this but it didn't help
  34. # sum_clicks = clicks.updateStateByKey(countKeys, numPartitions=2)
  35. sum_clicks.pprint()
  36. ssc.start()
  37. ssc.awaitTermination()
  38. ssc.stop()

调用pprint()时会显示错误消息的相关部分,但我认为这只是因为这会触发求值。错误是:

  1. py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
  2. : org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195, num of partitions: 2]; Checkpoint RDD [ID: 267, num of partitions: 0].

它显示了原始rdd和检查点rdd中的分区数是不同的——但是我尝试指定numpartitions=2,但没有什么区别。
有人知道我做错了什么吗?谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题