Kafka消费者在提交补偿和重新平衡方面失败

wko9yo5t  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(418)

我有一个Kafka消费者,它只订阅了一个主题。在某个时间点,在正常工作后,我在日志中收到以下消息:

  1. Line 25694: 2017-05-15 17:59:53.656 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
  2. Line 25739: 2017-05-15 18:01:39.745 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
  3. Line 25740: 2017-05-15 18:01:39.745 [WARN ] [MeasureConsumerExecutor] ConsumerCoordinator - Auto offset commit failed: null
  4. Line 25766: 2017-05-15 18:10:52.539 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
  5. Line 25789: 2017-05-15 18:25:51.036 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
  6. Line 25790: 2017-05-15 18:25:52.241 [WARN ] [MeasureConsumerExecutor] ConsumerCoordinator - Auto offset commit failed: null
  7. Line 25796: 2017-05-15 18:31:10.354 [INFO ] [MeasureConsumerExecutor] AbstractCoordinator - Marking the coordinator 2147483647 dead.
  8. Line 25797: 2017-05-15 18:31:24.101 [INFO ] [MeasureConsumerExecutor] EventConsumer - run() - WARN - msg: KafkaConsumer will be CLOSED!

我的代码非常简单:

  1. private final AtomicBoolean closed = new AtomicBoolean(false); ...
  2. ...
  3. ...
  4. try {
  5. while (!closed.get()) {
  6. ConsumerRecords<String, Message> records = kafkaConsumer.poll(Long.MAX_VALUE);
  7. for (ConsumerRecord<String, Message> record : records) {
  8. Message message = record.value();
  9. messageArrived(message);
  10. }
  11. }
  12. logger.info("run() - NOTIFY - msg: idConsumer = [{}] HAS !closed.get() = [{}]", consumerId, !closed.get());
  13. } catch (WakeupException wakeupException) {
  14. logger.error("run() - ERROR - msg: Error on Consumer [{}] caused by = [{}]", getConsumerId(), wakeupException.getMessage(), wakeupException);
  15. // Ignore exception if closing
  16. if (!closed.get())
  17. throw wakeupException;
  18. } catch (KafkaException kafkaException) {
  19. logger.error("run() - ERROR - msg: Error on Consumer [" + getConsumerId() + "] caused by = [" + kafkaException.getMessage() + "]", kafkaException);
  20. } catch (Exception exception) {
  21. logger.error("run() - ERROR - msg: Error on Consumer [" + getConsumerId() + "] caused by = [" + exception.getMessage() + "]", exception);
  22. } finally {
  23. logger.info("run() - WARN - msg: KafkaConsumer will be CLOSED!");
  24. if (null != kafkaConsumer) {
  25. kafkaConsumer.close();
  26. }
  27. }
  28. }

奇怪的是,我得到了最后一个警告日志(“kafkaconsumer will be closed”),而没有进入异常日志(因此显然没有异常),而且“closed”变量没有任何更改。
我有多个类似这样的消费者在不同的主题上并行运行,但我认为这是不相关的。代理位于同一子网中的不同物理计算机上。
你能给我一些关于这里发生的事情的提示吗?我如何处理这个问题以防止消费者断开连接或者至少能够从中恢复?
非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题