我的spring引导服务需要使用一个主题之外的kafka事件,进行一些处理(包括用jpa写入db),然后在一个新主题上生成一些事件。无论发生什么情况,我都无法在不更新数据库的情况下发布事件,如果出现任何问题,那么我希望消费者的下一次轮询重试该事件。我的处理逻辑包括db更新是幂等的,所以重试就可以了
我想我已经实现了上面描述的一次语义https://docs.spring.io/spring-kafka/reference/html/#exactly-有一次,使用这样的链接KafkatTransactionManager:
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
return factory;
}
myapplication.yaml文件中的相关kafka配置如下所示:
kafka:
...
consumer:
group-id: myGroupId
auto-offset-reset: earliest
properties:
isolation.level: read_committed
...
producer:
transaction-id-prefix: ${random.uuid}
...
因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交是按照所需的顺序进行的,并且如果在提交到kafka的过程中发生错误,那么原始事件将再次被消耗。但是,我正在努力寻找一种好的方法来导致db提交和kafka提交之间的失败。
我有什么建议或其他方法可以做到这一点吗?
谢谢
1条答案
按热度按时间mcdcgff01#
你可以用定制的
ProducerFactory
返回MockProducer
(由提供)kafka-clients
.设置
commitTransactionException
以便在ktm尝试提交事务时抛出它。编辑
这是一个例子;它不使用chained tm,但这不会有什么区别。