有什么区别?Kafka消费者和Kafka主义者可以互换使用吗?
pbpqsu0x1#
这个 @KafkaListener 是用于 ConcurrentMessageListenerContainer ,它会在周围生成多个内部侦听器 KafkaConsumer .不同的是 KafkaConsumer 当您调用其 poll() 只要你需要。侦听器抽象将围绕它有一个无限循环 poll() 每当记录出现在 poll() . 我们有一个任务执行器,它运行如下逻辑:
@KafkaListener
ConcurrentMessageListenerContainer
KafkaConsumer
poll()
while (isRunning()) { try { pollAndInvoke(); } catch (@SuppressWarnings(UNUSED) WakeupException e) { // Ignore, we're stopping } catch (NoOffsetForPartitionException nofpe) { this.fatalError = true; ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe); break; } catch (Exception e) { handleConsumerException(e); } catch (Error e) { // NOSONAR - rethrown Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop; if (runnable != null) { runnable.run(); } this.logger.error("Stopping container due to an Error", e); wrapUp(); throw e; } }
这个 KafkaConsumer.poll() 在那叫什么 pollAndInvoke(); .
KafkaConsumer.poll()
pollAndInvoke();
1条答案
按热度按时间pbpqsu0x1#
这个
@KafkaListener
是用于ConcurrentMessageListenerContainer
,它会在周围生成多个内部侦听器KafkaConsumer
.不同的是
KafkaConsumer
当您调用其poll()
只要你需要。侦听器抽象将围绕它有一个无限循环poll()
每当记录出现在poll()
. 我们有一个任务执行器,它运行如下逻辑:这个
KafkaConsumer.poll()
在那叫什么pollAndInvoke();
.