apache kafka@kafkalistener方法中的数据库事务

rsaldnfx  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(221)

我读了很多关于这个主题的问题和答案,但我不能解决我的问题。
我用kafka和spring数据jdbc初始化了一个springboot项目。我想做的是
配置kafka jdbc连接器,以便将postgresql数据库中的记录更改推送到kafka主题中
设置一个kafka使用者,以便通过将其插入另一个postgresql db来使用推送到主题中的记录。
对于第1点,一切正常。对于第二点,我遇到了一些问题。
这就是项目的组织方式

com.migration
 - MigrationApplication.java
com.migration.config
 - KafkaConsumerConfig.java
com.migration.db
 - JDBCConfig.java
 - RecordRepository.java
com.migration.listener
 - MessageListener.java
com.migration.model
 - Record.java
 - AbstractRecord.java
 - PostgresRecord.java

这就是 MessageListener

@EnableJdbcRepositories("com.migration.db")
@Transactional
@Configuration
public class MessageListener {
    @Autowired
    private RecordRepository repository;

    @KafkaListener(topics={"author"}, groupId = "migrator", containerFactory = "migratorKafkaListenerContainerFactory")
    public void listenGroupMigrator(Record record) {
        repository.insert(message);
        throw new RuntimeException();
    }

我认为很清楚,它设置了一个kafka消费者,以便收听“author”主题,并通过将其插入db来消费记录。
如你所见,里面 listenGroupMigrator() 方法在插入到记录的db中时执行,然后抛出 RuntimeException 因为我在检查 @Transactional 工作,如果执行回滚。
但如果不是,则不会执行回滚,即使用 @Transactional .
为了完整性,这些是其他类 RecordRepository

@Repository
public class RecordRepository {
    public RecordRepository() {}

    public void insert(Record record) {
        JDBCConfig jdbcConfig = new JDBCConfig();
        SimpleJdbcInsert messageInsert = new SimpleJdbcInsert(jdbcConfig.postgresDataSource());

        messageInsert.withTableName(record.tableName()).execute(record.content());
    }
}
``` `JDBCConfig` 班

@Configuration
public class JDBCConfig {

@Bean
public DataSource postgresDataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("org.postgresql.Driver");
    dataSource.setUrl("jdbc:postgresql://localhost:5432/db");
    dataSource.setUsername("postgres");
    dataSource.setPassword("root");

    return dataSource;
}

}
``` KafkaConsumerConfig 班级:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${kafka.bootstrap-server}")
    private String bootstrapServer;

    private <T extends Record> ConsumerFactory<String, T> consumerFactory(String groupId, Class<T> clazz) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz));

    }

    private <T extends Record> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory(String groupId, Class<T> clazz) {
        ConcurrentKafkaListenerContainerFactory<String, T> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId, clazz));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PostgresRecord> migratorKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("migrator", PostgresRecord.class);
    }
}
``` `MigrationApplication` 班

@SpringBootApplication
public class MigrationApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MigrationApplication.class, args);
MessageListener listener = context.getBean(MessageListener.class);
}
}

我怎么才能做这个 `listenGroupMigrator` 事务性方法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题