我使用SpringKafka运行kafka消费者服务。我已将enable.auto.commit设置为false,将ackmode设置为manual\u immediate。同时使用ConcurrentKafkalListenerContainerFactory,并发性等于10。(我的主题中的分区数)
现在我在听我的主题,它很少有偏移,它滞后。我正在调试模式下运行我的应用程序,当我在kafkalistener方法中得到第一条消息时,我会强制停止我的应用程序。
应用程序在调用之前停止 acknowledgment.acknowledge()
但当我检查任何滞后时,它显示为零。
侦听器代码
@KafkaListener(topics = "baeldung")
public void listen(ConsumerRecord<?, ?> message, Acknowledgment acknowledgment) {
log.info("logging");
System.out.println("Received Messasge in: "+ " " + message.value().toString());
acknowledgment.acknowledge();
System.out.println("done");
}
调试指针设置为 System.out.println("Received Messasge in: "+ " " + message.value().toString());
消费者属性:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(10);
factory.setConsumerFactory(consumerFactory());
factory
.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
主题在运行消费代码之前描述:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2
Consumer group 'testMayank2' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testMayank2 baeldung 5 135 146 11 - - -
testMayank2 baeldung 9 140 145 5 - - -
testMayank2 baeldung 6 140 145 5 - - -
testMayank2 baeldung 1 144 148 4 - - -
testMayank2 baeldung 2 141 145 4 - - -
testMayank2 baeldung 0 144 148 4 - - -
testMayank2 baeldung 7 142 147 5 - - -
testMayank2 baeldung 3 142 147 5 - - -
testMayank2 baeldung 8 141 146 5 - - -
testMayank2 baeldung 4 142 146 4 - - -
主题运行消费代码后描述:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2
Warning: Consumer group 'testMayank2' is rebalancing.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testMayank2 baeldung 5 146 146 0 - - -
testMayank2 baeldung 9 140 145 5 - - -
testMayank2 baeldung 6 145 145 0 - - -
testMayank2 baeldung 1 144 148 4 - - -
testMayank2 baeldung 2 141 145 4 - - -
testMayank2 baeldung 0 148 148 0 - - -
testMayank2 baeldung 7 147 147 0 - - -
testMayank2 baeldung 3 147 147 0 - - -
testMayank2 baeldung 8 146 146 0 - - -
testMayank2 baeldung 4 146 146 0 - - -
spring boot版本:2.2.6.release kafka版本:2.3.1
我做错什么了吗?在异常关机期间,是否期望提交偏移量?
暂无答案!
目前还没有任何答案,快来回答吧!