我正在使用Redpanda与Flink进行流消息,并处理它们,每个作业都从同一主题和同一组中阅读,我在使用KafkaSource
消费数据消息时给组id,但它似乎没有采取它。
我尝试启用检查点并根据文档将commit.offsets.on.checkpoint
设置为true,但结果是一样的
KafkaSource<Map<String, Object>> logSource = KafkaSource.<Map<String, Object>>builder()
.setBootstrapServers(BOOTSTRAP_SERVER)
.setTopics("logs")
.setProperty("group.id", "group1")
.setProperty("commit.offsets.on.checkpoint", "true")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new LogDeserializer())
.build();
字符串
同样,如果同一组中的两个消费者从同一主题消费,则每个消费者应该消费唯一的记录,例如:如果我在主题X中发送了10个事件,并且两个消费者在同一组中从该主题消费,那么每个消费者不应该接收相同的事件,但在本例中并没有发生这种情况
2条答案
按热度按时间nukf8bse1#
如果你提交了多个Flink作业,这些作业都是从同一个主题阅读的,每个作业都会根据你用
setStartingOffsets
定义的策略开始读取。同样,如果同一组中的两个消费者从同一主题消费,则每个消费者应该消费唯一的记录,例如:如果我在主题X中发送了10个事件,并且两个消费者在同一组中从该主题消费,那么每个消费者不应该接收相同的事件,但在本例中并没有发生这种情况
Flink只使用消费者组,以便暴露消费者和消费组的进度以进行监控。它不利用消费者群体做其他事情。
kb5ga3dv2#
在第一个案例中,你有没有收到任何记录?