如何在spring引导服务中测试我是否正确配置了chainedkafkatransactionmanager

py49o6xq  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(495)

我的spring引导服务需要使用一个主题之外的kafka事件,进行一些处理(包括用jpa写入db),然后在一个新主题上生成一些事件。无论发生什么情况,我都无法在不更新数据库的情况下发布事件,如果出现任何问题,那么我希望消费者的下一次轮询重试该事件。我的处理逻辑包括db更新是幂等的,所以重试就可以了
我想我已经实现了上面描述的一次语义https://docs.spring.io/spring-kafka/reference/html/#exactly-有一次,使用这样的链接KafkatTransactionManager:

@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
    kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager(kafka, jpa); 
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ChainedKafkaTransactionManager chainedTransactionManager) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setTransactionManager(chainedTransactionManager);

    return factory;
}

myapplication.yaml文件中的相关kafka配置如下所示:

kafka:
    ...
    consumer:
      group-id: myGroupId
      auto-offset-reset: earliest
      properties:
        isolation.level: read_committed
      ...
    producer:
      transaction-id-prefix: ${random.uuid}
      ...

因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交是按照所需的顺序进行的,并且如果在提交到kafka的过程中发生错误,那么原始事件将再次被消耗。但是,我正在努力寻找一种好的方法来导致db提交和kafka提交之间的失败。
我有什么建议或其他方法可以做到这一点吗?
谢谢

mcdcgff0

mcdcgff01#

你可以用定制的 ProducerFactory 返回 MockProducer (由提供) kafka-clients .
设置 commitTransactionException 以便在ktm尝试提交事务时抛出它。
编辑
这是一个例子;它不使用chained tm,但这不会有什么区别。

@SpringBootApplication
public class So66018178Application {

    public static void main(String[] args) {
        SpringApplication.run(So66018178Application.class, args);
    }

    @KafkaListener(id = "so66018178", topics = "so66018178")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {

    @Autowired
    EmbeddedKafkaBroker broker;

    @Test
    void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
            throws InterruptedException {

        registry.getListenerContainer("so66018178").stop();
        AtomicReference<Exception> listenerException = new AtomicReference<>();
        CountDownLatch latch = new CountDownLatch(1);
        ((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
                .setAfterRollbackProcessor(new AfterRollbackProcessor<>() {

                    @Override
                    public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
                            Exception exception, boolean recoverable) {

                        listenerException.set(exception);
                        latch.countDown();
                    }
                });
        registry.getListenerContainer("so66018178").start();

        Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
        template.send("so66018178", "test");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
                .hasCause(config.exception);
    }

    @Configuration
    public static class Config {

        RuntimeException exception = new RuntimeException("test");

        @Bean
        public ProducerFactory<Object, Object> pf() {
            return new ProducerFactory<>() {

                @Override
                public Producer<Object, Object> createProducer() {
                    MockProducer<Object, Object> mockProducer = new MockProducer<>();
                    mockProducer.commitTransactionException = Config.this.exception;
                    return mockProducer;
                }

                @Override
                public Producer<Object, Object> createProducer(String txIdPrefix) {
                    Producer<Object, Object> producer = createProducer();
                    producer.initTransactions();
                    return producer;
                }

                @Override
                public boolean transactionCapable() {
                    return true;
                }

            };
        }

    }

}

相关问题