如何在spring引导服务中测试我是否正确配置了chainedkafkatransactionmanager

py49o6xq  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(599)

我的spring引导服务需要使用一个主题之外的kafka事件,进行一些处理(包括用jpa写入db),然后在一个新主题上生成一些事件。无论发生什么情况,我都无法在不更新数据库的情况下发布事件,如果出现任何问题,那么我希望消费者的下一次轮询重试该事件。我的处理逻辑包括db更新是幂等的,所以重试就可以了
我想我已经实现了上面描述的一次语义https://docs.spring.io/spring-kafka/reference/html/#exactly-有一次,使用这样的链接KafkatTransactionManager:

  1. @Bean
  2. public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
  3. kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
  4. return new ChainedKafkaTransactionManager(kafka, jpa);
  5. }
  6. @Bean
  7. public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
  8. ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
  9. ConsumerFactory<Object, Object> kafkaConsumerFactory,
  10. ChainedKafkaTransactionManager chainedTransactionManager) {
  11. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  12. configurer.configure(factory, kafkaConsumerFactory);
  13. factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
  14. return factory;
  15. }

myapplication.yaml文件中的相关kafka配置如下所示:

  1. kafka:
  2. ...
  3. consumer:
  4. group-id: myGroupId
  5. auto-offset-reset: earliest
  6. properties:
  7. isolation.level: read_committed
  8. ...
  9. producer:
  10. transaction-id-prefix: ${random.uuid}
  11. ...

因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交是按照所需的顺序进行的,并且如果在提交到kafka的过程中发生错误,那么原始事件将再次被消耗。但是,我正在努力寻找一种好的方法来导致db提交和kafka提交之间的失败。
我有什么建议或其他方法可以做到这一点吗?
谢谢

mcdcgff0

mcdcgff01#

你可以用定制的 ProducerFactory 返回 MockProducer (由提供) kafka-clients .
设置 commitTransactionException 以便在ktm尝试提交事务时抛出它。
编辑
这是一个例子;它不使用chained tm,但这不会有什么区别。

  1. @SpringBootApplication
  2. public class So66018178Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(So66018178Application.class, args);
  5. }
  6. @KafkaListener(id = "so66018178", topics = "so66018178")
  7. public void listen(String in) {
  8. System.out.println(in);
  9. }
  10. }
  1. spring.kafka.producer.transaction-id-prefix=tx-
  2. spring.kafka.consumer.auto-offset-reset=earliest
  1. @SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
  2. @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  3. class So66018178ApplicationTests {
  4. @Autowired
  5. EmbeddedKafkaBroker broker;
  6. @Test
  7. void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
  8. throws InterruptedException {
  9. registry.getListenerContainer("so66018178").stop();
  10. AtomicReference<Exception> listenerException = new AtomicReference<>();
  11. CountDownLatch latch = new CountDownLatch(1);
  12. ((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
  13. .setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
  14. @Override
  15. public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
  16. Exception exception, boolean recoverable) {
  17. listenerException.set(exception);
  18. latch.countDown();
  19. }
  20. });
  21. registry.getListenerContainer("so66018178").start();
  22. Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
  23. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  24. DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
  25. KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
  26. template.send("so66018178", "test");
  27. assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
  28. assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
  29. .hasCause(config.exception);
  30. }
  31. @Configuration
  32. public static class Config {
  33. RuntimeException exception = new RuntimeException("test");
  34. @Bean
  35. public ProducerFactory<Object, Object> pf() {
  36. return new ProducerFactory<>() {
  37. @Override
  38. public Producer<Object, Object> createProducer() {
  39. MockProducer<Object, Object> mockProducer = new MockProducer<>();
  40. mockProducer.commitTransactionException = Config.this.exception;
  41. return mockProducer;
  42. }
  43. @Override
  44. public Producer<Object, Object> createProducer(String txIdPrefix) {
  45. Producer<Object, Object> producer = createProducer();
  46. producer.initTransactions();
  47. return producer;
  48. }
  49. @Override
  50. public boolean transactionCapable() {
  51. return true;
  52. }
  53. };
  54. }
  55. }
  56. }
展开查看全部

相关问题