根据文档,一个主题的分区将会并且只会被分配给同一组中的一个消费者,而对于我的案例(kafka版本2.0.1)来说似乎不是这样,我以集群模式启动了3个app server,并创建了一个分区为20的主题,从而将每个app server的这个主题的并发性设置为6。当我开始启动所有3个应用服务器时(20个分区被平均分配到3个),它工作得很好,但不知怎么的重新平衡发生了(不知道为什么),一些分区被同时分配给多个使用者,导致db的id\u密钥重复错误。
@KafkaListener(topics = MQConstant.HK_STOCK_DATA_TRADE, concurrency = "#{kafkaUtil.getConcurrency('" + MQConstant.HK_STOCK_DATA_TRADE + "')}", containerFactory = "defaultKafkaListenerContainerFactory")
public void hkTradeReceive0(ConsumerRecord<?, String> record)
public synchronized Integer getConcurrency(String topic) {
return getPartitionCount(topic) < HA_SERVER_COUNT_IN_CLUSTER ? 1 : getPartitionCount(topic) / HA_SERVER_COUNT_IN_CLUSTER;
}
public synchronized Integer getPartitionCount(String topic) {
if (!getAllTopics().contains(topic)) {
throw new InvalidParameterException("without target topic:" + topic);
}
return kafkaConsumer.partitionsFor(topic).size();
}
public Map<String, Object> getDefaultCommonPropertis() {
Map<String, Object> properties = new HashMap<String, Object>(16);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, defaultConsumerMaxPollRecords);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, defaultConsumerGroupId);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultConsumerAutoOffsetReset);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return properties;
}
app server one日志:
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-4-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-39, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-16]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-3-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-38, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-15]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-35, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-12, hk_stock_data_trade-18]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-2-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-37, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-14]
[stock-market-cal-hk] [2021-01-21 09:43:05.729] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-1-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-36, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-13, hk_stock_data_trade-19]
[stock-market-cal-hk] [2021-01-21 09:43:05.729] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-5-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-40, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-17]
应用程序服务器2日志:
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-35, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-12, hk_stock_data_trade-0]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-2-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-37, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-8, hk_stock_data_trade-17]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-4-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-39, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-10]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-5-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-40, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-11]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-1-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-36, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-1, hk_stock_data_trade-15]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-3-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280] [Consumer clientId=consumer-38, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-9, hk_stock_data_trade-18]
我不知道我在kafka配置中是否做错了什么,因为在服务器日志中很明显,对于topic hk\u stock\u data\u trade,分区17/18被两次分配给不同的消费者,即使他们共享相同的groupid(data\u cal\u hk)
暂无答案!
目前还没有任何答案,快来回答吧!