我写了一个非常简单的flink流媒体作业,它使用 FlinkKafkaConsumer082
.
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}
这是非常好的工作,每当我把Kafka的主题,它是由我的Flink工作接收和处理的东西。现在我试着看看如果我的flink工作因为某种原因不在线会发生什么。所以我关掉了flink的工作,一直给Kafka发信息。然后我又开始了我的flink工作,希望它能处理同时发送的消息。
然而,我得到了这个信息:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]
因此,它基本上忽略了自上次关闭flink作业以来出现的所有消息,只是在队列末尾开始读取。从 FlinkKafkaConsumer082
我收集到,它自动负责与kafka代理同步处理的偏移量。然而,情况似乎并非如此。
我使用单节点kafka安装(kafka发行版附带的安装)和单节点zookeper安装(也是与kafka发行版捆绑的安装)。
我怀疑这是某种配置错误或类似的东西,但我真的不知道从哪里开始寻找。有没有其他人有这个问题,也许解决了?
2条答案
按热度按时间bnlyeluc1#
我找到了原因。您需要在
StreamExecutionEnvironment
要使kafka连接器将处理过的偏移量写入zookeeper。如果不启用它,kafka连接器将不会写入上次读取的偏移量,因此在重新启动采集作业时,它将无法从那里恢复。所以一定要写:anatoly关于更改初始偏移量的建议可能仍然是一个好主意,以防检查点由于某种原因而失败。
brgchamk2#
https://kafka.apache.org/08/configuration.html
将auto.offset.reset设置为最小值(默认为最大值)
自动偏移重置:
当zookeeper中没有初始偏移或偏移超出范围时,该怎么办:
最小:自动将偏移重置为最小偏移
最大:自动将偏移量重置为最大偏移量
其他:向消费者抛出异常。
如果将其设置为“最大”,则当代理上其订阅的主题的分区数更改时,使用者可能会丢失一些消息。为了防止在添加分区期间丢失数据,请将auto.offset.reset设置为最小值
还要确保重新启动后getgroup()相同