无法手动提交工作(java)

ncgqoxb0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(386)

我们使用javainputdstream<consumerrecord<string,string>>从apachekafka读取消息(值:json string),加入一些oracledb数据并写入elasticsearch。
我们实现了spark streaming-kafka integration guide中描述的偏移管理,但是现在我们才意识到偏移管理对我们不起作用,并且如果当前小批量中出现故障,那么流不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息:

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

我们将代码分解为以下内容,并期望流最终在循环中反复读取相同的消息,但事实并非如此:

stream.foreachRDD(recordRDD -> {
   final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
   if (!recordRDD.isEmpty()) {
      LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
   }
});

consumer config param enable.auto.commit设置为false,在初始化javainputdstream之后,它也会显示在日志中。我们在测试中的嵌入式kafka代理和dev stage上的kafka服务器都面临同样的问题。目前两者都以独立模式运行。
我们尝试的是:
代理配置:increase offsets.commit.timeout.ms
使用者/流配置:将isolation.level设置为“read\u committed”
使用者/流配置:将auto.offset.reset设置为最早
spark:将spark.streaming.unpersist设置为false
spark:增加spark.streaming.kafka.maxretries的值
流:将streamingphaseduration调整为比小批量所需的时间更长
流:启用检查点
流:更改位置策略
这些都不起作用,我们似乎搜索了整个网络,却没有找到问题的原因。流似乎忽略了enable.auto.commit配置,只是在读取当前rdd的消息之后才提交。无论我们尝试什么,我们的信息流只会一直精确地读取每条消息一次。
有什么不同的方法或事实我遗漏了吗?

rryofs0p

rryofs0p1#

在更多的测试之后,我们发现只有当流在实际批处理过程中停止/崩溃时,手动提交才起作用。如果流停止并重新开始,它将再次使用失败的数据。
因此,我们现在所做的是在检测到故障时直接停止流 javaStreamingContext.stop(false) . 在此之后,流由调度器再次启动,调度器验证流是否在正常的时间段内处于活动状态,如果没有,则启动它。
这不是一个优雅的解决方案,但它首先适用于我们。

相关问题