我正在运行一些实验来测试apache flink的容错能力。我目前正在使用hibench框架,为flink实现wordcount微基准测试。
我注意到,如果在执行期间杀死taskmanager,则在自动“重新部署”之后会恢复flink操作符的状态,但会丢失从基准发送到kafka的许多(全部?)元组(存储在kafka中,但在flink中未收到)。
看来复苏之后 FlinkKafkaConsumer
(基准测试使用flinkkafcumer08)代替从故障前最后一次偏移量读取的开始读取,从最新的可用偏移量开始读取(丢失故障期间发送的所有事件)。
有什么建议吗?
谢谢!
1条答案
按热度按时间wtzytmuj1#
问题出在hibench框架本身和flink的最新版本上。
为了在kafka使用者中使用“setstartfromgroupoffsets()”方法,我必须在基准中更新flink的版本。