Spring BootKafka监听器vs消费者

x33g5p2x  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(393)

有什么区别?Kafka消费者和Kafka主义者可以互换使用吗?

pbpqsu0x

pbpqsu0x1#

这个 @KafkaListener 是用于 ConcurrentMessageListenerContainer ,它会在周围生成多个内部侦听器 KafkaConsumer .
不同的是 KafkaConsumer 当您调用其 poll() 只要你需要。侦听器抽象将围绕它有一个无限循环 poll() 每当记录出现在 poll() . 我们有一个任务执行器,它运行如下逻辑:

while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping
            }
            catch (NoOffsetForPartitionException nofpe) {
                this.fatalError = true;
                ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                break;
            }
            catch (Exception e) {
                handleConsumerException(e);
            }
            catch (Error e) { // NOSONAR - rethrown
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }
                this.logger.error("Stopping container due to an Error", e);
                wrapUp();
                throw e;
            }
        }

这个 KafkaConsumer.poll() 在那叫什么 pollAndInvoke(); .

相关问题