我们使用javainputdstream<consumerrecord<string,string>>从apachekafka读取消息(值:json string),加入一些oracledb数据并写入elasticsearch。
我们实现了spark streaming-kafka integration guide中描述的偏移管理,但是现在我们才意识到偏移管理对我们不起作用,并且如果当前小批量中出现故障,那么流不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息:
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
我们将代码分解为以下内容,并期望流最终在循环中反复读取相同的消息,但事实并非如此:
stream.foreachRDD(recordRDD -> {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
if (!recordRDD.isEmpty()) {
LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
}
});
consumer config param enable.auto.commit设置为false,在初始化javainputdstream之后,它也会显示在日志中。我们在测试中的嵌入式kafka代理和dev stage上的kafka服务器都面临同样的问题。目前两者都以独立模式运行。
我们尝试的是:
代理配置:increase offsets.commit.timeout.ms
使用者/流配置:将isolation.level设置为“read\u committed”
使用者/流配置:将auto.offset.reset设置为最早
spark:将spark.streaming.unpersist设置为false
spark:增加spark.streaming.kafka.maxretries的值
流:将streamingphaseduration调整为比小批量所需的时间更长
流:启用检查点
流:更改位置策略
这些都不起作用,我们似乎搜索了整个网络,却没有找到问题的原因。流似乎忽略了enable.auto.commit配置,只是在读取当前rdd的消息之后才提交。无论我们尝试什么,我们的信息流只会一直精确地读取每条消息一次。
有什么不同的方法或事实我遗漏了吗?
1条答案
按热度按时间rryofs0p1#
在更多的测试之后,我们发现只有当流在实际批处理过程中停止/崩溃时,手动提交才起作用。如果流停止并重新开始,它将再次使用失败的数据。
因此,我们现在所做的是在检测到故障时直接停止流
javaStreamingContext.stop(false)
. 在此之后,流由调度器再次启动,调度器验证流是否在正常的时间段内处于活动状态,如果没有,则启动它。这不是一个优雅的解决方案,但它首先适用于我们。