kafkalistener读了两遍信息

vddsk6oq  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(355)

因此,使用下面的配置,当我们将spring引导容器扩展到10个jvm时,事件的数量随机多于发布的数量,例如,如果发布了32万条消息,那么事件有时会达到32.05万条等等。。

  1. //Consumer container bean
  2. private static final int CONCURRENCY = 1;
  3. @Bean
  4. public Map<String, Object> consumerConfigs() {
  5. Map<String, Object> props = new HashMap<>();
  6. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  7. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  8. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  9. props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
  10. props.put("enable.auto.commit", "false");
  11. //props.put("isolation.level", "read_committed");
  12. return props;
  13. }
  14. @Bean
  15. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  16. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  17. factory.setConsumerFactory(consumerFactory());
  18. //factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
  19. factory.getContainerProperties().setPollTimeout(3000);
  20. factory.setConcurrency(CONCURRENCY);
  21. return factory;
  22. }
  23. //Listener
  24. @KafkaListener(id="claimserror",topics = "${kafka.topic.dataintakeclaimsdqerrors}",groupId = "topic1", containerFactory = "kafkaListenerContainerFactory")
  25. public void receiveClaimErrors(String event,Acknowledgment ack) throws JsonProcessingException {
  26. //save event to table ..
  27. }

更新了下面的更改现在似乎工作正常,我只需在消费者中添加一个重复的检查,以防止消费者失败的情况

  1. @Bean
  2. public Map<String, Object> consumerConfigs() {
  3. Map<String, Object> props = new HashMap<>();
  4. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  5. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  6. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  7. props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
  8. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
  10. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  11. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "-1");
  12. //props.put("isolation.level", "read_committed");
  13. return props;
  14. }
mnowg1ta

mnowg1ta1#

您可以尝试设置 ENABLE_IDEMPOTENCE_CONFIG 诚然,这将有助于确保生产者在流中只写入每条消息的一个副本。

相关问题