为了降低代码的难度,我允许重新启动spark流系统来使用新的批处理大小,但需要保持以前的进度(允许丢失正在处理的批处理)。如果我使用 checkpoint 在spark流媒体中,当应用程序重新启动时,它不能更改所有配置。所以我想通过修改源代码来实现这个函数,但是我不知道从哪里开始。希望能给我一些指导,告诉我困难。
checkpoint
fykwrbwg1#
既然你说的是批量大小,我假设你问的是spark流,而不是结构化流。有一种方法可以通过编程设置批处理间隔的值,有关文档,请参阅此链接。建造商 StreamingContext 接受 duration 类的对象,该对象定义批处理间隔。您可以通过在代码中硬编码来传递批处理间隔大小,这将要求您在每次需要更改批处理间隔时都构建jar文件,相反,您可以从配置文件中获取它,这样您就不需要每次都构建代码。注意:您必须在应用程序的配置文件中设置此属性,而不是在spark的配置文件中。您可以更改批处理间隔的配置并重新启动应用程序,这不会导致任何有关检查点的问题。
StreamingContext
duration
val sparkConf: SparkConf = new SparkConf() .setAppName("app-name") .setMaster("app-master") val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(config.getInt("batch-interval")))
干杯!!
1条答案
按热度按时间fykwrbwg1#
既然你说的是批量大小,我假设你问的是spark流,而不是结构化流。
有一种方法可以通过编程设置批处理间隔的值,有关文档,请参阅此链接。
建造商
StreamingContext
接受duration
类的对象,该对象定义批处理间隔。您可以通过在代码中硬编码来传递批处理间隔大小,这将要求您在每次需要更改批处理间隔时都构建jar文件,相反,您可以从配置文件中获取它,这样您就不需要每次都构建代码。
注意:您必须在应用程序的配置文件中设置此属性,而不是在spark的配置文件中。
您可以更改批处理间隔的配置并重新启动应用程序,这不会导致任何有关检查点的问题。
干杯!!