我有一个从不清除消息的Kafka主题(故意)。我想继续使用它的消息,即使我的消费者离线了几天/几周/几个月。据我所知,这可能是不可能的,所以我从我的解决方案开始。
在运行它的时候,我得到了以下异常,之后我的@ Kafkafka似乎再也没有消费过消息,即使新消息一直被放到“topic 1”上。我试着重新启动我的Sping Boot 应用。我甚至试着用以下命令将组偏移量重置回0:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group aggregator-txnstokafka-group-ethereum --topic topic1 --reset-offsets --to-earliest --execute
字符串
但消费从未恢复。
如何让我的@ Kafkafka恢复消费消息?
例外情况:
2023-10-05 11:14:31,755 ERROR org.springframework.kafka.core.DefaultKafkaProducerFactory [aggregator-txnstokafka-listener-ethereum-0-C-1] - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7de7fc0]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
... 1 more
2023-10-05 11:14:31,760 ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer [aggregator-txnstokafka-listener-ethereum-0-C-1] - Transaction rolled back
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
... 1 more
型
在服务器上,我配置了:
offsets.retention.minutes=5256000
group.max.session.timeout.ms=2147483647
型
在我的客户端中,我配置了
session.timeout.ms=2147483646
max.poll.interval.ms=900000
型
下面是从topic 1拉取并发布到topic 2的代码片段(为了简洁起见,这里剥离了其余的功能)。
@Autowired
@Qualifier("evmBlockToKakfaTemplate")
KafkaTemplate<Number, Object> destTemplate;
@KafkaListener(id = "aggregator-txnstokafka-listener-ethereum", groupId = "aggregator-txnstokafka-group-ethereum", topics = "topic1", containerFactory = "evmTxnsToKafkaContainerFactory")
public void onMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgement) {
// Do some stuff
destTemplate.send("topic2", key, value);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> evmTxnsToKafkaContainerFactory(
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager,
ConsumerFactory<? super String, ? super String> evmTxnToKakfaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(evmTxnAggregatorTransactionManager);
factory.setConsumerFactory(evmTxnToKakfaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new TransactionAggregatorErrorHandler());
factory.setConcurrency(1);
factory.setBatchListener(false);
return factory;
}
@Bean
public KafkaTemplate<Number, Object> evmTxnToKakfaTemplate() {
return new KafkaTemplate<Number, Object>(evmTxnToKakfaProducerFactory());
}
@Bean
public DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBroker);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "aggregator-txnstokafka-" + chain);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "aggregator-txnstokafka-" + chain + "-");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<? super String, ? super BlockchainDataAggregator> evmTxnToKakfaConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, srcKafkaBroker);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "2147483646"); // Integer max value. About 24.8 days
//config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ...);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager(
DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory) {
KafkaTransactionManager<Number, Object> kafkaTransactionManager = new KafkaTransactionManager<>(
evmTxnToKakfaProducerFactory);
kafkaTransactionManager
.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return kafkaTransactionManager;
}
型
我们用...
- Ubuntu上的Kafka 2.12-3.2.0
- Java 18
- Sping Boot Starter 2.7.4 with Spring Kafka
- “topic 1”是1个分区,有3个复制体(有意)
1条答案
按热度按时间zd287kbt1#
有点困惑的要求在这里.然而,你可以检查几件事.
1.为该主题指定的保留字节数是多少
1.当您将使用者重置为最早时,请尝试更改使用者组名称并查看问题是否仍然存在。如果您尝试访问无效的偏移量,则可能会发生此提交失败异常。