使用spring在单个事务中向kafka发送消息列表

pinkon5k  于 2021-08-20  发布在  Java
关注(0)|答案(0)|浏览(186)

我想使用spring在一个事务中向kafka topic发送多条消息。仅在手动提交之后,消息应由使用者读取。尝试如下所示,但消费者在提交前会逐个接收每条消息。

@Bean(name = "txnProducerConfig")
public Map<String, Object> txnConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-");
    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
    return properties;
}

@Bean
public DefaultKafkaProducerFactory<String, String> txnFactory() {
    return new DefaultKafkaProducerFactory<>(txnConfigs());
}

@Bean(name = "txnTemplate")
public KafkaTemplate<String, String> txnTemplate() {
    return new KafkaTemplate<>(txnFactory());
}
txnTemplate.executeInTransaction(y -> {
    txnTemplate.send(TOPIC, message + 1);
    txnTemplate.send(TOPIC, message + 2);
    return true;
});

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题