我在一个模块上工作,要求有一个生产者,我们使用kafka作为数据产生的队列,并将其提供给消费者。
现在在consumer中,我尝试至少实现一次消息传递场景。
为此,我必须汇集来自kafka的消息,然后使用这些消息。在使用这些消息之后,我将调用consumer.commitasync(offset,callback)。
我想知道会发生什么
案例1)。当commitasync()api从未被调用时(假设在调用这个api之前发生了异常);但事实并非如此,消费者再也得不到这些数据了。
案例2)。如果使用者重新启动。
下面是使用使用者设置的属性的代码片段
private Properties getConsumerProperties() {
final Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "server");
props.put(GROUP_ID_CONFIG, "groupName");
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, heartBeatinterval);
props.put(METADATA_MAX_AGE_CONFIG, metaDataMaxAge);
props.put(SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
现在在消费的基础上,一些属性设置;我有3个主题,为每个主题创建3个消费者(因为Kafka有3个分区和3个代理)。
对于数据的消耗…我根据从kafka接收到的某些属性来标识数据包..并将其传递给相关的主题(我为不同的主题使用了不同的线程池,并根据数据包中的属性创建任务并提交给线程池),处理之后,我调用consumer.commitasync(offset,callback)。
我期待着同样的消息会再次从Kafka的情况下commitasync是不是呼吁一些数据包…但我惊讶的是,它不会回来…我错过了什么。有什么样的设置,我们需要做的ApacheKafka以及至少一个。
请建议。
1条答案
按热度按时间k75qkfdt1#
在你的问题中有几件事需要解决。
在讨论如何实现至少一次行为的建议之前,我将尝试解决以下两种情况:
案例1)。当commitasync()api从未被调用时(假设在调用这个api之前发生了异常);但事实并非如此,消费者再也得不到这些数据了。
您的消费者无法获得旧数据的原因可能是
enable.auto.commit
属性,设置为true
默认情况下,将在后台定期提交偏移量。因此,后续运行的使用者将找到要处理的偏移量,并只等待新数据/消息到达。案例2)。如果使用者重新启动。
这也是类似的,即如果使用者在重新启动后找到一个提交的偏移量来处理,它将从该偏移量开始使用,无论该偏移量是由于
enable.auto.commit
属性设置为true
或者通过调用commitAsync()/commitSync()
明确地。现在,转到如何实现至少一次行为的部分-我可以想到以下两种方法:
如果要控制提交偏移量,请将“enable.auto.commit”属性设置为
false
然后调用commitSync()
或者commitAsync()
在回调函数中处理重试。注意:同步与异步提交的选择将取决于您的延迟预算和任何其他要求。所以,这里不要过多地讨论这些细节。
另一个选项是使用自动偏移提交功能,即设置
enable.auto.commit
至true
以及auto.commit.interval.ms
一个可接受的数字(同样,基于您对提交偏移量的频率的要求)。我认为Kafka的默认行为至少集中在一次语义上,因此应该相当直接。
我希望这有帮助!