在本地执行模式下停止/启动kafka使用者/生产者流

qoefvg9y  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(327)

设置:
java 8
flink 1.2(mac osx)
Kafka0.10.0(virtualbox/ubuntu)
弗林Kafka消费者010
flinkkafkaproducer010公司
创建了一个简单的示例程序来使用来自一个kafka主题的1m消息并生成到另一个主题-以本地执行模式运行。两个主题都有32个分区。
当我让它从头到尾运行时,它会消耗并生成所有消息。如果在完成之前先启动然后停止(sigint),然后再重新启动,则生产者只接收原始消息的一个子集。
我已经为消费者确认了我的补偿,它读取了所有100万条信息。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(32);
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);

--

producer.setFlushOnCheckpoint(true);
producer.setLogFailuresOnly(false);

在本地执行模式下,这是预期的吗?是否需要启用保存点来停止和重新启动流作业?出现这种情况时,生产者似乎没有提交所有消息。
提前谢谢!

cclgggtu

cclgggtu1#

首先,在随后的运行中,它只接收消息的一个子集,因为 FlinkKafkaConsumer 正在使用Kafka中提交的偏移量作为起始位置。目前,避免这种情况的唯一方法是在发行版中(最多 1.2.0 从现在起)总是分配一个新的 group.id . 在下一版本中,将有以下新选项:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-消费者开始位置配置。
另请注意,kafka中的承诺补偿根本不用于flink中的一次处理保证。flink只依赖于检查点偏移量。更多细节可以在上面链接中的flink kafka连接器文档中找到。

相关问题