我试图找到一个替代方案,因为ChainedKafkaTransactionManager显示@Deprecated

20jt8wwn  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(229)

所以这是我的ChainedKafkaTransactionManager的实现,所以这是管理kafkaTransactionManager和JpaTransactionManager的,因为ChainedKafkaTransactionManager已被弃用,我希望它的替代方案来实现相同的功能。

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class DunningCycleKafkaConfiguration {

    private final KafkaConfigurationProperties kafkaConfigurationProperties;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfigurationProperties.getConsumer().getEnableAutoCommit());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getConsumer().getGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfigurationProperties.getConsumer().getKeyDeserializer());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfigurationProperties.getConsumer().getValueDeserializer());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getConsumer().getMaxPollRecords());
       
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            AfterRollbackProcessor<Object, Object> processor,
            ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordInterceptor(new KafkaConsumerInterceptor());
        ContainerProperties containerProps = factory.getContainerProperties();
        containerProps.setAckMode(ContainerProperties.AckMode.valueOf(kafkaConfigurationProperties.getListener().getAckMode()));
        factory.setAfterRollbackProcessor(processor);
        factory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
        return factory;
    }

    @Bean
    public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }

    @Bean
    public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
            JpaTransactionManager transactionManager,
            KafkaTransactionManager<?, ?> kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, transactionManager);
    }

    @Bean
    @Primary
    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public AfterRollbackProcessor<Object, Object> processor()
    {
        DefaultAfterRollbackProcessor<Object, Object> processor = new DefaultAfterRollbackProcessor<>(
                new FixedBackOff(1000L, 3L));

        processor.addNotRetryableExceptions(DataIntegrityViolationException.class);
        processor.addNotRetryableExceptions(IllegalStateException.class);
        processor.addNotRetryableExceptions(RestClientException.class);
        processor.addNotRetryableExceptions(NullPointerException.class);
        processor.addNotRetryableExceptions(NumberFormatException.class);
        processor.addNotRetryableExceptions(IllegalArgumentException.class);
        processor.addNotRetryableExceptions(NoSuchMethodException.class);
        processor.addNotRetryableExceptions(JsonParseException.class);
        processor.addNotRetryableExceptions(MessageConversionException.class);
        return processor;
    }
}

我尝试过@ transmitting方法,但不知道这是否是正确的方法。

@Bean
    @Primary
    @Transactional
    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

@Bean
    @Transactional
    public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }
tpgth1q7

tpgth1q71#

在TM bean定义上不需要@Transactional;那毫无意义
只需要在listener方法上使用@Transactional(使用JPA TM)(并且只将KTM注入容器)。这将与CKTM非常相似-当侦听器退出时,JPA事务将提交,紧接着是Kafka事务。
类似地,如果侦听器抛出异常,两个事务都将回滚。
请注意,与CKTM一样,这只是best effort 1 pc,JPA提交可能会成功,Kafka事务可能会回滚-因此您需要处理可能的重复。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions

相关问题