我想读取事务的元数据(kafka0.11.0.1支持),这样我就可以判断特定事务id的事务是否已提交。目前我正在从状态主题获取密钥和值,但它是某种编码格式。下面是我在轮询事务状态主题时收到的一些相同的键/值:key=1000000mm,value=� � ���� +'���������)
����
dba5bblo1#
就像Kafka所做的那样,如何从消费者的主题中解读echo "exclude.internal.topics=false" > consumer.config kafka-console-consumer --consumer.config consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --bootstrap-server localhost:9092 --topic __transaction_state --from-beginning
kafka-console-consumer --consumer.config consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --bootstrap-server localhost:9092 --topic __transaction_state --from-beginning
kdfy810k2#
您可以查看 TransactionLogMessageParser 类内部 kafka/tools/DumpLogSegments.scala 以文件为例。它使用 readTxnRecordValue 函数来自 TransactionLog 班级。此函数的第一个参数可以通过 readTxnRecordKey 同一类的函数。
TransactionLogMessageParser
kafka/tools/DumpLogSegments.scala
readTxnRecordValue
TransactionLog
readTxnRecordKey
2条答案
按热度按时间dba5bblo1#
就像Kafka所做的那样,如何从消费者的主题中解读
echo "exclude.internal.topics=false" > consumer.config
kafka-console-consumer --consumer.config consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --bootstrap-server localhost:9092 --topic __transaction_state --from-beginning
kdfy810k2#
您可以查看
TransactionLogMessageParser
类内部kafka/tools/DumpLogSegments.scala
以文件为例。它使用readTxnRecordValue
函数来自TransactionLog
班级。此函数的第一个参数可以通过readTxnRecordKey
同一类的函数。