看过官方文件后,我试着用 checkpoint
与 getOrCreate
在Spark流中。一些片段:
def get_ssc():
sc = SparkContext("yarn-client")
ssc = StreamingContext(sc, 10) # calc every 10s
ks = KafkaUtils.createDirectStream(
ssc, ['lucky-track'], {"metadata.broker.list": KAFKA_BROKER})
process_data(ks)
ssc.checkpoint(CHECKPOINT_DIR)
return ssc
if __name__ == '__main__':
ssc = StreamingContext.getOrCreate(CHECKPOINT_DIR, get_ssc)
ssc.start()
ssc.awaitTermination()
代码可以很好地用于recover,但是恢复的上下文总是在旧的进程函数上工作。这意味着即使我更改了map/reduce函数代码,它也根本不起作用。
直到现在,spark(1.5.2)仍然不支持python的任意偏移。那么,我该怎么做才能让它正常工作呢?
1条答案
按热度按时间8wigbo561#
这种行为是“设计的”,对于java/scalaspark应用程序也是有效的。整个代码在检查点时被序列化。如果代码更改,检查点数据应该被截断。