前面我们学习了幂等生产者,知道了幂等生产者存在的两个局限不支持跨回话和多分区以及它的优点就是使用简单,针对这两个不足我们看一下kafka 的另外一个解决方案——事务
其实看到这里我们大致知道了,全部失败的情况下不能保证数据不写入kafka,因为kafka 并没有回滚机制,所以失败的情况下需要consumer 不可见,就是consumer 不去读取失败的数据。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。
这里我们的生产者偶数值,奇数值是不提交的
public class TransactionProducer {
private static final Logger logger = LoggerFactory.getLogger(IdempotenceProducer.class);
private static KafkaProducer<String, String> producer = null;
/* 初始化生产者 */
static {
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}
/* 初始化配置 */
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// 必须设置的
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1v1_bu_stu_reg");
props.put("retries", 3);
return props;
}
public static void main(String[] args) throws InterruptedException {
//消息实体
ProducerRecord<String, String> record = null;
producer.initTransactions();
for (int i = 0; i < 10000; i++) {
record = new ProducerRecord<String, String>("test", "value" + i);
producer.beginTransaction();
//发送消息
producer.send(record);
producer.send(record);
if (i %2 ==0){
producer.commitTransaction();
}else {
producer.abortTransaction();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
producer.close();
}
}
这里注意下,我们除了配置开启事务和事务ID 之外,还需要设置重试次数,否则会得到如下错误
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.kingcall.clients.producer.transaction.TransactionProducer.<clinit>(TransactionProducer.java:22)
Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
at org.apache.kafka.clients.producer.KafkaProducer.configureRetries(KafkaProducer.java:545)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:458)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:419)
... 2 more
我们先用命令行来演示一下,我们如果开启消费者相关配置,我们是会消费到事务失败的消息的,也就是失败的消息已经写入了kafka 了
从上面的消费信息来看,好像我们的消费者确实只消费到了偶数值,那意味着奇数值没提交吗,也就是失败的消息没有提交到kafka 吗,其实不是这是和kafka 的批量提交有关的,也就是说是因为producer 根本没有提交,然后事务abort
的话,缓存里的消息就没有发送了,这里我们在发送消息的时候执行一下等待,等待producer 将数据发送出去之后我们再abort
事务,生产者和事务提交之间加一行代码TimeUnit.MILLISECONDS.sleep(1000);
,然后我们看一下效果
producer.beginTransaction();
//发送消息
producer.send(record);
TimeUnit.MILLISECONDS.sleep(1000);
if (i %2 ==0){
producer.commitTransaction();
}else {
producer.abortTransaction();
}
这次我们就看到了事务失败的时候提交的数据了
我们保持生产者代码不变,然后通过在消费者端配置不消费事务失败的数据
public class TransactionConsumer {
private static KafkaConsumer<String,String> consumer;
/** * 初始化消费者 */
static {
Properties configs = initConfig();
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("test"));
}
/** * 初始化配置 */
private static Properties initConfig(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// 只消费 read_committed数据
props.put("isolation.level","read_committed");
return props;
}
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record)->{
System.out.println(record.value());
});
}
}
}
我们看一下消费结果
我们看到值消费了事务提交的数据
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114777443
内容来源于网络,如有侵权,请联系作者删除!