Spring KafkaScript断开连接并中断消费-CommittFailedException

ehxuflar  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(137)

我有一个从不清除消息的Kafka主题(故意)。我想继续使用它的消息,即使我的消费者离线了几天/几周/几个月。据我所知,这可能是不可能的,所以我从我的解决方案开始。
在运行它的时候,我得到了以下异常,之后我的@ Kafkafka似乎再也没有消费过消息,即使新消息一直被放到“topic 1”上。我试着重新启动我的Sping Boot 应用。我甚至试着用以下命令将组偏移量重置回0:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group aggregator-txnstokafka-group-ethereum --topic topic1 --reset-offsets --to-earliest --execute

字符串
但消费从未恢复。

如何让我的@ Kafkafka恢复消费消息?

例外情况:

2023-10-05 11:14:31,755 ERROR org.springframework.kafka.core.DefaultKafkaProducerFactory [aggregator-txnstokafka-listener-ethereum-0-C-1] - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7de7fc0]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        ... 1 more
2023-10-05 11:14:31,760 ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer [aggregator-txnstokafka-listener-ethereum-0-C-1] - Transaction rolled back
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:373)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1231)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:372)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:755)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:1156)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:186)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2353)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2003)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1771)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        ... 1 more


在服务器上,我配置了:

offsets.retention.minutes=5256000
group.max.session.timeout.ms=2147483647


在我的客户端中,我配置了

session.timeout.ms=2147483646
max.poll.interval.ms=900000


下面是从topic 1拉取并发布到topic 2的代码片段(为了简洁起见,这里剥离了其余的功能)。

@Autowired
@Qualifier("evmBlockToKakfaTemplate")
KafkaTemplate<Number, Object> destTemplate;

@KafkaListener(id = "aggregator-txnstokafka-listener-ethereum", groupId = "aggregator-txnstokafka-group-ethereum", topics = "topic1", containerFactory = "evmTxnsToKafkaContainerFactory")
public void onMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgement) {
  // Do some stuff
  destTemplate.send("topic2", key, value);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> evmTxnsToKafkaContainerFactory(
        KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager,
        ConsumerFactory<? super String, ? super String> evmTxnToKakfaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setTransactionManager(evmTxnAggregatorTransactionManager);
    factory.setConsumerFactory(evmTxnToKakfaConsumerFactory);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setCommonErrorHandler(new TransactionAggregatorErrorHandler());
    factory.setConcurrency(1);
    factory.setBatchListener(false);

    return factory;
}

@Bean
public KafkaTemplate<Number, Object> evmTxnToKakfaTemplate() {
    return new KafkaTemplate<Number, Object>(evmTxnToKakfaProducerFactory());
}

@Bean
public DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBroker);
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "aggregator-txnstokafka-" + chain);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "aggregator-txnstokafka-" + chain + "-");
    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConsumerFactory<? super String, ? super BlockchainDataAggregator> evmTxnToKakfaConsumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, srcKafkaBroker);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "2147483646"); // Integer max value. About 24.8 days
    //config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ...);
    config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
KafkaTransactionManager<Number, Object> evmTxnAggregatorTransactionManager(
        DefaultKafkaProducerFactory<Number, Object> evmTxnToKakfaProducerFactory) {
    KafkaTransactionManager<Number, Object> kafkaTransactionManager = new KafkaTransactionManager<>(
            evmTxnToKakfaProducerFactory);
    kafkaTransactionManager
            .setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
    return kafkaTransactionManager;
}


我们用...

  • Ubuntu上的Kafka 2.12-3.2.0
  • Java 18
  • Sping Boot Starter 2.7.4 with Spring Kafka
  • “topic 1”是1个分区,有3个复制体(有意)
zd287kbt

zd287kbt1#

有点困惑的要求在这里.然而,你可以检查几件事.
1.为该主题指定的保留字节数是多少
1.当您将使用者重置为最早时,请尝试更改使用者组名称并查看问题是否仍然存在。如果您尝试访问无效的偏移量,则可能会发生此提交失败异常。

相关问题