Spring的嵌入Kafka的生产者不等待消费者的认可

sf6xfgos  于 2022-11-23  发布在  Spring
关注(0)|答案(1)|浏览(102)

我希望测试中的生产者等待被测类中的消费者通过调用Acknowledgement.acknowledge()进行确认。

spring:
  kafka:
    consumer:
      group-id: test-group
      bootstrap-servers: ${spring.embedded.kafka.brokers}
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 1

我的kafkaListenerContainerFactory初始化如下:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageType> kafkaListenerContainerFactory(
            ConsumerFactory<String, MessageType> consumerFactory)
    {
        ConcurrentKafkaListenerContainerFactory<String, MessageType> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setErrorHandler(new LoggingErrorHandler());
        factory.setConcurrency(numListeners);
        return factory;
    }

我也在我的producer配置中将acks设置为all。即使我在等待future返回,producer似乎也没有阻塞。

producer.send(new ProducerRecord<Integer, String>(TOPIC, key, value)).get();

如何在不添加某种sleep的情况下,使测试中的生成方阻塞,直到使用方确认?

6vl6ewon

6vl6ewon1#

你不能;生产者和消费者是独立的; acks仅仅是确认代理已经接收并保护了记录;这与消费者方面无关。
使用者需要自己的逻辑来告诉生成器它已收到记录。

相关问题