ApacheKafka精确一次实现不发送消息

yftpprvb  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(425)

我正在尝试为banks用例之一使用幂等生产者和事务实现一次语义。
我创造了这样的制片人:

String topicName = "exonce";

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("enable.idempotence", "true");
props.put("transactional.id", "1");
props.put("acks", "all");
props.put("transaction.timeout.ms","160000");

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer
   <String, String>(props);
producer.initTransactions();
producer.beginTransaction();

producer.send(new ProducerRecord<String, String>(topicName,           "val_"));

producer.commitTransaction();
System.out.println("Message sent successfully");
producer.close();

但是我不能在用户端得到任何东西&而且我也看不到sysout:“messagesentmessfully”至少sysout应该出现/显示。
程序没有结束,它在等待某些事情发生(没有错误/异常)。正在等待:producer.inittransactions();线
这是日志:
2012年7月17日08:46:36 info producer.producerconfig:producerconfig值:acks=all
batch.size=16384
bootstrap.servers=[localhost:9092]
buffer.memory=33554432
client.id=
compression.type=无
connections.max.idle.ms=540000个
enable.idempotence=真
interceptor.classes=空
key.serializer=类org.apache.kafka.common.serialization.stringserializer
linger.ms=1
最大block.ms=60000
每个连接的最大飞行请求数=5
最大请求大小=1048576
metadata.max.age.ms=300000
metric.reporters=[]
metrics.num.samples=2个
metrics.recording.level=信息
metrics.sample.window.ms=30000
partitioner.class=类org.apache.kafka.clients.producer.internals.defaultpartitioner
receive.buffer.bytes=32768
reconnect.backoff.max.ms=1000
重新连接.backoff.ms=50
request.timeout.ms=30000
重试次数=1
retry.backoff.ms=100
transaction.timeout.ms=160000
事务id=1
value.serializer=类org.apache.kafka.common.serialization.stringserializer
17/07/12 08:46:36 info producer.kafkaproducer:示例化了事务生产者。
17/07/12 08:46:36 info producer.kafkaproducer:正在将默认的max.in.flight.requests.per.connection覆盖为1,因为启用了幂等。
2012年7月17日08:46:37 info utils.appinfoparser:Kafka版本:0.11.0.0
2012年7月17日08:46:37 info utils.appinfoparser:kafka commitid:cb8625948210849f
2012年7月17日08:46:37 info.transactionmanager:[transactionalid 1]producerid设置为-1,epoch为-1
我不知道我在这里犯了什么错误。
我用的是Kafka-0.11.0.0
如果你需要更多的信息,请告诉我。
感谢您的帮助和支持。
谢谢
拉吉

p5fdfcr1

p5fdfcr11#

更新了答案,尝试在javadoc中运行示例,以测试代理是否正确配置为0.11和0.11客户端协议

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();
vd2z7a6w

vd2z7a6w2#

这是我在遵循默认情况下需要3个代理的提示后发现的。此外,默认情况下,事务需要2个同步副本。
因此,在基于docker的环境中,我减少了broker上的这两个设置:

KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

这些设置对应于https://kafka.apache.org/0110/documentation.html#brokerconfigs

transaction.state.log.min.isr
transaction.state.log.replication.factor

注意:isr代表同步副本
还有一点:如果您使用的是confluent的默认平台设置(而不是docker容器),那么这些设置已经预先配置好了。

pcww981p

pcww981p3#

这是单节点安装吗?你能检查一下你的server.log吗 __transaction_state 主题创建正确吗?它需要3个副本才能创建,并且只在第一个副本上创建 initTransactions 请求。因此,如果没有足够的代理,则主题的创建将失败,并且 initTransactions 请求可能永远阻止。

相关问题