我正在尝试为banks用例之一使用幂等生产者和事务实现一次语义。
我创造了这样的制片人:
String topicName = "exonce";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("transactional.id", "1");
props.put("acks", "all");
props.put("transaction.timeout.ms","160000");
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>(topicName, "val_"));
producer.commitTransaction();
System.out.println("Message sent successfully");
producer.close();
但是我不能在用户端得到任何东西&而且我也看不到sysout:“messagesentmessfully”至少sysout应该出现/显示。
程序没有结束,它在等待某些事情发生(没有错误/异常)。正在等待:producer.inittransactions();线
这是日志:
2012年7月17日08:46:36 info producer.producerconfig:producerconfig值:acks=all
batch.size=16384
bootstrap.servers=[localhost:9092]
buffer.memory=33554432
client.id=
compression.type=无
connections.max.idle.ms=540000个
enable.idempotence=真
interceptor.classes=空
key.serializer=类org.apache.kafka.common.serialization.stringserializer
linger.ms=1
最大block.ms=60000
每个连接的最大飞行请求数=5
最大请求大小=1048576
metadata.max.age.ms=300000
metric.reporters=[]
metrics.num.samples=2个
metrics.recording.level=信息
metrics.sample.window.ms=30000
partitioner.class=类org.apache.kafka.clients.producer.internals.defaultpartitioner
receive.buffer.bytes=32768
reconnect.backoff.max.ms=1000
重新连接.backoff.ms=50
request.timeout.ms=30000
重试次数=1
retry.backoff.ms=100
transaction.timeout.ms=160000
事务id=1
value.serializer=类org.apache.kafka.common.serialization.stringserializer
17/07/12 08:46:36 info producer.kafkaproducer:示例化了事务生产者。
17/07/12 08:46:36 info producer.kafkaproducer:正在将默认的max.in.flight.requests.per.connection覆盖为1,因为启用了幂等。
2012年7月17日08:46:37 info utils.appinfoparser:Kafka版本:0.11.0.0
2012年7月17日08:46:37 info utils.appinfoparser:kafka commitid:cb8625948210849f
2012年7月17日08:46:37 info.transactionmanager:[transactionalid 1]producerid设置为-1,epoch为-1
我不知道我在这里犯了什么错误。
我用的是Kafka-0.11.0.0
如果你需要更多的信息,请告诉我。
感谢您的帮助和支持。
谢谢
拉吉
3条答案
按热度按时间p5fdfcr11#
更新了答案,尝试在javadoc中运行示例,以测试代理是否正确配置为0.11和0.11客户端协议
vd2z7a6w2#
这是我在遵循默认情况下需要3个代理的提示后发现的。此外,默认情况下,事务需要2个同步副本。
因此,在基于docker的环境中,我减少了broker上的这两个设置:
这些设置对应于https://kafka.apache.org/0110/documentation.html#brokerconfigs
注意:isr代表同步副本
还有一点:如果您使用的是confluent的默认平台设置(而不是docker容器),那么这些设置已经预先配置好了。
pcww981p3#
这是单节点安装吗?你能检查一下你的server.log吗
__transaction_state
主题创建正确吗?它需要3个副本才能创建,并且只在第一个副本上创建initTransactions
请求。因此,如果没有足够的代理,则主题的创建将失败,并且initTransactions
请求可能永远阻止。