所以这是我的ChainedKafkaTransactionManager的实现,所以这是管理kafkaTransactionManager和JpaTransactionManager的,因为ChainedKafkaTransactionManager已被弃用,我希望它的替代方案来实现相同的功能。
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class DunningCycleKafkaConfiguration {
private final KafkaConfigurationProperties kafkaConfigurationProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfigurationProperties.getConsumer().getEnableAutoCommit());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigurationProperties.getConsumer().getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfigurationProperties.getConsumer().getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfigurationProperties.getConsumer().getValueDeserializer());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigurationProperties.getBootstrapServers());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfigurationProperties.getConsumer().getMaxPollRecords());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
AfterRollbackProcessor<Object, Object> processor,
ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordInterceptor(new KafkaConsumerInterceptor());
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setAckMode(ContainerProperties.AckMode.valueOf(kafkaConfigurationProperties.getListener().getAckMode()));
factory.setAfterRollbackProcessor(processor);
factory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
return factory;
}
@Bean
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
@Bean
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
JpaTransactionManager transactionManager,
KafkaTransactionManager<?, ?> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, transactionManager);
}
@Bean
@Primary
public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public AfterRollbackProcessor<Object, Object> processor()
{
DefaultAfterRollbackProcessor<Object, Object> processor = new DefaultAfterRollbackProcessor<>(
new FixedBackOff(1000L, 3L));
processor.addNotRetryableExceptions(DataIntegrityViolationException.class);
processor.addNotRetryableExceptions(IllegalStateException.class);
processor.addNotRetryableExceptions(RestClientException.class);
processor.addNotRetryableExceptions(NullPointerException.class);
processor.addNotRetryableExceptions(NumberFormatException.class);
processor.addNotRetryableExceptions(IllegalArgumentException.class);
processor.addNotRetryableExceptions(NoSuchMethodException.class);
processor.addNotRetryableExceptions(JsonParseException.class);
processor.addNotRetryableExceptions(MessageConversionException.class);
return processor;
}
}
我尝试过@ transmitting方法,但不知道这是否是正确的方法。
@Bean
@Primary
@Transactional
public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Transactional
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
1条答案
按热度按时间tpgth1q71#
在TM bean定义上不需要
@Transactional
;那毫无意义只需要在listener方法上使用
@Transactional
(使用JPA TM)(并且只将KTM注入容器)。这将与CKTM非常相似-当侦听器退出时,JPA事务将提交,紧接着是Kafka事务。类似地,如果侦听器抛出异常,两个事务都将回滚。
请注意,与CKTM一样,这只是best effort 1 pc,JPA提交可能会成功,Kafka事务可能会回滚-因此您需要处理可能的重复。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions