我正在尝试Kafka在 java 的跨国制片人。
喜欢
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(rec, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
});
producer.commitTransaction();
}catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
producer.close();
}catch(KafkaException e) {
producer.abortTransaction();
}catch (Exception x){}
producer.close();
它没有抛出任何错误。以及 send
也在Kafka推送消息,它是可用的。
我看到的经纪人的日志是这样的:
[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)
5分钟后我找到了这个代理日志[2017-10-30 19:36:44123]信息[代理1001上的组元数据管理器]:在0毫秒内删除了0个过期的偏移量(kafka.coordinator.group.groupmetadatamanager)
在这里我只能看到 the transaction is initialized
但是没有更多的提交日志或者其他的东西。
在producer configs中我附加了
transactional.id=<some random transaction ID>
enable.idempotence=true
如前所述 Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.
我找到了一个陈述 Producer: Send an OffsetCommitRequest to commit the input state associated with the end of that transaction
Kafka文献中
这是不是意味着我必须告诉你 Offset
我想承诺?
我不知道制片人怎么了
这是我的生产者调试日志:
1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY
我想是的,在提交事务之前我遗漏了一些东西。在消费者中,如果我设置了 READ_COMMITTED
. 如果不是这样的话,它是正常工作的,甚至我也收到消息,我正在使用事务生产者生产。
我的用于读取事务性消息的消费代码
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
消费者正在订阅 topic
主题和
我的使用者调试控制台日志是:
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
在这是重复相同的3行连续和我有13作为最高偏移值。在消费者中,我不能消费信息。
我有一个1节点群集的设置,我在3节点上尝试过,它也显示出相同的结果。
感谢您的帮助。
1条答案
按热度按时间bcs8qyzn1#
终于对我来说开始起作用了。
我说不清到底是什么问题。但从我的观察来看,这是应该的
WINDOWS OS
.如果我们的代理在windows机器上,它就不能按预期工作。如果代理在linux机器上,那么它工作正常。
我的观察:
windows计算机中事务状态的日志转储段
在上面的日志中,您可以很容易地看到事务状态为
Empty
,Ongoing
以及PrepareCommit
. 但是没有提到提交/事务的完成。但在我的producer控制台日志中,它表示为事务已完成,因此肯定存在一些问题。
linux机器中事务状态的日志转储段
但在这里我们可以很容易地发现,总共有4种不同的状态被提及。
Empty
,Ongoing
,PrepareCommit
以及CompleteCommit
. 这实际上使交易完成。因此,我们也可以重用提交id。
因此,我可以得出结论,如果您想使用事务,那么现在就在linux上而不是windows上使用kafka代理。