我希望测试中的生产者等待被测类中的消费者通过调用Acknowledgement.acknowledge()
进行确认。
spring:
kafka:
consumer:
group-id: test-group
bootstrap-servers: ${spring.embedded.kafka.brokers}
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1
我的kafkaListenerContainerFactory
初始化如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageType> kafkaListenerContainerFactory(
ConsumerFactory<String, MessageType> consumerFactory)
{
ConcurrentKafkaListenerContainerFactory<String, MessageType> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new LoggingErrorHandler());
factory.setConcurrency(numListeners);
return factory;
}
我也在我的producer配置中将acks
设置为all
。即使我在等待future返回,producer似乎也没有阻塞。
producer.send(new ProducerRecord<Integer, String>(TOPIC, key, value)).get();
如何在不添加某种sleep
的情况下,使测试中的生成方阻塞,直到使用方确认?
1条答案
按热度按时间6vl6ewon1#
你不能;生产者和消费者是独立的;
acks
仅仅是确认代理已经接收并保护了记录;这与消费者方面无关。使用者需要自己的逻辑来告诉生成器它已收到记录。