我正试图实现一个完全一次生产者在Kafka。我已经阅读了文档并理解了它,以便:
只要生产者是幂等的
事务是使用事务api的(使用init.transaction的两阶段事务。。。以此类推)
消费者的行为被设置为“read\u committed”
我只会得到一次。
这是我目前的代码。如果我执行这个代码,我的主题总是重复的。我知道我关闭了生产者,幂等的性质就丢失了。因此,我创建了更多的记录,这些记录是在同一个提交中发送的,只有一个生产者,但是结果是一样的,我做错了什么,我想我需要实现sendoffsetstotransactions(),但是我不确定。
public class Producer_test {
public static void main(String[] args) {
String bootstrapServers ="127.0.0.1:9092";
String groupId="test_group";
String topic="test_topic";
// create Producer properties
Properties properties= new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
properties.setProperty(ProducerConfig.ACKS_CONFIG,"all");
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"1");
properties.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG,"ClientId");
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"100");
// create the producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
// create a producer record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!