我用的是Kafka版本 0.10.2.1
还有我的项目的spring boot。
我有一个主题的5个分区,可以被运行在不同机器上的多个使用者(具有相同的组id)使用。
我面临的问题是:
我收到了Kafka警告日志中的一条消息的副本 Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
正如日志所示,这个问题的产生是因为kafka消费者未能提交。
以下是关于我的用例的一些细节:
我有一个主题的多个消费者 My-Topic
属于同一组id的 my-consumer-group
使用者使用来自kafka的消息,应用业务逻辑并将处理后的数据存储在 Cassandra
消费来自kafka的消息、应用业务逻辑并将其保存到cassandra的过程大约需要10毫秒。
我使用以下代码创建kafka消费bean
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerURL;
@Value("${spring.kafka.session.timeout}")
private int sessionTimeout;
@Value("${spring.kafka.consumer.my-group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.listener.poll-timeout}")
private int timeout;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(timeout);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
这些是我正在使用的Kafka配置
spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2
我主要关注的是属于同一消费群体的多个kafka消费者对消息的重复读取,在我的应用程序中,我必须避免重复输入数据库。
你能帮我检查一下我上面的Kafka配置和Kafka消费代码吗?这样我就可以避免重复阅读了。
1条答案
按热度按时间vfwfrxfs1#
简单的答案是不要使用
autoCommit
-它是按计划进行的。相反,让容器进行提交;使用
AckMode
RECORD
.然而,你仍然应该使你的代码幂等-总是有可能重新交付;只是使用更可靠的提交策略时,概率会更小。