我的spark流应用程序从kafka获取数据并对其进行处理。
在应用程序失败的情况下,大量的数据存储在kafka中,在下一次启动spark streaming应用程序时,它会崩溃,因为一次消耗了太多的数据。因为我的应用程序不关心过去的数据,所以只使用当前(最新)的数据完全可以。
我找到了“auto.reset.offest”选项,它在spark中的行为略有不同。它删除存储在zookeeper中的偏移量(如果已配置)。然而,尽管它的行为出人意料,它应该从删除后的最新数据中获取数据。
但我发现不是。我看到在使用数据之前,所有的偏移量都被清除了。然后,由于默认行为,它应该按预期获取数据。但由于数据太多,它仍然崩溃。
当我使用“kafka console consumer”清理最新版本的偏移量和使用数据并运行我的应用程序时,它按预期工作。
因此,看起来“auto.reset.offset”不起作用,spark streaming中的kafka consumer从“最小”偏移量获取数据作为默认值。
你知道如何使用最新的spark流媒体中的Kafka数据吗?
我使用的是spark-1.0.0和kafka-2.10-0.8.1。
提前谢谢。
1条答案
按热度按时间e4yzc0pl1#
我想你把酒店的名字拼错了。正确的键是auto.offset.reset,而不是auto.reset.offest
更多信息请点击此处:http://kafka.apache.org/documentation.html#configuration
希望这有帮助。