我想使用spring在一个事务中向kafka topic发送多条消息。仅在手动提交之后,消息应由使用者读取。尝试如下所示,但消费者在提交前会逐个接收每条消息。
@Bean(name = "txnProducerConfig")
public Map<String, Object> txnConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-");
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
return properties;
}
@Bean
public DefaultKafkaProducerFactory<String, String> txnFactory() {
return new DefaultKafkaProducerFactory<>(txnConfigs());
}
@Bean(name = "txnTemplate")
public KafkaTemplate<String, String> txnTemplate() {
return new KafkaTemplate<>(txnFactory());
}
txnTemplate.executeInTransaction(y -> {
txnTemplate.send(TOPIC, message + 1);
txnTemplate.send(TOPIC, message + 2);
return true;
});
暂无答案!
目前还没有任何答案,快来回答吧!