使用pythonspark直接方法时如何从检查点恢复?

bxgwgixi  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(331)

看过官方文件后,我试着用 checkpointgetOrCreate 在Spark流中。一些片段:

def get_ssc():
    sc = SparkContext("yarn-client")
    ssc = StreamingContext(sc, 10)  # calc every 10s
    ks = KafkaUtils.createDirectStream(
        ssc, ['lucky-track'], {"metadata.broker.list": KAFKA_BROKER})
    process_data(ks)

    ssc.checkpoint(CHECKPOINT_DIR)
    return ssc

if __name__ == '__main__':
    ssc = StreamingContext.getOrCreate(CHECKPOINT_DIR, get_ssc)

    ssc.start()
    ssc.awaitTermination()

代码可以很好地用于recover,但是恢复的上下文总是在旧的进程函数上工作。这意味着即使我更改了map/reduce函数代码,它也根本不起作用。
直到现在,spark(1.5.2)仍然不支持python的任意偏移。那么,我该怎么做才能让它正常工作呢?

8wigbo56

8wigbo561#

这种行为是“设计的”,对于java/scalaspark应用程序也是有效的。整个代码在检查点时被序列化。如果代码更改,检查点数据应该被截断。

相关问题